Voltar ao Portfolio
Full-Stack PlatformSocial NetworkReal-time

iContei.

Rede social de contadores compartilháveis — desde estatísticas esportivas virais até marcos pessoais. Plataforma full-stack com Next.js 16, FastAPI, WebSockets, e verificação por IA.

Next.js 16React 19FastAPIPostgreSQLRedisWebSocketOpenAI/GroqPrometheus

Visão Geral

O Produto

iContei é uma plataforma social dedicada a contadores de todos os tipos — desde estatísticas esportivas virais ("Há 2.847 dias o Flamengo está sem título mundial") até marcos pessoais ("Faltam 127 dias para meu casamento") e corporativos ("1.567 dias sem acidentes de trabalho").

O foco é criar contadores visualmente impactantes para compartilhamento em redes sociais (TikTok, Instagram, Twitter) com previews dinâmicos, atualização em tempo real, e comunidade engajada.

Casos de Uso

📊 Públicos (Virais)

  • • "Há 2.847 dias o Flamengo está sem título mundial"
  • • "Faltam 180 dias para a Copa do Mundo 2026"
  • • "Real Madrid há 45 dias sem perder no Bernabéu"

💑 Pessoais

  • • "Faltam 127 dias para meu casamento"
  • • "Há 1.234 dias que nos conhecemos"

🏢 Corporativos

  • • "1.567 dias sem acidentes de trabalho"
  • • "890 dias como líder de mercado"

Arquitetura Full-Stack

┌─────────────────────────────────────────────────────────────────────────┐
│                        FRONTEND (Next.js 16)                            │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐  ┌────────────┐  │
│  │  Home Page   │  │Counter Detail│  │   Trending   │  │  Profile   │  │
│  │  (SSR+ISR)   │  │   (SSR)      │  │   Rankings   │  │   Admin    │  │
│  └──────────────┘  └──────────────┘  └──────────────┘  └────────────┘  │
│                            │                                            │
│  ┌───────────────────────────────────────────────────────────────────┐  │
│  │  React 19 + Server Components + Framer Motion + Remotion (video)  │  │
│  └───────────────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────┬───────────────────────────┘
                                              │ HTTP + WebSocket
┌─────────────────────────────────────────────▼───────────────────────────┐
│                         BACKEND (FastAPI)                               │
│  ┌───────────────────────────────────────────────────────────────────┐  │
│  │                      API Layer (Routes)                           │  │
│  │  /counters  /rankings  /auth  /admin  /realtime  /automation     │  │
│  └────────────────────────────┬──────────────────────────────────────┘  │
│                               │                                         │
│  ┌────────────────────────────▼──────────────────────────────────────┐  │
│  │              Service Layer (Domain-Driven Design)                 │  │
│  │  CounterService  RankingService  AutomationService  AIVerify     │  │
│  └────────────────────────────┬──────────────────────────────────────┘  │
│                               │                                         │
│  ┌────────────────────────────▼──────────────────────────────────────┐  │
│  │              Repository Layer (Data Access)                       │  │
│  │  CounterRepo  UserRepo  ThemeRepo  AutomationRuleRepo            │  │
│  └───────────┬──────────────────────────────────────┬────────────────┘  │
└──────────────┼──────────────────────────────────────┼────────────────────┘
               │                                      │
    ┌──────────▼──────────┐            ┌─────────────▼─────────────┐
    │   PostgreSQL 16     │            │        Redis 7            │
    │  (dados + sessions) │            │  (cache + rankings +      │
    │  AsyncPG + SQLAlch  │            │   WebSocket pub/sub)      │
    └─────────────────────┘            └───────────────────────────┘

Frontend Stack

  • Next.js 16 — App Router, Server Components, ISR
  • React 19 — Latest features, Turbopack
  • Framer Motion — Fluid animations
  • Remotion — Programmatic video generation
  • Sentry — Error tracking & performance
  • PWA — Manifest + Service Worker

Backend Stack

  • FastAPI — Async Python, auto-docs
  • Domain-Driven Design — Well-separated layers
  • AsyncPG + SQLAlchemy — Async PostgreSQL
  • Redis — Cache, rankings (sorted sets), pub/sub
  • OpenTelemetry + Prometheus — Observability
  • MinIO — Object storage (images)

Sistema Real-time

Contadores atualizam a cada segundo no frontend. Para suportar milhares de conexões simultâneas, implementei um sistema de WebSocket com Redis pub/sub, heartbeat, e rate limiting inteligente.

