Back to Blog

Designing a Notification System: From Naive Proposal to Production Architecture

A consequence-driven design walkthrough — starting from a half-baked proposal and evolving through debate, edge cases, and code into a production-grade notification system using SSE, Redis Streams, and Celery.

2025-05-0121 min read
system-designredisfastapissedistributed-systemspython

I want to be upfront about something: when I started designing this notification system, I had a rough idea but several fundamental misconceptions. What follows is the actual process of working through those — the initial proposal, where it was wrong, why it was wrong, and how the architecture evolved through consequence-driven questioning.

This isn't a polished "here is the answer" post. It's a design debate, written in the order the understanding arrived.


The Initial Proposal

My starting point was something like this:

"There's going to be a service which will interact with Redis as a cache to pull the notification. When an event occurs, we store it in Redis. Instead of long polling, we keep an HTTP/2 streaming connection open. 100 clients can connect to one TCP connection over the internet — that's why I chose HTTP/2 with SSE over WebSockets."

I had three reasons for avoiding the obvious alternatives:

Against polling: With 50,000 users and a 3-second poll interval, you get 20 requests per user per minute. That's 1,000,000 requests per minute against the database. Even with connection pooling (typically 100–500 connections), you'd be queueing requests aggressively. The pool doesn't eliminate the problem — it serialises it, adding latency and backpressure.

Against WebSockets: WebSocket is a stateful connection. Behind a load balancer, you need sticky sessions or a session registry. Solvable, but operationally complex.

For SSE + Redis: Store events in Redis, stream via HTTP/2 SSE. Clean, simple, push-based.

That reasoning wasn't entirely wrong. But there were three significant mistakes buried in it.


Mistake 1: What "100 Clients on One TCP Connection" Actually Means

I had read that HTTP/2 allows 100 clients to share one TCP connection. This is wrong — or more precisely, it's a misunderstanding of what HTTP/2 multiplexes.

HTTP/2 multiplexes multiple streams inside one TCP connection from a single client:

Client                    Server
  │                          │
  │══ TCP Connection ════════│
  │  ┌─ Stream 1 (GET /api) ─┤
  │  ├─ Stream 2 (GET /img) ─┤  ← all from the SAME client
  │  └─ Stream 3 (SSE /notif)┤     inside ONE TCP connection
  │                          │

Different clients do not share a TCP connection. 50,000 users = 50,000 TCP connections on your SSE server. HTTP/2 doesn't merge different clients.

What HTTP/2 actually helps with for SSE: In HTTP/1.1, browsers had a 6-connection-per-domain cap — one SSE connection would permanently eat one of those 6 slots. In HTTP/2, SSE is just one stream inside the single multiplexed connection, so it doesn't block your other API calls.

Impact on the architecture: The server still holds 50,000 open connections for 50,000 users. The per-client overhead is lower than WebSockets, but you're not magically reducing server-side connection count.


Mistake 2: Redis as the Source of Truth

