Dragon Pastoral

凯龘牧歌

Chapter20 - Distributed Message Queue

Posted at # SystemDesign

Distributed Message Queue

We’ll be designing a distributed message queue in this chapter.

Benefits of message queues:

Some popular message queue implementations - Kafka, RabbitMQ, RocketMQ, Apache Pulsar, ActiveMQ, ZeroMQ.

Strictly speaking, Kafka and Pulsar are not message queues(这几个不是MQ). They are event streaming platforms. There is however a convergence of features which blurs the distinction between message queues and event streaming platforms.

For example, RabbitMQ, which is a typical message queue, added an optional streams feature to allow repeated message consumption and long message retention, and its implementation uses an append-only log, much like an event streaming platform would.

In this chapter, we will design a distributed message queue with additional features, such as long data retention, repeated consumption of messages, etc., that are typ- ically only available on event streaming platforms.

Step 1 - Understand the problem and establish design scope

Message queues ought to support few basic features - producers produce messages and consumers consume them. There are, however, different considerations with regards to performance, message delivery, data retention, etc.

Here’s a set of potential questions between Candidate and Interviewer:

Functional requirements:

Non-functional requirements:

Traditional message queues typically don’t support data retention and don’t provide ordering guarantees. This greatly simplifies the design and we’ll discuss it.

Step 2 - Propose high-level design and get buy-in

Key components of a message queue: message-queue-components

Messaging models

The first type of messaging model is point-to-point and it’s commonly found in traditional message queues: point-to-point-model

On the other hand, the publish-subscribe model is more common for event streaming platforms: publish-subscribe-model

Topics, partitions and brokers

What if the data volume for a topic is too large? One way to scale is by splitting a topic into partitions (aka sharding): partitions

Consumer groups

Consumer groups are a set of consumers working together to consume messages from a topic: consumer-groups

High-level architecture

high-level-architecture

Step 3 - Design Deep Dive

In order to achieve high throughput and preserve the high data retention requirement, we made some important design choices:

Data storage

In order to find the best data store for messages, we must examine a message’s properties:

What are our options:

WAL files are extremely efficient when used with traditional HDDs.

There is a misconception that HDD acces is slow, but that hugely depends on the access pattern. When the access pattern is sequential (as in our case), HDDs can achieve several MB/s write/read speed which is sufficient for our needs. We also piggyback on the fact that the OS caches disk data in memory aggressively.

Message data structure

It is important that the message schema is compliant between producer, queue and consumer to avoid extra copying. This allows much more efficient processing.

Example message structure: message-structure

The key of the message specifies which partition a message belongs to. An example mapping is hash(key) % numPartitions. For more flexibility, the producer can override default keys in order to control which partitions messages are distributed to.

The message value is the payload of a message. It can be plaintext or a compressed binary block.

Note: Message keys, unlike traditional KV stores, need not be unique. It is acceptable to have duplicate keys and for it to even be missing.

Other message files:

Additional features such as filtering can be supported by adding additional fields.

Batching

Batching is critical for the performance of our system. We apply it in the producer, consumer and message queue.

It is critical because:

There is a trade-off between latency and throughput:

If we need to support lower latency since the system is deployed as a traditional message queue, the system could be tuned to use a smaller batch size.

If tuned for throughput, we might need more partitions per topic to compensate for the slower sequential disk write throughput.

Producer flow

If a producer wants to send a message to a partition, which broker should it connect to?

One option is to introduce a routing layer, which route messages to the correct broker. If replication is enabled, the correct broker is the leader replica: routing-layer

The reason for having replicas is to enable fault tolerance.

This approach works but has some drawbacks:

To mitigate these issues, we can embed the routing layer into the producer: routing-layer-producer

The batch size choice is a classical trade-off between throughput and latency. batch-size-throughput-vs-latency

Consumer flow

The consumer specifies its offset in a partition and receives a chunk of messages, beginning from that offset: consumer-example

One important consideration when designing the consumer is whether to use a push or a pull model:

Hence, most message queues (and us) choose the pull model. consumer-flow

Consumer rebalancing

Consumer rebalancing is responsible for deciding which consumers are responsible for which partition.

This process occurs when a consumer joins/leaves or a partition is added/removed.