WebSocket Manager

class WebSocketManager:
    """
    Gerencia conexões WebSocket e distribuição 
    de mensagens Redis -> Clientes
    """
    def __init__(self):
        # Mapeia counter_id -> Lista de WebSockets
        self.active_connections: dict[str, list[WebSocket]] = {}
        self._redis_task: asyncio.Task | None = None
        self.presence_timeout = 45  # segundos

    async def connect(
        self, 
        websocket: WebSocket, 
        counter_id: str,
        user_id: str | None = None,
        ip_address: str | None = None
    ):
        # Rate Limiting por usuário
        if user_id:
            user_conn_key = f"user_connections:{user_id}"
            current_conns = await redis_manager.scard(user_conn_key)
            if current_conns >= 5:  # max 5 conexões por user
                await websocket.close(code=4029)  # Too Many Requests
                return False
        
        # Rate Limiting por IP (anônimos)
        elif ip_address:
            ip_conn_key = f"anon_connections:{ip_address}"
            current_ip_conns = await redis_manager.scard(ip_conn_key)
            if current_ip_conns >= 20:
                await websocket.close(code=4029)
                return False
        
        await websocket.accept()
        self.active_connections[counter_id].append(websocket)
        
        # Heartbeat no Redis (Sorted Set com timestamp)
        await self.heartbeat(counter_id, connection_id)
        await self._broadcast_presence(counter_id)

Rate limiting dual: por usuário autenticado (5 conexões) e por IP para anônimos (20 conexões, considerando NAT/CGNAT). Heartbeat via Redis Sorted Set para detectar conexões mortas.

Redis Pub/Sub Distribution

async def start_redis_listener(self):
    """
    Escuta canal Redis e distribui para WebSockets locais
    Permite escalar horizontalmente com múltiplas instâncias
    """
    pubsub = redis_manager.client.pubsub()
    await pubsub.subscribe("counter_updates")
    
    async for message in pubsub.listen():
        if message["type"] == "message":
            data = json.loads(message["data"])
            counter_id = data["counter_id"]
            
            # Broadcast apenas para conexões desta instância
            if counter_id in self.active_connections:
                await self._broadcast_to_counter(
                    counter_id, 
                    data["payload"]
                )

async def publish_update(self, counter_id: str, payload: dict):
    """
    Publica atualização no Redis
    Todas as instâncias do backend receberão
    """
    await redis_manager.publish(
        "counter_updates",
        json.dumps({
            "counter_id": counter_id,
            "payload": payload
        })
    )

Com Redis pub/sub, posso escalar horizontalmente o backend — cada instância mantém suas conexões WebSocket e recebe atualizações via Redis.

Sistema de Presença

30s
Heartbeat interval
45s
Presence timeout
ZADD
Redis Sorted Set

Cliente envia heartbeat a cada 30s. Servidor usa ZRANGEBYSCORE para listar conexões ativas (timestamp > now - 45s). Conexões stale são removidas automaticamente.

Verificação por IA

Contadores oficiais passam por verificação automatizada com IA para garantir que datas e informações estão corretas. Sistema multi-provider com fallback.

AI Verification Service

class AIVerificationService:
    """
    Verifica contadores usando OpenAI ou Groq
    Retorna saída padronizada para o ReviewWorker
    """
    def __init__(
        self,
        provider: str,  # "openai" ou "groq"
        model: str,
        openai_api_key: str | None,
        groq_api_key: str | None,
    ):
        self.provider = provider
        # Endpoints compatíveis com OpenAI Chat Completions
        self._openai_url = "https://api.openai.com/v1/chat/completions"
        self._groq_url = "https://api.groq.com/openai/v1/chat/completions"

    async def verify_counter(self, counter: Counter) -> dict | None:
        """
        Retorna:
        {
          "approved": bool,
          "confidence": float (0-100),
          "reasoning": str,
          "model": str
        }
        """
        # Seleção de provider com fallback
        if self.provider == "openai" and self.openai_api_key:
            chosen = "openai"
        elif self.groq_api_key:
            chosen = "groq"  # fallback
        else:
            return None  # Manual review

        system_msg = (
            "Você é um verificador que decide se um contador "
            "é válido. Responda APENAS JSON..."
        )
        
        user_msg = self._build_user_prompt(counter)
        # ... chamada à API e parse do resultado

