← Back to blog

How I Built a Redis Server from Scratch in Python

·
#python#asyncio#distributed-systems#performance

Like many backend developers, I use Redis every day. It’s fast, reliable, and “just works.” But I realized I treated it like a black box. I knew how to use it, but not how it worked.

So I decided to build it from scratch. In Python.

The result is PyRedis: a fully functional Redis server that speaks the real RESP protocol, handles concurrent connections with asyncio, supports transactions, streams, and even master-replica replication. In this post I’ll walk you through the architecture and the most interesting problems I had to solve.


Why Build a Database from Scratch?

You might ask: “Why build a slower version of Redis in Python when the original C version exists?”

The goal was never to replace Redis. It was to understand it.

Building a database forces you to confront problems that don’t appear in typical web development:

  • Raw socket handling: Parsing binary byte streams instead of HTTP requests.
  • Concurrency without threads: Handling thousands of connections on a single thread.
  • Distributed consistency: Keeping a replica in sync with a master in real time.

Reading documentation tells you what a system does. Building it tells you why it was designed that way.


The Protocol: RESP

The first thing I had to implement was RESP (Redis Serialization Protocol). Redis doesn’t speak JSON or HTTP. It uses its own binary-safe, text-based protocol. A simple GET mykey command looks like this on the wire:

*2\r\n$3\r\nGET\r\n$5\r\nmykey\r\n

That’s a RESP Array (*2) with two Bulk Strings ($3 and $5). Every type has a prefix character: + for simple strings, - for errors, : for integers, $ for bulk strings, and * for arrays.

I implemented this as two stateless classes: a RESPParser that converts raw bytes into Python objects, and a RESPEncoder that does the reverse. It inspects the Python type and automatically chooses the correct RESP format:

# Usage:
RESPEncoder.encode("hello")           # → b'$5\r\nhello\r\n'   (Bulk String)
RESPEncoder.encode(42)                # → b':42\r\n'           (Integer)
RESPEncoder.encode({"ok": "PONG"})    # → b'+PONG\r\n'         (Simple String)
RESPEncoder.encode({"error": "ERR"})  # → b'-ERR\r\n'          (Error)
RESPEncoder.encode(["GET", "mykey"])   # → b'*2\r\n$3\r\nGET\r\n$5\r\nmykey\r\n'

The key design decision was making both parser and encoder stateless: each call is independent. This made testing trivial and eliminated an entire category of bugs.


Concurrency with asyncio

Redis is famously single-threaded. Instead of threads, it uses an event loop to multiplex thousands of connections on a single core. Python’s asyncio is a natural fit for this model.

The server itself is just a few lines:

# app/server.py

async def start_server(host="localhost", port=6379, handler=None):
    if handler is None:
        handler = handle_client

    server = await asyncio.start_server(handler, host, port)
    async with server:
        await server.serve_forever()

Each incoming connection gets its own coroutine. Inside, the handler reads bytes from the socket, parses them into a command, executes it, encodes the response, and writes it back:

# app/handler.py

async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    addr = writer.get_extra_info("peername")

    try:
        while True:
            data = await reader.read(1024)
            if not data:
                break

            command = RESPParser.parse(data)
            response = await execute_command(command, connection_id=addr,
                                             reader=reader, writer=writer)
            response_bytes = RESPEncoder.encode(response)
            writer.write(response_bytes)
            await writer.drain()
    finally:
        remove_transaction_context(connection_id=addr)
        ReplicaManager.remove_replica(addr)
        writer.close()
        await writer.wait_closed()

The await reader.read() call is where the magic happens. While one client is waiting for data, the event loop is free to serve other clients. No threads, no locks, no race conditions.


The Command Pattern

Each Redis command is its own class, inheriting from an abstract base:

# app/commands/base.py

