Kafka Transactions

With the increase of "read-process-write" applications, it has become necessary to ensure "exactly-once" message delivery (think financial applications). This topic involves different aspects such as the acknowledgment mechanism, idempotent producers, and transactions.

This post describes how Kafka implements transaction.

Using a producer and a consumer configured with at-least-once, message duplicates can occur when:

  • producer.send() sends multiple messages due to retries
  • message processing A is re-executed because the stream stops after writing B but before marking A as consumed.
  • the application temporarily loses connectivity with the rest of the system, and a new instance is started to replace the lost one. Through this process, we could have multiple instances processing the same input topics and writing to the same output topics, causing duplicate output and violating the “exactly-once” semantics. This problem is called “zombie instances.”

Transactional Semantics.

Exactly once, in order, delivery per partition

To avoid duplicates generated by the producer, each producer is assigned a “producerId” and an “epoch time.” When the producer sends a message, a “sequence number” is added to the metadata that is replicated on the other brokers, so even in case of failure, this behavior is guaranteed.

The producer becomes idempotent because the leader broker that receives the message checks whether that sequence number has already been sent for that producerId. If it already exists, the broker returns the acknowledgment without writing to the log (as it is a duplicate), guaranteeing “exactly-once” delivery per partition. When the producer registers, in addition to the “producerId,” Kafka checks for open transactions with the same ID and completes them. Once the epoch time is advanced, any producer with the same transactional.id and an older epoch is considered a zombie, and further writes from those producers are rejected.

Multi-partitions atomic writes

At a high level, the concept of a transaction is the same as that of a database, i.e., atomic writes to different topics. All messages will be written correctly, or none of them. If an error occurs during this process, the transaction is aborted, and none of the messages can be read by the consumers (if correctly configured).

When an application consumes message A at offset X for partition pN of topic tp0 and writes a message B to topic tp1, the operation is atomic only if both A and B are read and written or none of them.

The message A is marked as consumed only when the offset X is marked as consumed, i.e., when the offset is committed, which is done by writing to a topic called “offsets-topic.” Therefore, the atomicity of writing to different topics and partitions (including offsets-topic) also guarantees the atomicity of the read-process-write cycle. Committing the offset X to the offsets topic and writing message B to topic tp1 are part of the same transaction.

Reading Transactional Messages

Kafka guarantees that a consumer reads only non-transactional messages or confirmed transactional messages: it processes messages from open transactions and filters messages from canceled or pending transactions. Consumers can be configured to take into account transactions:

  • read_committed means that only messages that are not part of a transaction or have a completed transaction are read. As described in more detail in the “Data Flow” paragraph, messages are written to the various partitions for a topic before the two-phase commit begins, but their headers indicate that they are part of a transaction. The transaction is completed only when an additional “marker” message is sent to the same partitions/topics. See symbol “C” in Figure 1. This marker is visible to the consumer but transparent to the application. In the absence of this marker, the transaction is still pending, and the message is not consumed.
  • read_uncommitted reads all messages.

How Transactions Work

Figure 1

Data Flow

The components introduced with the transaction API from version 0.11.0 are the Transaction Coordinator and the Transaction Log on the right side of the diagram in Figure 1. The Transaction Coordinator is a module running within each Kafka broker. The transaction log is an internal Kafka topic. Each Transaction Coordinator has a transaction log for the partitions it leads.

The first operation done by the producer is producer.initTransactions(), with which the transactional.id is registered with the Transaction Coordinator. The transactional.id is used to identify the producer among all the instances of that producer and, together with the epoch time, allows identifying only one active producer at a time. The coordinator closes all pending transactions with that transactional.id and increases the epoch to isolate the “zombie instances.”

Each transactional.id is mapped to a specific partition of the transaction log through a simple hash function. This means that only one coordinator owns a given transactional.id. The Kafka replication protocol and leader election processes are leveraged to ensure that the transaction coordinator is always available and that all transaction states are durably stored. This achieves a transaction with a consistent state across different producer instances even if the application fails.

The transaction log stores only the last state of a transaction and not the actual messages in the transaction. The messages are stored only in the actual partitions of the topics, as we will see in the next point.

At this point, messages are sent (producer.send()), and for each message that would write to a new partition, it is recorded in the transaction log, which will contain the set of unique partitions involved in the transaction. After this operation, messages are actually written to their topics and partitions. These messages are marked as transactional to ensure the reading process (if the consumer is configured read_committed).

To commit the transaction (producer.commitTransaction()), a two-phase commit is used:

  • phase 1: a “prepare commit” message is written to the transaction log. When this message is written and replicated, transaction execution is guaranteed even if the application fails.
  • phase 2: the Transaction Coordinator writes the transaction commit markers to the partitions of the topic that are part of the transaction. The markers are an idea introduced by Chandy and Lamport about 30 years ago called the Snapshot Marker Model. These markers are read and interpreted by the consumers but are not visible to the application. At this point, the message committed is added to the transaction log, and the transaction is closed.

enable.idempotence = true

max.inflight.requests.per.connection = 1

acks = all

retries > 1 (preferably MAX_INT)

transactional.id = some unique id

these configurations guarantee Exactly once, in order, delivery per partition

isolation.level = read_committed

processing.mode = exactly_once

What Transactions Cannot Do

The main restriction of transactions is that they work only in situations where the input comes from Kafka, and the output is written to a Kafka topic. If an external service is called (e.g., via HTTP), updating a database, writing to stdout, or doing anything other than reading and writing from the Kafka broker, the transactional guarantees will not apply, and the calls may be duplicated. This is exactly how a transactional database works: the transaction works only within the database, but since Kafka is often used to connect systems, it can be confusing. In other words, Kafka transactions are not inter-system transactions like those provided by technologies implementing XA. If a transaction involved n resource managers, and Kafka was the only one not supporting XA, should be the latest to be implemented.






Share This

More To Explore

Schedule a live Demo
with our logistics


Frame 427319334

A Milkman's


Published since 2015

source of
about First &
Last Mile