Nice google doc!

Probably need to go over the google doc a few more times, but a minor
comment from the first pass:

In Transaction Coordinator Request Handling (,
step 2 mentions that if the Transaction Coordinator doesn't already see a
producer with the same app-id, it creates a pid and appends (app-id, pid,
epoch) into the transaction log.

What about if the app-id/pid pair already exists and we increment the
epoch? Should we append (app-id, pid, epoch++) to the transaction log? I
think we should, but step 2 doesn't mention this.

On Wed, Nov 30, 2016 at 5:35 PM, Apurva Mehta <> wrote:

> Thanks for your comments, let me deal with your second point regarding
> merging the __consumer-offsets and transactions topic.
> Needless to say, we considered doing this, but chose to keep them separate
> for the following reasons:
>    1. Your assumption that and can be the same
>    does not hold for streams applications. All colocated tasks of a streams
>    application will share the same consumer (and hence implicitly will have
>    the same, but each task will have its own producer instance.
>    The for each producer instance will still have to be
>    distinct. So to colocate the transaction and consumer group
> coordinators,
>    we will have to now introduce a '' config in the producer and
>    require it to be the same as the consumer. This seemed like a very
> fragile
>    option.
>    2. Following on from the above, the transaction coordinator and group
>    coordinator would _have_ to be colocated inorder to be the leader for
> the
>    same TopicPartition, unless we wanted to make even more fundamental
> changes
>    to Kafka.
>    3. We don't require that the consumer coordinator and the transaction
>    coordinator have the same view of the current PID/Epoch pair. If a
> producer
>    instance is bounced, the epoch will be bumped. Any transactions
> initiated
>    by the previous instance would either be fully committed or fully rolled
>    back. Since the writes to the offset topics are just like writes to a
>    regular topic, these would enjoy the same guarantees, and the
> inconsistency
>    will be eventually resolved.
>    4. Finally, every application will have consumers, and hence record
>    consumer offsets. But a very small fraction of applications would use
>    transactions. Blending the two topics would make recovering transaction
>    coordinator state unnecessarily inefficient since it has to read from
> the
>    beginning of the topic to reconstruct its data structures -- it would
> have
>    to inspect and skip a majority of the messages if the offsets were in
> the
>    same topic.
> Thanks,
> Apurva
> On Wed, Nov 30, 2016 at 4:47 PM, Neha Narkhede <> wrote:
> > Thanks for initiating this KIP! I think it is well written and I'm
> excited
> > to see the first step towards adding an important feature in Kafka.
> >
> > I had a few initial thoughts on the KIP, mostly not as deeply thought
> > through than what you've done -
> >
> > 1. Perhaps you’ve thought about how this would work already — since we
> now
> > require a producer to specify a unique AppID across different instances
> of
> > an application, how would applications that run in the cloud use this
> > feature with auto scaling?
> >
> > 2. Making it easy for applications to get exactly-once semantics for a
> > consume-process-produce workflow is a great feature to have. To enable
> > this, the proposal now includes letting a producer initiate a write to
> the
> > offset topic as well (just like consumers do). The consumer coordinator
> > (which could be on a different broker than the txn coordinator) would
> then
> > validate if the PID and producer epoch is valid before it writes to the
> > offset topic along with the associated PID. This is a great feature
> though
> > I see 2 difficulties
> >
> > -- This needs the consumer coordinator to have a consistent view of the
> > PID/epochs that is same as the view on the txn coordinator. However, as
> the
> > offset and the transaction topics are different, the 2 coordinators might
> > live on different brokers.
> > -- We now also have 2 internal topics - a transaction topic and the
> > __consumer_offsets topic.
> >
> > Maybe you’ve thought about this already and discarded it ... let me make
> a
> > somewhat crazy proposal — Why don’t we upgrade the transaction topic to
> be
> > the new offsets topic as well? For consumers that want EoS guarantees for
> > a consume-process-produce pattern, the is the same as the
> > set for the producer. Assume that the transaction
> topic
> > also stores consumer offsets. It stores both the transaction metadata
> > messages as well as offset messages, both for transactional as well as
> > non-transactional consumers. Since the of the consumer and the
> > of the producer is the same, the offsets associated with a
> consumer
> > group and topic-partition end up in the same transaction topic partition
> as
> > the transaction metadata messages. The transaction coordinator and the
> > consumer coordinator always live on the same broker since they both map
> to
> > the same partition in the transaction topic. Even if there are failures,
> > they end up on the same new broker. Hence, they share the same and
> > consistent view of the PIDs, epochs and App IDs, whatever it is. The
> > consumer coordinator will skip over the transaction metadata messages
> when
> > it bootstraps the offsets from this new topic for consumer groups that
> are
> > not involved in a transaction and don’t have a txn id associated with the
> > offset message in the transaction topic. The consumer coordinator will
> > expose only committed offsets in cases of consumer groups that are
> involved
> > in a txn. It will also be able to validate the OffsetCommitRequests
> coming
> > from a transactional producer by ensuring that it is coming from a valid
> > PID, producer epoch since it uses the same view of this data created by
> the
> > transaction coordinator (that lives on the same broker). And we will end
> up
> > with one internal topic, not too.
> >
> > This proposal offers better operational simplicity and fewer internal
> > topics but there are some downsides that come with it — there are 2 types
> > of messages in one topic (txn metadata ones and offset ones). Since this
> > internal topic serves a dual purpose, it will be harder to name it and
> also
> > design a message format that includes the different types of messages
> that
> > will live in the topic. Though the transaction topic already needs to
> write
> > 5 different types of messages (the AppID->PID mapping, the BeginTxn
> > message, InsertTxn, PrepareCommit, Committed/Aborted) so maybe adding the
> > offset message isn't a big deal?
> >
> > Back when we introduced the offsets topic, we had discussed making it
> more
> > general and allowing the producer to send offset commit messages to it
> but
> > ended up creating a specialized topic to allow the consumer coordinator
> to
> > wall off and prevent unauthorized writes from consumers outside of a
> group.
> > Jason can comment on the specifics but I don't believe that goal of the
> new
> > consumer protocol was quite achieved.
> >
> > I have other comments on the message format, request names etc but wanted
> > to get your thoughts on these 2 issues first :-)
> >
> > On Wed, Nov 30, 2016 at 2:19 PM Guozhang Wang <>
> wrote:
> >
> > > Hi all,
> > >
> > > I have just created KIP-98 to enhance Kafka with exactly once delivery
> > > semantics:
> > >
> > > *
> > >
> > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > <
> > >
> > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > >*
> > >
> > > This KIP adds a transactional messaging mechanism along with an
> > idempotent
> > > producer implementation to make sure that 1) duplicated messages sent
> > from
> > > the same identified producer can be detected on the broker side, and
> 2) a
> > > group of messages sent within a transaction will atomically be either
> > > reflected and fetchable to consumers or not as a whole.
> > >
> > > The above wiki page provides a high-level view of the proposed changes
> as
> > > well as summarized guarantees. Initial draft of the detailed
> > implementation
> > > design is described in this Google doc:
> > >
> > >
> > > 0wSw9ra8
> > > <
> > 0wSw9ra8>
> > >
> > >
> > > We would love to hear your comments and suggestions.
> > >
> > > Thanks,
> > >
> > > -- Guozhang
> > >
> > --
> > Thanks,
> > Neha
> >

