Understanding the basics of Kafka Binary Protocol
Taking a look inside how Kafka clients talks with Kafka server

Apache Kafka is a distributed event streaming platform used for high-performance data pipelines. In this article, we will take a look at the under belly of the Kafka and see how communication happens between the Kafka client and server.
Fundamentals
Let's start with the basics. Kafka uses a custom binary protocol for sending and receiving messages.
The specifications define the request header as follows:

| Field | Data type | Description |
request_api_key | INT16 | The API key for the request |
request_api_version | INT16 | The version of the API for the request |
correlation_id | INT32 | A unique identifier for the request |
client_id | NULLABLE_STRING | The client ID for the request |
TAG_BUFFER | COMPACT_ARRAY | Optional tagged fields |
Specs defines the data types as
| Type | Description |
| INT16 | Represents an integer between -215 and 215-1 inclusive. The values are encoded using two bytes in network byte order (big-endian). |
| INT32 | Represents an integer between -231 and 231-1 inclusive. The values are encoded using four bytes in network byte order (big-endian). |
| COMPACT_ARRAY | Represents a sequence of objects of a given type T. Type T can be either a primitive type (e.g. STRING) or a structure. First, the length N + 1 is given as an UNSIGNED_VARINT. Then N instances of type T follow. A null array is represented with a length of 0. In protocol documentation an array of T instances is referred to as [T]. |
Here's an example of a request message:
00 00 00 23 // message_size: 35
00 12 // request_api_key: 18
00 04 // request_api_version: 4
6f 7f c6 61 // correlation_id: 1870644833
...
Every Kafka request is an API call. The Kafka protocol defines over 70 different APIs, all of which do different things. Here are some examples:
Producewrites events to partitions.CreateTopicscreates new topics.ApiVersionsreturns the broker's supported API versions.
A Kafka request specifies the API its calling by using request_api_key header field.
Message body
The schemas for the request and response bodies are determined by the API being called.
For example, here are some of the fields that the Produce request body contains:
The name of the topic to write to.
The key of the partition to write to.
The event data to write.
On the other hand, the Produce response body contains a response code for each event. These response codes indicate if the writes succeeded.
As a reminder, requests and responses both have the following format:
message_sizeHeader
Body
API versioning
Each API supports multiple versions, to allow for different schemas. Here's how API versioning works:
Requests use the header field
request_api_versionto specify the API version being requested.Responses always use the same API version as the request. For example, a
Produce Request (Version: 3)will always get aProduce Response (Version: 3)back.Each API's version history is independent. So, different APIs with the same version are unrelated. For example,
Produce Request (Version: 10)is not related toFetch Request (Version: 10).
The ApiVersions API
The ApiVersions API returns the broker's supported API versions. For example, ApiVersions may say that the broker supports Produce versions 5 to 11, Fetch versions 0 to 3, etc.
Visualizing the Binary Protocol
Here is a great link that can help you visualize the binary protocol
Hands-on
So let’s build a POC of the Kafka server.
Let’s start our Server
public static void main(String[] args) {
int port = 9092;
try (ServerSocket server = new ServerSocket(port)) {
server.setReuseAddress(true);
log.info("Server started on port {}", port);
while (true) {
Socket client = server.accept();
log.info("New client connected");
handleClientAsync(client);
}
} catch (IOException e) {
log.error("IOException: ", e);
}
}
The server starts on port 9092 (the default Kafka port) and enters an infinite loop that waits for client connections. When a client connects, it passes the client socket to handleClientAsync(). The setReuseAddress(true) prevents "address already in use" errors when restarting the server.
Handle Multiple Clients Asynchronously
private static void handleClientAsync(Socket client) throws IOException {
new Thread(() -> {
try {
processMessage(client);
} catch (IOException e) {
log.error("Error while handling client: ", e);
} finally {
try {
client.close();
} catch (IOException e) {
log.error("Error closing client socket: ", e);
}
}
}).start();
}
This method creates a new thread for each client connection, allowing the server to handle multiple clients simultaneously. It delegates the message handling to processMessage() This is a trivial implementation and will not scale well because of lack of Thread Pool.
Processing Client Messages
private static void processMessage(Socket client) throws IOException {
DataInputStream in = new DataInputStream(client.getInputStream());
DataOutputStream out = new DataOutputStream(client.getOutputStream());
while (!client.isClosed()) {
// Read the message header
byte[] messageSize = new byte[4];
int read = in.read(messageSize);
if (read < 4) {
log.info("Read fewer characters than expected: {}", read);
break;
}
// Read protocol metadata
byte[] apiKey = new byte[2];
in.readFully(apiKey);
byte[] apiVersion = new byte[2];
in.readFully(apiVersion);
byte[] correlationId = new byte[4];
in.readFully(correlationId);
// Read client identification
byte[] clientIdLength = new byte[2];
in.readFully(clientIdLength);
byte[] clientId = new byte[ByteBuffer.wrap(clientIdLength).getShort()];
in.readFully(clientId);
log.info("Request from clientID {} for apiKey {} apiVersion {} correlationId {}",
new String(clientId), new String(apiKey), new String(apiVersion), new String(correlationId));
This is where we start seeing the protocol details. The method reads a message header containing size information, API key and version, correlation ID, and client ID. The use of DataInputStream helps in precise byte reading the binary protocol with fixed-size fields.
Handle DescribeTopicPartitions Request
byte[] topicsArrayLength = null;
byte[] topicNameLength = null;
byte[] topicName = null;
if (ByteBuffer.wrap(apiKey).getShort() == 75) {
log.info("Received DescribeTopicPartitions request");
byte[] tagBufferLength = new byte[1];
in.readFully(tagBufferLength);
log.info("Tag Buffer Length: {} ", ByteBuffer.wrap(tagBufferLength).get());
topicsArrayLength = new byte[1];
in.readFully(topicsArrayLength);
log.info("Topics Array Length: {}", ByteBuffer.wrap(topicsArrayLength).get());
topicNameLength = new byte[1];
in.readFully(topicNameLength);
log.info("Topic Name Length: {}", ByteBuffer.wrap(topicNameLength).get());
topicName = new byte[ByteBuffer.wrap(topicNameLength).get() - 1];
in.readFully(topicName);
log.info("Topic Name: {}", new String(topicName));
// Read additional fields
byte[] tagBufferLength2 = new byte[1];
in.readFully(tagBufferLength2);
log.info("Tag Buffer2 Length: {}", ByteBuffer.wrap(tagBufferLength2).get());
byte[] responsePartitionLimit = new byte[4];
in.readFully(responsePartitionLimit);
log.info("Response Partition Limit: {}", ByteBuffer.wrap(responsePartitionLimit).getInt());
byte[] cursor = new byte[1];
in.readFully(cursor);
log.info("Cursor: {}", ByteBuffer.wrap(cursor).get());
}
This block handles a specific request type - API key 75, which is DescribeTopicPartitions command. It reads several fields including the topic name, buffer lengths, and cursor information.
Sending a Response
log.info("Remaining bytes in input stream: {}", in.available());
in.skip(in.available());
// Handle the request and send a response
ByteBuffer responseBuffer = createResponseBuffer(apiKey, apiVersion, correlationId,
topicsArrayLength, topicNameLength, topicName);
out.write(responseBuffer.array(), 0, responseBuffer.position());
out.flush();
log.info("Response sent to client");
}
}
After parsing the request, the code skips any remaining bytes (a defensive practice) and constructs a response using the createResponseBuffer() method. The response is then sent back to the client. This completes one request-response cycle in the continuous communication loop.
Crafting the Response using ByteBuffer
If you are interested in the Json reference, the official kafka client has one.
private static ByteBuffer createResponseBuffer(byte[] apiKey, byte[] apiVersion, byte[] correlationID,
byte[] topicsArrayLength, byte[] topicNameLength, byte[] topicName) {
log.info("Creating response buffer");
ByteBuffer responseBuffer = ByteBuffer.allocate(1024);
responseBuffer.putInt(0); // Placeholder for message length
responseBuffer.put(correlationID);
// If API Key == 0x4b (75) (DescribeTopicPartitions)
if (ByteBuffer.wrap(apiKey).getShort() == 75) {
// Create DescribeTopicPartitions response
responseBuffer.put((byte) 0); // Tag Buffer
responseBuffer.putInt(0); // Throttle time
responseBuffer.put(topicsArrayLength);// Topic array length
responseBuffer.putShort((short) 3); // Error code
responseBuffer.put(topicNameLength); // Topic name length
responseBuffer.put(topicName); // Topic name
responseBuffer.put(new byte[16]); // 16-byte null ID
responseBuffer.put((byte) 0); // IsInternal == 0
responseBuffer.put((byte) 1); // partition count + 1
responseBuffer.putInt(0x00000DF8); // TopicAuthorizedOperations
responseBuffer.put((byte) 0); // compact-encoded empty TAG_BUFFER
responseBuffer.put((byte) 0xff); // Cursor
responseBuffer.put((byte) 0); // Tag Buffer
} else {
// Create API versions response
short apiVersionValue = ByteBuffer.wrap(apiVersion).getShort();
short errorCode = (apiVersionValue < 0 || apiVersionValue > 4) ? (short) 35 : (short) 0;
responseBuffer.putShort(errorCode);
responseBuffer.put((byte) 3);
// First API
responseBuffer.putShort((short) 18); // API Versions 18
responseBuffer.putShort((short) 0); // Min version
responseBuffer.putShort((short) 4); // Max version
responseBuffer.put((byte) 0); // Tagged fields for this API (compact encoded 0)
// Second API
responseBuffer.putShort((short) 75); // DescribeTopicPartitions 75
responseBuffer.putShort((short) 0); // Min version
responseBuffer.putShort((short) 0); // Max version
responseBuffer.put((byte) 0); // Tagged fields for this API (compact encoded 0)
responseBuffer.putInt(0); // Throttle time
responseBuffer.put((byte) 0); // No tagged fields
}
// Update the message length at the beginning of the buffer
int messageLength = responseBuffer.position() - 4;
log.info("Message length: {}", messageLength);
responseBuffer.putInt(0, messageLength);
return responseBuffer;
}
This final method builds the response message. It branches based on the API key to create either a DescribeTopicPartitions response or the APIVersions response. The message length is calculated and inserted at the beginning of the buffer, a common pattern in binary protocols.
Verification
echo -n "00000031004b0000589eecfb000c6b61666b612d746573746572000212756e6b6e6f776e2d746f7069632d73617a0000000001ff00" \
| xxd -r -p | nc 192.168.1.6 9092 | hexdump -C
00000000 00 00 00 37 58 9e ec fb 00 00 00 00 00 02 00 03 |...7X...........|
00000010 12 75 6e 6b 6e 6f 77 6e 2d 74 6f 70 69 63 2d 73 |.unknown-topic-s|
00000020 61 7a 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |az..............|
00000030 00 00 00 01 00 00 0d f8 00 ff 00 |...........|
0000003b
Hexdump of sent "DescribeTopicPartitions" request:
Idx | Hex | ASCII
-----+-------------------------------------------------+-----------------
0000 | 00 00 00 31 00 4b 00 00 58 9e ec fb 00 0c 6b 61 | ...1.K..X.....ka
0010 | 66 6b 61 2d 74 65 73 74 65 72 00 02 12 75 6e 6b | fka-tester...unk
0020 | 6e 6f 77 6e 2d 74 6f 70 69 63 2d 73 61 7a 00 00 | nown-topic-saz..
0030 | 00 00 01 ff 00 | .....
Hexdump of received "DescribeTopicPartitions" response:
Idx | Hex | ASCII
-----+-------------------------------------------------+-----------------
0000 | 00 00 00 37 58 9e ec fb 00 00 00 00 00 02 00 03 | ...7X...........
0010 | 12 75 6e 6b 6e 6f 77 6e 2d 74 6f 70 69 63 2d 73 | .unknown-topic-s
0020 | 61 7a 00 00 00 00 00 00 00 00 00 00 00 00 00 00 | az..............
0030 | 00 00 00 01 00 00 0d f8 00 ff 00 | ...........
.ResponseHeader
- .correlation_id (1486810363)
- .TAG_BUFFER
.ResponseBody
- .throttle_time_ms (0)
- .topic.length (1)
- .Topics[0]
- .error_code (3)
- .name (unknown-topic-saz)
- .topic_id (00000000-0000-0000-0000-000000000000)
- .is_internal (false)
- .num_partitions (0)
- .topic_authorized_operations (3576)
- .TAG_BUFFER
- .next_cursor (null)
- .TAG_BUFFER
Thank you for reading. Hope you learnt something new.




