Cool. It sounds like you guys will sync up and come up with a specific
proposal. I think point (3) does require full replication of the pre-commit
transaction, but I'm not sure, and I would be very happy to learn
otherwise. That was actually the blocker on that alternate proposal. From
my point of view 2x overhead is kind of a deal breaker since it makes
correctness so expensive you'd have to think very hard before turning it
on, but if there is a way to do it with less and there aren't too many
other negative side effects that would be very appealing. I think we can
also dive a bit into why we are so perf and latency sensitive as it relates
to the stream processing use cases...I'm not sure how much of that is
obvious from the proposal.

-Jay





On Tue, Dec 20, 2016 at 9:11 AM, Joel Koshy <jjkosh...@gmail.com> wrote:

> Just got some time to go through most of this thread and KIP - great to see
> this materialize and discussed!!
> I will add more comments in the coming days on some of the other "tracks"
> in this thread; but since Radai brought up the double-journaling approach
> that we had discussed I thought I would move over some content from
> our internal
> wiki on double-journalling
> <https://cwiki.apache.org/confluence/display/KAFKA/
> Double+journaling+with+local+data+copy>
> It is thin on details with a few invalid statements because I don't think
> we dwelt long enough on it - it was cast aside as being too expensive from
> a storage and latency perspective. As the immediately preceding emails
> state, I tend to agree that those are compelling enough reasons to take a
> hit in complexity/increased memory usage in the consumer. Anyway, couple of
> us at LinkedIn can spend some time today brainstorming a little more on
> this today.
>
> 1. on write amplification: i dont see x6 the writes, at worst i see x2 the
> > writes - once to the "tx log", then read and again to the destination
> > partition. if you have some != 1 replication factor than both the 1st and
> > the 2nd writes get replicated, but it is still a relative factor of x2.
> > what am I missing?
> >
>
> I think that's right - it would be six total copies if we are doing RF 3.
>
>
> > 3. why do writes to a TX need the same guarantees as "plain" writes? in
> > cases where the user can live with a TX rollback on change of
> > leadership/broker crash the TX log can be unreplicated, and even live in
> > the leader's memory. that would cut down on writes. this is also an
> > acceptable default in SQL - if your socket connection to a DB dies mid-TX
> > your TX is toast (mysql is even worse)
> >
>
> I may have misunderstood - while the above may be true for transactions
> in-flight, it definitely needs the same guarantees at the point of commit
> and the straightforward way to achieve that is to rely on the same
> guarantees while the transaction is in flight.
>
> 4. even if we replicate the TX log, why do we need to re-read it and
> > re-write it to the underlying partition? if its already written to disk
> all
> > I would need is to make that file the current segment of the "real"
> > partition and i've avoided the double write (at the cost of complicating
> > segment management). if the data is replicated fetchers could do the
> same.
> >
>
> I think we had considered the above as well - i.e., if you abstract the
> partition's segments into segments that contain non-transactional messages
> and those that contain transactional messages then it should be possible to
> jump from one to the other and back. It does add quite a bit of complexity
> though and you still need to do buffering on reads so the upside perhaps
> isn't worth the effort. I'm not convinced about that though - i.e., may
> help to spend more time thinking this one through.
>
>
> > 5. on latency - youre right, what im suggesting would result in tx
> ordering
> > of messages ,"read committed" semantics and therefore higher latency.
>
>
> *"read committed"* only if you do the copy back to actual log. If you don't
> do that (your point 4) then I think you still need to do buffering to
> achieve read-committed semantics.
>
>
>
> > 6. the added delay (vs your read uncommitted) would be roughly the time
> > span of a TX.
>
>
> I think it would be significantly less given that this is local copying.
>
>
>
> >
> >
> > On Mon, Dec 19, 2016 at 3:15 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > One more thing about the double journal proposal: when discussing about
> > > this method back at LinkedIn, another raised issue besides double
> writing
> > > was that it will void the offset ordering and enforce people to accept
> > > "transaction ordering", that is, consumer will not see messages from
> the
> > > same partition in the order where they were produced, but only in the
> > order
> > > of when the corresponding transaction was committed. For some
> scenarios,
> > we
> > > believe that offset ordering would still be preferred than transaction
> > > ordering and that is why in KIP-98 proposal we default to the former
> > while
> > > leave the door open if users want to switch to the latter case.
> > >
> > >
> > > Guozhang
> > >
> > > On Mon, Dec 19, 2016 at 10:56 AM, Jay Kreps <j...@confluent.io> wrote:
> > >
> > > > Hey Radai,
> > > >
> > > > I'm not sure if I fully understand what you are proposing, but I
> > > > interpreted it to be similar to a proposal we worked through back at
> > > > LinkedIn. The proposal was to commit to a central txlog topic, and
> then
> > > > recopy to the destination topic upon transaction commit. The
> > observation
> > > on
> > > > that approach at the time were the following:
> > > >
> > > >    1. It is cleaner since the output topics have only committed data!
> > > >    2. You need full replication on the txlog topic to ensure
> atomicity.
> > > We
> > > >    weren't able to come up with a solution where you buffer in memory
> > or
> > > > use
> > > >    renaming tricks the way you are describing. The reason is that
> once
> > > you
> > > >    begin committing you must ensure that the commit eventually
> succeeds
> > > to
> > > >    guarantee atomicity. If you use a transient store you might commit
> > > some
> > > >    data and then have a server failure that causes you to lose the
> rest
> > > of
> > > > the
> > > >    transaction.
> > > >    3. Having a single log allows the reader to choose a "read
> > > uncommitted"
> > > >    mode that hands out messages immediately. This is important for
> > cases
> > > > where
> > > >    latency is important, especially for stream processing topologies
> > > where
> > > >    these latencies stack up across multiple stages.
> > > >
> > > > For the stream processing use case, item (2) is a bit of a deal
> killer.
> > > > This takes the cost of a transient message write (say the
> intermediate
> > > > result of a stream processing topology) from 3x writes (assuming 3x
> > > > replication) to 6x writes. This means you basically can't default it
> > on.
> > > If
> > > > we can in fact get the cost down to a single buffered write (i.e. 1x
> > the
> > > > data is written to memory and buffered to disk if the transaction is
> > > large)
> > > > as in the KIP-98 proposal without too many other negative side
> effects
> > I
> > > > think that could be compelling.
> > > >
> > > > -Jay
> > > >
> > > >
> > > >
> > > > On Mon, Dec 19, 2016 at 9:36 AM, radai <radai.rosenbl...@gmail.com>
> > > wrote:
> > > >
> > > > > regarding efficiency:
> > > > >
> > > > > I'd like to distinguish between server efficiency (resource
> > utilization
> > > > of
> > > > > the broker machine alone) and overall network efficiency (resource
> > > > > utilization on brokers, producers and consumers, including network
> > > > > traffic).
> > > > > my proposal is not as resource-efficient on the broker (although it
> > can
> > > > be,
> > > > > depends on a few trade offs and implementation details). HOWEVER,
> if
> > i
> > > > look
> > > > > at the overall efficiency:
> > > > >
> > > > >    1.clients would need to either buffer or double-read uncommitted
> > > msgs.
> > > > > for N clients reading the stream M times (after re-starts and
> > > reconsumes)
> > > > > this would mean a M*N factor in either network BW or disk/memory
> > space
> > > > > (depends on if buffer vs re-read). potentially N*M more broker-side
> > > reads
> > > > > too.
> > > > >    2 to reduce the broker side cost several things can be done
> (this
> > is
> > > > not
> > > > > an either-or list, these are commulative):
> > > > >       2.1 - keep TX logs in mem (+overflow to disk) - trades disk
> > > writes
> > > > > for TX resiliency
> > > > >       2.2 - when "appending" TX logs to real partitions - instead
> of
> > > > > reading from (disk-based) TX log and writing to partition log (x2
> > disk
> > > > > writes) the TX log can be made a segment file (so file rename, with
> > > > > associated protocol changes). this would avoid double writing by
> > simply
> > > > > making the TX file part of the partition (for large enough TXs.
> > smaller
> > > > > ones can be rewritten).
> > > > >       2.3 - the approach above could be combined with a background
> > > > "defrag"
> > > > > - similar in concept to compaction - to further reduce the total of
> > > > > resulting number of files.
> > > > >
> > > > > I think my main issue with the current proposal, more important
> than
> > > > > performance, is lack of proper "encapsulation" of transactions - I
> > dont
> > > > > think downstream consumers should see uncommitted msgs. ever.
> > > > >
> > > > >
> > > > > On Sun, Dec 18, 2016 at 10:19 PM, Becket Qin <becket....@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > @Jason
> > > > > >
> > > > > > Yes, second thought on the number of messages included, the
> offset
> > > > delta
> > > > > > will probably be sufficient. The use case I encounter before for
> > > number
> > > > > of
> > > > > > messages in a message set is an embedded mirror maker on the
> > > > destination
> > > > > > broker side which fetches message directly from the source
> cluster.
> > > > > Ideally
> > > > > > the destination cluster only needs to check CRC and assign the
> > > offsets
> > > > > > because all the message verification has been done by the source
> > > > cluster,
> > > > > > but due to the lack of the number of messages in the message set,
> > we
> > > > have
> > > > > > to decompress the message set to increment offsets correctly. By
> > > > knowing
> > > > > > the number of the messages in the message set, we can avoid doing
> > > that.
> > > > > The
> > > > > > offset delta will also help. It's just then the offsets may have
> > > holes
> > > > > for
> > > > > > log compacted topics, but that may be fine.
> > > > > >
> > > > > > @Apurva
> > > > > >
> > > > > > I am not sure if it is true that the consumer will either deliver
> > all
> > > > the
> > > > > > message for the entire transaction or none of them from one
> poll()
> > > > call.
> > > > > If
> > > > > > we allow the transactions to be across partitions, unless the
> > > consumer
> > > > > > consumes from all the partitions involved in a transactions, it
> > seems
> > > > > > impossible for it to deliver *all* the messages in a transaction,
> > > > right?
> > > > > A
> > > > > > weaker guarantee is we will deliver all or none of the messages
> > that
> > > > > belong
> > > > > > to the same transaction in ONE partition, but this would be
> > different
> > > > > from
> > > > > > the guarantee from the producer side.
> > > > > >
> > > > > > My two cents on Radai's sideways partition design:
> > > > > > 1. If we consider the producer side behavior as doing a two phase
> > > > commit
> > > > > > which including the committing the consumer offsets, it is a
> little
> > > > > awkward
> > > > > > that we allow uncommitted message goes into the main log and rely
> > on
> > > > the
> > > > > > consumer to filter out. So semantic wise I think it would be
> better
> > > if
> > > > we
> > > > > > can avoid this. Radai's suggestion is actually intuitive because
> if
> > > the
> > > > > > brokers do not want to expose uncommitted transactions to the
> > > consumer,
> > > > > the
> > > > > > brokers have to buffer it.
> > > > > >
> > > > > > 2. Regarding the efficiency. I think may be it worth looking at
> the
> > > > > > efficiency cost v.s benefit. The efficiency includes both server
> > side
> > > > > > efficiency and consumer side efficiency.
> > > > > >
> > > > > > Regarding the server side efficiency, the current proposal would
> > > > probably
> > > > > > have better efficiency regardless of whether something goes
> wrong.
> > > > > Radai's
> > > > > > suggestion would put more burden on the server side. If nothing
> > goes
> > > > > wrong
> > > > > > we always pay the cost of having double copy of the transactional
> > > > > messages
> > > > > > and do not get the semantic benefit. But if something goes wrong,
> > the
> > > > > > efficiency cost we pay we get us a better semantic.
> > > > > >
> > > > > > For the consumer side efficiency, because there is no need to
> > buffer
> > > > the
> > > > > > uncommitted messages. The current proposal may have to
> potentially
> > > > buffer
> > > > > > uncommitted messages so it would be less efficient than Radai's
> > > > > suggestion
> > > > > > when a transaction aborts. When everything goes well, both design
> > > seems
> > > > > > having the similar performance. However, it depends on whether we
> > are
> > > > > > willing to loosen the consumer side transaction guarantee that I
> > > > > mentioned
> > > > > > earlier to Apurva.
> > > > > >
> > > > > > Currently the biggest pressure on the consumer side is that it
> has
> > to
> > > > > > buffer incomplete transactions. There are two reasons for it,
> > > > > > A. A transaction may be aborted so we cannot expose the messages
> to
> > > the
> > > > > > users.
> > > > > > B. We want to return all or none of the messages in a transaction
> > in
> > > > ONE
> > > > > > partition.
> > > > > >
> > > > > > While reason A is mandatory, I think reason B may be discussable.
> > > > Radai's
> > > > > > design actually removes reason A because there is no uncommitted
> > > > messages
> > > > > > exposed to the consumers. This may potentially give us a chance
> to
> > > > > > significantly improve consumer side efficiency in normal cases.
> It
> > > > again
> > > > > > depends on the use case, i.e. whether user can process a
> > transaction
> > > > > > progressively (message by message) or it has to be buffered and
> > > > returned
> > > > > > all together. If in most cases, users can process the
> transactions
> > > > > message
> > > > > > by message (most stream processing tasks probably can do so),
> then
> > > with
> > > > > > Radai's proposal we don't need to buffer the transactions for the
> > > users
> > > > > > anymore, which is a big difference. For the latter case, the
> > consumer
> > > > may
> > > > > > have to buffer the incomplete transactions otherwise we are just
> > > > throwing
> > > > > > the burden onto the users.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jiangjie (Becket) Qin
> > > > > >
> > > > > > On Fri, Dec 16, 2016 at 4:56 PM, Jay Kreps <j...@confluent.io>
> > wrote:
> > > > > >
> > > > > > > Yeah good point. I relent!
> > > > > > >
> > > > > > > -jay
> > > > > > >
> > > > > > > On Fri, Dec 16, 2016 at 1:46 PM Jason Gustafson <
> > > ja...@confluent.io>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Jay/Ismael,
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > I agree that lazy initialization of metadata seems
> unavoidable.
> > > > > > Ideally,
> > > > > > > we
> > > > > > > >
> > > > > > > > could follow the same pattern for transactions, but remember
> > that
> > > > in
> > > > > > the
> > > > > > > >
> > > > > > > > consumer+producer use case, the initialization needs to be
> > > > completed
> > > > > > > prior
> > > > > > > >
> > > > > > > > to setting the consumer's position. Otherwise we risk reading
> > > stale
> > > > > > > >
> > > > > > > > offsets. But it would be pretty awkward if you have to begin
> a
> > > > > > > transaction
> > > > > > > >
> > > > > > > > first to ensure that your consumer can read the right offset
> > from
> > > > the
> > > > > > > >
> > > > > > > > consumer, right? It's a bit easier to explain that you should
> > > > always
> > > > > > call
> > > > > > > >
> > > > > > > > `producer.init()` prior to initializing the consumer. Users
> > would
> > > > > > > probably
> > > > > > > >
> > > > > > > > get this right without any special effort.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > -Jason
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Dec 14, 2016 at 1:52 AM, Rajini Sivaram <
> > > > rsiva...@pivotal.io
> > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > > Hi Apurva,
> > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > > > Thank you for the answers. Just one follow-on.
> > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > > > 15. Let me rephrase my original question. If all control
> > > messages
> > > > > > > > (messages
> > > > > > > >
> > > > > > > > > to transaction logs and markers on user logs) were
> > acknowledged
> > > > > only
> > > > > > > > after
> > > > > > > >
> > > > > > > > > flushing the log segment, will transactions become durable
> in
> > > the
> > > > > > > >
> > > > > > > > > traditional sense (i.e. not restricted to
> min.insync.replicas
> > > > > > > failures) ?
> > > > > > > >
> > > > > > > > > This is not a suggestion to update the KIP. It seems to me
> > that
> > > > the
> > > > > > > > design
> > > > > > > >
> > > > > > > > > enables full durability if required in the future with a
> > rather
> > > > > > > >
> > > > > > > > > non-intrusive change. I just wanted to make sure I haven't
> > > missed
> > > > > > > > anything
> > > > > > > >
> > > > > > > > > fundamental that prevents Kafka from doing this.
> > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > > > On Wed, Dec 14, 2016 at 5:30 AM, Ben Kirwin <b...@kirw.in>
> > > wrote:
> > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > > > > Hi Apurva,
> > > > > > > >
> > > > > > > > > >
> > > > > > > >
> > > > > > > > > > Thanks for the detailed answers... and sorry for the late
> > > > reply!
> > > > > > > >
> > > > > > > > > >
> > > > > > > >
> > > > > > > > > > It does sound like, if the input-partitions-to-app-id
> > mapping
> > > > > never
> > > > > > > >
> > > > > > > > > > changes, the existing fencing mechanisms should prevent
> > > > > duplicates.
> > > > > > > >
> > > > > > > > > Great!
> > > > > > > >
> > > > > > > > > > I'm a bit concerned the proposed API will be delicate to
> > > > program
> > > > > > > > against
> > > > > > > >
> > > > > > > > > > successfully -- even in the simple case, we need to
> create
> > a
> > > > new
> > > > > > > > producer
> > > > > > > >
> > > > > > > > > > instance per input partition, and anything fancier is
> going
> > > to
> > > > > need
> > > > > > > its
> > > > > > > >
> > > > > > > > > own
> > > > > > > >
> > > > > > > > > > implementation of the Streams/Samza-style 'task' idea --
> > but
> > > > that
> > > > > > may
> > > > > > > > be
> > > > > > > >
> > > > > > > > > > fine for this sort of advanced feature.
> > > > > > > >
> > > > > > > > > >
> > > > > > > >
> > > > > > > > > > For the second question, I notice that Jason also
> > elaborated
> > > on
> > > > > > this
> > > > > > > >
> > > > > > > > > > downthread:
> > > > > > > >
> > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > We also looked at removing the producer ID.
> > > > > > > >
> > > > > > > > > > > This was discussed somewhere above, but basically the
> > idea
> > > is
> > > > > to
> > > > > > > > store
> > > > > > > >
> > > > > > > > > > the
> > > > > > > >
> > > > > > > > > > > AppID in the message set header directly and avoid the
> > > > mapping
> > > > > to
> > > > > > > >
> > > > > > > > > > producer
> > > > > > > >
> > > > > > > > > > > ID altogether. As long as batching isn't too bad, the
> > > impact
> > > > on
> > > > > > > total
> > > > > > > >
> > > > > > > > > > size
> > > > > > > >
> > > > > > > > > > > may not be too bad, but we were ultimately more
> > comfortable
> > > > > with
> > > > > > a
> > > > > > > >
> > > > > > > > > fixed
> > > > > > > >
> > > > > > > > > > > size ID.
> > > > > > > >
> > > > > > > > > >
> > > > > > > >
> > > > > > > > > > ...which suggests that the distinction is useful for
> > > > performance,
> > > > > > but
> > > > > > > > not
> > > > > > > >
> > > > > > > > > > necessary for correctness, which makes good sense to me.
> > > > (Would a
> > > > > > > > 128-bid
> > > > > > > >
> > > > > > > > > > ID be a reasonable compromise? That's enough room for a
> > UUID,
> > > > or
> > > > > a
> > > > > > > >
> > > > > > > > > > reasonable hash of an arbitrary string, and has only a
> > > marginal
> > > > > > > > increase
> > > > > > > >
> > > > > > > > > on
> > > > > > > >
> > > > > > > > > > the message size.)
> > > > > > > >
> > > > > > > > > >
> > > > > > > >
> > > > > > > > > > On Tue, Dec 6, 2016 at 11:44 PM, Apurva Mehta <
> > > > > apu...@confluent.io
> > > > > > >
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > Hi Ben,
> > > > > > > >
> > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > Now, on to your first question of how deal with
> consumer
> > > > > > > rebalances.
> > > > > > > >
> > > > > > > > > The
> > > > > > > >
> > > > > > > > > > > short answer is that the application needs to ensure
> that
> > > the
> > > > > the
> > > > > > > >
> > > > > > > > > > > assignment of input partitions to appId is consistent
> > > across
> > > > > > > >
> > > > > > > > > rebalances.
> > > > > > > >
> > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > For Kafka streams, they already ensure that the mapping
> > of
> > > > > input
> > > > > > > >
> > > > > > > > > > partitions
> > > > > > > >
> > > > > > > > > > > to task Id is invariant across rebalances by
> > implementing a
> > > > > > custom
> > > > > > > >
> > > > > > > > > sticky
> > > > > > > >
> > > > > > > > > > > assignor. Other non-streams apps can trivially have one
> > > > > producer
> > > > > > > per
> > > > > > > >
> > > > > > > > > > input
> > > > > > > >
> > > > > > > > > > > partition and have the appId be the same as the
> partition
> > > > > number
> > > > > > to
> > > > > > > >
> > > > > > > > > > achieve
> > > > > > > >
> > > > > > > > > > > the same effect.
> > > > > > > >
> > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > With this precondition in place, we can maintain
> > > transactions
> > > > > > > across
> > > > > > > >
> > > > > > > > > > > rebalances.
> > > > > > > >
> > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > Hope this answers your question.
> > > > > > > >
> > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > > > > Apurva
> > > > > > > >
> > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > On Tue, Dec 6, 2016 at 3:22 PM, Ben Kirwin <
> b...@kirw.in>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > > Thanks for this! I'm looking forward to going through
> > the
> > > > > full
> > > > > > > >
> > > > > > > > > proposal
> > > > > > > >
> > > > > > > > > > > in
> > > > > > > >
> > > > > > > > > > > > detail soon; a few early questions:
> > > > > > > >
> > > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > > First: what happens when a consumer rebalances in the
> > > > middle
> > > > > > of a
> > > > > > > >
> > > > > > > > > > > > transaction? The full documentation suggests that
> such
> > a
> > > > > > > > transaction
> > > > > > > >
> > > > > > > > > > > ought
> > > > > > > >
> > > > > > > > > > > > to be rejected:
> > > > > > > >
> > > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > > > [...] if a rebalance has happened and this consumer
> > > > > > > >
> > > > > > > > > > > > > instance becomes a zombie, even if this offset
> > message
> > > is
> > > > > > > > appended
> > > > > > > >
> > > > > > > > > in
> > > > > > > >
> > > > > > > > > > > the
> > > > > > > >
> > > > > > > > > > > > > offset topic, the transaction will be rejected
> later
> > on
> > > > > when
> > > > > > it
> > > > > > > >
> > > > > > > > > tries
> > > > > > > >
> > > > > > > > > > > to
> > > > > > > >
> > > > > > > > > > > > > commit the transaction via the EndTxnRequest.
> > > > > > > >
> > > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > > ...but it's unclear to me how we ensure that a
> > > transaction
> > > > > > can't
> > > > > > > >
> > > > > > > > > > complete
> > > > > > > >
> > > > > > > > > > > > if a rebalance has happened. (It's quite possible I'm
> > > > missing
> > > > > > > >
> > > > > > > > > something
> > > > > > > >
> > > > > > > > > > > > obvious!)
> > > > > > > >
> > > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > > As a concrete example: suppose a process with PID 1
> > adds
> > > > > > offsets
> > > > > > > > for
> > > > > > > >
> > > > > > > > > > some
> > > > > > > >
> > > > > > > > > > > > partition to a transaction; a consumer rebalance
> > happens
> > > > that
> > > > > > > > assigns
> > > > > > > >
> > > > > > > > > > the
> > > > > > > >
> > > > > > > > > > > > partition to a process with PID 2, which adds some
> > > offsets
> > > > to
> > > > > > its
> > > > > > > >
> > > > > > > > > > current
> > > > > > > >
> > > > > > > > > > > > transaction; both processes try and commit. Allowing
> > both
> > > > > > commits
> > > > > > > >
> > > > > > > > > would
> > > > > > > >
> > > > > > > > > > > > cause the messages to be processed twice -- how is
> that
> > > > > > avoided?
> > > > > > > >
> > > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > > Second: App IDs normally map to a single PID. It
> seems
> > > like
> > > > > one
> > > > > > > > could
> > > > > > > >
> > > > > > > > > > do
> > > > > > > >
> > > > > > > > > > > > away with the PID concept entirely, and just use App
> > IDs
> > > in
> > > > > > most
> > > > > > > >
> > > > > > > > > places
> > > > > > > >
> > > > > > > > > > > > that require a PID. This feels like it would be
> > > > significantly
> > > > > > > >
> > > > > > > > > simpler,
> > > > > > > >
> > > > > > > > > > > > though it does increase the message size. Are there
> > other
> > > > > > reasons
> > > > > > > > why
> > > > > > > >
> > > > > > > > > > the
> > > > > > > >
> > > > > > > > > > > > App ID / PID split is necessary?
> > > > > > > >
> > > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > > On Wed, Nov 30, 2016 at 2:19 PM, Guozhang Wang <
> > > > > > > wangg...@gmail.com
> > > > > > > > >
> > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > > > I have just created KIP-98 to enhance Kafka with
> > > exactly
> > > > > once
> > > > > > > >
> > > > > > > > > > delivery
> > > > > > > >
> > > > > > > > > > > > > semantics:
> > > > > > > >
> > > > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > > > *https://cwiki.apache.org/
> > > confluence/display/KAFKA/KIP-
> > > > > > > >
> > > > > > > > > > > > > 98+-+Exactly+Once+Delivery+
> > and+Transactional+Messaging
> > > > > > > >
> > > > > > > > > > > > > <https://cwiki.apache.org/
> > > confluence/display/KAFKA/KIP-
> > > > > > > >
> > > > > > > > > > > > > 98+-+Exactly+Once+Delivery+
> > > and+Transactional+Messaging>*
> > > > > > > >
> > > > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > > > This KIP adds a transactional messaging mechanism
> > along
> > > > > with
> > > > > > an
> > > > > > > >
> > > > > > > > > > > > idempotent
> > > > > > > >
> > > > > > > > > > > > > producer implementation to make sure that 1)
> > duplicated
> > > > > > > messages
> > > > > > > >
> > > > > > > > > sent
> > > > > > > >
> > > > > > > > > > > > from
> > > > > > > >
> > > > > > > > > > > > > the same identified producer can be detected on the
> > > > broker
> > > > > > > side,
> > > > > > > >
> > > > > > > > > and
> > > > > > > >
> > > > > > > > > > > 2) a
> > > > > > > >
> > > > > > > > > > > > > group of messages sent within a transaction will
> > > > atomically
> > > > > > be
> > > > > > > >
> > > > > > > > > either
> > > > > > > >
> > > > > > > > > > > > > reflected and fetchable to consumers or not as a
> > whole.
> > > > > > > >
> > > > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > > > The above wiki page provides a high-level view of
> the
> > > > > > proposed
> > > > > > > >
> > > > > > > > > > changes
> > > > > > > >
> > > > > > > > > > > as
> > > > > > > >
> > > > > > > > > > > > > well as summarized guarantees. Initial draft of the
> > > > > detailed
> > > > > > > >
> > > > > > > > > > > > implementation
> > > > > > > >
> > > > > > > > > > > > > design is described in this Google doc:
> > > > > > > >
> > > > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > > > https://docs.google.com/document/d/11Jqy_
> > > > > > > >
> > > > > > > > > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> > > > > > > >
> > > > > > > > > > > > > 0wSw9ra8
> > > > > > > >
> > > > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > > > We would love to hear your comments and
> suggestions.
> > > > > > > >
> > > > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > > >
> > > > > > > >
> > > > > > > > > > >
> > > > > > > >
> > > > > > > > > >
> > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Reply via email to