Storing notification events only in Redis means:

  • Data loss if Redis evicts or restarts (even with AOF persistence, replay isn't instant)
  • No audit trail — notifications become ephemeral
  • Missed notifications — if a user is offline when the event fires, the message is gone

The correct mental model: Redis is your delivery cache, not your source of truth. Write events to a durable store first (primary DB or a message queue), then push to Redis for live delivery.


Mistake 3: The Hot Path Problem Nobody Mentioned

When an event fires → writes to Redis → SSE streams it: who is polling Redis on the server side to detect new events?

If you're doing GET key polling on Redis from your SSE server, you've just moved the polling problem one layer deeper. You need a pub/sub mechanism:

  • Your SSE server subscribes to a Redis channel per user
  • Event producer publishes → Redis notifies subscribed SSE worker → streams to client

This is the piece that changes everything.


The Revised Architecture

After correcting those three mistakes, the architecture looks like this:

Event Source (any microservice)


Message Broker (RabbitMQ)


Notification Worker (Celery)
    ├── Persists to DB (audit log, offline inbox)
    └── Publishes to Redis Stream (key: notif:user:{id})


        SSE Server (FastAPI + async)
        └── Each client connection reads from its own Redis Stream
            └── Streams events over HTTP/2 SSE to browser

Why Redis Over the Database for SSE Delivery

This is the most important conceptual shift. Here's what happens at the code level if you use the DB inside your SSE generator:

# ❌ DB-polling inside SSE — moves the problem, doesn't solve it
async def event_generator():
    while True:
        result = await db.execute(
            "SELECT * FROM notifications WHERE user_id=:id AND seen=false",
            {"id": current_user["user_id"]}
        )
        rows = result.fetchall()
        if rows:
            yield f"data: {rows}\n\n"
        await asyncio.sleep(1)

With 50,000 users connected:

  • 50,000 SQL queries per second hit your DB
  • Pool has ~200 connections → 49,800 queries queuing
  • DB CPU spikes just serving "is there anything new?" — mostly returning empty results

With Redis Streams:

# ✅ Redis Streams — pure push, zero idle queries
async def event_generator():
    while True:
        messages = await redis_client.xread(
            {stream_key: cursor},
            block=5000  # sleeps here with zero CPU until data arrives
        )
        if messages:
            yield f"data: ...\n\n"

With 50,000 users connected: 50,000 coroutines all sleeping inside xread block=5000. Zero queries to DB. Zero CPU consumed while idle. Redis only wakes a coroutine when that specific user has a new notification.

The fundamental difference:

Who queries?DB at idleLatency
Long pollingClient, every 3sHigh0–3s
SSE + DB pollServer internallyHigh0–1s
SSE + Redis StreamsNobodyZeroMilliseconds

Redis Pub/Sub vs Redis Streams

Both seem like they solve the problem. The critical question is: what happens when a user is offline?

Pub/Sub fails here:

  • User offline → event fires → published to Redis → nobody listening → message gone forever
  • User reconnects → nothing to receive

Streams handle it:

  • User offline → event fires → written to Stream (persists)
  • User reconnects → reads from last seen message ID → catches up on everything missed

The pattern with Streams:

# Writer (Celery worker)
await redis_client.xadd(f"notif:user:{user_id}", payload)
 
# Reader (SSE server) — blocks until new message, replays on reconnect
messages = await redis_client.xread(
    {f"notif:user:{user_id}": last_seen_id},
    block=5000  # sleeps zero CPU until data arrives
)

XREAD BLOCK 5000 means the SSE server sleeps with zero CPU until a new message arrives — same push behavior as Pub/Sub, but with persistence. When a user reconnects, they pass their last_seen_id and catch up on everything missed.

Decision: Use Redis Streams.


A Real Scenario: PM Rejects 20 Timesheets in Bulk

Let's trace a concrete scenario through the full architecture. A Project Manager bulk-rejects 20 employee timesheets. All 20 employees should be notified about their specific rejection.

Key design decision: One RabbitMQ message with a list of 20 IDs — not 20 separate messages. The fan-out happens inside the Celery worker, not at the API layer.

Backend: User Service — Bulk Reject Endpoint

# user_service/routers/timesheet.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import BaseModel
from typing import List
import aio_pika, json
 
router = APIRouter()
 
class BulkRejectRequest(BaseModel):
    timesheet_ids: List[int]
    rejection_reason: str
 
@router.post("/timesheets/bulk-reject")
async def bulk_reject_timesheets(
    payload: BulkRejectRequest,
    db: AsyncSession = Depends(get_db),
    current_user: dict = Depends(require_role("Project Manager"))
):
    # 1. Single DB transaction for all 20
    async with db.begin():
        result = await db.execute(
            """
            UPDATE timesheets
            SET status = 'REJECTED',
                rejection_reason = :reason,
                rejected_by = :pm_id,
                rejected_at = NOW()
            WHERE id = ANY(:ids) AND status = 'PENDING'
            RETURNING id, employee_id
            """,
            {
                "reason": payload.rejection_reason,
                "pm_id": current_user["user_id"],
                "ids": payload.timesheet_ids
            }
        )
        updated = result.fetchall()
 
    if not updated:
        raise HTTPException(400, "No timesheets updated")
 
    # 2. ONE message to RabbitMQ — not 20
    affected = [
        {"timesheet_id": row.id, "employee_id": row.employee_id}
        for row in updated
    ]
 
    connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
    async with connection:
        channel = await connection.channel()
        await channel.default_exchange.publish(
            aio_pika.Message(
                body=json.dumps({
                    "event": "TIMESHEET_BULK_REJECTED",
                    "pm_id": current_user["user_id"],
                    "pm_name": current_user["employee_name"],
                    "reason": payload.rejection_reason,
                    "affected_employees": affected
                }).encode()
            ),
            routing_key="notifications"
        )
 
    return {"rejected_count": len(updated)}

Backend: Celery Worker — Fan-out to Redis Streams

# notification_service/tasks.py
from celery import Celery
import redis.asyncio as redis
import json, asyncio
 
celery = Celery("notifications", broker="amqp://guest:guest@localhost/")
redis_client = redis.Redis(host="localhost", port=6379)
 
@celery.task(acks_late=True)  # acknowledge AFTER processing, not before
def handle_bulk_rejection(message_body: str):
    data = json.loads(message_body)
    asyncio.run(_fan_out(data))
 
async def _fan_out(data: dict):
    affected = data["affected_employees"]
 
    # Fan out to each employee's personal Redis stream
    pipe = redis_client.pipeline()
 
    for emp in affected:
        stream_key = f"notif:user:{emp['employee_id']}"
        pipe.xadd(
            stream_key,
            {
                "type": "TIMESHEET_REJECTED",
                "timesheet_id": emp["timesheet_id"],
                "reason": data["reason"],
                "rejected_by": data["pm_name"],
                "message": f"Your timesheet was rejected by {data['pm_name']}: {data['reason']}"
            },
            maxlen=100  # keep last 100 notifications per user
        )
 
    await pipe.execute()  # all 20 writes in ONE Redis round-trip

Why pipeline matters: Without it, you make 20 separate network round-trips to Redis. With it, all 20 XADD commands are batched and sent in one shot. At 20 employees it's minor — at 2,000 bulk rejections it becomes critical.

Backend: SSE Endpoint

# notification_service/routers/sse.py
from fastapi import APIRouter
from fastapi.responses import StreamingResponse
import redis.asyncio as redis
import json, asyncio
 
router = APIRouter()
redis_client = redis.Redis(host="localhost", port=6379)
 
@router.get("/notifications/stream")
async def notification_stream(
    current_user: dict = Depends(get_current_user),
    last_id: str = "0"  # client sends last seen ID on reconnect
):
    async def event_generator():
        stream_key = f"notif:user:{current_user['user_id']}"
        cursor = last_id
 
        await redis_client.sadd("online_users", current_user["user_id"])
 
        try:
            while True:
                messages = await redis_client.xread(
                    {stream_key: cursor},
                    block=5000,  # sleep up to 5s, wake immediately if data arrives
                    count=10     # deliver max 10 at a time (burst control)
                )
 
                if not messages:
                    # Heartbeat — detects dead connections within 5 seconds
                    yield ": heartbeat\n\n"
                    continue
 
                for stream, events in messages:
                    for msg_id, fields in events:
                        cursor = msg_id  # advance cursor
                        yield f"id: {msg_id.decode()}\n"
                        yield f"data: {json.dumps({k.decode(): v.decode() for k, v in fields.items()})}\n\n"
 
        except asyncio.CancelledError:
            pass  # client disconnected cleanly
        finally:
            # Always runs — keeps online count accurate
            await redis_client.srem("online_users", current_user["user_id"])
 
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no"  # critical — tells nginx not to buffer SSE
        }
    )

Frontend: React Hook + Notification Bell

// hooks/useNotifications.js
import { useEffect, useRef, useCallback } from "react";
import { useNotificationStore } from "../store/notificationStore";
 
export function useNotifications() {
  const eventSourceRef = useRef(null);
  const lastIdRef = useRef(localStorage.getItem("notif_last_id") || "0");
  const addNotification = useNotificationStore(s => s.addNotification);
 
  const connect = useCallback(() => {
    const url = `/api/notifications/stream?last_id=${lastIdRef.current}`;
    const es = new EventSource(url);
 
    es.onmessage = (event) => {
      const data = JSON.parse(event.data);
 
      // Persist last seen ID — catches up on missed notifications after reconnect
      if (event.lastEventId) {
        lastIdRef.current = event.lastEventId;
        localStorage.setItem("notif_last_id", event.lastEventId);
      }
 
      addNotification(data);
 
      if (data.type === "TIMESHEET_REJECTED") {
        showToast(`Timesheet rejected: ${data.reason}`, "error");
      }
    };
 
    es.onerror = () => {
      es.close();
      // Auto-reconnect — passes last_id so no notifications missed
      setTimeout(connect, 3000);
    };
 
    eventSourceRef.current = es;
  }, [addNotification]);
 
  useEffect(() => {
    connect();
    return () => eventSourceRef.current?.close();
  }, [connect]);
}
// store/notificationStore.js (Zustand)
import { create } from "zustand";
 
