← Back to blog

Evolving my Python Redis Clone: From Request-Response to Pub/Sub

·
#python#asyncio#architecture#redis

When I first set out to build a Redis engine from scratch in Python, my initial focus was parsing standard RESP (Redis Serialization Protocol) and serving GET/SET requests. It functioned essentially as an in-memory key-value database over TCP.

However, a standard web server expects a strictly enforced request-and-response lifecycle. To elevate the engine to true Redis functionality, I had to completely shift my architecture paradigm to build Pub/Sub (Publish/Subscribe).

Pub/Sub breaks the standard networking mold aggressively. Clients don’t just ask for data—they open long-lived, stateful websocket-like streams where the server proactively pushes arbitrary events to them continuously.

Implementing this required tackling three major architectural challenges:


A. The “Long-Lived Connection” Problem

In standard HTTP or basic database queries, a connection is brief. The client sends a request (like GET username), the server computes the response, flushes the socket, and ideally closes or pools the connection.

In Pub/Sub, a client sends SUBSCRIBE channel_1, and suddenly the server must hold that connection entirely hostage indefinitely.

The immediate threat is resource exhaustion. If I used a traditional threaded networking model where every connection consumes a full OS thread, a few thousand idle subscribers would completely crush my server’s memory and performance due to context-switching overhead.

The Flex:

Because my core TCP loop was built on Python’s asyncio (asyncio.start_server), handling thousands of idle subscribers became shockingly cheap. The event loop natively suspends execution (await reader.read(...)) until bytes physically arrive on the wire. This allowed me to hold these long-lived connections open almost for free—without consuming a dedicated hardware thread per user. I merely needed a way to grab their unique StreamWriter instance globally when it was time to push an asynchronous event.


B. The “Fan-Out” Challenge

The next major hurdle was delivery resolution. Let’s say Client A issues PUBLISH system_alerts "Database Restarting".

If 10,000 clients are currently subscribed to the system_alerts channel, the server suddenly has to dynamically resolve 10,000 specific TCP writer objects and perform 10,000 network writes from a single trigger.

Initially, I tracked connections in a flat dictionary mapping clients to their PubSubContext (which held a set of their subscribed channels). But when a PUBLISH command triggered, I had to loop across every connected client to check if "system_alerts" was in their set. This was an expensive O(N) operation that would severely degrade performance under scale.

The Flex:

I restructured the subscription model into an inverted index. I created a global _channels: dict[str, set[PubSubContext]] registry.

Now, channels map directly to sets of active connections. The routing logic shifted from O(N) connected clients to O(K) targeted subscribers, making PUBLISH nearly instantaneous regardless of total server load.

Here is a snippet of how simple and decoupled the exact O(1) dispatch execution becomes:

async def publish_message(channel: str, message: str) -> int:
    # 1. Craft the array: ["message", channel, message] and serialize it
    response_payload = ["message", channel, message]
    response_bytes = RESPEncoder.encode(response_payload)
    delivery_count = 0
    
    # 2. O(1) fetch: Instantly grab only the relevant subscribers 
    subscribers = _channels.get(channel, set())
    # 3. Dynamic Fan-out
    for ctx in list(subscribers): 
        if ctx.writer is not None:
            try:
                # writer.write is synchronous, drain enables backpressure control 
                ctx.writer.write(response_bytes)
                await ctx.writer.drain()
                delivery_count += 1
            except Exception as e:
                logger.error(f"Failed to publish to subscriber on '{channel}': {e}")
                
    return delivery_count

By safely calling await ctx.writer.drain(), I ensured that if one client had a slow network connection preventing buffer flushes, the event loop could gracefully manage TCP backpressure without freezing the rest of the broadcast.


C. The Protocol Shift

The last edge case requires fundamentally altering how the server interprets incoming commands.

Once a client inputs SUBSCRIBE, the Redis protocol radically shifts. They logically enter “Subscribed Mode”. In this state, they forfeit the right to send standard mutations like SET, DEL, or ECHO. They are only permitted to passively listen, or execute administrative commands like UNSUBSCRIBE and PING.

The Flex:

Instead of hardcoding a massive, ugly if block checking for strings like ["SET", "GET", "INCR"] inside the central command handler, I leveraged clean object-oriented architecture.

I attached a state-flag to my client context (self.is_in_subscribed_mode = len(self.subscribed_channels) > 0).

Simultaneously, I gave every Command class in my standard registry a boolean property: @property allowed_in_subscribed_mode(). By default, the BaseCommand returns False. Only specific commands like SubscribeCommand explicitly override and return True.

In my core handler, the guard execution is wonderfully succinct:

if pubsub_ctx.is_in_subscribed_mode and not command_obj.allowed_in_subscribed_mode:
    return {"error": f"ERR Can't execute '{command_name.lower()}': only (P|S)SUBSCRIBE / (P|S)UNSUBSCRIBE / PING / QUIT are allowed in this context"}

Conclusion

Tackling Pub/Sub forced me to think beyond mere serialization or synchronous transaction loops.

Navigating O(1) fan-out routing, mitigating memory risks for long-lived sockets, and elegantly managing explicit protocol state changes reinforced why asyncio is arguably one of Python’s most powerful native networking abstractions.