The broker, acting as a coordinator plays a huge role in orchestrating the rebalancing workflow. consumer-rebalancing

When the coordinator stops receiving heartbeats from the consumers in a group, a rebalancing is triggered: consumer-rebalance-example

Let’s explore what happens when a consumer joins a group: consumer-join-group-usecase

Here’s what happens when a consumer leaves the group: consumer-leaves-group-usecase

The process is similar when a consumer doesn’t send a heartbeat for a long time: consumer-no-heartbeat-usecase

State storage

The state storage stores mapping between partitions and consumers, as well as the last consumed offsets for a partition. state-storage

Group 1’s offset is at 6, meaning all previous messages are consumed. If a consumer crashes, the new consumer will continue from that message on wards.

Data access patterns for consumer states:

Given these requirements, a fast KV storage like Zookeeper is ideal.

Metadata storage

The metadata storage stores configuration and topic properties - partition number, retention period, replica distribution.

Metadata doesn’t change often and volume is small, but there is a high consistency requirement. Zookeeper is a good choice for this storage.

ZooKeeper

Zookeeper is essential for building distributed message queues.

It is a hierarchical key-value store, commonly used for a distributed configuration, synchronization service and naming registry (ie service discovery). zookeeper

With this change, the broker only needs to maintain data for the messages. Metadata and state storage is in Zookeeper.

Zookeeper also helps with leader election of the broker replicas.

Replication

In distributed systems, hardware issues are inevitable. We can tackle this via replication to achieve high availability. replication-example

In-sync replicas

One problem we need to tackle is keeping messages in-sync between the leader and the followers for a given partition.

In-sync replicas (ISR) are replicas for a partition that stay in-sync with the leader.

The replica.lag.max.messages defines how many messages can a replica be lagging behind the leader to be considered in-sync.

in-sync-replicas-example

ISR reflects a trade-off between performance and durability.

Acknowledgment handling is configurable.

ACK=all means that all replicas in ISR have to sync a message. Message sending is slow, but message durability is highest. ack-all

ACK=1 means that producer receives acknowledgment once leader receives the message. Message sending is fast, but message durability is low. ack-1

ACK=0 means that producer sends messages without waiting for any acknowledgment from leader. Message sending is fastest, message durability is lowest. ack-0

On the consumer side, we can connect all consumers to the leader for a partition and let them read messages from it:

The ISR list is maintained by the leader who tracks the lag between itself and each replica.

Scalability

Let’s evaluate how we can scale different parts of the system.

Producer

The producer is much smaller than the consumer. Its scalability can easily be achieved by adding/removing new producer instances.

Consumer

Consumer groups are isolated from each other. It is easy to add/remove consumer groups at will.

Rebalancing help handle the case when consumers are added/removed from a group gracefully.

Consumer groups are rebalancing help us achieve scalability and fault tolerance.

Broker

How do brokers handle failure? broker-failure-recovery

Additional considerations to make the broker fault-tolerant:

How do we handle redistribution of replicas when a new broker is added? broker-replica-redistribution

Partition

Whenever a new partition is added, the producer is notified and consumer rebalancing is triggered.

In terms of data storage, we can only store new messages to the new partition vs. trying to copy all old ones: partition-exmaple

Decreasing the number of partitions is more involved: partition-decrease

Data delivery semantics

Let’s discuss different delivery semantics.

At-most once

With this guarantee, messages are delivered not more than once and could not be delivered at all. at-most-once

At-least once

A message can be sent more than once and no message should be left unprocessed. at-least-once

Exactly once

Extremely costly to implement for the system, albeit it’s the friendliest guarantee to users: exactly-once

Advanced features

Let’s discuss some advanced features, we might discuss in the interview.

Message filtering

Some consumers might want to only consume messages of a certain type within a partition.

This can be achieved by building separate topics for each subset of messages, but this can be costly if systems have too many differing use-cases.

We can resolve this using message filtering.

Delayed messages & scheduled messages

For some use-cases, we might want to delay or schedule message delivery. For example, we might submit a payment verification check for 30m from now, which triggers the consumer to see if a payment was successful.

This can be achieved by sending messages to temporary storage in the broker and moving the message to the partition at the right time: delayed-message-implementation

Step 4 - Wrap up

Additional talking points: