Rede social de contadores compartilháveis — desde estatísticas esportivas virais até marcos pessoais. Plataforma full-stack com Next.js 16, FastAPI, WebSockets, e verificação por IA.
iContei é uma plataforma social dedicada a contadores de todos os tipos — desde estatísticas esportivas virais ("Há 2.847 dias o Flamengo está sem título mundial") até marcos pessoais ("Faltam 127 dias para meu casamento") e corporativos ("1.567 dias sem acidentes de trabalho").
O foco é criar contadores visualmente impactantes para compartilhamento em redes sociais (TikTok, Instagram, Twitter) com previews dinâmicos, atualização em tempo real, e comunidade engajada.
┌─────────────────────────────────────────────────────────────────────────┐
│ FRONTEND (Next.js 16) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌────────────┐ │
│ │ Home Page │ │Counter Detail│ │ Trending │ │ Profile │ │
│ │ (SSR+ISR) │ │ (SSR) │ │ Rankings │ │ Admin │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ └────────────┘ │
│ │ │
│ ┌───────────────────────────────────────────────────────────────────┐ │
│ │ React 19 + Server Components + Framer Motion + Remotion (video) │ │
│ └───────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────┬───────────────────────────┘
│ HTTP + WebSocket
┌─────────────────────────────────────────────▼───────────────────────────┐
│ BACKEND (FastAPI) │
│ ┌───────────────────────────────────────────────────────────────────┐ │
│ │ API Layer (Routes) │ │
│ │ /counters /rankings /auth /admin /realtime /automation │ │
│ └────────────────────────────┬──────────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────────▼──────────────────────────────────────┐ │
│ │ Service Layer (Domain-Driven Design) │ │
│ │ CounterService RankingService AutomationService AIVerify │ │
│ └────────────────────────────┬──────────────────────────────────────┘ │
│ │ │
│ ┌────────────────────────────▼──────────────────────────────────────┐ │
│ │ Repository Layer (Data Access) │ │
│ │ CounterRepo UserRepo ThemeRepo AutomationRuleRepo │ │
│ └───────────┬──────────────────────────────────────┬────────────────┘ │
└──────────────┼──────────────────────────────────────┼────────────────────┘
│ │
┌──────────▼──────────┐ ┌─────────────▼─────────────┐
│ PostgreSQL 16 │ │ Redis 7 │
│ (dados + sessions) │ │ (cache + rankings + │
│ AsyncPG + SQLAlch │ │ WebSocket pub/sub) │
└─────────────────────┘ └───────────────────────────┘Contadores atualizam a cada segundo no frontend. Para suportar milhares de conexões simultâneas, implementei um sistema de WebSocket com Redis pub/sub, heartbeat, e rate limiting inteligente.
class WebSocketManager:
"""
Gerencia conexões WebSocket e distribuição
de mensagens Redis -> Clientes
"""
def __init__(self):
# Mapeia counter_id -> Lista de WebSockets
self.active_connections: dict[str, list[WebSocket]] = {}
self._redis_task: asyncio.Task | None = None
self.presence_timeout = 45 # segundos
async def connect(
self,
websocket: WebSocket,
counter_id: str,
user_id: str | None = None,
ip_address: str | None = None
):
# Rate Limiting por usuário
if user_id:
user_conn_key = f"user_connections:{user_id}"
current_conns = await redis_manager.scard(user_conn_key)
if current_conns >= 5: # max 5 conexões por user
await websocket.close(code=4029) # Too Many Requests
return False
# Rate Limiting por IP (anônimos)
elif ip_address:
ip_conn_key = f"anon_connections:{ip_address}"
current_ip_conns = await redis_manager.scard(ip_conn_key)
if current_ip_conns >= 20:
await websocket.close(code=4029)
return False
await websocket.accept()
self.active_connections[counter_id].append(websocket)
# Heartbeat no Redis (Sorted Set com timestamp)
await self.heartbeat(counter_id, connection_id)
await self._broadcast_presence(counter_id)Rate limiting dual: por usuário autenticado (5 conexões) e por IP para anônimos (20 conexões, considerando NAT/CGNAT). Heartbeat via Redis Sorted Set para detectar conexões mortas.
async def start_redis_listener(self):
"""
Escuta canal Redis e distribui para WebSockets locais
Permite escalar horizontalmente com múltiplas instâncias
"""
pubsub = redis_manager.client.pubsub()
await pubsub.subscribe("counter_updates")
async for message in pubsub.listen():
if message["type"] == "message":
data = json.loads(message["data"])
counter_id = data["counter_id"]
# Broadcast apenas para conexões desta instância
if counter_id in self.active_connections:
await self._broadcast_to_counter(
counter_id,
data["payload"]
)
async def publish_update(self, counter_id: str, payload: dict):
"""
Publica atualização no Redis
Todas as instâncias do backend receberão
"""
await redis_manager.publish(
"counter_updates",
json.dumps({
"counter_id": counter_id,
"payload": payload
})
)Com Redis pub/sub, posso escalar horizontalmente o backend — cada instância mantém suas conexões WebSocket e recebe atualizações via Redis.
Cliente envia heartbeat a cada 30s. Servidor usa ZRANGEBYSCORE para listar conexões ativas (timestamp > now - 45s). Conexões stale são removidas automaticamente.
Contadores oficiais passam por verificação automatizada com IA para garantir que datas e informações estão corretas. Sistema multi-provider com fallback.
class AIVerificationService:
"""
Verifica contadores usando OpenAI ou Groq
Retorna saída padronizada para o ReviewWorker
"""
def __init__(
self,
provider: str, # "openai" ou "groq"
model: str,
openai_api_key: str | None,
groq_api_key: str | None,
):
self.provider = provider
# Endpoints compatíveis com OpenAI Chat Completions
self._openai_url = "https://api.openai.com/v1/chat/completions"
self._groq_url = "https://api.groq.com/openai/v1/chat/completions"
async def verify_counter(self, counter: Counter) -> dict | None:
"""
Retorna:
{
"approved": bool,
"confidence": float (0-100),
"reasoning": str,
"model": str
}
"""
# Seleção de provider com fallback
if self.provider == "openai" and self.openai_api_key:
chosen = "openai"
elif self.groq_api_key:
chosen = "groq" # fallback
else:
return None # Manual review
system_msg = (
"Você é um verificador que decide se um contador "
"é válido. Responda APENAS JSON..."
)
user_msg = self._build_user_prompt(counter)
# ... chamada à API e parse do resultadoclass GroqClient:
"""
Pipeline de 2 estágios para extração de dados:
1. Brave Search - Busca web
2. Groq AI - Extrai dados estruturados em JSON
"""
async def search_and_extract(
self,
search_query: str,
prompt_template: str,
search_count: int = 5,
groq_model: str = "llama-3.3-70b-versatile",
) -> dict | None:
# Etapa 1: Busca na web
search_results = await self._brave_search(
search_query,
search_count
)
# Etapa 2: Processa com IA
extracted_data = await self._groq_process(
search_results=search_results,
prompt_template=prompt_template,
model=groq_model,
)
return extracted_data
async def _retry_with_backoff(self, operation, func, *args):
"""
Exponential backoff + jitter para resiliência
Retry on: 5xx, 429 (rate limit), 408 (timeout)
"""
for attempt in range(self.max_retries + 1):
try:
return await func(*args)
except (TimeoutError, ClientError):
delay = self.retry_delay * (2 ** attempt) + random.uniform(0, 1)
await asyncio.sleep(delay)Para contadores que precisam de dados externos (ex: "última vitória do time X"), uso Brave Search para buscar informações recentes e Groq para extrair dados estruturados.
Rankings em tempo real usando Redis Sorted Sets com algoritmo de scoring 60/30/10 que balanceia likes, comentários e visualizações.
async def calculate_counter_score(self, counter: Counter) -> float:
"""
Fórmula: score = (likes × 0.6) + (comments × 0.3) + (views × 0.1)
- 60% peso para likes (engajamento forte)
- 30% peso para comentários (interação ativa)
- 10% peso para visualizações (alcance)
"""
stats = await self.counter_repo.db.fetchrow("""
SELECT
COALESCE(likes_count, 0) as likes,
COALESCE(comments_count, 0) as comments,
COALESCE(total_views, 0) as views
FROM counter_stats_current
WHERE counter_id = $1
""", counter.id)
# Normaliza views (geralmente muito maiores)
normalized_views = stats["views"] / 10.0
base_score = (
stats["likes"] * 0.6 +
stats["comments"] * 0.3 +
normalized_views * 0.1
)
# Multipliers opcionais
featured_multiplier = 1.5 if counter.is_featured else 1.0
official_multiplier = 1.2 if counter.source == "official" else 1.0
return round(base_score * featured_multiplier * official_multiplier, 2)# Estrutura de chaves Redis
ranking:24h # Trending últimas 24 horas
ranking:7d # Trending últimos 7 dias
ranking:30d # Trending últimos 30 dias
ranking:all # All-time trending
async def update_rankings(self, period: str = "24h") -> dict:
"""Atualiza rankings para um período específico"""
now = datetime.now(UTC)
cutoff_date = {
"24h": now - timedelta(hours=24),
"7d": now - timedelta(days=7),
"30d": now - timedelta(days=30),
"all": None
}[period]
# Busca contadores elegíveis
counters = await self.counter_repo.list(
visibility=CounterVisibility.PUBLIC,
created_after=cutoff_date
)
# Atualiza Redis Sorted Set
for counter in counters:
score = await self.calculate_counter_score(counter)
await self.redis.zadd(
f"ranking:{period}",
{str(counter.id): score}
)
# Mantém apenas top 1000
await self.redis.zremrangebyrank(f"ranking:{period}", 0, -1001)ZADD para inserir, ZREVRANGE para buscar top N, ZREMRANGEBYRANK para manter tamanho controlado. O(log N) para todas as operações.
Contadores podem ser atualizados automaticamente via regras configuráveis — desde agendamentos cron até triggers baseados em APIs externas.
Cron expressions para atualização periódica. Usa croniter para calcular próxima execução.
Integração com APIs externas (API-Football, etc.) para buscar dados automaticamente.
Usuários podem sugerir atualizações que passam por aprovação antes de serem aplicadas.
class SportsAPIProvider(BaseDataProvider):
"""
Provider para API-Football
Endpoints: fixtures, standings, teams/statistics, players
source_config exemplo:
{
"endpoint": "fixtures",
"query_params": {"team": 128, "season": 2024, "last": 1},
"extract_path": "response[0].goals.home"
}
"""
async def fetch_data(self, source_config: dict) -> ProviderResponse:
endpoint = source_config["endpoint"]
query_params = source_config.get("query_params", {})
extract_path = source_config["extract_path"]
# Construir URL e headers
url = f"{self.base_url}/{endpoint}"
headers = {"x-apisports-key": self.api_key}
async with httpx.AsyncClient(timeout=self.timeout_seconds) as client:
response = await client.get(url, headers=headers, params=query_params)
data = response.json()
# Extrai valor usando JSONPath
value = self._extract_value(data, extract_path)
return self._create_success_response(
value=str(value),
confidence=95.0, # API-Football é altamente confiável
metadata={"raw_response": data, "endpoint": endpoint}
)Provider Pattern permite adicionar novas fontes de dados facilmente. Cada provider implementa fetch_data() e retorna resposta padronizada.
Stack completa de observabilidade para monitorar a plataforma em produção.
/metricsCobertura completa com pytest (backend) e vitest + Playwright (frontend).
Processamento assíncrono para tarefas pesadas, mantendo a API responsiva.
Executa regras de automação a cada 5 minutos. Calcula próximo run com croniter.
Atualiza Redis Sorted Sets com scores calculados para cada período (24h, 7d, 30d, all).
Processa filas de verificação de contadores, coordena com AI Verification Service.
Monitora contadores marcados para verificação contínua, atualiza quando dados mudam.
Este é um dos meus projetos pessoais. Explore também o Optimus (plataforma de IA conversacional) e outros case studies no meu portfolio.