Pipeline Brave Search + Groq

class GroqClient:
    """
    Pipeline de 2 estágios para extração de dados:
    1. Brave Search - Busca web
    2. Groq AI - Extrai dados estruturados em JSON
    """
    
    async def search_and_extract(
        self,
        search_query: str,
        prompt_template: str,
        search_count: int = 5,
        groq_model: str = "llama-3.3-70b-versatile",
    ) -> dict | None:
        # Etapa 1: Busca na web
        search_results = await self._brave_search(
            search_query, 
            search_count
        )
        
        # Etapa 2: Processa com IA
        extracted_data = await self._groq_process(
            search_results=search_results,
            prompt_template=prompt_template,
            model=groq_model,
        )
        
        return extracted_data

    async def _retry_with_backoff(self, operation, func, *args):
        """
        Exponential backoff + jitter para resiliência
        Retry on: 5xx, 429 (rate limit), 408 (timeout)
        """
        for attempt in range(self.max_retries + 1):
            try:
                return await func(*args)
            except (TimeoutError, ClientError):
                delay = self.retry_delay * (2 ** attempt) + random.uniform(0, 1)
                await asyncio.sleep(delay)

Para contadores que precisam de dados externos (ex: "última vitória do time X"), uso Brave Search para buscar informações recentes e Groq para extrair dados estruturados.

Sistema de Rankings

Rankings em tempo real usando Redis Sorted Sets com algoritmo de scoring 60/30/10 que balanceia likes, comentários e visualizações.

Algoritmo de Scoring

async def calculate_counter_score(self, counter: Counter) -> float:
    """
    Fórmula: score = (likes × 0.6) + (comments × 0.3) + (views × 0.1)
    
    - 60% peso para likes (engajamento forte)
    - 30% peso para comentários (interação ativa)
    - 10% peso para visualizações (alcance)
    """
    stats = await self.counter_repo.db.fetchrow("""
        SELECT
            COALESCE(likes_count, 0) as likes,
            COALESCE(comments_count, 0) as comments,
            COALESCE(total_views, 0) as views
        FROM counter_stats_current
        WHERE counter_id = $1
    """, counter.id)
    
    # Normaliza views (geralmente muito maiores)
    normalized_views = stats["views"] / 10.0
    
    base_score = (
        stats["likes"] * 0.6 + 
        stats["comments"] * 0.3 + 
        normalized_views * 0.1
    )
    
    # Multipliers opcionais
    featured_multiplier = 1.5 if counter.is_featured else 1.0
    official_multiplier = 1.2 if counter.source == "official" else 1.0
    
    return round(base_score * featured_multiplier * official_multiplier, 2)

Redis Sorted Sets

# Estrutura de chaves Redis
ranking:24h   # Trending últimas 24 horas
ranking:7d    # Trending últimos 7 dias
ranking:30d   # Trending últimos 30 dias
ranking:all   # All-time trending

async def update_rankings(self, period: str = "24h") -> dict:
    """Atualiza rankings para um período específico"""
    now = datetime.now(UTC)
    
    cutoff_date = {
        "24h": now - timedelta(hours=24),
        "7d": now - timedelta(days=7),
        "30d": now - timedelta(days=30),
        "all": None
    }[period]
    
    # Busca contadores elegíveis
    counters = await self.counter_repo.list(
        visibility=CounterVisibility.PUBLIC,
        created_after=cutoff_date
    )
    
    # Atualiza Redis Sorted Set
    for counter in counters:
        score = await self.calculate_counter_score(counter)
        await self.redis.zadd(
            f"ranking:{period}",
            {str(counter.id): score}
        )
    
    # Mantém apenas top 1000
    await self.redis.zremrangebyrank(f"ranking:{period}", 0, -1001)

ZADD para inserir, ZREVRANGE para buscar top N, ZREMRANGEBYRANK para manter tamanho controlado. O(log N) para todas as operações.

Sistema de Automação

Contadores podem ser atualizados automaticamente via regras configuráveis — desde agendamentos cron até triggers baseados em APIs externas.

Scheduled

Cron expressions para atualização periódica. Usa croniter para calcular próxima execução.

🔌

API Trigger

Integração com APIs externas (API-Football, etc.) para buscar dados automaticamente.

👥

Crowdsourced

Usuários podem sugerir atualizações que passam por aprovação antes de serem aplicadas.

Sports API Provider

class SportsAPIProvider(BaseDataProvider):
    """
    Provider para API-Football
    
    Endpoints: fixtures, standings, teams/statistics, players
    
    source_config exemplo:
    {
        "endpoint": "fixtures",
        "query_params": {"team": 128, "season": 2024, "last": 1},
        "extract_path": "response[0].goals.home"
    }
    """
    
    async def fetch_data(self, source_config: dict) -> ProviderResponse:
        endpoint = source_config["endpoint"]
        query_params = source_config.get("query_params", {})
        extract_path = source_config["extract_path"]
        
        # Construir URL e headers
        url = f"{self.base_url}/{endpoint}"
        headers = {"x-apisports-key": self.api_key}
        
        async with httpx.AsyncClient(timeout=self.timeout_seconds) as client:
            response = await client.get(url, headers=headers, params=query_params)
            data = response.json()
        
        # Extrai valor usando JSONPath
        value = self._extract_value(data, extract_path)
        
        return self._create_success_response(
            value=str(value),
            confidence=95.0,  # API-Football é altamente confiável
            metadata={"raw_response": data, "endpoint": endpoint}
        )

Provider Pattern permite adicionar novas fontes de dados facilmente. Cada provider implementa fetch_data() e retorna resposta padronizada.

Observabilidade

Stack completa de observabilidade para monitorar a plataforma em produção.

Prometheus + Metrics

  • • HTTP request duration histograms
  • • Request count by status code
  • • Active WebSocket connections
  • • Redis operations latency
  • • Database query timing
  • • Endpoint: /metrics

OpenTelemetry

  • • Distributed tracing
  • • Auto-instrumentation FastAPI
  • • Redis instrumentation
  • • httpx instrumentation
  • • SQLAlchemy instrumentation
  • • Configurable sample rate

Structured Logging

  • • structlog for structured logs
  • • JSON format in production
  • • Request/response middleware
  • • Correlation IDs
  • • Error context enrichment
  • • Sentry integration

Testes Automatizados

Cobertura completa com pytest (backend) e vitest + Playwright (frontend).

Backend (pytest)

267
testes
passando
  • • Critical endpoints (health, counters, rankings)
  • • Pagination and filter validation
  • • Error handling (404, 400, 403)
  • • Rate limiting (RATE_LIMIT_ENABLED=false for tests)
  • • Admin routes and auth flows

Frontend (vitest + Playwright)

21
unit tests
E2E
Playwright
  • • Complete API client (getCounters, getTrending, etc)
  • • URL construction with parameters
  • • Error handling (APIError)
  • • Global fetch mock
  • • E2E with Playwright (headless and headed)

Background Workers

Processamento assíncrono para tarefas pesadas, mantendo a API responsiva.

Automation Scheduler

Executa regras de automação a cada 5 minutos. Calcula próximo run com croniter.

Ranking Worker

Atualiza Redis Sorted Sets com scores calculados para cada período (24h, 7d, 30d, all).

Review Worker

Processa filas de verificação de contadores, coordena com AI Verification Service.

AI Monitoring

Monitora contadores marcados para verificação contínua, atualiza quando dados mudam.

Roadmap

Fase 1 - MVP ✅
  • ✓ CRUD de contadores
  • ✓ Sistema de temas
  • ✓ Rankings (Trending)
  • ✓ OG images dinâmicas
  • ✓ Autenticação
  • ✓ Seguir + reações
Fase 2 - Social
  • • Feed personalizado
  • • Comentários e discussões
  • • Notificações push
  • • Badges e gamificação
  • • Perfis públicos robustos
Fase 3 - Empresas
  • • Perfis corporativos
  • • Multi-usuário por org
  • • Dashboards analytics
  • • SSO corporativo
  • • Temas licenciados
Fase 4 - Creators
  • • Geração de vídeos (Remotion)
  • • SDK para redes sociais
  • • API pública
  • • Widgets embeddable
  • • Marketplace de templates

Stack Completa

⚛️
Next.js 16
React 19
🐍
FastAPI
🐘
PostgreSQL
🔴
Redis
🔌
WebSocket
🤖
OpenAI/Groq
📊
Prometheus
🔍
OpenTelemetry
🎬
Remotion
🧪
Playwright
🐳
Docker

Quer ver mais projetos?

Este é um dos meus projetos pessoais. Explore também o Optimus (plataforma de IA conversacional) e outros case studies no meu portfolio.