Thanks Andrew for the careful read .
> (1) "Not much I can do with TransactionSession that I cannot do with
> KafkaProducer today."
There are three things Txn Session enables that KafkaProducer structurally
cannot, and which is pretty much needd:
1. Resuming a prepared transaction without reflection: Today Flink calls
FlinkKafkaInternalProducer.resumeTransaction(producerId, epoch) [1] via
reflection on TransactionManager internals ("Reflection; manually forces state
into TransactionManager internals"). TransactionSession.resume(...) replaces
that with a stable public API. KIP-939's prepareTransaction() /
completeTransaction() are exactly the operations that need this; they live on
KafkaProducer today even though they don't produce records.
2. Sharing one transaction identity across two distinct clients. A single
(producerId, epoch, state) cannot today be held by both a KafkaProducer and a
KafkaShareConsumer simultaneously, because the state is private to a
KafkaProducer instance.
3. Custom 2PC coordinators: Today this requires holding a KafkaProducer (with
its buffer pool, sender thread, serializers) purely to drive the transaction
lifecycle for a workload that produces no records.
So it is about what can Txn Session can do without forcing the user to
instantiate a KafkaProducer they don't otherwise need, or reach into private
implementations.
------
> (2) "Two variants of transactional KafkaProducer."
The KIP currently keeps both paths and labels the producer-bound methods
"convenience wrappers". We can document producer-bound methods as the
recommended API for simple produce-and-commit, Txn Session for advanced cases
later on we can have deprecation of producer bound APIs.
The number of objects in a CTP loop is in fact unchanged: TS today is
KafkaProducer.TransactionManager, just named and made public. Before KIP-1310:
one Producer, one Consumer, one (private) TransactionManager. After KIP-1310:
one Producer, one Consumer, one (public) TransactionSession. Same count, one is
now correctly named.
---
> (3) "Consumer is still unaware of transactions; with TS there are even more
>moving parts."
I am align with this and even want to keep it unbounded with both producer and
consumer.
We can treat consumer-awareness as a follow-up KIP (provisionally
"Transaction-Aware KafkaConsumer") that adds something like
KafkaConsumer.bindTransactionSession(session). After binding: the consumer
refuses to commit independently, rolls its in-memory cursor back on
abortTransaction(), and blocks poll() during commit/abort transitions.
We cannot ask KafkaConsumer to bind to "a part of KafkaProducer's internal
state".
--
Ref:
[1]
https://github.com/apache/flink-connector-kafka/blob/0caa1c9c826f93ac37fb4db454b81c1cf83b996c/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java#L300
KIP: https://cwiki.apache.org/confluence/x/nJY8G
Thanks,
Shekharrajak
On Tuesday 21 April 2026 at 03:09:41 am GMT+5:30, Andrew Schofield
<[email protected]> wrote:
Hi,
I've taken a good look at KIP-1310 and referred back to KIP-939 to refresh my
memory. Thanks for putting in the effort to write the proposal.
While I don't like the fact that the transactional logic is locked inside
KafkaProducer, it doesn't seem to me that the benefits of TransactionSession
are really that great in practice. There's not much I can do using
TransactionSession that I cannot do with KafkaProducer today. The main benefit
is that if I want to use a consumer transactionally without sending any
records, I no longer need to create a KafkaProducer. In these situations, the
KafkaProducer is really just a wrapper for the transaction manager, so you've
made that a bit clearer, but it's still very fiddly. I need to construct a
TransactionSession and then a KafkaConsumer, and use them together in a very
similar way as I have to use the KafkaProducer today.
The introduction of TransactionSession doesn't make the API any tidier. For
example, there are now two variants of transactional KafkaProducer, either one
that was instantiated with transactional.id configuration, or one that was
instantiated with a TransactionSession. With the former, you can use
KafkaProducer.commitTransaction(), while with the latter, you cannot.
The familiar usability problems of KafkaConsumer being unaware of transactions
remains, meaning that if you use KafkaProducer to send offsets to a transaction
and it goes wrong, you need to discard the KafkaConsumer because it needs to be
re-initialized from the last committed offset. With TransactionSession, there
are even more moving parts.
On reflection, it seems to me that a more straightforward way of adding EoS
support to share groups would be preferable. That is a big piece of work in
itself.
Thanks,
Andrew
On 2026/04/07 15:00:44 Shekhar Rajak wrote:
>
> Hi everyone,
>
> I’ve published KIP-1310: General Transaction Session and would like to open
> the floor for discussion.
>
> Historically, Kafka’s transaction logic has been locked inside KafkaProducer.
> This worked well for simple "write-and-commit" patterns, but as the ecosystem
> matures, this monolithic design has become a hurdle. We now have multiple
> entities e.g. Flink/External Coordinators, Share Group Consumers (KIP-1289),
> Kafka Connect—that need to participate in or complete transactions without
> needing the heavy baggage of a full KafkaProducer (record batching,
> serializers, sender threads, etc.).
>
> KIP-1310 extracts transaction identity and lifecycle management into a
> first-class TransactionSession client.
>
> Proposal:
>
> -
> New TransactionSession Class: A lightweight, thread-safe object for identity
> (producerId/epoch) and lifecycle (initialize, beginTransaction, commit).
>
> -
> Reflection-Free Recovery: Replaces reflection workarounds with a public
> TransactionSession.resume() API for external coordinators.
>
> -
> Decoupled Architecture: Allows KafkaProducer and KafkaShareConsumer to share
> a single transaction identity for atomic "Exactly-Once" Kafka-to-Kafka
> pipelines (KIP-1302).
>
> -
> Automatic Heartbeats: Provides a dedicated home for the background
> transaction heartbeat (KAFKA-20381), independent of the producer's
> data-sending loop.
>
>
> This change is fully backward compatible and introduces no new wire protocol
> changes—it simply aligns the client API with what the Kafka protocol already
> supports.
>
> I’m looking forward to your feedback on the proposed interfaces and the
> refactoring of TransactionManager.
> KIP Link: https://cwiki.apache.org/confluence/x/nJY8G
> Best regards,Shekharhttps://github.com/Shekharrajak
>
>
>