export const useNotificationStore = create((set) => ({
  notifications: [],
  unreadCount: 0,
  addNotification: (notif) => set(state => ({
    notifications: [notif, ...state.notifications].slice(0, 20),
    unreadCount: state.unreadCount + 1
  })),
  markAllRead: () => set({ unreadCount: 0 })
}));

Why Not Do the Redis Writes Directly in the User Service?

This is a valid question. Technically it works. But here's what happens:

PM hits API
    ├── UPDATE 20 rows in DB  ✅
    ├── Publish to RabbitMQ   ✅
    ├── XADD to 20 Redis streams  ← doing this too now
    └── return response to PM

If Redis is slow or down:
    → PM waits
    → API timeout
    → PM sees error
    → But DB was already updated ← inconsistency
ConcernInline in User ServiceVia Celery Worker
API response timeSlower (waits for Redis)Fast (fire and forget)
Redis failurePM gets 500 errorWorker retries silently
Retry logicComplex to addBuilt into Celery
Single responsibilityUser Service doing notification workEach service owns its domain

The golden rule: Your API should do the minimum to persist state, then return. Side effects like notifications, emails, webhooks — delegate to workers.


Connection Lifecycle: The while True and block=5000 Relationship

A question worth spending time on: does StreamingResponse always need a while True?

No. StreamingResponse just needs an async generator — it streams whatever the generator yields, then closes when the generator ends. The generator controls the lifetime.

# Finite stream — sends 5 things and closes
async def event_generator():
    for i in range(5):
        yield f"data: message {i}\n\n"
        await asyncio.sleep(1)
# Connection closes after 5 messages

Notification streams need while True because a notification can arrive 5 seconds from now or 5 hours from now. The connection must stay open indefinitely.

How block=5000 Actually Makes the Coroutine Sleep

There are three separate blocking mechanisms chained together — at different layers:

Python coroutine hits: await redis_client.xread(block=5000)


Python event loop parks the coroutine (no CPU consumed)
Monitors TCP socket via OS (epoll/kqueue)
Runs other coroutines freely


Redis server receives XREAD BLOCK 5000
Internally sets a 5000ms server-side timer
Does NOT respond immediately — waits for:
    ├── New message arrives → responds immediately (could be T=1s)
    └── 5000ms expires     → responds with empty reply


TCP socket receives Redis response
OS signals event loop: "data available"
Event loop wakes the coroutine
await returns → coroutine resumes

block=5000 tells Redis to hold the command for up to 5 seconds. await tells Python to park the coroutine until Redis replies. Neither blocks the thread — the event loop is free the entire time serving other coroutines.

This is why 50,000 coroutines work: they're all just parked, waiting for their own Redis reply. CPU usage while idle ≈ 0.

block=0 vs block=5000 — Ghost Connections

block=0 means Redis responds immediately even if empty. Your while True becomes a CPU-burning spin loop:

while True:
    messages = await redis_client.xread({...}, block=0)  # returns instantly with nothing
    # immediately loops back — 50K coroutines spinning = server melts

block=5000 transforms a CPU-burning spin into an efficient sleeping listener.

But there's a more important reason for block=5000: ghost connections.

When a user loses internet without a clean TCP FIN (phone loses signal, cable pulled), FastAPI has no idea the client is gone. The coroutine stays alive.

With block=5000:

T=0s   User loses internet (no TCP FIN sent)
T=0s   Coroutine sleeping in xread...
T=5s   xread returns None (5s elapsed)
T=5s   Coroutine tries: yield ": heartbeat\n\n"
       OS detects broken pipe
       BrokenPipeError raised
T=5s   Coroutine dies → memory freed

Maximum ghost connection lifetime = 5 seconds. Without it: ghost connections for inactive users accumulate indefinitely — eviction only happens when the next notification arrives for that user, which could be hours.


Second-by-Second Walkthrough

Normal flow (user online):

T=0s    User opens browser → SSE connects → coroutine starts
T=0s    xread starts blocking (5s timer starts in Redis)
T=3s    PM rejects timesheet → Celery writes to Redis stream
T=3s    xread immediately wakes up (timer irrelevant, data arrived)
T=3s    notification yielded → browser receives it
T=3s    xread blocks again (new 5s timer starts)
T=8s    nothing arrived → xread returns None after 5s
T=8s    heartbeat yields ": heartbeat\n\n" → browser ignores it
T=8s    xread blocks again (new 5s timer starts)

Dirty disconnect (internet drops, no TCP FIN):

T=0s   Coroutine blocking in xread
T=2s   User loses internet — no TCP FIN sent, FastAPI has NO idea
T=5s   xread returns None (5s elapsed)
T=5s   Coroutine tries to yield heartbeat to dead socket
       OS raises BrokenPipeError
T=5s   Coroutine dies → memory freed → online count decremented

Each User Gets Their Own Coroutine and Stream

A question that comes up: since the SSE endpoint sends a StreamingResponse, doesn't the backend send all notifications to all clients?

No. FastAPI spins up a separate coroutine for every single connection. They don't share state.

Employee 101 connects → Coroutine 1: xread("notif:user:101", block=5000)
Employee 102 connects → Coroutine 2: xread("notif:user:102", block=5000)
Employee 103 connects → Coroutine 3: xread("notif:user:103", block=5000)

When Celery writes to notif:user:101 — only Coroutine 1 wakes up. Everyone else stays asleep. The stream key is the isolation mechanism. Each coroutine watches only its user's private stream.

100 use cases does not mean 100 stream keys per user. All use cases write to the same stream key with a type field:

# All event types go into the SAME stream — type differentiates them
await redis_client.xadd(
    f"notif:user:{user_id}",  # always same key
    {
        "type": "TIMESHEET_REJECTED",  # differentiator
        "payload": json.dumps(data)
    }
)

The frontend routes by type:

es.onmessage = (event) => {
    const data = JSON.parse(event.data);
    switch(data.type) {
        case "TIMESHEET_REJECTED": showTimesheetAlert(data); break;
        case "TASK_ASSIGNED":      updateTaskBoard(data);    break;
        case "LEAVE_APPROVED":     showLeaveStatus(data);    break;
    }
}

Total coroutines = Total concurrent online users. Not users × use cases.


Horizontal Scaling

SSE doesn't have the sticky session problem that WebSockets has. Here's why:

Load Balancer
      │
   ┌──┴──┐
Server A  Server B

Employee 101 connected to Server A
    └── Coroutine on Server A: xread("notif:user:101", block=5000)

Celery writes to notif:user:101
    └── Redis wakes Server A's coroutine
    └── Server B has no coroutine for user 101 — irrelevant

Celery doesn't care which server the user is on — it just writes to Redis. Redis wakes whichever server is blocking on that key. No coordination between servers needed.

The one real problem — Redis connections at scale:

Server A: 25K users → 25K xread connections to Redis
Server B: 25K users → 25K xread connections to Redis
Total: 50K concurrent Redis connections

Redis default max connections is 10,000. Fix with connection pooling:

redis_client = redis.Redis(
    host="localhost",
    port=6379,
    max_connections=1000  # pool shared across all coroutines on this server
)

And horizontally scale Redis itself via Redis Cluster — streams are sharded across nodes by key hash. Clients hash to the correct node automatically.


The Burst + Server Oscillation Problem

What if a user has 50 notifications arrive within 1 second, while also oscillating between servers due to a flaky connection?

The count parameter handles the burst:

messages = await redis_client.xread(
    {stream_key: cursor},
    block=5000,
    count=10  # deliver max 10 at a time
)

User was offline and 80 notifications piled up:

xread from cursor=0  → gets 1–10,  cursor advances to 10
xread from cursor=10 → gets 11–20, cursor advances to 20
...
xread from cursor=70 → gets 71–80, cursor advances to 80
xread from cursor=80 → nothing pending, BLOCKS (sleeps)

count=10 acts as a flow control valve — burst delivery in controlled batches, no memory spike.

The server oscillation problem:

When a user reconnects and lands on a different server, you temporarily have two coroutines watching the same stream. Both will read independently from Redis Streams (unlike a queue, xread is non-destructive). This can cause duplicate delivery during the ghost connection window.

Fix: cursor ownership via Redis:

async def event_generator(user_id, last_id):
    owner_key = f"notif:owner:{user_id}"
    my_id = f"server-{socket.gethostname()}-{uuid.uuid4()}"
 
    # Atomically claim ownership
    await redis_client.set(owner_key, my_id, ex=10)
    cursor = last_id
 
    try:
        while True:
            # Verify I still own this stream
            current_owner = await redis_client.get(owner_key)
            if current_owner.decode() != my_id:
                return  # I am the ghost — die cleanly
 
            messages = await redis_client.xread(
                {f"notif:user:{user_id}": cursor},
                block=5000,
                count=10
            )
 
            if not messages:
                # Refresh ownership TTL on heartbeat
                await redis_client.set(owner_key, my_id, ex=10)
                yield ": heartbeat\n\n"
                continue
 
            for stream, events in messages:
                for msg_id, fields in events:
                    cursor = msg_id
                    yield f"id: {msg_id.decode()}\n"
                    yield f"data: {json.dumps(...)}\n\n"
    finally:
        current_owner = await redis_client.get(owner_key)
        if current_owner and current_owner.decode() == my_id:
            await redis_client.delete(owner_key)

With ownership, ghost eviction drops from 5 seconds (heartbeat cycle) to less than 1 second (next ownership check).


When to Write to the Database

Two separate things need DB writes — at different times for different reasons:

Business event → write synchronously, before anything else:

@router.post("/timesheets/bulk-reject")
async def bulk_reject(payload: BulkRejectRequest):
    # 1. DB write FIRST — source of truth
    async with db.begin():
        await db.execute("UPDATE timesheets SET status='REJECTED'...")
 
    # 2. ONLY after DB commit → publish to RabbitMQ
    await rabbitmq.publish("timesheet.rejected", payload)
 
    # 3. Return immediately — don't wait for notification delivery
    return {"rejected_count": 20}

Notification record → write asynchronously, in the same Celery task as the Redis write:

async def _fan_out(data: dict):
    affected = data["affected_employees"]
 
    # 1. Redis writes — live delivery (milliseconds)
    pipe = redis_client.pipeline()
    for emp in affected:
        pipe.xadd(f"notif:user:{emp['employee_id']}", {...}, maxlen=100)
    await pipe.execute()
 
    # 2. DB writes — audit trail, history, read/unread (simultaneous, not triggered)
    await db.execute_many(
        "INSERT INTO notification_log (user_id, type, payload, created_at, read_at) VALUES (:user_id, :type, :payload, NOW(), NULL)",
        [{"user_id": emp["employee_id"], "type": "TIMESHEET_REJECTED", ...} for emp in affected]
    )

Never Redis without DB. Always both, always together, in the same worker task.

The full write timeline:

T=0ms    PM clicks bulk reject
T=1ms    API → DB UPDATE timesheets (synchronous, blocks)
T=5ms    DB commits → API publishes to RabbitMQ
T=6ms    API returns 200 to PM
T=10ms   Celery picks up RabbitMQ message
T=11ms   Celery → pipeline XADD to 20 Redis Streams
T=12ms   20 SSE coroutines wake up → browser notifications fire
T=15ms   Celery → batch INSERT into notification_log table

Redis delivers fast. DB stores forever. They serve different purposes.


Implementing Unread Count Efficiently

The naive approach:

SELECT COUNT(*) FROM notification_log
WHERE user_id = 101 AND read_at IS NULL

At 50,000 users this runs constantly. Instead, maintain the count in Redis:

# Notification created → increment
await redis_client.incr(f"notif:unread:{user_id}")
 
# User marks all read → reset
await redis_client.set(f"notif:unread:{user_id}", 0)
 
# Bell icon loads → O(1) read
count = await redis_client.get(f"notif:unread:{user_id}")

Mark as read endpoints:

@router.patch("/notifications/{notif_id}/read")
async def mark_read(notif_id: str, current_user: dict = Depends(get_current_user)):
    await db.execute(
        "UPDATE notification_log SET read_at = NOW() WHERE id = :id AND user_id = :uid",
        {"id": notif_id, "uid": current_user["user_id"]}
    )
    await redis_client.decr(f"notif:unread:{current_user['user_id']}")
 
@router.patch("/notifications/read-all")
async def mark_all_read(current_user: dict = Depends(get_current_user)):
    await db.execute(
        "UPDATE notification_log SET read_at = NOW() WHERE user_id = :id AND read_at IS NULL",
        {"id": current_user["user_id"]}
    )
    await redis_client.set(f"notif:unread:{current_user['user_id']}", 0)

Notification History: Cursor-Based Pagination

The naive pagination approach is wrong at scale:

-- ❌ OFFSET pagination — scans and discards N rows every time
SELECT * FROM notification_log
WHERE user_id = 101
ORDER BY created_at DESC
LIMIT 20 OFFSET 10000  -- scans 10,020 rows, throws away 10,000

Use cursor-based pagination instead:

-- ✅ First page
SELECT * FROM notification_log
WHERE user_id = 101
ORDER BY created_at DESC
LIMIT 20;
 
-- ✅ Next page — pass last received created_at as cursor
SELECT * FROM notification_log
WHERE user_id = 101
AND created_at < '2024-01-15 10:30:00'  -- last seen timestamp
ORDER BY created_at DESC
LIMIT 20;

The database jumps directly to that timestamp via index. Performance is identical on page 1 or page 500.

The correct index:

-- Composite index serves both the WHERE and ORDER BY in one scan
CREATE INDEX idx_notif_user_time
ON notification_log(user_id, created_at DESC);

What Redis and DB Each Own

By the end of this design, the responsibilities are cleanly separated:

LayerOwnsPurpose
DB (timesheets table)Business eventsSource of truth for what happened
DB (notification_log)Notification historyAudit trail, read/unread state, history page
Redis StreamLive deliverySSE push to online users, recent catchup (maxlen=100)
Redis key (unread count)Bell badge numberO(1) read, updated on create/read
Browser memory (Zustand)Current session displayWhat appeared this session, capped at 20

On page load:

useEffect(() => {
    // 1. Load recent from DB immediately — populates bell icon
    const recent = await fetch("/notifications/recent?limit=5")
    store.setInitial(recent)
 
    // 2. Simultaneously reconnect SSE — streams anything new
    connect()
}, [])

DB handles the past. SSE handles the future. Redis Streams handles the catch-up window. Zustand handles the current session display. They are not substitutes for each other.


Architecture Summary

Any Service (Timesheet, Task, Leave, etc.)
        │  publishes event with key=user_id
        ▼
RabbitMQ (durable queue — survives worker restarts)
        │  acks_late=True — message deleted only after processing
        ▼
Celery Workers (fan-out layer)
        ├── pipeline XADD → Redis Streams (notif:user:{id})  [live delivery]
        └── batch INSERT → notification_log DB               [persistence]
        │
        ▼
Redis Streams (notif:user:{id}, maxlen=100)
        │  one stream per user, all event types mixed in
        ▼
SSE Server (one coroutine per connected user)
        │  xread BLOCK 5000, count=10
        │  heartbeat every 5s → ghost eviction
        │  ownership key → deduplication on reconnect
        ▼
Browser (EventSource)
        │  stores last_id in localStorage
        │  auto-reconnects with last_id → no gaps
        ▼
Zustand Store (display layer, capped at 20)

This architecture started with a rough idea and three fundamental mistakes. The final design emerged from asking one question at each stage: what are the consequences of this decision? That's the only way I know how to do system design.

Next in this series: scaling the write path with Kafka when RabbitMQ throughput becomes the bottleneck — producers, consumers, partitions, and consumer groups.