During the 'consumer-transform-produce' cycle, if the consumer app needs to
update an external data store (e.g. update RocksDB in Kafka streams), and
that update is not idempotent (e.g. increment a counter in RocksDB), how do
you make that update part of Kafka transaction?


On Wed, Nov 30, 2016 at 9:51 PM, Apurva Mehta <apu...@confluent.io> wrote:

> Thanks for your comment, I updated the document. Let me know if it is clear
> now.
>
> Apurva
>
> On Wed, Nov 30, 2016 at 9:42 PM, Onur Karaman <
> onurkaraman.apa...@gmail.com>
> wrote:
>
> > @Apurva yep that's what I was trying to say.
> >
> > Original message:
> > If there is already an entry with the AppID in the mapping, increment the
> > epoch number and go on to the next step. If there is no entry with the
> > AppID in the mapping, construct a PID with initialized epoch number;
> append
> > an AppID message into the transaction topic, insert into the mapping and
> > reply with the PID / epoch / timestamp.
> >
> > Just wanted to make it explicit because:
> > 1. The "append an AppID message..." chunk was ambiguous on whether it
> > applied to the "if exists" or "if not exists" condition
> > 2. I think the google doc is pretty explicit on appending to the log
> > everywhere else.
> >
> > On Wed, Nov 30, 2016 at 9:36 PM, Apurva Mehta <apu...@confluent.io>
> wrote:
> >
> > > The first line in step 2 of that section is: "If there is already an
> > entry
> > > with the AppID in the mapping, increment the epoch number and go on to
> > the
> > > next step."
> > >
> > > Are you suggesting that it be made explicit that 'increment the epoch
> > > number' includes persisting the updated value to the log?
> > >
> > > Thanks,
> > > Apurva
> > >
> > > On Wed, Nov 30, 2016 at 9:21 PM, Onur Karaman <
> > > onurkaraman.apa...@gmail.com>
> > > wrote:
> > >
> > > > Nice google doc!
> > > >
> > > > Probably need to go over the google doc a few more times, but a minor
> > > > comment from the first pass:
> > > >
> > > > In Transaction Coordinator Request Handling (
> > > > https://docs.google.com/document/d/11Jqy_
> > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> > > > 0wSw9ra8/edit#bookmark=id.jro89lml46du),
> > > > step 2 mentions that if the Transaction Coordinator doesn't already
> > see a
> > > > producer with the same app-id, it creates a pid and appends (app-id,
> > pid,
> > > > epoch) into the transaction log.
> > > >
> > > > What about if the app-id/pid pair already exists and we increment the
> > > > epoch? Should we append (app-id, pid, epoch++) to the transaction
> log?
> > I
> > > > think we should, but step 2 doesn't mention this.
> > > >
> > > > On Wed, Nov 30, 2016 at 5:35 PM, Apurva Mehta <apu...@confluent.io>
> > > wrote:
> > > >
> > > > > Thanks for your comments, let me deal with your second point
> > regarding
> > > > > merging the __consumer-offsets and transactions topic.
> > > > >
> > > > > Needless to say, we considered doing this, but chose to keep them
> > > > separate
> > > > > for the following reasons:
> > > > >
> > > > >    1. Your assumption that group.id and transaction.app.id can be
> > the
> > > > same
> > > > >    does not hold for streams applications. All colocated tasks of a
> > > > streams
> > > > >    application will share the same consumer (and hence implicitly
> > will
> > > > have
> > > > >    the same group.id), but each task will have its own producer
> > > > instance.
> > > > >    The transaction.app.id for each producer instance will still
> have
> > > to
> > > > be
> > > > >    distinct. So to colocate the transaction and consumer group
> > > > > coordinators,
> > > > >    we will have to now introduce a 'group.id' config in the
> producer
> > > and
> > > > >    require it to be the same as the consumer. This seemed like a
> very
> > > > > fragile
> > > > >    option.
> > > > >    2. Following on from the above, the transaction coordinator and
> > > group
> > > > >    coordinator would _have_ to be colocated inorder to be the
> leader
> > > for
> > > > > the
> > > > >    same TopicPartition, unless we wanted to make even more
> > fundamental
> > > > > changes
> > > > >    to Kafka.
> > > > >    3. We don't require that the consumer coordinator and the
> > > transaction
> > > > >    coordinator have the same view of the current PID/Epoch pair.
> If a
> > > > > producer
> > > > >    instance is bounced, the epoch will be bumped. Any transactions
> > > > > initiated
> > > > >    by the previous instance would either be fully committed or
> fully
> > > > rolled
> > > > >    back. Since the writes to the offset topics are just like writes
> > to
> > > a
> > > > >    regular topic, these would enjoy the same guarantees, and the
> > > > > inconsistency
> > > > >    will be eventually resolved.
> > > > >    4. Finally, every application will have consumers, and hence
> > record
> > > > >    consumer offsets. But a very small fraction of applications
> would
> > > use
> > > > >    transactions. Blending the two topics would make recovering
> > > > transaction
> > > > >    coordinator state unnecessarily inefficient since it has to read
> > > from
> > > > > the
> > > > >    beginning of the topic to reconstruct its data structures -- it
> > > would
> > > > > have
> > > > >    to inspect and skip a majority of the messages if the offsets
> were
> > > in
> > > > > the
> > > > >    same topic.
> > > > >
> > > > > Thanks,
> > > > > Apurva
> > > > >
> > > > > On Wed, Nov 30, 2016 at 4:47 PM, Neha Narkhede <n...@confluent.io>
> > > > wrote:
> > > > >
> > > > > > Thanks for initiating this KIP! I think it is well written and
> I'm
> > > > > excited
> > > > > > to see the first step towards adding an important feature in
> Kafka.
> > > > > >
> > > > > > I had a few initial thoughts on the KIP, mostly not as deeply
> > thought
> > > > > > through than what you've done -
> > > > > >
> > > > > > 1. Perhaps you’ve thought about how this would work already —
> since
> > > we
> > > > > now
> > > > > > require a producer to specify a unique AppID across different
> > > instances
> > > > > of
> > > > > > an application, how would applications that run in the cloud use
> > this
> > > > > > feature with auto scaling?
> > > > > >
> > > > > > 2. Making it easy for applications to get exactly-once semantics
> > for
> > > a
> > > > > > consume-process-produce workflow is a great feature to have. To
> > > enable
> > > > > > this, the proposal now includes letting a producer initiate a
> write
> > > to
> > > > > the
> > > > > > offset topic as well (just like consumers do). The consumer
> > > coordinator
> > > > > > (which could be on a different broker than the txn coordinator)
> > would
> > > > > then
> > > > > > validate if the PID and producer epoch is valid before it writes
> to
> > > the
> > > > > > offset topic along with the associated PID. This is a great
> feature
> > > > > though
> > > > > > I see 2 difficulties
> > > > > >
> > > > > > -- This needs the consumer coordinator to have a consistent view
> of
> > > the
> > > > > > PID/epochs that is same as the view on the txn coordinator.
> > However,
> > > as
> > > > > the
> > > > > > offset and the transaction topics are different, the 2
> coordinators
> > > > might
> > > > > > live on different brokers.
> > > > > > -- We now also have 2 internal topics - a transaction topic and
> the
> > > > > > __consumer_offsets topic.
> > > > > >
> > > > > > Maybe you’ve thought about this already and discarded it ... let
> me
> > > > make
> > > > > a
> > > > > > somewhat crazy proposal — Why don’t we upgrade the transaction
> > topic
> > > to
> > > > > be
> > > > > > the new offsets topic as well? For consumers that want EoS
> > guarantees
> > > > for
> > > > > > a consume-process-produce pattern, the group.id is the same as
> the
> > > > > > transaction.app.id set for the producer. Assume that the
> > transaction
> > > > > topic
> > > > > > also stores consumer offsets. It stores both the transaction
> > metadata
> > > > > > messages as well as offset messages, both for transactional as
> well
> > > as
> > > > > > non-transactional consumers. Since the group.id of the consumer
> > and
> > > > the
> > > > > > app.id of the producer is the same, the offsets associated with
> a
> > > > > consumer
> > > > > > group and topic-partition end up in the same transaction topic
> > > > partition
> > > > > as
> > > > > > the transaction metadata messages. The transaction coordinator
> and
> > > the
> > > > > > consumer coordinator always live on the same broker since they
> both
> > > map
> > > > > to
> > > > > > the same partition in the transaction topic. Even if there are
> > > > failures,
> > > > > > they end up on the same new broker. Hence, they share the same
> and
> > > > > > consistent view of the PIDs, epochs and App IDs, whatever it is.
> > The
> > > > > > consumer coordinator will skip over the transaction metadata
> > messages
> > > > > when
> > > > > > it bootstraps the offsets from this new topic for consumer groups
> > > that
> > > > > are
> > > > > > not involved in a transaction and don’t have a txn id associated
> > with
> > > > the
> > > > > > offset message in the transaction topic. The consumer coordinator
> > > will
> > > > > > expose only committed offsets in cases of consumer groups that
> are
> > > > > involved
> > > > > > in a txn. It will also be able to validate the
> OffsetCommitRequests
> > > > > coming
> > > > > > from a transactional producer by ensuring that it is coming from
> a
> > > > valid
> > > > > > PID, producer epoch since it uses the same view of this data
> > created
> > > by
> > > > > the
> > > > > > transaction coordinator (that lives on the same broker). And we
> > will
> > > > end
> > > > > up
> > > > > > with one internal topic, not too.
> > > > > >
> > > > > > This proposal offers better operational simplicity and fewer
> > internal
> > > > > > topics but there are some downsides that come with it — there
> are 2
> > > > types
> > > > > > of messages in one topic (txn metadata ones and offset ones).
> Since
> > > > this
> > > > > > internal topic serves a dual purpose, it will be harder to name
> it
> > > and
> > > > > also
> > > > > > design a message format that includes the different types of
> > messages
> > > > > that
> > > > > > will live in the topic. Though the transaction topic already
> needs
> > to
> > > > > write
> > > > > > 5 different types of messages (the AppID->PID mapping, the
> BeginTxn
> > > > > > message, InsertTxn, PrepareCommit, Committed/Aborted) so maybe
> > adding
> > > > the
> > > > > > offset message isn't a big deal?
> > > > > >
> > > > > > Back when we introduced the offsets topic, we had discussed
> making
> > it
> > > > > more
> > > > > > general and allowing the producer to send offset commit messages
> to
> > > it
> > > > > but
> > > > > > ended up creating a specialized topic to allow the consumer
> > > coordinator
> > > > > to
> > > > > > wall off and prevent unauthorized writes from consumers outside
> of
> > a
> > > > > group.
> > > > > > Jason can comment on the specifics but I don't believe that goal
> of
> > > the
> > > > > new
> > > > > > consumer protocol was quite achieved.
> > > > > >
> > > > > > I have other comments on the message format, request names etc
> but
> > > > wanted
> > > > > > to get your thoughts on these 2 issues first :-)
> > > > > >
> > > > > > On Wed, Nov 30, 2016 at 2:19 PM Guozhang Wang <
> wangg...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I have just created KIP-98 to enhance Kafka with exactly once
> > > > delivery
> > > > > > > semantics:
> > > > > > >
> > > > > > > *
> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > > > > > <
> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > > > > > >*
> > > > > > >
> > > > > > > This KIP adds a transactional messaging mechanism along with an
> > > > > > idempotent
> > > > > > > producer implementation to make sure that 1) duplicated
> messages
> > > sent
> > > > > > from
> > > > > > > the same identified producer can be detected on the broker
> side,
> > > and
> > > > > 2) a
> > > > > > > group of messages sent within a transaction will atomically be
> > > either
> > > > > > > reflected and fetchable to consumers or not as a whole.
> > > > > > >
> > > > > > > The above wiki page provides a high-level view of the proposed
> > > > changes
> > > > > as
> > > > > > > well as summarized guarantees. Initial draft of the detailed
> > > > > > implementation
> > > > > > > design is described in this Google doc:
> > > > > > >
> > > > > > > https://docs.google.com/document/d/11Jqy_
> > > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> > > > > > > 0wSw9ra8
> > > > > > > <https://docs.google.com/document/d/11Jqy_
> > > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> > > > > > 0wSw9ra8>
> > > > > > >
> > > > > > >
> > > > > > > We would love to hear your comments and suggestions.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > -- Guozhang
> > > > > > >
> > > > > > --
> > > > > > Thanks,
> > > > > > Neha
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to