Cool. It sounds like you guys will sync up and come up with a specific proposal. I think point (3) does require full replication of the pre-commit transaction, but I'm not sure, and I would be very happy to learn otherwise. That was actually the blocker on that alternate proposal. From my point of view 2x overhead is kind of a deal breaker since it makes correctness so expensive you'd have to think very hard before turning it on, but if there is a way to do it with less and there aren't too many other negative side effects that would be very appealing. I think we can also dive a bit into why we are so perf and latency sensitive as it relates to the stream processing use cases...I'm not sure how much of that is obvious from the proposal.
-Jay On Tue, Dec 20, 2016 at 9:11 AM, Joel Koshy <jjkosh...@gmail.com> wrote: > Just got some time to go through most of this thread and KIP - great to see > this materialize and discussed!! > I will add more comments in the coming days on some of the other "tracks" > in this thread; but since Radai brought up the double-journaling approach > that we had discussed I thought I would move over some content from > our internal > wiki on double-journalling > <https://cwiki.apache.org/confluence/display/KAFKA/ > Double+journaling+with+local+data+copy> > It is thin on details with a few invalid statements because I don't think > we dwelt long enough on it - it was cast aside as being too expensive from > a storage and latency perspective. As the immediately preceding emails > state, I tend to agree that those are compelling enough reasons to take a > hit in complexity/increased memory usage in the consumer. Anyway, couple of > us at LinkedIn can spend some time today brainstorming a little more on > this today. > > 1. on write amplification: i dont see x6 the writes, at worst i see x2 the > > writes - once to the "tx log", then read and again to the destination > > partition. if you have some != 1 replication factor than both the 1st and > > the 2nd writes get replicated, but it is still a relative factor of x2. > > what am I missing? > > > > I think that's right - it would be six total copies if we are doing RF 3. > > > > 3. why do writes to a TX need the same guarantees as "plain" writes? in > > cases where the user can live with a TX rollback on change of > > leadership/broker crash the TX log can be unreplicated, and even live in > > the leader's memory. that would cut down on writes. this is also an > > acceptable default in SQL - if your socket connection to a DB dies mid-TX > > your TX is toast (mysql is even worse) > > > > I may have misunderstood - while the above may be true for transactions > in-flight, it definitely needs the same guarantees at the point of commit > and the straightforward way to achieve that is to rely on the same > guarantees while the transaction is in flight. > > 4. even if we replicate the TX log, why do we need to re-read it and > > re-write it to the underlying partition? if its already written to disk > all > > I would need is to make that file the current segment of the "real" > > partition and i've avoided the double write (at the cost of complicating > > segment management). if the data is replicated fetchers could do the > same. > > > > I think we had considered the above as well - i.e., if you abstract the > partition's segments into segments that contain non-transactional messages > and those that contain transactional messages then it should be possible to > jump from one to the other and back. It does add quite a bit of complexity > though and you still need to do buffering on reads so the upside perhaps > isn't worth the effort. I'm not convinced about that though - i.e., may > help to spend more time thinking this one through. > > > > 5. on latency - youre right, what im suggesting would result in tx > ordering > > of messages ,"read committed" semantics and therefore higher latency. > > > *"read committed"* only if you do the copy back to actual log. If you don't > do that (your point 4) then I think you still need to do buffering to > achieve read-committed semantics. > > > > > 6. the added delay (vs your read uncommitted) would be roughly the time > > span of a TX. > > > I think it would be significantly less given that this is local copying. > > > > > > > > > On Mon, Dec 19, 2016 at 3:15 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > One more thing about the double journal proposal: when discussing about > > > this method back at LinkedIn, another raised issue besides double > writing > > > was that it will void the offset ordering and enforce people to accept > > > "transaction ordering", that is, consumer will not see messages from > the > > > same partition in the order where they were produced, but only in the > > order > > > of when the corresponding transaction was committed. For some > scenarios, > > we > > > believe that offset ordering would still be preferred than transaction > > > ordering and that is why in KIP-98 proposal we default to the former > > while > > > leave the door open if users want to switch to the latter case. > > > > > > > > > Guozhang > > > > > > On Mon, Dec 19, 2016 at 10:56 AM, Jay Kreps <j...@confluent.io> wrote: > > > > > > > Hey Radai, > > > > > > > > I'm not sure if I fully understand what you are proposing, but I > > > > interpreted it to be similar to a proposal we worked through back at > > > > LinkedIn. The proposal was to commit to a central txlog topic, and > then > > > > recopy to the destination topic upon transaction commit. The > > observation > > > on > > > > that approach at the time were the following: > > > > > > > > 1. It is cleaner since the output topics have only committed data! > > > > 2. You need full replication on the txlog topic to ensure > atomicity. > > > We > > > > weren't able to come up with a solution where you buffer in memory > > or > > > > use > > > > renaming tricks the way you are describing. The reason is that > once > > > you > > > > begin committing you must ensure that the commit eventually > succeeds > > > to > > > > guarantee atomicity. If you use a transient store you might commit > > > some > > > > data and then have a server failure that causes you to lose the > rest > > > of > > > > the > > > > transaction. > > > > 3. Having a single log allows the reader to choose a "read > > > uncommitted" > > > > mode that hands out messages immediately. This is important for > > cases > > > > where > > > > latency is important, especially for stream processing topologies > > > where > > > > these latencies stack up across multiple stages. > > > > > > > > For the stream processing use case, item (2) is a bit of a deal > killer. > > > > This takes the cost of a transient message write (say the > intermediate > > > > result of a stream processing topology) from 3x writes (assuming 3x > > > > replication) to 6x writes. This means you basically can't default it > > on. > > > If > > > > we can in fact get the cost down to a single buffered write (i.e. 1x > > the > > > > data is written to memory and buffered to disk if the transaction is > > > large) > > > > as in the KIP-98 proposal without too many other negative side > effects > > I > > > > think that could be compelling. > > > > > > > > -Jay > > > > > > > > > > > > > > > > On Mon, Dec 19, 2016 at 9:36 AM, radai <radai.rosenbl...@gmail.com> > > > wrote: > > > > > > > > > regarding efficiency: > > > > > > > > > > I'd like to distinguish between server efficiency (resource > > utilization > > > > of > > > > > the broker machine alone) and overall network efficiency (resource > > > > > utilization on brokers, producers and consumers, including network > > > > > traffic). > > > > > my proposal is not as resource-efficient on the broker (although it > > can > > > > be, > > > > > depends on a few trade offs and implementation details). HOWEVER, > if > > i > > > > look > > > > > at the overall efficiency: > > > > > > > > > > 1.clients would need to either buffer or double-read uncommitted > > > msgs. > > > > > for N clients reading the stream M times (after re-starts and > > > reconsumes) > > > > > this would mean a M*N factor in either network BW or disk/memory > > space > > > > > (depends on if buffer vs re-read). potentially N*M more broker-side > > > reads > > > > > too. > > > > > 2 to reduce the broker side cost several things can be done > (this > > is > > > > not > > > > > an either-or list, these are commulative): > > > > > 2.1 - keep TX logs in mem (+overflow to disk) - trades disk > > > writes > > > > > for TX resiliency > > > > > 2.2 - when "appending" TX logs to real partitions - instead > of > > > > > reading from (disk-based) TX log and writing to partition log (x2 > > disk > > > > > writes) the TX log can be made a segment file (so file rename, with > > > > > associated protocol changes). this would avoid double writing by > > simply > > > > > making the TX file part of the partition (for large enough TXs. > > smaller > > > > > ones can be rewritten). > > > > > 2.3 - the approach above could be combined with a background > > > > "defrag" > > > > > - similar in concept to compaction - to further reduce the total of > > > > > resulting number of files. > > > > > > > > > > I think my main issue with the current proposal, more important > than > > > > > performance, is lack of proper "encapsulation" of transactions - I > > dont > > > > > think downstream consumers should see uncommitted msgs. ever. > > > > > > > > > > > > > > > On Sun, Dec 18, 2016 at 10:19 PM, Becket Qin <becket....@gmail.com > > > > > > wrote: > > > > > > > > > > > @Jason > > > > > > > > > > > > Yes, second thought on the number of messages included, the > offset > > > > delta > > > > > > will probably be sufficient. The use case I encounter before for > > > number > > > > > of > > > > > > messages in a message set is an embedded mirror maker on the > > > > destination > > > > > > broker side which fetches message directly from the source > cluster. > > > > > Ideally > > > > > > the destination cluster only needs to check CRC and assign the > > > offsets > > > > > > because all the message verification has been done by the source > > > > cluster, > > > > > > but due to the lack of the number of messages in the message set, > > we > > > > have > > > > > > to decompress the message set to increment offsets correctly. By > > > > knowing > > > > > > the number of the messages in the message set, we can avoid doing > > > that. > > > > > The > > > > > > offset delta will also help. It's just then the offsets may have > > > holes > > > > > for > > > > > > log compacted topics, but that may be fine. > > > > > > > > > > > > @Apurva > > > > > > > > > > > > I am not sure if it is true that the consumer will either deliver > > all > > > > the > > > > > > message for the entire transaction or none of them from one > poll() > > > > call. > > > > > If > > > > > > we allow the transactions to be across partitions, unless the > > > consumer > > > > > > consumes from all the partitions involved in a transactions, it > > seems > > > > > > impossible for it to deliver *all* the messages in a transaction, > > > > right? > > > > > A > > > > > > weaker guarantee is we will deliver all or none of the messages > > that > > > > > belong > > > > > > to the same transaction in ONE partition, but this would be > > different > > > > > from > > > > > > the guarantee from the producer side. > > > > > > > > > > > > My two cents on Radai's sideways partition design: > > > > > > 1. If we consider the producer side behavior as doing a two phase > > > > commit > > > > > > which including the committing the consumer offsets, it is a > little > > > > > awkward > > > > > > that we allow uncommitted message goes into the main log and rely > > on > > > > the > > > > > > consumer to filter out. So semantic wise I think it would be > better > > > if > > > > we > > > > > > can avoid this. Radai's suggestion is actually intuitive because > if > > > the > > > > > > brokers do not want to expose uncommitted transactions to the > > > consumer, > > > > > the > > > > > > brokers have to buffer it. > > > > > > > > > > > > 2. Regarding the efficiency. I think may be it worth looking at > the > > > > > > efficiency cost v.s benefit. The efficiency includes both server > > side > > > > > > efficiency and consumer side efficiency. > > > > > > > > > > > > Regarding the server side efficiency, the current proposal would > > > > probably > > > > > > have better efficiency regardless of whether something goes > wrong. > > > > > Radai's > > > > > > suggestion would put more burden on the server side. If nothing > > goes > > > > > wrong > > > > > > we always pay the cost of having double copy of the transactional > > > > > messages > > > > > > and do not get the semantic benefit. But if something goes wrong, > > the > > > > > > efficiency cost we pay we get us a better semantic. > > > > > > > > > > > > For the consumer side efficiency, because there is no need to > > buffer > > > > the > > > > > > uncommitted messages. The current proposal may have to > potentially > > > > buffer > > > > > > uncommitted messages so it would be less efficient than Radai's > > > > > suggestion > > > > > > when a transaction aborts. When everything goes well, both design > > > seems > > > > > > having the similar performance. However, it depends on whether we > > are > > > > > > willing to loosen the consumer side transaction guarantee that I > > > > > mentioned > > > > > > earlier to Apurva. > > > > > > > > > > > > Currently the biggest pressure on the consumer side is that it > has > > to > > > > > > buffer incomplete transactions. There are two reasons for it, > > > > > > A. A transaction may be aborted so we cannot expose the messages > to > > > the > > > > > > users. > > > > > > B. We want to return all or none of the messages in a transaction > > in > > > > ONE > > > > > > partition. > > > > > > > > > > > > While reason A is mandatory, I think reason B may be discussable. > > > > Radai's > > > > > > design actually removes reason A because there is no uncommitted > > > > messages > > > > > > exposed to the consumers. This may potentially give us a chance > to > > > > > > significantly improve consumer side efficiency in normal cases. > It > > > > again > > > > > > depends on the use case, i.e. whether user can process a > > transaction > > > > > > progressively (message by message) or it has to be buffered and > > > > returned > > > > > > all together. If in most cases, users can process the > transactions > > > > > message > > > > > > by message (most stream processing tasks probably can do so), > then > > > with > > > > > > Radai's proposal we don't need to buffer the transactions for the > > > users > > > > > > anymore, which is a big difference. For the latter case, the > > consumer > > > > may > > > > > > have to buffer the incomplete transactions otherwise we are just > > > > throwing > > > > > > the burden onto the users. > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > On Fri, Dec 16, 2016 at 4:56 PM, Jay Kreps <j...@confluent.io> > > wrote: > > > > > > > > > > > > > Yeah good point. I relent! > > > > > > > > > > > > > > -jay > > > > > > > > > > > > > > On Fri, Dec 16, 2016 at 1:46 PM Jason Gustafson < > > > ja...@confluent.io> > > > > > > > wrote: > > > > > > > > > > > > > > > Jay/Ismael, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I agree that lazy initialization of metadata seems > unavoidable. > > > > > > Ideally, > > > > > > > we > > > > > > > > > > > > > > > > could follow the same pattern for transactions, but remember > > that > > > > in > > > > > > the > > > > > > > > > > > > > > > > consumer+producer use case, the initialization needs to be > > > > completed > > > > > > > prior > > > > > > > > > > > > > > > > to setting the consumer's position. Otherwise we risk reading > > > stale > > > > > > > > > > > > > > > > offsets. But it would be pretty awkward if you have to begin > a > > > > > > > transaction > > > > > > > > > > > > > > > > first to ensure that your consumer can read the right offset > > from > > > > the > > > > > > > > > > > > > > > > consumer, right? It's a bit easier to explain that you should > > > > always > > > > > > call > > > > > > > > > > > > > > > > `producer.init()` prior to initializing the consumer. Users > > would > > > > > > > probably > > > > > > > > > > > > > > > > get this right without any special effort. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -Jason > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Dec 14, 2016 at 1:52 AM, Rajini Sivaram < > > > > rsiva...@pivotal.io > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Apurva, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thank you for the answers. Just one follow-on. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 15. Let me rephrase my original question. If all control > > > messages > > > > > > > > (messages > > > > > > > > > > > > > > > > > to transaction logs and markers on user logs) were > > acknowledged > > > > > only > > > > > > > > after > > > > > > > > > > > > > > > > > flushing the log segment, will transactions become durable > in > > > the > > > > > > > > > > > > > > > > > traditional sense (i.e. not restricted to > min.insync.replicas > > > > > > > failures) ? > > > > > > > > > > > > > > > > > This is not a suggestion to update the KIP. It seems to me > > that > > > > the > > > > > > > > design > > > > > > > > > > > > > > > > > enables full durability if required in the future with a > > rather > > > > > > > > > > > > > > > > > non-intrusive change. I just wanted to make sure I haven't > > > missed > > > > > > > > anything > > > > > > > > > > > > > > > > > fundamental that prevents Kafka from doing this. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Dec 14, 2016 at 5:30 AM, Ben Kirwin <b...@kirw.in> > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Apurva, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the detailed answers... and sorry for the late > > > > reply! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It does sound like, if the input-partitions-to-app-id > > mapping > > > > > never > > > > > > > > > > > > > > > > > > changes, the existing fencing mechanisms should prevent > > > > > duplicates. > > > > > > > > > > > > > > > > > Great! > > > > > > > > > > > > > > > > > > I'm a bit concerned the proposed API will be delicate to > > > > program > > > > > > > > against > > > > > > > > > > > > > > > > > > successfully -- even in the simple case, we need to > create > > a > > > > new > > > > > > > > producer > > > > > > > > > > > > > > > > > > instance per input partition, and anything fancier is > going > > > to > > > > > need > > > > > > > its > > > > > > > > > > > > > > > > > own > > > > > > > > > > > > > > > > > > implementation of the Streams/Samza-style 'task' idea -- > > but > > > > that > > > > > > may > > > > > > > > be > > > > > > > > > > > > > > > > > > fine for this sort of advanced feature. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For the second question, I notice that Jason also > > elaborated > > > on > > > > > > this > > > > > > > > > > > > > > > > > > downthread: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We also looked at removing the producer ID. > > > > > > > > > > > > > > > > > > > This was discussed somewhere above, but basically the > > idea > > > is > > > > > to > > > > > > > > store > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > AppID in the message set header directly and avoid the > > > > mapping > > > > > to > > > > > > > > > > > > > > > > > > producer > > > > > > > > > > > > > > > > > > > ID altogether. As long as batching isn't too bad, the > > > impact > > > > on > > > > > > > total > > > > > > > > > > > > > > > > > > size > > > > > > > > > > > > > > > > > > > may not be too bad, but we were ultimately more > > comfortable > > > > > with > > > > > > a > > > > > > > > > > > > > > > > > fixed > > > > > > > > > > > > > > > > > > > size ID. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > ...which suggests that the distinction is useful for > > > > performance, > > > > > > but > > > > > > > > not > > > > > > > > > > > > > > > > > > necessary for correctness, which makes good sense to me. > > > > (Would a > > > > > > > > 128-bid > > > > > > > > > > > > > > > > > > ID be a reasonable compromise? That's enough room for a > > UUID, > > > > or > > > > > a > > > > > > > > > > > > > > > > > > reasonable hash of an arbitrary string, and has only a > > > marginal > > > > > > > > increase > > > > > > > > > > > > > > > > > on > > > > > > > > > > > > > > > > > > the message size.) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Dec 6, 2016 at 11:44 PM, Apurva Mehta < > > > > > apu...@confluent.io > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Ben, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Now, on to your first question of how deal with > consumer > > > > > > > rebalances. > > > > > > > > > > > > > > > > > The > > > > > > > > > > > > > > > > > > > short answer is that the application needs to ensure > that > > > the > > > > > the > > > > > > > > > > > > > > > > > > > assignment of input partitions to appId is consistent > > > across > > > > > > > > > > > > > > > > > rebalances. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > For Kafka streams, they already ensure that the mapping > > of > > > > > input > > > > > > > > > > > > > > > > > > partitions > > > > > > > > > > > > > > > > > > > to task Id is invariant across rebalances by > > implementing a > > > > > > custom > > > > > > > > > > > > > > > > > sticky > > > > > > > > > > > > > > > > > > > assignor. Other non-streams apps can trivially have one > > > > > producer > > > > > > > per > > > > > > > > > > > > > > > > > > input > > > > > > > > > > > > > > > > > > > partition and have the appId be the same as the > partition > > > > > number > > > > > > to > > > > > > > > > > > > > > > > > > achieve > > > > > > > > > > > > > > > > > > > the same effect. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > With this precondition in place, we can maintain > > > transactions > > > > > > > across > > > > > > > > > > > > > > > > > > > rebalances. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hope this answers your question. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > Apurva > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Dec 6, 2016 at 3:22 PM, Ben Kirwin < > b...@kirw.in> > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for this! I'm looking forward to going through > > the > > > > > full > > > > > > > > > > > > > > > > > proposal > > > > > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > > > > detail soon; a few early questions: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > First: what happens when a consumer rebalances in the > > > > middle > > > > > > of a > > > > > > > > > > > > > > > > > > > > transaction? The full documentation suggests that > such > > a > > > > > > > > transaction > > > > > > > > > > > > > > > > > > > ought > > > > > > > > > > > > > > > > > > > > to be rejected: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [...] if a rebalance has happened and this consumer > > > > > > > > > > > > > > > > > > > > > instance becomes a zombie, even if this offset > > message > > > is > > > > > > > > appended > > > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > > offset topic, the transaction will be rejected > later > > on > > > > > when > > > > > > it > > > > > > > > > > > > > > > > > tries > > > > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > > commit the transaction via the EndTxnRequest. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > ...but it's unclear to me how we ensure that a > > > transaction > > > > > > can't > > > > > > > > > > > > > > > > > > complete > > > > > > > > > > > > > > > > > > > > if a rebalance has happened. (It's quite possible I'm > > > > missing > > > > > > > > > > > > > > > > > something > > > > > > > > > > > > > > > > > > > > obvious!) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > As a concrete example: suppose a process with PID 1 > > adds > > > > > > offsets > > > > > > > > for > > > > > > > > > > > > > > > > > > some > > > > > > > > > > > > > > > > > > > > partition to a transaction; a consumer rebalance > > happens > > > > that > > > > > > > > assigns > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > partition to a process with PID 2, which adds some > > > offsets > > > > to > > > > > > its > > > > > > > > > > > > > > > > > > current > > > > > > > > > > > > > > > > > > > > transaction; both processes try and commit. Allowing > > both > > > > > > commits > > > > > > > > > > > > > > > > > would > > > > > > > > > > > > > > > > > > > > cause the messages to be processed twice -- how is > that > > > > > > avoided? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Second: App IDs normally map to a single PID. It > seems > > > like > > > > > one > > > > > > > > could > > > > > > > > > > > > > > > > > > do > > > > > > > > > > > > > > > > > > > > away with the PID concept entirely, and just use App > > IDs > > > in > > > > > > most > > > > > > > > > > > > > > > > > places > > > > > > > > > > > > > > > > > > > > that require a PID. This feels like it would be > > > > significantly > > > > > > > > > > > > > > > > > simpler, > > > > > > > > > > > > > > > > > > > > though it does increase the message size. Are there > > other > > > > > > reasons > > > > > > > > why > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > App ID / PID split is necessary? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Nov 30, 2016 at 2:19 PM, Guozhang Wang < > > > > > > > wangg...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I have just created KIP-98 to enhance Kafka with > > > exactly > > > > > once > > > > > > > > > > > > > > > > > > delivery > > > > > > > > > > > > > > > > > > > > > semantics: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *https://cwiki.apache.org/ > > > confluence/display/KAFKA/KIP- > > > > > > > > > > > > > > > > > > > > > 98+-+Exactly+Once+Delivery+ > > and+Transactional+Messaging > > > > > > > > > > > > > > > > > > > > > <https://cwiki.apache.org/ > > > confluence/display/KAFKA/KIP- > > > > > > > > > > > > > > > > > > > > > 98+-+Exactly+Once+Delivery+ > > > and+Transactional+Messaging>* > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This KIP adds a transactional messaging mechanism > > along > > > > > with > > > > > > an > > > > > > > > > > > > > > > > > > > > idempotent > > > > > > > > > > > > > > > > > > > > > producer implementation to make sure that 1) > > duplicated > > > > > > > messages > > > > > > > > > > > > > > > > > sent > > > > > > > > > > > > > > > > > > > > from > > > > > > > > > > > > > > > > > > > > > the same identified producer can be detected on the > > > > broker > > > > > > > side, > > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > > 2) a > > > > > > > > > > > > > > > > > > > > > group of messages sent within a transaction will > > > > atomically > > > > > > be > > > > > > > > > > > > > > > > > either > > > > > > > > > > > > > > > > > > > > > reflected and fetchable to consumers or not as a > > whole. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The above wiki page provides a high-level view of > the > > > > > > proposed > > > > > > > > > > > > > > > > > > changes > > > > > > > > > > > > > > > > > > > as > > > > > > > > > > > > > > > > > > > > > well as summarized guarantees. Initial draft of the > > > > > detailed > > > > > > > > > > > > > > > > > > > > implementation > > > > > > > > > > > > > > > > > > > > > design is described in this Google doc: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/11Jqy_ > > > > > > > > > > > > > > > > > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF > > > > > > > > > > > > > > > > > > > > > 0wSw9ra8 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We would love to hear your comments and > suggestions. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > >