@Radai, regarding the replication for inflight transactional messages.

I think Jay and Joel have addressed the need for transactional messages to
be persisted durably at the moment we enter the pre-commit phase. If we
don't have durable persistence of these messages, we can't have idempotent
and atomic copying into the main  log, and your proposal to date does not
show otherwise.

Additionally, I would like to point out that both the proposed solutions
for the copy operation in the transaction journal approach are pretty
invasive changes at the core of the kafka log manager layer and below: you
either have to 'splice in' segments. Or else you have to guarantee that set
of messages will be copied from one log to another idempotently and
atomically even in the case of failures, which means reliably keeping track
of messages already copied, reliably knowing from where to resume the copy,
etc.

The proposal in the KIP does not require major changes to Kafka at the Log
manager level and below: every partition involved in the transaction
(including the transaction log) is just another partition, so we inherit
all the durability guarantees for these partitions.

I don't think significantly complicating the log manager level is a deal
breaker, but I would like to point out the costs of the two log approach
from an implementation perspective.

@Joel,

I read over your wiki, and apart from the introduction of the notion of
journal partitions --whose pros and cons are already being discussed-- you
also introduce the notion of a 'producer group' which enables multiple
producers to participate in a single transaction. This is completely
opposite of the model in the KIP where a transaction is defined by a
producer id, and hence there is a 1-1 mapping between producers and
transactions. Further, each producer can have exactly one in-flight
transaction at a time in the KIP.

The motivation for the model in the KIP is the streams use-case, where a
1-1 mapping between producers and transactions is natural. I am curious
about the use cases you have in mind for a many-to-one mapping between
producers and transactions.

@all,

As Jay and Sriram have alluded to, the current proposal is geared toward
enabling transactions for streaming applications. However, the details of
these use-cases and the features they need are missing from the KIP. In
particular, enabling deep stream topologies with low end-to-end processing
time necessitates speculative execution, and is one of the driving factors
behind the present proposal. We will update the document with these
details.

Regards,
Apurva


On Tue, Dec 20, 2016 at 11:28 AM, Jay Kreps <j...@confluent.io> wrote:

> I don't think the simple approach of writing to a local store (in memory or
> on disk) and then copying out to the destination topics would work but
> there could well be more sophisticated things that would. As you say, it is
> fine for the data to be un-replicated while you are accumulating the
> transaction, because you can always just abort the transaction if that node
> fails, but once you decided to commit and begin the process of copying out
> data you must guarantee you eventually will copy out the full transaction.
> If you have a non-durable store on one broker, and that broker crashes in
> the middle of copying out the transaction to the destination brokers, if it
> is possible that some of the writes have already succeeded, and the others
> are now lost, then you would violate atomicity.
>
> This is similar in classic two-phase commit protocols: a post-condition of
> a successful prepare commit is a promise that the transaction will
> eventually be successfully committed if requested so full durability is
> required in the pre-commit phase.
>
> But the flaw in the simple approach doesn't mean there isn't some less
> obvious solution that hasn't been thought of yet.
>
> For latency, yeah you're exactly right. We're assuming the latency of
> transactions can be pushed down to almost the duration of the transaction
> and obviously it can't be less than that. Let me try to flesh out the
> motivation for caring about latency (I think Sriram touched on this):
>
>    - We're primarily motivated by uses that fit a generalized notion of
>    correct, stateful stream processing. That is you consume/process/produce
>    potentially with associated local state in the processing. This fits KS
> and
>    Samza, but potentially a whole world of things that do transformation of
>    data. I think this is a really general notion of stream processing as a
>    kind of "protocol" and the proposed semantics give a kind of "closure"
> to
>    Kafka's producer and consumer protocols so they can be correctly
> chained.
>    - These use cases end up being a kind of DAG of transformations, often
>    even a fairly simple flow will have a depth of 5 stages and more
> realistic
>    flows can be more like 10.
>    - The transaction size is proportional to the efficiency since the
>    overhead of the transaction is fixed irrespective of the number of
>    messages. A transaction with two messages will be extremely inefficient,
>    but one with a few thousand should be much better. So you can't
> comfortably
>    make the transactions too small but yes you probably wouldn't need them
> to
>    be multisecond.
>    - The latency of the transactions stack up with the stages in the DAG in
>    a naive usage. Say you commit every 100ms, if you have 10 stages your
>    latency is going to be 1 second.
>    - This latency is definitely a concern in many domains. This is why we
>    are interested in having the option of supporting speculative execution.
>    For speculative execution you assume likely processes won't fail and
> you go
>    ahead and compute downstream results but co-ordinate the commit. This
>    trades more work rolling back when there are failures for lower latency.
>    This lets you push the end-to-end latency closer to 100ms rather than
> the
>    100ms*num_stages.
>
> Hopefully that gives a bit more color on the latency concern and desire for
> "read uncommitted".
>
> -Jay
>
> On Tue, Dec 20, 2016 at 10:33 AM, radai <radai.rosenbl...@gmail.com>
> wrote:
>
> > obviously anything committed would need to be replicated to all
> followers -
> > just like current msgs.
> >
> > what im trying to say is that in-flight data (written as part of an
> ongoing
> > TX and not committed yet) does not necessarily need to be replicated, or
> > even written out to disk. taken to the extreme it means i can buffer in
> > memory on the leader alone and incur no extra writes at all.
> >
> > if you dont want to just buffer in-memory on the leader (or are forced to
> > spool to disk because of size) you could still avoid a double write by
> > messing around with segment files (so the TX file becomes part of the
> > "linked-list" of segment files instead of reading it and appending it's
> > contents verbatim to the current segment file).
> >
> > the area when this does inevitably come short is latency and "read
> > uncommitted" (which are related). the added delay (after cutting all the
> > corners above) would really be the "time span" of a TX - the amount of
> time
> > from the moment the producer started the TX to the time when it was
> > committed. in my mind this time span is very short. am I failing to
> > understand the proposed "typical" use case? is the plan to use
> long-running
> > transactions and only commit at, say, 5 minute "checkpoints" ?
> >
> > On Tue, Dec 20, 2016 at 10:00 AM, Jay Kreps <j...@confluent.io> wrote:
> >
> > > Cool. It sounds like you guys will sync up and come up with a specific
> > > proposal. I think point (3) does require full replication of the
> > pre-commit
> > > transaction, but I'm not sure, and I would be very happy to learn
> > > otherwise. That was actually the blocker on that alternate proposal.
> From
> > > my point of view 2x overhead is kind of a deal breaker since it makes
> > > correctness so expensive you'd have to think very hard before turning
> it
> > > on, but if there is a way to do it with less and there aren't too many
> > > other negative side effects that would be very appealing. I think we
> can
> > > also dive a bit into why we are so perf and latency sensitive as it
> > relates
> > > to the stream processing use cases...I'm not sure how much of that is
> > > obvious from the proposal.
> > >
> > > -Jay
> > >
> > >
> > >
> > >
> > >
> > > On Tue, Dec 20, 2016 at 9:11 AM, Joel Koshy <jjkosh...@gmail.com>
> wrote:
> > >
> > > > Just got some time to go through most of this thread and KIP - great
> to
> > > see
> > > > this materialize and discussed!!
> > > > I will add more comments in the coming days on some of the other
> > "tracks"
> > > > in this thread; but since Radai brought up the double-journaling
> > approach
> > > > that we had discussed I thought I would move over some content from
> > > > our internal
> > > > wiki on double-journalling
> > > > <https://cwiki.apache.org/confluence/display/KAFKA/
> > > > Double+journaling+with+local+data+copy>
> > > > It is thin on details with a few invalid statements because I don't
> > think
> > > > we dwelt long enough on it - it was cast aside as being too expensive
> > > from
> > > > a storage and latency perspective. As the immediately preceding
> emails
> > > > state, I tend to agree that those are compelling enough reasons to
> > take a
> > > > hit in complexity/increased memory usage in the consumer. Anyway,
> > couple
> > > of
> > > > us at LinkedIn can spend some time today brainstorming a little more
> on
> > > > this today.
> > > >
> > > > 1. on write amplification: i dont see x6 the writes, at worst i see
> x2
> > > the
> > > > > writes - once to the "tx log", then read and again to the
> destination
> > > > > partition. if you have some != 1 replication factor than both the
> 1st
> > > and
> > > > > the 2nd writes get replicated, but it is still a relative factor of
> > x2.
> > > > > what am I missing?
> > > > >
> > > >
> > > > I think that's right - it would be six total copies if we are doing
> RF
> > 3.
> > > >
> > > >
> > > > > 3. why do writes to a TX need the same guarantees as "plain"
> writes?
> > in
> > > > > cases where the user can live with a TX rollback on change of
> > > > > leadership/broker crash the TX log can be unreplicated, and even
> live
> > > in
> > > > > the leader's memory. that would cut down on writes. this is also an
> > > > > acceptable default in SQL - if your socket connection to a DB dies
> > > mid-TX
> > > > > your TX is toast (mysql is even worse)
> > > > >
> > > >
> > > > I may have misunderstood - while the above may be true for
> transactions
> > > > in-flight, it definitely needs the same guarantees at the point of
> > commit
> > > > and the straightforward way to achieve that is to rely on the same
> > > > guarantees while the transaction is in flight.
> > > >
> > > > 4. even if we replicate the TX log, why do we need to re-read it and
> > > > > re-write it to the underlying partition? if its already written to
> > disk
> > > > all
> > > > > I would need is to make that file the current segment of the "real"
> > > > > partition and i've avoided the double write (at the cost of
> > > complicating
> > > > > segment management). if the data is replicated fetchers could do
> the
> > > > same.
> > > > >
> > > >
> > > > I think we had considered the above as well - i.e., if you abstract
> the
> > > > partition's segments into segments that contain non-transactional
> > > messages
> > > > and those that contain transactional messages then it should be
> > possible
> > > to
> > > > jump from one to the other and back. It does add quite a bit of
> > > complexity
> > > > though and you still need to do buffering on reads so the upside
> > perhaps
> > > > isn't worth the effort. I'm not convinced about that though - i.e.,
> may
> > > > help to spend more time thinking this one through.
> > > >
> > > >
> > > > > 5. on latency - youre right, what im suggesting would result in tx
> > > > ordering
> > > > > of messages ,"read committed" semantics and therefore higher
> latency.
> > > >
> > > >
> > > > *"read committed"* only if you do the copy back to actual log. If you
> > > don't
> > > > do that (your point 4) then I think you still need to do buffering to
> > > > achieve read-committed semantics.
> > > >
> > > >
> > > >
> > > > > 6. the added delay (vs your read uncommitted) would be roughly the
> > time
> > > > > span of a TX.
> > > >
> > > >
> > > > I think it would be significantly less given that this is local
> > copying.
> > > >
> > > >
> > > >
> > > > >
> > > > >
> > > > > On Mon, Dec 19, 2016 at 3:15 PM, Guozhang Wang <wangg...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > One more thing about the double journal proposal: when discussing
> > > about
> > > > > > this method back at LinkedIn, another raised issue besides double
> > > > writing
> > > > > > was that it will void the offset ordering and enforce people to
> > > accept
> > > > > > "transaction ordering", that is, consumer will not see messages
> > from
> > > > the
> > > > > > same partition in the order where they were produced, but only in
> > the
> > > > > order
> > > > > > of when the corresponding transaction was committed. For some
> > > > scenarios,
> > > > > we
> > > > > > believe that offset ordering would still be preferred than
> > > transaction
> > > > > > ordering and that is why in KIP-98 proposal we default to the
> > former
> > > > > while
> > > > > > leave the door open if users want to switch to the latter case.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Mon, Dec 19, 2016 at 10:56 AM, Jay Kreps <j...@confluent.io>
> > > wrote:
> > > > > >
> > > > > > > Hey Radai,
> > > > > > >
> > > > > > > I'm not sure if I fully understand what you are proposing, but
> I
> > > > > > > interpreted it to be similar to a proposal we worked through
> back
> > > at
> > > > > > > LinkedIn. The proposal was to commit to a central txlog topic,
> > and
> > > > then
> > > > > > > recopy to the destination topic upon transaction commit. The
> > > > > observation
> > > > > > on
> > > > > > > that approach at the time were the following:
> > > > > > >
> > > > > > >    1. It is cleaner since the output topics have only committed
> > > data!
> > > > > > >    2. You need full replication on the txlog topic to ensure
> > > > atomicity.
> > > > > > We
> > > > > > >    weren't able to come up with a solution where you buffer in
> > > memory
> > > > > or
> > > > > > > use
> > > > > > >    renaming tricks the way you are describing. The reason is
> that
> > > > once
> > > > > > you
> > > > > > >    begin committing you must ensure that the commit eventually
> > > > succeeds
> > > > > > to
> > > > > > >    guarantee atomicity. If you use a transient store you might
> > > commit
> > > > > > some
> > > > > > >    data and then have a server failure that causes you to lose
> > the
> > > > rest
> > > > > > of
> > > > > > > the
> > > > > > >    transaction.
> > > > > > >    3. Having a single log allows the reader to choose a "read
> > > > > > uncommitted"
> > > > > > >    mode that hands out messages immediately. This is important
> > for
> > > > > cases
> > > > > > > where
> > > > > > >    latency is important, especially for stream processing
> > > topologies
> > > > > > where
> > > > > > >    these latencies stack up across multiple stages.
> > > > > > >
> > > > > > > For the stream processing use case, item (2) is a bit of a deal
> > > > killer.
> > > > > > > This takes the cost of a transient message write (say the
> > > > intermediate
> > > > > > > result of a stream processing topology) from 3x writes
> (assuming
> > 3x
> > > > > > > replication) to 6x writes. This means you basically can't
> default
> > > it
> > > > > on.
> > > > > > If
> > > > > > > we can in fact get the cost down to a single buffered write
> (i.e.
> > > 1x
> > > > > the
> > > > > > > data is written to memory and buffered to disk if the
> transaction
> > > is
> > > > > > large)
> > > > > > > as in the KIP-98 proposal without too many other negative side
> > > > effects
> > > > > I
> > > > > > > think that could be compelling.
> > > > > > >
> > > > > > > -Jay
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Dec 19, 2016 at 9:36 AM, radai <
> > radai.rosenbl...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > regarding efficiency:
> > > > > > > >
> > > > > > > > I'd like to distinguish between server efficiency (resource
> > > > > utilization
> > > > > > > of
> > > > > > > > the broker machine alone) and overall network efficiency
> > > (resource
> > > > > > > > utilization on brokers, producers and consumers, including
> > > network
> > > > > > > > traffic).
> > > > > > > > my proposal is not as resource-efficient on the broker
> > (although
> > > it
> > > > > can
> > > > > > > be,
> > > > > > > > depends on a few trade offs and implementation details).
> > HOWEVER,
> > > > if
> > > > > i
> > > > > > > look
> > > > > > > > at the overall efficiency:
> > > > > > > >
> > > > > > > >    1.clients would need to either buffer or double-read
> > > uncommitted
> > > > > > msgs.
> > > > > > > > for N clients reading the stream M times (after re-starts and
> > > > > > reconsumes)
> > > > > > > > this would mean a M*N factor in either network BW or
> > disk/memory
> > > > > space
> > > > > > > > (depends on if buffer vs re-read). potentially N*M more
> > > broker-side
> > > > > > reads
> > > > > > > > too.
> > > > > > > >    2 to reduce the broker side cost several things can be
> done
> > > > (this
> > > > > is
> > > > > > > not
> > > > > > > > an either-or list, these are commulative):
> > > > > > > >       2.1 - keep TX logs in mem (+overflow to disk) - trades
> > disk
> > > > > > writes
> > > > > > > > for TX resiliency
> > > > > > > >       2.2 - when "appending" TX logs to real partitions -
> > instead
> > > > of
> > > > > > > > reading from (disk-based) TX log and writing to partition log
> > (x2
> > > > > disk
> > > > > > > > writes) the TX log can be made a segment file (so file
> rename,
> > > with
> > > > > > > > associated protocol changes). this would avoid double writing
> > by
> > > > > simply
> > > > > > > > making the TX file part of the partition (for large enough
> TXs.
> > > > > smaller
> > > > > > > > ones can be rewritten).
> > > > > > > >       2.3 - the approach above could be combined with a
> > > background
> > > > > > > "defrag"
> > > > > > > > - similar in concept to compaction - to further reduce the
> > total
> > > of
> > > > > > > > resulting number of files.
> > > > > > > >
> > > > > > > > I think my main issue with the current proposal, more
> important
> > > > than
> > > > > > > > performance, is lack of proper "encapsulation" of
> transactions
> > -
> > > I
> > > > > dont
> > > > > > > > think downstream consumers should see uncommitted msgs. ever.
> > > > > > > >
> > > > > > > >
> > > > > > > > On Sun, Dec 18, 2016 at 10:19 PM, Becket Qin <
> > > becket....@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > @Jason
> > > > > > > > >
> > > > > > > > > Yes, second thought on the number of messages included, the
> > > > offset
> > > > > > > delta
> > > > > > > > > will probably be sufficient. The use case I encounter
> before
> > > for
> > > > > > number
> > > > > > > > of
> > > > > > > > > messages in a message set is an embedded mirror maker on
> the
> > > > > > > destination
> > > > > > > > > broker side which fetches message directly from the source
> > > > cluster.
> > > > > > > > Ideally
> > > > > > > > > the destination cluster only needs to check CRC and assign
> > the
> > > > > > offsets
> > > > > > > > > because all the message verification has been done by the
> > > source
> > > > > > > cluster,
> > > > > > > > > but due to the lack of the number of messages in the
> message
> > > set,
> > > > > we
> > > > > > > have
> > > > > > > > > to decompress the message set to increment offsets
> correctly.
> > > By
> > > > > > > knowing
> > > > > > > > > the number of the messages in the message set, we can avoid
> > > doing
> > > > > > that.
> > > > > > > > The
> > > > > > > > > offset delta will also help. It's just then the offsets may
> > > have
> > > > > > holes
> > > > > > > > for
> > > > > > > > > log compacted topics, but that may be fine.
> > > > > > > > >
> > > > > > > > > @Apurva
> > > > > > > > >
> > > > > > > > > I am not sure if it is true that the consumer will either
> > > deliver
> > > > > all
> > > > > > > the
> > > > > > > > > message for the entire transaction or none of them from one
> > > > poll()
> > > > > > > call.
> > > > > > > > If
> > > > > > > > > we allow the transactions to be across partitions, unless
> the
> > > > > > consumer
> > > > > > > > > consumes from all the partitions involved in a
> transactions,
> > it
> > > > > seems
> > > > > > > > > impossible for it to deliver *all* the messages in a
> > > transaction,
> > > > > > > right?
> > > > > > > > A
> > > > > > > > > weaker guarantee is we will deliver all or none of the
> > messages
> > > > > that
> > > > > > > > belong
> > > > > > > > > to the same transaction in ONE partition, but this would be
> > > > > different
> > > > > > > > from
> > > > > > > > > the guarantee from the producer side.
> > > > > > > > >
> > > > > > > > > My two cents on Radai's sideways partition design:
> > > > > > > > > 1. If we consider the producer side behavior as doing a two
> > > phase
> > > > > > > commit
> > > > > > > > > which including the committing the consumer offsets, it is
> a
> > > > little
> > > > > > > > awkward
> > > > > > > > > that we allow uncommitted message goes into the main log
> and
> > > rely
> > > > > on
> > > > > > > the
> > > > > > > > > consumer to filter out. So semantic wise I think it would
> be
> > > > better
> > > > > > if
> > > > > > > we
> > > > > > > > > can avoid this. Radai's suggestion is actually intuitive
> > > because
> > > > if
> > > > > > the
> > > > > > > > > brokers do not want to expose uncommitted transactions to
> the
> > > > > > consumer,
> > > > > > > > the
> > > > > > > > > brokers have to buffer it.
> > > > > > > > >
> > > > > > > > > 2. Regarding the efficiency. I think may be it worth
> looking
> > at
> > > > the
> > > > > > > > > efficiency cost v.s benefit. The efficiency includes both
> > > server
> > > > > side
> > > > > > > > > efficiency and consumer side efficiency.
> > > > > > > > >
> > > > > > > > > Regarding the server side efficiency, the current proposal
> > > would
> > > > > > > probably
> > > > > > > > > have better efficiency regardless of whether something goes
> > > > wrong.
> > > > > > > > Radai's
> > > > > > > > > suggestion would put more burden on the server side. If
> > nothing
> > > > > goes
> > > > > > > > wrong
> > > > > > > > > we always pay the cost of having double copy of the
> > > transactional
> > > > > > > > messages
> > > > > > > > > and do not get the semantic benefit. But if something goes
> > > wrong,
> > > > > the
> > > > > > > > > efficiency cost we pay we get us a better semantic.
> > > > > > > > >
> > > > > > > > > For the consumer side efficiency, because there is no need
> to
> > > > > buffer
> > > > > > > the
> > > > > > > > > uncommitted messages. The current proposal may have to
> > > > potentially
> > > > > > > buffer
> > > > > > > > > uncommitted messages so it would be less efficient than
> > Radai's
> > > > > > > > suggestion
> > > > > > > > > when a transaction aborts. When everything goes well, both
> > > design
> > > > > > seems
> > > > > > > > > having the similar performance. However, it depends on
> > whether
> > > we
> > > > > are
> > > > > > > > > willing to loosen the consumer side transaction guarantee
> > that
> > > I
> > > > > > > > mentioned
> > > > > > > > > earlier to Apurva.
> > > > > > > > >
> > > > > > > > > Currently the biggest pressure on the consumer side is that
> > it
> > > > has
> > > > > to
> > > > > > > > > buffer incomplete transactions. There are two reasons for
> it,
> > > > > > > > > A. A transaction may be aborted so we cannot expose the
> > > messages
> > > > to
> > > > > > the
> > > > > > > > > users.
> > > > > > > > > B. We want to return all or none of the messages in a
> > > transaction
> > > > > in
> > > > > > > ONE
> > > > > > > > > partition.
> > > > > > > > >
> > > > > > > > > While reason A is mandatory, I think reason B may be
> > > discussable.
> > > > > > > Radai's
> > > > > > > > > design actually removes reason A because there is no
> > > uncommitted
> > > > > > > messages
> > > > > > > > > exposed to the consumers. This may potentially give us a
> > chance
> > > > to
> > > > > > > > > significantly improve consumer side efficiency in normal
> > cases.
> > > > It
> > > > > > > again
> > > > > > > > > depends on the use case, i.e. whether user can process a
> > > > > transaction
> > > > > > > > > progressively (message by message) or it has to be buffered
> > and
> > > > > > > returned
> > > > > > > > > all together. If in most cases, users can process the
> > > > transactions
> > > > > > > > message
> > > > > > > > > by message (most stream processing tasks probably can do
> so),
> > > > then
> > > > > > with
> > > > > > > > > Radai's proposal we don't need to buffer the transactions
> for
> > > the
> > > > > > users
> > > > > > > > > anymore, which is a big difference. For the latter case,
> the
> > > > > consumer
> > > > > > > may
> > > > > > > > > have to buffer the incomplete transactions otherwise we are
> > > just
> > > > > > > throwing
> > > > > > > > > the burden onto the users.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > >
> > > > > > > > > On Fri, Dec 16, 2016 at 4:56 PM, Jay Kreps <
> j...@confluent.io
> > >
> > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Yeah good point. I relent!
> > > > > > > > > >
> > > > > > > > > > -jay
> > > > > > > > > >
> > > > > > > > > > On Fri, Dec 16, 2016 at 1:46 PM Jason Gustafson <
> > > > > > ja...@confluent.io>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Jay/Ismael,
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > I agree that lazy initialization of metadata seems
> > > > unavoidable.
> > > > > > > > > Ideally,
> > > > > > > > > > we
> > > > > > > > > > >
> > > > > > > > > > > could follow the same pattern for transactions, but
> > > remember
> > > > > that
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > >
> > > > > > > > > > > consumer+producer use case, the initialization needs to
> > be
> > > > > > > completed
> > > > > > > > > > prior
> > > > > > > > > > >
> > > > > > > > > > > to setting the consumer's position. Otherwise we risk
> > > reading
> > > > > > stale
> > > > > > > > > > >
> > > > > > > > > > > offsets. But it would be pretty awkward if you have to
> > > begin
> > > > a
> > > > > > > > > > transaction
> > > > > > > > > > >
> > > > > > > > > > > first to ensure that your consumer can read the right
> > > offset
> > > > > from
> > > > > > > the
> > > > > > > > > > >
> > > > > > > > > > > consumer, right? It's a bit easier to explain that you
> > > should
> > > > > > > always
> > > > > > > > > call
> > > > > > > > > > >
> > > > > > > > > > > `producer.init()` prior to initializing the consumer.
> > Users
> > > > > would
> > > > > > > > > > probably
> > > > > > > > > > >
> > > > > > > > > > > get this right without any special effort.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > -Jason
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Dec 14, 2016 at 1:52 AM, Rajini Sivaram <
> > > > > > > rsiva...@pivotal.io
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > Hi Apurva,
> > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > Thank you for the answers. Just one follow-on.
> > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > 15. Let me rephrase my original question. If all
> > control
> > > > > > messages
> > > > > > > > > > > (messages
> > > > > > > > > > >
> > > > > > > > > > > > to transaction logs and markers on user logs) were
> > > > > acknowledged
> > > > > > > > only
> > > > > > > > > > > after
> > > > > > > > > > >
> > > > > > > > > > > > flushing the log segment, will transactions become
> > > durable
> > > > in
> > > > > > the
> > > > > > > > > > >
> > > > > > > > > > > > traditional sense (i.e. not restricted to
> > > > min.insync.replicas
> > > > > > > > > > failures) ?
> > > > > > > > > > >
> > > > > > > > > > > > This is not a suggestion to update the KIP. It seems
> to
> > > me
> > > > > that
> > > > > > > the
> > > > > > > > > > > design
> > > > > > > > > > >
> > > > > > > > > > > > enables full durability if required in the future
> with
> > a
> > > > > rather
> > > > > > > > > > >
> > > > > > > > > > > > non-intrusive change. I just wanted to make sure I
> > > haven't
> > > > > > missed
> > > > > > > > > > > anything
> > > > > > > > > > >
> > > > > > > > > > > > fundamental that prevents Kafka from doing this.
> > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > On Wed, Dec 14, 2016 at 5:30 AM, Ben Kirwin <
> > b...@kirw.in
> > > >
> > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > Hi Apurva,
> > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > Thanks for the detailed answers... and sorry for
> the
> > > late
> > > > > > > reply!
> > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > It does sound like, if the
> input-partitions-to-app-id
> > > > > mapping
> > > > > > > > never
> > > > > > > > > > >
> > > > > > > > > > > > > changes, the existing fencing mechanisms should
> > prevent
> > > > > > > > duplicates.
> > > > > > > > > > >
> > > > > > > > > > > > Great!
> > > > > > > > > > >
> > > > > > > > > > > > > I'm a bit concerned the proposed API will be
> delicate
> > > to
> > > > > > > program
> > > > > > > > > > > against
> > > > > > > > > > >
> > > > > > > > > > > > > successfully -- even in the simple case, we need to
> > > > create
> > > > > a
> > > > > > > new
> > > > > > > > > > > producer
> > > > > > > > > > >
> > > > > > > > > > > > > instance per input partition, and anything fancier
> is
> > > > going
> > > > > > to
> > > > > > > > need
> > > > > > > > > > its
> > > > > > > > > > >
> > > > > > > > > > > > own
> > > > > > > > > > >
> > > > > > > > > > > > > implementation of the Streams/Samza-style 'task'
> idea
> > > --
> > > > > but
> > > > > > > that
> > > > > > > > > may
> > > > > > > > > > > be
> > > > > > > > > > >
> > > > > > > > > > > > > fine for this sort of advanced feature.
> > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > For the second question, I notice that Jason also
> > > > > elaborated
> > > > > > on
> > > > > > > > > this
> > > > > > > > > > >
> > > > > > > > > > > > > downthread:
> > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > We also looked at removing the producer ID.
> > > > > > > > > > >
> > > > > > > > > > > > > > This was discussed somewhere above, but basically
> > the
> > > > > idea
> > > > > > is
> > > > > > > > to
> > > > > > > > > > > store
> > > > > > > > > > >
> > > > > > > > > > > > > the
> > > > > > > > > > >
> > > > > > > > > > > > > > AppID in the message set header directly and
> avoid
> > > the
> > > > > > > mapping
> > > > > > > > to
> > > > > > > > > > >
> > > > > > > > > > > > > producer
> > > > > > > > > > >
> > > > > > > > > > > > > > ID altogether. As long as batching isn't too bad,
> > the
> > > > > > impact
> > > > > > > on
> > > > > > > > > > total
> > > > > > > > > > >
> > > > > > > > > > > > > size
> > > > > > > > > > >
> > > > > > > > > > > > > > may not be too bad, but we were ultimately more
> > > > > comfortable
> > > > > > > > with
> > > > > > > > > a
> > > > > > > > > > >
> > > > > > > > > > > > fixed
> > > > > > > > > > >
> > > > > > > > > > > > > > size ID.
> > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > ...which suggests that the distinction is useful
> for
> > > > > > > performance,
> > > > > > > > > but
> > > > > > > > > > > not
> > > > > > > > > > >
> > > > > > > > > > > > > necessary for correctness, which makes good sense
> to
> > > me.
> > > > > > > (Would a
> > > > > > > > > > > 128-bid
> > > > > > > > > > >
> > > > > > > > > > > > > ID be a reasonable compromise? That's enough room
> > for a
> > > > > UUID,
> > > > > > > or
> > > > > > > > a
> > > > > > > > > > >
> > > > > > > > > > > > > reasonable hash of an arbitrary string, and has
> only
> > a
> > > > > > marginal
> > > > > > > > > > > increase
> > > > > > > > > > >
> > > > > > > > > > > > on
> > > > > > > > > > >
> > > > > > > > > > > > > the message size.)
> > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > On Tue, Dec 6, 2016 at 11:44 PM, Apurva Mehta <
> > > > > > > > apu...@confluent.io
> > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > Hi Ben,
> > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > Now, on to your first question of how deal with
> > > > consumer
> > > > > > > > > > rebalances.
> > > > > > > > > > >
> > > > > > > > > > > > The
> > > > > > > > > > >
> > > > > > > > > > > > > > short answer is that the application needs to
> > ensure
> > > > that
> > > > > > the
> > > > > > > > the
> > > > > > > > > > >
> > > > > > > > > > > > > > assignment of input partitions to appId is
> > consistent
> > > > > > across
> > > > > > > > > > >
> > > > > > > > > > > > rebalances.
> > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > For Kafka streams, they already ensure that the
> > > mapping
> > > > > of
> > > > > > > > input
> > > > > > > > > > >
> > > > > > > > > > > > > partitions
> > > > > > > > > > >
> > > > > > > > > > > > > > to task Id is invariant across rebalances by
> > > > > implementing a
> > > > > > > > > custom
> > > > > > > > > > >
> > > > > > > > > > > > sticky
> > > > > > > > > > >
> > > > > > > > > > > > > > assignor. Other non-streams apps can trivially
> have
> > > one
> > > > > > > > producer
> > > > > > > > > > per
> > > > > > > > > > >
> > > > > > > > > > > > > input
> > > > > > > > > > >
> > > > > > > > > > > > > > partition and have the appId be the same as the
> > > > partition
> > > > > > > > number
> > > > > > > > > to
> > > > > > > > > > >
> > > > > > > > > > > > > achieve
> > > > > > > > > > >
> > > > > > > > > > > > > > the same effect.
> > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > With this precondition in place, we can maintain
> > > > > > transactions
> > > > > > > > > > across
> > > > > > > > > > >
> > > > > > > > > > > > > > rebalances.
> > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > Hope this answers your question.
> > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > >
> > > > > > > > > > > > > > Apurva
> > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > On Tue, Dec 6, 2016 at 3:22 PM, Ben Kirwin <
> > > > b...@kirw.in>
> > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks for this! I'm looking forward to going
> > > through
> > > > > the
> > > > > > > > full
> > > > > > > > > > >
> > > > > > > > > > > > proposal
> > > > > > > > > > >
> > > > > > > > > > > > > > in
> > > > > > > > > > >
> > > > > > > > > > > > > > > detail soon; a few early questions:
> > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > > First: what happens when a consumer rebalances
> in
> > > the
> > > > > > > middle
> > > > > > > > > of a
> > > > > > > > > > >
> > > > > > > > > > > > > > > transaction? The full documentation suggests
> that
> > > > such
> > > > > a
> > > > > > > > > > > transaction
> > > > > > > > > > >
> > > > > > > > > > > > > > ought
> > > > > > > > > > >
> > > > > > > > > > > > > > > to be rejected:
> > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > > > [...] if a rebalance has happened and this
> > > consumer
> > > > > > > > > > >
> > > > > > > > > > > > > > > > instance becomes a zombie, even if this
> offset
> > > > > message
> > > > > > is
> > > > > > > > > > > appended
> > > > > > > > > > >
> > > > > > > > > > > > in
> > > > > > > > > > >
> > > > > > > > > > > > > > the
> > > > > > > > > > >
> > > > > > > > > > > > > > > > offset topic, the transaction will be
> rejected
> > > > later
> > > > > on
> > > > > > > > when
> > > > > > > > > it
> > > > > > > > > > >
> > > > > > > > > > > > tries
> > > > > > > > > > >
> > > > > > > > > > > > > > to
> > > > > > > > > > >
> > > > > > > > > > > > > > > > commit the transaction via the EndTxnRequest.
> > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > > ...but it's unclear to me how we ensure that a
> > > > > > transaction
> > > > > > > > > can't
> > > > > > > > > > >
> > > > > > > > > > > > > complete
> > > > > > > > > > >
> > > > > > > > > > > > > > > if a rebalance has happened. (It's quite
> possible
> > > I'm
> > > > > > > missing
> > > > > > > > > > >
> > > > > > > > > > > > something
> > > > > > > > > > >
> > > > > > > > > > > > > > > obvious!)
> > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > > As a concrete example: suppose a process with
> > PID 1
> > > > > adds
> > > > > > > > > offsets
> > > > > > > > > > > for
> > > > > > > > > > >
> > > > > > > > > > > > > some
> > > > > > > > > > >
> > > > > > > > > > > > > > > partition to a transaction; a consumer
> rebalance
> > > > > happens
> > > > > > > that
> > > > > > > > > > > assigns
> > > > > > > > > > >
> > > > > > > > > > > > > the
> > > > > > > > > > >
> > > > > > > > > > > > > > > partition to a process with PID 2, which adds
> > some
> > > > > > offsets
> > > > > > > to
> > > > > > > > > its
> > > > > > > > > > >
> > > > > > > > > > > > > current
> > > > > > > > > > >
> > > > > > > > > > > > > > > transaction; both processes try and commit.
> > > Allowing
> > > > > both
> > > > > > > > > commits
> > > > > > > > > > >
> > > > > > > > > > > > would
> > > > > > > > > > >
> > > > > > > > > > > > > > > cause the messages to be processed twice -- how
> > is
> > > > that
> > > > > > > > > avoided?
> > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > > Second: App IDs normally map to a single PID.
> It
> > > > seems
> > > > > > like
> > > > > > > > one
> > > > > > > > > > > could
> > > > > > > > > > >
> > > > > > > > > > > > > do
> > > > > > > > > > >
> > > > > > > > > > > > > > > away with the PID concept entirely, and just
> use
> > > App
> > > > > IDs
> > > > > > in
> > > > > > > > > most
> > > > > > > > > > >
> > > > > > > > > > > > places
> > > > > > > > > > >
> > > > > > > > > > > > > > > that require a PID. This feels like it would be
> > > > > > > significantly
> > > > > > > > > > >
> > > > > > > > > > > > simpler,
> > > > > > > > > > >
> > > > > > > > > > > > > > > though it does increase the message size. Are
> > there
> > > > > other
> > > > > > > > > reasons
> > > > > > > > > > > why
> > > > > > > > > > >
> > > > > > > > > > > > > the
> > > > > > > > > > >
> > > > > > > > > > > > > > > App ID / PID split is necessary?
> > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > > On Wed, Nov 30, 2016 at 2:19 PM, Guozhang Wang
> <
> > > > > > > > > > wangg...@gmail.com
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi all,
> > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > > > I have just created KIP-98 to enhance Kafka
> > with
> > > > > > exactly
> > > > > > > > once
> > > > > > > > > > >
> > > > > > > > > > > > > delivery
> > > > > > > > > > >
> > > > > > > > > > > > > > > > semantics:
> > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > > > *https://cwiki.apache.org/
> > > > > > confluence/display/KAFKA/KIP-
> > > > > > > > > > >
> > > > > > > > > > > > > > > > 98+-+Exactly+Once+Delivery+
> > > > > and+Transactional+Messaging
> > > > > > > > > > >
> > > > > > > > > > > > > > > > <https://cwiki.apache.org/
> > > > > > confluence/display/KAFKA/KIP-
> > > > > > > > > > >
> > > > > > > > > > > > > > > > 98+-+Exactly+Once+Delivery+
> > > > > > and+Transactional+Messaging>*
> > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > > > This KIP adds a transactional messaging
> > mechanism
> > > > > along
> > > > > > > > with
> > > > > > > > > an
> > > > > > > > > > >
> > > > > > > > > > > > > > > idempotent
> > > > > > > > > > >
> > > > > > > > > > > > > > > > producer implementation to make sure that 1)
> > > > > duplicated
> > > > > > > > > > messages
> > > > > > > > > > >
> > > > > > > > > > > > sent
> > > > > > > > > > >
> > > > > > > > > > > > > > > from
> > > > > > > > > > >
> > > > > > > > > > > > > > > > the same identified producer can be detected
> on
> > > the
> > > > > > > broker
> > > > > > > > > > side,
> > > > > > > > > > >
> > > > > > > > > > > > and
> > > > > > > > > > >
> > > > > > > > > > > > > > 2) a
> > > > > > > > > > >
> > > > > > > > > > > > > > > > group of messages sent within a transaction
> > will
> > > > > > > atomically
> > > > > > > > > be
> > > > > > > > > > >
> > > > > > > > > > > > either
> > > > > > > > > > >
> > > > > > > > > > > > > > > > reflected and fetchable to consumers or not
> as
> > a
> > > > > whole.
> > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > > > The above wiki page provides a high-level
> view
> > of
> > > > the
> > > > > > > > > proposed
> > > > > > > > > > >
> > > > > > > > > > > > > changes
> > > > > > > > > > >
> > > > > > > > > > > > > > as
> > > > > > > > > > >
> > > > > > > > > > > > > > > > well as summarized guarantees. Initial draft
> of
> > > the
> > > > > > > > detailed
> > > > > > > > > > >
> > > > > > > > > > > > > > > implementation
> > > > > > > > > > >
> > > > > > > > > > > > > > > > design is described in this Google doc:
> > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > > > https://docs.google.com/document/d/11Jqy_
> > > > > > > > > > >
> > > > > > > > > > > > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> > > > > > > > > > >
> > > > > > > > > > > > > > > > 0wSw9ra8
> > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > > > We would love to hear your comments and
> > > > suggestions.
> > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > > > -- Guozhang
> > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to