I want to be upfront about something: when I started designing this notification system, I had a rough idea but several fundamental misconceptions. What follows is the actual process of working through those — the initial proposal, where it was wrong, why it was wrong, and how the architecture evolved through consequence-driven questioning.
This isn't a polished "here is the answer" post. It's a design debate, written in the order the understanding arrived.
The Initial Proposal
My starting point was something like this:
"There's going to be a service which will interact with Redis as a cache to pull the notification. When an event occurs, we store it in Redis. Instead of long polling, we keep an HTTP/2 streaming connection open. 100 clients can connect to one TCP connection over the internet — that's why I chose HTTP/2 with SSE over WebSockets."
I had three reasons for avoiding the obvious alternatives:
Against polling: With 50,000 users and a 3-second poll interval, you get 20 requests per user per minute. That's 1,000,000 requests per minute against the database. Even with connection pooling (typically 100–500 connections), you'd be queueing requests aggressively. The pool doesn't eliminate the problem — it serialises it, adding latency and backpressure.
Against WebSockets: WebSocket is a stateful connection. Behind a load balancer, you need sticky sessions or a session registry. Solvable, but operationally complex.
For SSE + Redis: Store events in Redis, stream via HTTP/2 SSE. Clean, simple, push-based.
That reasoning wasn't entirely wrong. But there were three significant mistakes buried in it.
Mistake 1: What "100 Clients on One TCP Connection" Actually Means
I had read that HTTP/2 allows 100 clients to share one TCP connection. This is wrong — or more precisely, it's a misunderstanding of what HTTP/2 multiplexes.
HTTP/2 multiplexes multiple streams inside one TCP connection from a single client:
Client Server
│ │
│══ TCP Connection ════════│
│ ┌─ Stream 1 (GET /api) ─┤
│ ├─ Stream 2 (GET /img) ─┤ ← all from the SAME client
│ └─ Stream 3 (SSE /notif)┤ inside ONE TCP connection
│ │Different clients do not share a TCP connection. 50,000 users = 50,000 TCP connections on your SSE server. HTTP/2 doesn't merge different clients.
What HTTP/2 actually helps with for SSE: In HTTP/1.1, browsers had a 6-connection-per-domain cap — one SSE connection would permanently eat one of those 6 slots. In HTTP/2, SSE is just one stream inside the single multiplexed connection, so it doesn't block your other API calls.
Impact on the architecture: The server still holds 50,000 open connections for 50,000 users. The per-client overhead is lower than WebSockets, but you're not magically reducing server-side connection count.
Mistake 2: Redis as the Source of Truth
Storing notification events only in Redis means:
- Data loss if Redis evicts or restarts (even with AOF persistence, replay isn't instant)
- No audit trail — notifications become ephemeral
- Missed notifications — if a user is offline when the event fires, the message is gone
The correct mental model: Redis is your delivery cache, not your source of truth. Write events to a durable store first (primary DB or a message queue), then push to Redis for live delivery.
Mistake 3: The Hot Path Problem Nobody Mentioned
When an event fires → writes to Redis → SSE streams it: who is polling Redis on the server side to detect new events?
If you're doing GET key polling on Redis from your SSE server, you've just moved the polling problem one layer deeper. You need a pub/sub mechanism:
- Your SSE server subscribes to a Redis channel per user
- Event producer publishes → Redis notifies subscribed SSE worker → streams to client
This is the piece that changes everything.
The Revised Architecture
After correcting those three mistakes, the architecture looks like this:
Event Source (any microservice)
│
▼
Message Broker (RabbitMQ)
│
▼
Notification Worker (Celery)
├── Persists to DB (audit log, offline inbox)
└── Publishes to Redis Stream (key: notif:user:{id})
│
▼
SSE Server (FastAPI + async)
└── Each client connection reads from its own Redis Stream
└── Streams events over HTTP/2 SSE to browserWhy Redis Over the Database for SSE Delivery
This is the most important conceptual shift. Here's what happens at the code level if you use the DB inside your SSE generator:
# ❌ DB-polling inside SSE — moves the problem, doesn't solve it
async def event_generator():
while True:
result = await db.execute(
"SELECT * FROM notifications WHERE user_id=:id AND seen=false",
{"id": current_user["user_id"]}
)
rows = result.fetchall()
if rows:
yield f"data: {rows}\n\n"
await asyncio.sleep(1)With 50,000 users connected:
- 50,000 SQL queries per second hit your DB
- Pool has ~200 connections → 49,800 queries queuing
- DB CPU spikes just serving "is there anything new?" — mostly returning empty results
With Redis Streams:
# ✅ Redis Streams — pure push, zero idle queries
async def event_generator():
while True:
messages = await redis_client.xread(
{stream_key: cursor},
block=5000 # sleeps here with zero CPU until data arrives
)
if messages:
yield f"data: ...\n\n"With 50,000 users connected: 50,000 coroutines all sleeping inside xread block=5000. Zero queries to DB. Zero CPU consumed while idle. Redis only wakes a coroutine when that specific user has a new notification.
The fundamental difference:
| Who queries? | DB at idle | Latency | |
|---|---|---|---|
| Long polling | Client, every 3s | High | 0–3s |
| SSE + DB poll | Server internally | High | 0–1s |
| SSE + Redis Streams | Nobody | Zero | Milliseconds |
Redis Pub/Sub vs Redis Streams
Both seem like they solve the problem. The critical question is: what happens when a user is offline?
Pub/Sub fails here:
- User offline → event fires → published to Redis → nobody listening → message gone forever
- User reconnects → nothing to receive
Streams handle it:
- User offline → event fires → written to Stream (persists)
- User reconnects → reads from last seen message ID → catches up on everything missed
The pattern with Streams:
# Writer (Celery worker)
await redis_client.xadd(f"notif:user:{user_id}", payload)
# Reader (SSE server) — blocks until new message, replays on reconnect
messages = await redis_client.xread(
{f"notif:user:{user_id}": last_seen_id},
block=5000 # sleeps zero CPU until data arrives
)XREAD BLOCK 5000 means the SSE server sleeps with zero CPU until a new message arrives — same push behavior as Pub/Sub, but with persistence. When a user reconnects, they pass their last_seen_id and catch up on everything missed.
Decision: Use Redis Streams.
A Real Scenario: PM Rejects 20 Timesheets in Bulk
Let's trace a concrete scenario through the full architecture. A Project Manager bulk-rejects 20 employee timesheets. All 20 employees should be notified about their specific rejection.
Key design decision: One RabbitMQ message with a list of 20 IDs — not 20 separate messages. The fan-out happens inside the Celery worker, not at the API layer.
Backend: User Service — Bulk Reject Endpoint
# user_service/routers/timesheet.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel
from typing import List
import aio_pika, json
router = APIRouter()
class BulkRejectRequest(BaseModel):
timesheet_ids: List[int]
rejection_reason: str
@router.post("/timesheets/bulk-reject")
async def bulk_reject_timesheets(
payload: BulkRejectRequest,
db: AsyncSession = Depends(get_db),
current_user: dict = Depends(require_role("Project Manager"))
):
# 1. Single DB transaction for all 20
async with db.begin():
result = await db.execute(
"""
UPDATE timesheets
SET status = 'REJECTED',
rejection_reason = :reason,
rejected_by = :pm_id,
rejected_at = NOW()
WHERE id = ANY(:ids) AND status = 'PENDING'
RETURNING id, employee_id
""",
{
"reason": payload.rejection_reason,
"pm_id": current_user["user_id"],
"ids": payload.timesheet_ids
}
)
updated = result.fetchall()
if not updated:
raise HTTPException(400, "No timesheets updated")
# 2. ONE message to RabbitMQ — not 20
affected = [
{"timesheet_id": row.id, "employee_id": row.employee_id}
for row in updated
]
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
await channel.default_exchange.publish(
aio_pika.Message(
body=json.dumps({
"event": "TIMESHEET_BULK_REJECTED",
"pm_id": current_user["user_id"],
"pm_name": current_user["employee_name"],
"reason": payload.rejection_reason,
"affected_employees": affected
}).encode()
),
routing_key="notifications"
)
return {"rejected_count": len(updated)}Backend: Celery Worker — Fan-out to Redis Streams
# notification_service/tasks.py
from celery import Celery
import redis.asyncio as redis
import json, asyncio
celery = Celery("notifications", broker="amqp://guest:guest@localhost/")
redis_client = redis.Redis(host="localhost", port=6379)
@celery.task(acks_late=True) # acknowledge AFTER processing, not before
def handle_bulk_rejection(message_body: str):
data = json.loads(message_body)
asyncio.run(_fan_out(data))
async def _fan_out(data: dict):
affected = data["affected_employees"]
# Fan out to each employee's personal Redis stream
pipe = redis_client.pipeline()
for emp in affected:
stream_key = f"notif:user:{emp['employee_id']}"
pipe.xadd(
stream_key,
{
"type": "TIMESHEET_REJECTED",
"timesheet_id": emp["timesheet_id"],
"reason": data["reason"],
"rejected_by": data["pm_name"],
"message": f"Your timesheet was rejected by {data['pm_name']}: {data['reason']}"
},
maxlen=100 # keep last 100 notifications per user
)
await pipe.execute() # all 20 writes in ONE Redis round-tripWhy pipeline matters: Without it, you make 20 separate network round-trips to Redis. With it, all 20 XADD commands are batched and sent in one shot. At 20 employees it's minor — at 2,000 bulk rejections it becomes critical.
Backend: SSE Endpoint
# notification_service/routers/sse.py
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
import redis.asyncio as redis
import json, asyncio
router = APIRouter()
redis_client = redis.Redis(host="localhost", port=6379)
@router.get("/notifications/stream")
async def notification_stream(
current_user: dict = Depends(get_current_user),
last_id: str = "0" # client sends last seen ID on reconnect
):
async def event_generator():
stream_key = f"notif:user:{current_user['user_id']}"
cursor = last_id
await redis_client.sadd("online_users", current_user["user_id"])
try:
while True:
messages = await redis_client.xread(
{stream_key: cursor},
block=5000, # sleep up to 5s, wake immediately if data arrives
count=10 # deliver max 10 at a time (burst control)
)
if not messages:
# Heartbeat — detects dead connections within 5 seconds
yield ": heartbeat\n\n"
continue
for stream, events in messages:
for msg_id, fields in events:
cursor = msg_id # advance cursor
yield f"id: {msg_id.decode()}\n"
yield f"data: {json.dumps({k.decode(): v.decode() for k, v in fields.items()})}\n\n"
except asyncio.CancelledError:
pass # client disconnected cleanly
finally:
# Always runs — keeps online count accurate
await redis_client.srem("online_users", current_user["user_id"])
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no" # critical — tells nginx not to buffer SSE
}
)Frontend: React Hook + Notification Bell
// hooks/useNotifications.js
import { useEffect, useRef, useCallback } from "react";
import { useNotificationStore } from "../store/notificationStore";
export function useNotifications() {
const eventSourceRef = useRef(null);
const lastIdRef = useRef(localStorage.getItem("notif_last_id") || "0");
const addNotification = useNotificationStore(s => s.addNotification);
const connect = useCallback(() => {
const url = `/api/notifications/stream?last_id=${lastIdRef.current}`;
const es = new EventSource(url);
es.onmessage = (event) => {
const data = JSON.parse(event.data);
// Persist last seen ID — catches up on missed notifications after reconnect
if (event.lastEventId) {
lastIdRef.current = event.lastEventId;
localStorage.setItem("notif_last_id", event.lastEventId);
}
addNotification(data);
if (data.type === "TIMESHEET_REJECTED") {
showToast(`Timesheet rejected: ${data.reason}`, "error");
}
};
es.onerror = () => {
es.close();
// Auto-reconnect — passes last_id so no notifications missed
setTimeout(connect, 3000);
};
eventSourceRef.current = es;
}, [addNotification]);
useEffect(() => {
connect();
return () => eventSourceRef.current?.close();
}, [connect]);
}// store/notificationStore.js (Zustand)
import { create } from "zustand";
export const useNotificationStore = create((set) => ({
notifications: [],
unreadCount: 0,
addNotification: (notif) => set(state => ({
notifications: [notif, ...state.notifications].slice(0, 20),
unreadCount: state.unreadCount + 1
})),
markAllRead: () => set({ unreadCount: 0 })
}));Why Not Do the Redis Writes Directly in the User Service?
This is a valid question. Technically it works. But here's what happens:
PM hits API
├── UPDATE 20 rows in DB ✅
├── Publish to RabbitMQ ✅
├── XADD to 20 Redis streams ← doing this too now
└── return response to PM
If Redis is slow or down:
→ PM waits
→ API timeout
→ PM sees error
→ But DB was already updated ← inconsistency
| Concern | Inline in User Service | Via Celery Worker |
|---|---|---|
| API response time | Slower (waits for Redis) | Fast (fire and forget) |
| Redis failure | PM gets 500 error | Worker retries silently |
| Retry logic | Complex to add | Built into Celery |
| Single responsibility | User Service doing notification work | Each service owns its domain |
The golden rule: Your API should do the minimum to persist state, then return. Side effects like notifications, emails, webhooks — delegate to workers.
Connection Lifecycle: The while True and block=5000 Relationship
A question worth spending time on: does StreamingResponse always need a while True?
No. StreamingResponse just needs an async generator — it streams whatever the generator yields, then closes when the generator ends. The generator controls the lifetime.
# Finite stream — sends 5 things and closes
async def event_generator():
for i in range(5):
yield f"data: message {i}\n\n"
await asyncio.sleep(1)
# Connection closes after 5 messagesNotification streams need while True because a notification can arrive 5 seconds from now or 5 hours from now. The connection must stay open indefinitely.
How block=5000 Actually Makes the Coroutine Sleep
There are three separate blocking mechanisms chained together — at different layers:
Python coroutine hits: await redis_client.xread(block=5000)
│
▼
Python event loop parks the coroutine (no CPU consumed)
Monitors TCP socket via OS (epoll/kqueue)
Runs other coroutines freely
│
▼
Redis server receives XREAD BLOCK 5000
Internally sets a 5000ms server-side timer
Does NOT respond immediately — waits for:
├── New message arrives → responds immediately (could be T=1s)
└── 5000ms expires → responds with empty reply
│
▼
TCP socket receives Redis response
OS signals event loop: "data available"
Event loop wakes the coroutine
await returns → coroutine resumesblock=5000 tells Redis to hold the command for up to 5 seconds. await tells Python to park the coroutine until Redis replies. Neither blocks the thread — the event loop is free the entire time serving other coroutines.
This is why 50,000 coroutines work: they're all just parked, waiting for their own Redis reply. CPU usage while idle ≈ 0.
block=0 vs block=5000 — Ghost Connections
block=0 means Redis responds immediately even if empty. Your while True becomes a CPU-burning spin loop:
while True:
messages = await redis_client.xread({...}, block=0) # returns instantly with nothing
# immediately loops back — 50K coroutines spinning = server meltsblock=5000 transforms a CPU-burning spin into an efficient sleeping listener.
But there's a more important reason for block=5000: ghost connections.
When a user loses internet without a clean TCP FIN (phone loses signal, cable pulled), FastAPI has no idea the client is gone. The coroutine stays alive.
With block=5000:
T=0s User loses internet (no TCP FIN sent)
T=0s Coroutine sleeping in xread...
T=5s xread returns None (5s elapsed)
T=5s Coroutine tries: yield ": heartbeat\n\n"
OS detects broken pipe
BrokenPipeError raised
T=5s Coroutine dies → memory freedMaximum ghost connection lifetime = 5 seconds. Without it: ghost connections for inactive users accumulate indefinitely — eviction only happens when the next notification arrives for that user, which could be hours.
Second-by-Second Walkthrough
Normal flow (user online):
T=0s User opens browser → SSE connects → coroutine starts
T=0s xread starts blocking (5s timer starts in Redis)
T=3s PM rejects timesheet → Celery writes to Redis stream
T=3s xread immediately wakes up (timer irrelevant, data arrived)
T=3s notification yielded → browser receives it
T=3s xread blocks again (new 5s timer starts)
T=8s nothing arrived → xread returns None after 5s
T=8s heartbeat yields ": heartbeat\n\n" → browser ignores it
T=8s xread blocks again (new 5s timer starts)Dirty disconnect (internet drops, no TCP FIN):
T=0s Coroutine blocking in xread
T=2s User loses internet — no TCP FIN sent, FastAPI has NO idea
T=5s xread returns None (5s elapsed)
T=5s Coroutine tries to yield heartbeat to dead socket
OS raises BrokenPipeError
T=5s Coroutine dies → memory freed → online count decrementedEach User Gets Their Own Coroutine and Stream
A question that comes up: since the SSE endpoint sends a StreamingResponse, doesn't the backend send all notifications to all clients?
No. FastAPI spins up a separate coroutine for every single connection. They don't share state.
Employee 101 connects → Coroutine 1: xread("notif:user:101", block=5000)
Employee 102 connects → Coroutine 2: xread("notif:user:102", block=5000)
Employee 103 connects → Coroutine 3: xread("notif:user:103", block=5000)
When Celery writes to notif:user:101 — only Coroutine 1 wakes up. Everyone else stays asleep. The stream key is the isolation mechanism. Each coroutine watches only its user's private stream.
100 use cases does not mean 100 stream keys per user. All use cases write to the same stream key with a type field:
# All event types go into the SAME stream — type differentiates them
await redis_client.xadd(
f"notif:user:{user_id}", # always same key
{
"type": "TIMESHEET_REJECTED", # differentiator
"payload": json.dumps(data)
}
)The frontend routes by type:
es.onmessage = (event) => {
const data = JSON.parse(event.data);
switch(data.type) {
case "TIMESHEET_REJECTED": showTimesheetAlert(data); break;
case "TASK_ASSIGNED": updateTaskBoard(data); break;
case "LEAVE_APPROVED": showLeaveStatus(data); break;
}
}Total coroutines = Total concurrent online users. Not users × use cases.
Horizontal Scaling
SSE doesn't have the sticky session problem that WebSockets has. Here's why:
Load Balancer
│
┌──┴──┐
Server A Server B
Employee 101 connected to Server A
└── Coroutine on Server A: xread("notif:user:101", block=5000)
Celery writes to notif:user:101
└── Redis wakes Server A's coroutine
└── Server B has no coroutine for user 101 — irrelevant
Celery doesn't care which server the user is on — it just writes to Redis. Redis wakes whichever server is blocking on that key. No coordination between servers needed.
The one real problem — Redis connections at scale:
Server A: 25K users → 25K xread connections to Redis
Server B: 25K users → 25K xread connections to Redis
Total: 50K concurrent Redis connections
Redis default max connections is 10,000. Fix with connection pooling:
redis_client = redis.Redis(
host="localhost",
port=6379,
max_connections=1000 # pool shared across all coroutines on this server
)And horizontally scale Redis itself via Redis Cluster — streams are sharded across nodes by key hash. Clients hash to the correct node automatically.
The Burst + Server Oscillation Problem
What if a user has 50 notifications arrive within 1 second, while also oscillating between servers due to a flaky connection?
The count parameter handles the burst:
messages = await redis_client.xread(
{stream_key: cursor},
block=5000,
count=10 # deliver max 10 at a time
)User was offline and 80 notifications piled up:
xread from cursor=0 → gets 1–10, cursor advances to 10
xread from cursor=10 → gets 11–20, cursor advances to 20
...
xread from cursor=70 → gets 71–80, cursor advances to 80
xread from cursor=80 → nothing pending, BLOCKS (sleeps)
count=10 acts as a flow control valve — burst delivery in controlled batches, no memory spike.
The server oscillation problem:
When a user reconnects and lands on a different server, you temporarily have two coroutines watching the same stream. Both will read independently from Redis Streams (unlike a queue, xread is non-destructive). This can cause duplicate delivery during the ghost connection window.
Fix: cursor ownership via Redis:
async def event_generator(user_id, last_id):
owner_key = f"notif:owner:{user_id}"
my_id = f"server-{socket.gethostname()}-{uuid.uuid4()}"
# Atomically claim ownership
await redis_client.set(owner_key, my_id, ex=10)
cursor = last_id
try:
while True:
# Verify I still own this stream
current_owner = await redis_client.get(owner_key)
if current_owner.decode() != my_id:
return # I am the ghost — die cleanly
messages = await redis_client.xread(
{f"notif:user:{user_id}": cursor},
block=5000,
count=10
)
if not messages:
# Refresh ownership TTL on heartbeat
await redis_client.set(owner_key, my_id, ex=10)
yield ": heartbeat\n\n"
continue
for stream, events in messages:
for msg_id, fields in events:
cursor = msg_id
yield f"id: {msg_id.decode()}\n"
yield f"data: {json.dumps(...)}\n\n"
finally:
current_owner = await redis_client.get(owner_key)
if current_owner and current_owner.decode() == my_id:
await redis_client.delete(owner_key)With ownership, ghost eviction drops from 5 seconds (heartbeat cycle) to less than 1 second (next ownership check).
When to Write to the Database
Two separate things need DB writes — at different times for different reasons:
Business event → write synchronously, before anything else:
@router.post("/timesheets/bulk-reject")
async def bulk_reject(payload: BulkRejectRequest):
# 1. DB write FIRST — source of truth
async with db.begin():
await db.execute("UPDATE timesheets SET status='REJECTED'...")
# 2. ONLY after DB commit → publish to RabbitMQ
await rabbitmq.publish("timesheet.rejected", payload)
# 3. Return immediately — don't wait for notification delivery
return {"rejected_count": 20}Notification record → write asynchronously, in the same Celery task as the Redis write:
async def _fan_out(data: dict):
affected = data["affected_employees"]
# 1. Redis writes — live delivery (milliseconds)
pipe = redis_client.pipeline()
for emp in affected:
pipe.xadd(f"notif:user:{emp['employee_id']}", {...}, maxlen=100)
await pipe.execute()
# 2. DB writes — audit trail, history, read/unread (simultaneous, not triggered)
await db.execute_many(
"INSERT INTO notification_log (user_id, type, payload, created_at, read_at) VALUES (:user_id, :type, :payload, NOW(), NULL)",
[{"user_id": emp["employee_id"], "type": "TIMESHEET_REJECTED", ...} for emp in affected]
)Never Redis without DB. Always both, always together, in the same worker task.
The full write timeline:
T=0ms PM clicks bulk reject
T=1ms API → DB UPDATE timesheets (synchronous, blocks)
T=5ms DB commits → API publishes to RabbitMQ
T=6ms API returns 200 to PM
T=10ms Celery picks up RabbitMQ message
T=11ms Celery → pipeline XADD to 20 Redis Streams
T=12ms 20 SSE coroutines wake up → browser notifications fire
T=15ms Celery → batch INSERT into notification_log table
Redis delivers fast. DB stores forever. They serve different purposes.
Implementing Unread Count Efficiently
The naive approach:
SELECT COUNT(*) FROM notification_log
WHERE user_id = 101 AND read_at IS NULLAt 50,000 users this runs constantly. Instead, maintain the count in Redis:
# Notification created → increment
await redis_client.incr(f"notif:unread:{user_id}")
# User marks all read → reset
await redis_client.set(f"notif:unread:{user_id}", 0)
# Bell icon loads → O(1) read
count = await redis_client.get(f"notif:unread:{user_id}")Mark as read endpoints:
@router.patch("/notifications/{notif_id}/read")
async def mark_read(notif_id: str, current_user: dict = Depends(get_current_user)):
await db.execute(
"UPDATE notification_log SET read_at = NOW() WHERE id = :id AND user_id = :uid",
{"id": notif_id, "uid": current_user["user_id"]}
)
await redis_client.decr(f"notif:unread:{current_user['user_id']}")
@router.patch("/notifications/read-all")
async def mark_all_read(current_user: dict = Depends(get_current_user)):
await db.execute(
"UPDATE notification_log SET read_at = NOW() WHERE user_id = :id AND read_at IS NULL",
{"id": current_user["user_id"]}
)
await redis_client.set(f"notif:unread:{current_user['user_id']}", 0)Notification History: Cursor-Based Pagination
The naive pagination approach is wrong at scale:
-- ❌ OFFSET pagination — scans and discards N rows every time
SELECT * FROM notification_log
WHERE user_id = 101
ORDER BY created_at DESC
LIMIT 20 OFFSET 10000 -- scans 10,020 rows, throws away 10,000Use cursor-based pagination instead:
-- ✅ First page
SELECT * FROM notification_log
WHERE user_id = 101
ORDER BY created_at DESC
LIMIT 20;
-- ✅ Next page — pass last received created_at as cursor
SELECT * FROM notification_log
WHERE user_id = 101
AND created_at < '2024-01-15 10:30:00' -- last seen timestamp
ORDER BY created_at DESC
LIMIT 20;The database jumps directly to that timestamp via index. Performance is identical on page 1 or page 500.
The correct index:
-- Composite index serves both the WHERE and ORDER BY in one scan
CREATE INDEX idx_notif_user_time
ON notification_log(user_id, created_at DESC);What Redis and DB Each Own
By the end of this design, the responsibilities are cleanly separated:
| Layer | Owns | Purpose |
|---|---|---|
| DB (timesheets table) | Business events | Source of truth for what happened |
| DB (notification_log) | Notification history | Audit trail, read/unread state, history page |
| Redis Stream | Live delivery | SSE push to online users, recent catchup (maxlen=100) |
| Redis key (unread count) | Bell badge number | O(1) read, updated on create/read |
| Browser memory (Zustand) | Current session display | What appeared this session, capped at 20 |
On page load:
useEffect(() => {
// 1. Load recent from DB immediately — populates bell icon
const recent = await fetch("/notifications/recent?limit=5")
store.setInitial(recent)
// 2. Simultaneously reconnect SSE — streams anything new
connect()
}, [])DB handles the past. SSE handles the future. Redis Streams handles the catch-up window. Zustand handles the current session display. They are not substitutes for each other.
Architecture Summary
Any Service (Timesheet, Task, Leave, etc.)
│ publishes event with key=user_id
▼
RabbitMQ (durable queue — survives worker restarts)
│ acks_late=True — message deleted only after processing
▼
Celery Workers (fan-out layer)
├── pipeline XADD → Redis Streams (notif:user:{id}) [live delivery]
└── batch INSERT → notification_log DB [persistence]
│
▼
Redis Streams (notif:user:{id}, maxlen=100)
│ one stream per user, all event types mixed in
▼
SSE Server (one coroutine per connected user)
│ xread BLOCK 5000, count=10
│ heartbeat every 5s → ghost eviction
│ ownership key → deduplication on reconnect
▼
Browser (EventSource)
│ stores last_id in localStorage
│ auto-reconnects with last_id → no gaps
▼
Zustand Store (display layer, capped at 20)
This architecture started with a rough idea and three fundamental mistakes. The final design emerged from asking one question at each stage: what are the consequences of this decision? That's the only way I know how to do system design.
Next in this series: scaling the write path with Kafka when RabbitMQ throughput becomes the bottleneck — producers, consumers, partitions, and consumer groups.