class BaseCommand(ABC):
    @property
    @abstractmethod
    def name(self) -> str:
        """Return the command name (e.g., 'PING', 'GET')."""

    @property
    def is_write_command(self) -> bool:
        """Whether this command should be propagated to replicas."""
        return False

    @abstractmethod
    async def execute(self, args: list[str], connection_id=None) -> Any:
        """Execute the command."""

This makes adding a new command trivial. Here’s the complete SET implementation:

# app/commands/set.py

class SetCommand(BaseCommand):
    @property
    def name(self) -> str:
        return "SET"

    @property
    def is_write_command(self) -> bool:
        return True

    async def execute(self, args: list[str], connection_id=None):
        self.validate_args(args, min_args=2)
        key, value = args[0], args[1]

        if len(args) > 2:
            if len(args) == 4 and args[2].upper() == "PX":
                px_value = int(args[3])
                storage = get_storage()
                storage.set_with_ttl(key, value, px_value)
                return {"ok": "OK"}
            else:
                raise ValueError("ERR syntax error")

        storage = get_storage()
        storage.set(key, value)
        return {"ok": "OK"}

The is_write_command property is critical: it tells the handler whether or not this command needs to be forwarded to replicas. Adding a new command is as simple as creating a file, implementing the interface, and registering it in the CommandRegistry.


Transactions: MULTI/EXEC

Redis transactions queue commands and execute them atomically. Each connection gets a TransactionContext that tracks whether a transaction is active:

# app/transaction.py

class TransactionContext:
    def __init__(self):
        self._in_transaction = False
        self._queued_commands: List[tuple[str, List[str]]] = []

    def start_transaction(self):
        self._in_transaction = True
        self._queued_commands = []

    def queue_command(self, command_name: str, args: List[str]):
        self._queued_commands.append((command_name, args))

When a client is inside a MULTI block, the handler intercepts every command and queues it instead of executing immediately:

# app/handler.py (inside execute_command)

if transaction_ctx and transaction_ctx.in_transaction \
   and not command_obj.bypasses_transaction_queue:
    transaction_ctx.queue_command(command_name, command_args)
    return {"queued": "QUEUED"}

On EXEC, all queued commands run sequentially and their results are returned as an array. On DISCARD, the queue is dropped. The bypasses_transaction_queue flag ensures that MULTI, EXEC, and DISCARD themselves never get queued.


The Hardest Part: Replication

Implementing SET and GET is straightforward. Implementing replication (making one server mirror another one) is where things get interesting.

Here’s how it works in PyRedis:

1. The Handshake. The replica connects to the master and performs a four-step handshake:

# app/replication.py

async def start_handshake(self):
    await self.connect()
    await self.send_ping()

    listening_port = ServerConfig.get_listening_port()
    await self.send_replconf_listening_port(listening_port)
    await self.send_replconf_capa()
    await self.send_psync()

First PING to verify the connection, then REPLCONF to announce capabilities, then PSYNC to request synchronization. The master responds with FULLRESYNC and sends an RDB snapshot.

2. Command Propagation. After the handshake, the master forwards every write command to all replicas through the ReplicaManager:

# app/replica_manager.py

@classmethod
async def propagate_command(cls, command_name: str, args: list[str]):
    if not cls._replicas:
        return

    command_array = [command_name.upper(), *args]
    encoded = RESPEncoder.encode(command_array)

    for connection_id, (reader, writer) in cls._replicas.items():
        try:
            writer.write(encoded)
            await writer.drain()
        except Exception as e:
            logger.error(f"Error propagating to {connection_id}: {e}")

    cls._master_offset += len(encoded)

3. Acknowledgments. The WAIT command lets a client block until a certain number of replicas have confirmed they received all commands up to a given offset. This is implemented using asyncio.Condition:

# app/replica_manager.py

