Since 0.10.0.0
, Kafka clients and brokers are by design, bidirectionally compatible, meaning new clients can talk to old brokers, and old clients can talk to new brokers. But, how did Kafka manage to achieve this great feat?
Moreover, as Kafka is inherently a partitioned system, clients will likely need to maintain a connection to multiple brokers in order to direct their produce or fetch requests to the appropriate broker based on the topic/partitions they send to or fetch from. So, the question here is, how does a client find out which topics along with their partitions exist, and which brokers currently host these topic partitions so that it can direct its requests to the right hosts? Where is this metadata information stored, how is it built, distributed, maintained and kept up to date over time.
This article tries to address these questions succinctly and I hope you find it useful.
API Versioning and API Version Request
Kafka uses a binary protocol over TCP for connections between clients and brokers. The protocol currently has about 68 APIs, each of which is defined as a request and response message pair. These message pairs are schematised and versioned using API versioning to ensure that the clients and brokers can consistently communicate and evolve over time in a compatible manner.
In order to work against multiple broker versions, clients find out what versions of the various APIs a broker supports and then use the highest API version supported by both in their requests. The list of supported API versions is exposed to clients by brokers via the ApiVersionsRequest API.
Here is the sequence of steps that clients follow to bidirectionally and compatibly communicate with the brokers.
- Determining the latest API version supported by both client and broker
- After establishing connection with the broker and before sending any request, the client sends
ApiVersionsRequest
to a broker. - On receiving
ApiVersionsRequest
, a broker returns its full list of supported ApiKeys and versions - On receiving the list, the client chooses the highest API version supported by both itself and the broker
- If no such version exists, an error is reported by the client.
- Fetching the cluster metadata
- The client (Producer, Consumer or AdminClient) sends
MetadataRequest
to a broker via the configured Kafka bootstrap server endpoint. The client should use the cluster load balancer endpoint (recommended where available) or a list of statically configured Kafka hosts as its bootstrap server endpoint. - When any of the brokers receive the
MetadataRequest
, it returns the cluster metadata that describes the current state of the cluster, the topics with their partitions, the leaders for those topic partitions, and the host and port information for the brokers. - On receiving the metadata, the client caches the metadata and doesn’t repeatedly poll for update from the cluster but occasionally refreshes in
metadata.max.age.ms
interval or until it receives an error while processing a request e.g.NotLeaderForPartition
indicating that its cached metadata is out of date. When this happens, the client just refreshes its metadata by issuing aMetadataRequest
and then tries the failed request again.
- Updating the cluster metadata
- All the brokers in a Kafka cluster can respond to
MetadataRequest
because they all maintain a local cache of the cluster metadata. - The broker, designated as the controller is responsible for building the cluster metadata and also for initialising and updating the metadata cache of other brokers via
UpdateMetadataRequest
. - The controller sends the
UpdateMetadataRequest
to all the brokers in any of the following scenarios:
- A broker change (New broker being added or existing broker removed)
- A new offline broker
- A new leader election
- A ISR change notification
- A broker modification
Clients send MetadataRequest to fetch cluster metadata
A Broker designated as the Controller sends UpdateMetadataRequest to other brokers
Metadata Design Changes without Zookeeper (KIP-500)
So, how is metadata stored, maintained and synchronised among the brokers in a Kafka ecosystem without Zookeeper?
With reference to the Pre KIP-500
diagram above, I highlight Post KIP-500
design changes below:
- Metadata will be stored in Kafka as an event log of stream events for timeliness, ordering, robustness and scalability in cluster metadata management.
- Everything currently stored in ZooKeeper, such as brokers, topics, partitions, ISRs, configurations etc will be stored in this log.
- Self-managed metadata raft quorum of controllers will replace the zookeeper quorum
- One of the controllers in the raft quorum is elected as the leader of the metadata topic partition
- The leader of the metadata log is called the active controller
- The active controller handles all RPCs made from the brokers
- Brokers register themselves with the raft quorum via the active controller, replacing
Step 2
in the diagram above with Zookeeper quorum - The active controller removes a broker from the cluster metadata if it has not sent a MetadataFetch heartbeat for a while replacing same broker de-registration functionality in Zookeeper quorum
- The follower controllers replicate the data written to the active controller, and serve as hot standbys
- In
Pre KIP-500
world, the controller publishes the metadata to other brokers (Step 10
) viaUpdateMetadata
Request. Now the brokers fetch the metadata updates from the active controller via theMetadataFetch
Request. - The brokers will persist the metadata they fetch to disk for faster recovery and metadata durability
- The brokers will periodically ask for metadata updates from the active controller
Further Reading
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol