During the 'consumer-transform-produce' cycle, if the consumer app needs to update an external data store (e.g. update RocksDB in Kafka streams), and that update is not idempotent (e.g. increment a counter in RocksDB), how do you make that update part of Kafka transaction?
On Wed, Nov 30, 2016 at 9:51 PM, Apurva Mehta <[email protected]> wrote: > Thanks for your comment, I updated the document. Let me know if it is clear > now. > > Apurva > > On Wed, Nov 30, 2016 at 9:42 PM, Onur Karaman < > [email protected]> > wrote: > > > @Apurva yep that's what I was trying to say. > > > > Original message: > > If there is already an entry with the AppID in the mapping, increment the > > epoch number and go on to the next step. If there is no entry with the > > AppID in the mapping, construct a PID with initialized epoch number; > append > > an AppID message into the transaction topic, insert into the mapping and > > reply with the PID / epoch / timestamp. > > > > Just wanted to make it explicit because: > > 1. The "append an AppID message..." chunk was ambiguous on whether it > > applied to the "if exists" or "if not exists" condition > > 2. I think the google doc is pretty explicit on appending to the log > > everywhere else. > > > > On Wed, Nov 30, 2016 at 9:36 PM, Apurva Mehta <[email protected]> > wrote: > > > > > The first line in step 2 of that section is: "If there is already an > > entry > > > with the AppID in the mapping, increment the epoch number and go on to > > the > > > next step." > > > > > > Are you suggesting that it be made explicit that 'increment the epoch > > > number' includes persisting the updated value to the log? > > > > > > Thanks, > > > Apurva > > > > > > On Wed, Nov 30, 2016 at 9:21 PM, Onur Karaman < > > > [email protected]> > > > wrote: > > > > > > > Nice google doc! > > > > > > > > Probably need to go over the google doc a few more times, but a minor > > > > comment from the first pass: > > > > > > > > In Transaction Coordinator Request Handling ( > > > > https://docs.google.com/document/d/11Jqy_ > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF > > > > 0wSw9ra8/edit#bookmark=id.jro89lml46du), > > > > step 2 mentions that if the Transaction Coordinator doesn't already > > see a > > > > producer with the same app-id, it creates a pid and appends (app-id, > > pid, > > > > epoch) into the transaction log. > > > > > > > > What about if the app-id/pid pair already exists and we increment the > > > > epoch? Should we append (app-id, pid, epoch++) to the transaction > log? > > I > > > > think we should, but step 2 doesn't mention this. > > > > > > > > On Wed, Nov 30, 2016 at 5:35 PM, Apurva Mehta <[email protected]> > > > wrote: > > > > > > > > > Thanks for your comments, let me deal with your second point > > regarding > > > > > merging the __consumer-offsets and transactions topic. > > > > > > > > > > Needless to say, we considered doing this, but chose to keep them > > > > separate > > > > > for the following reasons: > > > > > > > > > > 1. Your assumption that group.id and transaction.app.id can be > > the > > > > same > > > > > does not hold for streams applications. All colocated tasks of a > > > > streams > > > > > application will share the same consumer (and hence implicitly > > will > > > > have > > > > > the same group.id), but each task will have its own producer > > > > instance. > > > > > The transaction.app.id for each producer instance will still > have > > > to > > > > be > > > > > distinct. So to colocate the transaction and consumer group > > > > > coordinators, > > > > > we will have to now introduce a 'group.id' config in the > producer > > > and > > > > > require it to be the same as the consumer. This seemed like a > very > > > > > fragile > > > > > option. > > > > > 2. Following on from the above, the transaction coordinator and > > > group > > > > > coordinator would _have_ to be colocated inorder to be the > leader > > > for > > > > > the > > > > > same TopicPartition, unless we wanted to make even more > > fundamental > > > > > changes > > > > > to Kafka. > > > > > 3. We don't require that the consumer coordinator and the > > > transaction > > > > > coordinator have the same view of the current PID/Epoch pair. > If a > > > > > producer > > > > > instance is bounced, the epoch will be bumped. Any transactions > > > > > initiated > > > > > by the previous instance would either be fully committed or > fully > > > > rolled > > > > > back. Since the writes to the offset topics are just like writes > > to > > > a > > > > > regular topic, these would enjoy the same guarantees, and the > > > > > inconsistency > > > > > will be eventually resolved. > > > > > 4. Finally, every application will have consumers, and hence > > record > > > > > consumer offsets. But a very small fraction of applications > would > > > use > > > > > transactions. Blending the two topics would make recovering > > > > transaction > > > > > coordinator state unnecessarily inefficient since it has to read > > > from > > > > > the > > > > > beginning of the topic to reconstruct its data structures -- it > > > would > > > > > have > > > > > to inspect and skip a majority of the messages if the offsets > were > > > in > > > > > the > > > > > same topic. > > > > > > > > > > Thanks, > > > > > Apurva > > > > > > > > > > On Wed, Nov 30, 2016 at 4:47 PM, Neha Narkhede <[email protected]> > > > > wrote: > > > > > > > > > > > Thanks for initiating this KIP! I think it is well written and > I'm > > > > > excited > > > > > > to see the first step towards adding an important feature in > Kafka. > > > > > > > > > > > > I had a few initial thoughts on the KIP, mostly not as deeply > > thought > > > > > > through than what you've done - > > > > > > > > > > > > 1. Perhaps you’ve thought about how this would work already — > since > > > we > > > > > now > > > > > > require a producer to specify a unique AppID across different > > > instances > > > > > of > > > > > > an application, how would applications that run in the cloud use > > this > > > > > > feature with auto scaling? > > > > > > > > > > > > 2. Making it easy for applications to get exactly-once semantics > > for > > > a > > > > > > consume-process-produce workflow is a great feature to have. To > > > enable > > > > > > this, the proposal now includes letting a producer initiate a > write > > > to > > > > > the > > > > > > offset topic as well (just like consumers do). The consumer > > > coordinator > > > > > > (which could be on a different broker than the txn coordinator) > > would > > > > > then > > > > > > validate if the PID and producer epoch is valid before it writes > to > > > the > > > > > > offset topic along with the associated PID. This is a great > feature > > > > > though > > > > > > I see 2 difficulties > > > > > > > > > > > > -- This needs the consumer coordinator to have a consistent view > of > > > the > > > > > > PID/epochs that is same as the view on the txn coordinator. > > However, > > > as > > > > > the > > > > > > offset and the transaction topics are different, the 2 > coordinators > > > > might > > > > > > live on different brokers. > > > > > > -- We now also have 2 internal topics - a transaction topic and > the > > > > > > __consumer_offsets topic. > > > > > > > > > > > > Maybe you’ve thought about this already and discarded it ... let > me > > > > make > > > > > a > > > > > > somewhat crazy proposal — Why don’t we upgrade the transaction > > topic > > > to > > > > > be > > > > > > the new offsets topic as well? For consumers that want EoS > > guarantees > > > > for > > > > > > a consume-process-produce pattern, the group.id is the same as > the > > > > > > transaction.app.id set for the producer. Assume that the > > transaction > > > > > topic > > > > > > also stores consumer offsets. It stores both the transaction > > metadata > > > > > > messages as well as offset messages, both for transactional as > well > > > as > > > > > > non-transactional consumers. Since the group.id of the consumer > > and > > > > the > > > > > > app.id of the producer is the same, the offsets associated with > a > > > > > consumer > > > > > > group and topic-partition end up in the same transaction topic > > > > partition > > > > > as > > > > > > the transaction metadata messages. The transaction coordinator > and > > > the > > > > > > consumer coordinator always live on the same broker since they > both > > > map > > > > > to > > > > > > the same partition in the transaction topic. Even if there are > > > > failures, > > > > > > they end up on the same new broker. Hence, they share the same > and > > > > > > consistent view of the PIDs, epochs and App IDs, whatever it is. > > The > > > > > > consumer coordinator will skip over the transaction metadata > > messages > > > > > when > > > > > > it bootstraps the offsets from this new topic for consumer groups > > > that > > > > > are > > > > > > not involved in a transaction and don’t have a txn id associated > > with > > > > the > > > > > > offset message in the transaction topic. The consumer coordinator > > > will > > > > > > expose only committed offsets in cases of consumer groups that > are > > > > > involved > > > > > > in a txn. It will also be able to validate the > OffsetCommitRequests > > > > > coming > > > > > > from a transactional producer by ensuring that it is coming from > a > > > > valid > > > > > > PID, producer epoch since it uses the same view of this data > > created > > > by > > > > > the > > > > > > transaction coordinator (that lives on the same broker). And we > > will > > > > end > > > > > up > > > > > > with one internal topic, not too. > > > > > > > > > > > > This proposal offers better operational simplicity and fewer > > internal > > > > > > topics but there are some downsides that come with it — there > are 2 > > > > types > > > > > > of messages in one topic (txn metadata ones and offset ones). > Since > > > > this > > > > > > internal topic serves a dual purpose, it will be harder to name > it > > > and > > > > > also > > > > > > design a message format that includes the different types of > > messages > > > > > that > > > > > > will live in the topic. Though the transaction topic already > needs > > to > > > > > write > > > > > > 5 different types of messages (the AppID->PID mapping, the > BeginTxn > > > > > > message, InsertTxn, PrepareCommit, Committed/Aborted) so maybe > > adding > > > > the > > > > > > offset message isn't a big deal? > > > > > > > > > > > > Back when we introduced the offsets topic, we had discussed > making > > it > > > > > more > > > > > > general and allowing the producer to send offset commit messages > to > > > it > > > > > but > > > > > > ended up creating a specialized topic to allow the consumer > > > coordinator > > > > > to > > > > > > wall off and prevent unauthorized writes from consumers outside > of > > a > > > > > group. > > > > > > Jason can comment on the specifics but I don't believe that goal > of > > > the > > > > > new > > > > > > consumer protocol was quite achieved. > > > > > > > > > > > > I have other comments on the message format, request names etc > but > > > > wanted > > > > > > to get your thoughts on these 2 issues first :-) > > > > > > > > > > > > On Wed, Nov 30, 2016 at 2:19 PM Guozhang Wang < > [email protected]> > > > > > wrote: > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > I have just created KIP-98 to enhance Kafka with exactly once > > > > delivery > > > > > > > semantics: > > > > > > > > > > > > > > * > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging > > > > > > > < > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging > > > > > > > >* > > > > > > > > > > > > > > This KIP adds a transactional messaging mechanism along with an > > > > > > idempotent > > > > > > > producer implementation to make sure that 1) duplicated > messages > > > sent > > > > > > from > > > > > > > the same identified producer can be detected on the broker > side, > > > and > > > > > 2) a > > > > > > > group of messages sent within a transaction will atomically be > > > either > > > > > > > reflected and fetchable to consumers or not as a whole. > > > > > > > > > > > > > > The above wiki page provides a high-level view of the proposed > > > > changes > > > > > as > > > > > > > well as summarized guarantees. Initial draft of the detailed > > > > > > implementation > > > > > > > design is described in this Google doc: > > > > > > > > > > > > > > https://docs.google.com/document/d/11Jqy_ > > > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF > > > > > > > 0wSw9ra8 > > > > > > > <https://docs.google.com/document/d/11Jqy_ > > > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF > > > > > > 0wSw9ra8> > > > > > > > > > > > > > > > > > > > > > We would love to hear your comments and suggestions. > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > -- Guozhang > > > > > > > > > > > > > -- > > > > > > Thanks, > > > > > > Neha > > > > > > > > > > > > > > > > > > > > >