@classmethod
async def wait_for_replication(cls, numreplicas: int, timeout_ms: int) -> int:
    target_offset = cls._master_offset

    # Send GETACK to all replicas
    getack_command = RESPEncoder.encode(["REPLCONF", "GETACK", "*"])
    for connection_id, (reader, writer) in cls._replicas.items():
        writer.write(getack_command)
        await writer.drain()

    # Wait for ACKs with timeout
    condition = cls._get_condition()
    try:
        return await asyncio.wait_for(
            _wait_condition(),
            timeout=timeout_ms / 1000.0
        )
    except asyncio.TimeoutError:
        return cls._count_acks(target_offset)

Getting the replication protocol right was, by far, the most time-consuming part of the project. The interplay between offset tracking, asynchronous ACKs, and timeout management required careful coordination.


Streams and Blocking Reads

The most complex data structure I implemented was Redis Streams. XADD appends timestamped entries with auto-generated IDs. XREAD BLOCK lets a client wait for new data to arrive on a stream, which requires a cooperative blocking mechanism:

# app/commands/xread.py (simplified)

async def execute(self, args, connection_id=None):
    block_timeout, streams = self._parse_args(args)

    # Try immediate read
    result = self._query_streams(streams)
    if result is not None:
        return result

    # If not blocking, return None
    if block_timeout is None:
        return None

    # Register waiters and block
    for key in keys:
        event = asyncio.Event()
        register_waiter(key, event)

    # Wait for notification or timeout
    done, pending = await asyncio.wait(
        [asyncio.create_task(event.wait()) for _, event in events],
        timeout=block_timeout,
        return_when=asyncio.FIRST_COMPLETED
    )

    # Re-query after wake-up
    return self._query_streams(streams)

When another client runs XADD, the storage layer notifies all registered waiters, waking up the blocked XREAD coroutines.


Storage Layer

The storage layer is a Python dict with type awareness. Each key stores a typed wrapper (RedisString, RedisList, or RedisStream), and a decorator enforces type safety:

# app/storage/memory.py

class InMemoryStorage(BaseStorage):
    def __init__(self):
        self._data: dict[str, RedisValue] = {}
        self._expiry: dict[str, float] = {}  # key → monotonic expiration time

    def _is_expired(self, key: str) -> bool:
        if key not in self._expiry:
            return False
        return time.monotonic() > self._expiry[key]

    @require_type(RedisType.STRING)
    def get(self, key: str) -> Optional[str]:
        if key not in self._data:
            return None
        if self._is_expired(key):
            del self._data[key]
            del self._expiry[key]
            return None
        return self._data[key].value

TTL uses time.monotonic() instead of wall-clock time, which prevents bugs caused by system clock changes. Expiration is lazy: keys are only cleaned up when accessed.


What I Learned

This project taught me more about systems programming than any tutorial or course I’ve taken.

  • Protocol design matters. RESP looks simple, but its prefix-based type system is extremely efficient to parse. There’s a reason Redis didn’t use JSON.
  • asyncio is powerful, not simple. The happy path is easy. Coordinating timeouts, cancellations, and blocking operations across coroutines requires real discipline.
  • Replication is a distributed systems problem. Even a simplified version forces you to think about ordering, consistency, and failure recovery.
  • The Command Pattern pays off. Having each command as an isolated unit made the codebase easy to extend and test. I went from 5 to 25+ commands without ever touching the handler logic.
  • Testing is non-negotiable. You can’t build a database without trusting your test suite. pytest-asyncio was invaluable for testing async code.

Architecture at a Glance

Client → TCP → RESP Parser → Command Handler → Command Registry → Storage
                                    ↓                    ↓
                            Transaction Mgr      Replica Manager → Replicas

The full architecture guide with Mermaid diagrams is available in the repository docs.


Try It Out

The entire codebase is open source and designed to be readable. If you’re learning asyncio, system design, or just curious about how Redis works internally, I hope you find it useful.

git clone https://github.com/albertopastormr/pyredis.git
cd pyredis
uv sync
./run-redis.sh

GitHub Repository

Feedback, issues, and contributions are welcome.