Hi Jun,

Thanks again for the comments. More responses follow:


> 101. Compatibility during upgrade: Suppose that the brokers are upgraded to
> the new version, but the broker message format is still the old one. If a
> new producer uses the transaction feature, should the producer get an error
> in this case? A tricky case can be that the leader broker is on the new
> message format, but the follower broker is still on the old message format.
> In this case, the transactional info will be lost in the follower due to
> down conversion. Should we failed the transactional requests when the
> followers are still on the old message format?
>

This will only be an issue if applications are written to use transactions
and are deployed with the new client before all the brokers are upgraded to
the new message format.

There are a variety of engineering solutions to this problem, one of which
is for each broker to register itself as 'transaction ready' in zookeeper
when it is on the right version of the message format. Once the controller
detects that all brokers in the cluster are transaction ready, it will
signal to each broker via the UpdateMetadataRequest that the cluster is
ready for transactions. Any transactional requests received by brokers
before this point will be rejected.

A simpler way to solve this problem is through organizational policy: a
cluster should not be advertised to application developers as 'transaction
ready' until all brokers are on the new message format.

I think the non-engineering solution is reasonable, and as such would
prefer to not include engineering solutions in V1. It could be future work
if necessary.

We can make the problems that arise out of premature use of transactions
clear in the release notes so that operators can take the necessary
precautions. Is that reasonable?




> 102. When there is a correlated hard failure (e.g., power outage), it's
> possible that an existing commit/abort marker is lost in all replicas. This
> may not be fixed by the transaction coordinator automatically and the
> consumer may get stuck on that incomplete transaction forever. Not sure
> what's the best way to address this. Perhaps, one way is to run a tool to
> add an abort maker for all pids in all affected partitions.
>
>
This is a good point. With the abort index proposal, if a correlated hard
failure causes us to lose the markers everywhere, the LSO on the broker
would not advance and consumers would block (but not buffer). This would be
a noticeable situation.

A simple tool may make use of internal functions to effectively do a
'initPID', 'beginTransaction', 'AddTopicPartitiontoTransaction',
'commitTransaction'. This would ensure that the markers are rewritten to
all partitions by the transaction coordinator, but would also fence the
existing producer with the same AppId.

To make this workable, we need to make sure that the transaction
coordinator adds a sufficient logging so that we know the AppID -> PID
mapping as well as the partitions participating in each transaction. The
broker should also probably log information so that we know which
unfinished transaction (ie. which PID) is preventing the LSO from moving
forward. Both these things will make it fairly easy to configure the tool.

Of course, it is possible for the producer to continue onto another
transaction before the tool is run, in which case the data will be corrupt
since the second transaction will include messages from the first. But this
is no worse than Kafka's existing durability semantics which this proposal
relies on.

I think such a tool can be a follow up work, and I have added it to the
'future work' section of the document.



> 103. Currently, there is no check for producer liveness. This means that if
> a producer has not been sending transactional requests for a long time, its
> appId will be expired by the coordinator. Have we considered having
> producers sending heartbeatRequest just like the consumer to keep it alive?
>
>
In the current proposal, a producer whose AppId has expired is a Zombie
which will get a Fatal 'ProducerFencedException' when it tries to make any
new transactional requests. A bounce of the producer will reinitialize it,
at which point it can continue.

As such, while the proposed behavior is not ideal, I think that a heartbeat
thread would be a nice to have that may not be worth putting into V1. I
have made a note to add this in the 'Future Work' section of the document.



> 104. The logic for handling follower truncation can be a bit tricker now.
> The truncation may rewind the sequence number for some pids. The question
> is how to quickly recover the last sequence number of those pids. Do we
> plan to reload from a PID snapshot and scan forward?
>
>
Yes, this is exactly what we intend to do.


> 115. Message format:
> 115.1 Epoch is int16. Does it wrap after max? If not, is int16 too small
> since it's possible for a producer to be restarted 10s thousands of times?
> 115.2 Sequence number int32. Does it wrap after max? It's possible for a
> producer to publish more than 2 billion messages in a session.
>

Yes, both the epoch and the sequence number will have a circular space and
wrap around.

In the pure idempotent producer case, ie. where there is no AppId, each
producer session is guaranteed to have a unique PID and the leader for the
partition validates the incoming sequence number per PID before committing
to the log. So wrapping is safe.

In the transactional case, the transaction coordinator assigns the epoch
and returns the current epoch as part of the InitPIDRequest. All
transaction requests (Begin, AddTPToTransaction, Prepare) will be accepted
only if their epoch exactly matches the epoch recorded by the coordinator.
Wrap around is safe except in some degenerate cases like very long lived
zombie producers, described below.

Say we have a producer at epoch 20, who suddenly becomes a zombie. Assume
other producers with the same id are bounced for 65536 sessions (since
epoch are 2 bytes). Now after this period, the zombie producer comes back
online and would have the same PID/epoch as the current producer. In this
case, it is possible for both to produce transactions and for this
situation to go undetected.

We can solve this problem by including 'session ids' for producers and
validating that as well. But since the degenerate case is so rare, we think
that any such session id can be part of future work if it becomes a real
problem.

I have updated the doc to explicitly mention that the epoch and sequence
numbers may wrap.



> 119. InitPIDRequest
> <https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> 0wSw9ra8/edit#heading=h.z99xar1h2enr>
> : Should we write the completion of open transactions before append the pid
> with bumped up epoch to the transaction log?
>
>
Done.


> 120. transaction.app.id: An app may have multiple concurrent instances.
> Perhaps we should name it transaction.instance.id or just instance.id?
>
>
This has been the most debated point. How about just `producer.id`?



> 121. The ordering is important with idempotent producer, which means
> that max.in.flight.requests.per.connection should be set to 1. Do we want
> to enforce this?
>

This makes sense. Doc has been updated.


>
> Thanks,
>
> Jun
>
>
> On Tue, Jan 3, 2017 at 5:38 PM, radai <radai.rosenbl...@gmail.com> wrote:
>
> > @jun - good proposal. i was willing to concede that read-uncommitted was
> > impossible under my proposal but if LSO/NSO is introduced is becomes
> > possible.
> >
> >
> > On Tue, Jan 3, 2017 at 7:50 AM, Jun Rao <j...@confluent.io> wrote:
> >
> > > Just to follow up on Radai's idea of pushing the buffering logic to the
> > > broker. It may be possible to do this efficiently if we assume aborted
> > > transactions are rare. The following is a draft proposal. For each
> > > partition, the broker maintains the last stable offset (LSO) as
> described
> > > in the document, and only exposes messages up to this point if the
> reader
> > > is in the read-committed mode. When a new stable offset (NSO) is
> > > determined, if there is no aborted message in this window, the broker
> > > simply advances the LSO to the NSO. If there is at least one aborted
> > > message, the broker first replaces the current log segment with new log
> > > segments excluding the aborted messages and then advances the LSO. To
> > make
> > > the replacement efficient, we can replace the current log segment with
> 3
> > > new segments: (1) a new "shadow" log segment that simply references the
> > > portion of the current log segment from the beginning to the LSO, (2) a
> > log
> > > segment created by copying only committed messages between the LSO and
> > the
> > > NSO, (3) a new "shadow" log segment that references the portion of the
> > > current log segment from the NSO (open ended). Note that only (2)
> > involves
> > > real data copying. If aborted transactions are rare, this overhead will
> > be
> > > insignificant. Assuming that applications typically don't abort
> > > transactions, transactions will only be aborted by transaction
> > coordinators
> > > during hard failure of the producers, which should be rare.
> > >
> > > This way, the consumer library's logic will be simplified. We can still
> > > expose uncommitted messages to readers in the read-uncommitted mode and
> > > therefore leave the door open for speculative reader in the future.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Wed, Dec 21, 2016 at 10:44 AM, Apurva Mehta <apu...@confluent.io>
> > > wrote:
> > >
> > > > Hi Joel,
> > > >
> > > > The alternatives are embedded in the 'discussion' sections which are
> > > spread
> > > > throughout the google doc.
> > > >
> > > > Admittedly, we have not covered high level alternatives like those
> > which
> > > > have been brought up in this thread. In particular, having a separate
> > log
> > > > for transactional mesages and also having multiple producers
> > participate
> > > in
> > > > a single transaction.
> > > >
> > > > This is an omission which we will correct.
> > > >
> > > > Thanks,
> > > > Apurva
> > > >
> > > > On Wed, Dec 21, 2016 at 10:34 AM, Joel Koshy <jjkosh...@gmail.com>
> > > wrote:
> > > >
> > > > > >
> > > > > >
> > > > > > @Joel,
> > > > > >
> > > > > > I read over your wiki, and apart from the introduction of the
> > notion
> > > of
> > > > > > journal partitions --whose pros and cons are already being
> > > discussed--
> > > > > you
> > > > > > also introduce the notion of a 'producer group' which enables
> > > multiple
> > > > > > producers to participate in a single transaction. This is
> > completely
> > > > > > opposite of the model in the KIP where a transaction is defined
> by
> > a
> > > > > > producer id, and hence there is a 1-1 mapping between producers
> and
> > > > > > transactions. Further, each producer can have exactly one
> in-flight
> > > > > > transaction at a time in the KIP.
> > > > > >
> > > > >
> > > > > Hi Apurva - yes I did notice those differences among other things
> :)
> > > > BTW, I
> > > > > haven't yet gone through the google-doc carefully but on a skim it
> > does
> > > > not
> > > > > seem to contain any rejected alternatives as the wiki states.
> > > > >
> > > >
> > >
> >
>

Reply via email to