Hi, Apurva,

Thanks for the update. My replies are inlined below.

On Wed, Jan 25, 2017 at 5:15 PM, Apurva Mehta <apu...@confluent.io> wrote:

> Hi Jun,
>
> Thanks again for the comments. More responses follow:
>
>
> > 101. Compatibility during upgrade: Suppose that the brokers are upgraded
> to
> > the new version, but the broker message format is still the old one. If a
> > new producer uses the transaction feature, should the producer get an
> error
> > in this case? A tricky case can be that the leader broker is on the new
> > message format, but the follower broker is still on the old message
> format.
> > In this case, the transactional info will be lost in the follower due to
> > down conversion. Should we failed the transactional requests when the
> > followers are still on the old message format?
> >
>
> This will only be an issue if applications are written to use transactions
> and are deployed with the new client before all the brokers are upgraded to
> the new message format.
>
> There are a variety of engineering solutions to this problem, one of which
> is for each broker to register itself as 'transaction ready' in zookeeper
> when it is on the right version of the message format. Once the controller
> detects that all brokers in the cluster are transaction ready, it will
> signal to each broker via the UpdateMetadataRequest that the cluster is
> ready for transactions. Any transactional requests received by brokers
> before this point will be rejected.
>
> A simpler way to solve this problem is through organizational policy: a
> cluster should not be advertised to application developers as 'transaction
> ready' until all brokers are on the new message format.
>
> I think the non-engineering solution is reasonable, and as such would
> prefer to not include engineering solutions in V1. It could be future work
> if necessary.
>
> We can make the problems that arise out of premature use of transactions
> clear in the release notes so that operators can take the necessary
> precautions. Is that reasonable?
>
>
> Yes, in the first version, we can just document the impact in the upgrade
doc.



>
>
> > 102. When there is a correlated hard failure (e.g., power outage), it's
> > possible that an existing commit/abort marker is lost in all replicas.
> This
> > may not be fixed by the transaction coordinator automatically and the
> > consumer may get stuck on that incomplete transaction forever. Not sure
> > what's the best way to address this. Perhaps, one way is to run a tool to
> > add an abort maker for all pids in all affected partitions.
> >
> >
> This is a good point. With the abort index proposal, if a correlated hard
> failure causes us to lose the markers everywhere, the LSO on the broker
> would not advance and consumers would block (but not buffer). This would be
> a noticeable situation.
>
> A simple tool may make use of internal functions to effectively do a
> 'initPID', 'beginTransaction', 'AddTopicPartitiontoTransaction',
> 'commitTransaction'. This would ensure that the markers are rewritten to
> all partitions by the transaction coordinator, but would also fence the
> existing producer with the same AppId.
>
> To make this workable, we need to make sure that the transaction
> coordinator adds a sufficient logging so that we know the AppID -> PID
> mapping as well as the partitions participating in each transaction. The
> broker should also probably log information so that we know which
> unfinished transaction (ie. which PID) is preventing the LSO from moving
> forward. Both these things will make it fairly easy to configure the tool.
>
> Of course, it is possible for the producer to continue onto another
> transaction before the tool is run, in which case the data will be corrupt
> since the second transaction will include messages from the first. But this
> is no worse than Kafka's existing durability semantics which this proposal
> relies on.
>
> I think such a tool can be a follow up work, and I have added it to the
> 'future work' section of the document.
>
>
> There can be two types of tools, one for diagnosing the issue and another
for fixing the issue. I think having at least a diagnostic tool in the
first version could be helpful. For example, the tool can report things
like which producer id is preventing the LSO from being advanced. That way,
at least the users can try to fix this themselves.


>
> > 103. Currently, there is no check for producer liveness. This means that
> if
> > a producer has not been sending transactional requests for a long time,
> its
> > appId will be expired by the coordinator. Have we considered having
> > producers sending heartbeatRequest just like the consumer to keep it
> alive?
> >
> >
> In the current proposal, a producer whose AppId has expired is a Zombie
> which will get a Fatal 'ProducerFencedException' when it tries to make any
> new transactional requests. A bounce of the producer will reinitialize it,
> at which point it can continue.
>
> As such, while the proposed behavior is not ideal, I think that a heartbeat
> thread would be a nice to have that may not be worth putting into V1. I
> have made a note to add this in the 'Future Work' section of the document.
>
>
>
Yes, as long as we have a path to add heartbeat in the future, this is fine.


>
> > 104. The logic for handling follower truncation can be a bit tricker now.
> > The truncation may rewind the sequence number for some pids. The question
> > is how to quickly recover the last sequence number of those pids. Do we
> > plan to reload from a PID snapshot and scan forward?
> >
> >
> Yes, this is exactly what we intend to do.
>
>
> > 115. Message format:
> > 115.1 Epoch is int16. Does it wrap after max? If not, is int16 too small
> > since it's possible for a producer to be restarted 10s thousands of
> times?
> > 115.2 Sequence number int32. Does it wrap after max? It's possible for a
> > producer to publish more than 2 billion messages in a session.
> >
>
> Yes, both the epoch and the sequence number will have a circular space and
> wrap around.
>
> In the pure idempotent producer case, ie. where there is no AppId, each
> producer session is guaranteed to have a unique PID and the leader for the
> partition validates the incoming sequence number per PID before committing
> to the log. So wrapping is safe.
>
> In the transactional case, the transaction coordinator assigns the epoch
> and returns the current epoch as part of the InitPIDRequest. All
> transaction requests (Begin, AddTPToTransaction, Prepare) will be accepted
> only if their epoch exactly matches the epoch recorded by the coordinator.
> Wrap around is safe except in some degenerate cases like very long lived
> zombie producers, described below.
>
> Say we have a producer at epoch 20, who suddenly becomes a zombie. Assume
> other producers with the same id are bounced for 65536 sessions (since
> epoch are 2 bytes). Now after this period, the zombie producer comes back
> online and would have the same PID/epoch as the current producer. In this
> case, it is possible for both to produce transactions and for this
> situation to go undetected.
>
> We can solve this problem by including 'session ids' for producers and
> validating that as well. But since the degenerate case is so rare, we think
> that any such session id can be part of future work if it becomes a real
> problem.
>
> I have updated the doc to explicitly mention that the epoch and sequence
> numbers may wrap.
>
>
>
Sounds good.


>
> > 119. InitPIDRequest
> > <https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> > 0wSw9ra8/edit#heading=h.z99xar1h2enr>
> > : Should we write the completion of open transactions before append the
> pid
> > with bumped up epoch to the transaction log?
> >
> >
> Done.
>
>
> > 120. transaction.app.id: An app may have multiple concurrent instances.
> > Perhaps we should name it transaction.instance.id or just instance.id?
> >
> >
> This has been the most debated point. How about just `producer.id`?
>
>
The only thing is that this will be set in the producer config so the
"producer" part seems redundant.


>
>
> > 121. The ordering is important with idempotent producer, which means
> > that max.in.flight.requests.per.connection should be set to 1. Do we
> want
> > to enforce this?
> >
>
> This makes sense. Doc has been updated.
>
>
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Jan 3, 2017 at 5:38 PM, radai <radai.rosenbl...@gmail.com>
> wrote:
> >
> > > @jun - good proposal. i was willing to concede that read-uncommitted
> was
> > > impossible under my proposal but if LSO/NSO is introduced is becomes
> > > possible.
> > >
> > >
> > > On Tue, Jan 3, 2017 at 7:50 AM, Jun Rao <j...@confluent.io> wrote:
> > >
> > > > Just to follow up on Radai's idea of pushing the buffering logic to
> the
> > > > broker. It may be possible to do this efficiently if we assume
> aborted
> > > > transactions are rare. The following is a draft proposal. For each
> > > > partition, the broker maintains the last stable offset (LSO) as
> > described
> > > > in the document, and only exposes messages up to this point if the
> > reader
> > > > is in the read-committed mode. When a new stable offset (NSO) is
> > > > determined, if there is no aborted message in this window, the broker
> > > > simply advances the LSO to the NSO. If there is at least one aborted
> > > > message, the broker first replaces the current log segment with new
> log
> > > > segments excluding the aborted messages and then advances the LSO. To
> > > make
> > > > the replacement efficient, we can replace the current log segment
> with
> > 3
> > > > new segments: (1) a new "shadow" log segment that simply references
> the
> > > > portion of the current log segment from the beginning to the LSO,
> (2) a
> > > log
> > > > segment created by copying only committed messages between the LSO
> and
> > > the
> > > > NSO, (3) a new "shadow" log segment that references the portion of
> the
> > > > current log segment from the NSO (open ended). Note that only (2)
> > > involves
> > > > real data copying. If aborted transactions are rare, this overhead
> will
> > > be
> > > > insignificant. Assuming that applications typically don't abort
> > > > transactions, transactions will only be aborted by transaction
> > > coordinators
> > > > during hard failure of the producers, which should be rare.
> > > >
> > > > This way, the consumer library's logic will be simplified. We can
> still
> > > > expose uncommitted messages to readers in the read-uncommitted mode
> and
> > > > therefore leave the door open for speculative reader in the future.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Wed, Dec 21, 2016 at 10:44 AM, Apurva Mehta <apu...@confluent.io>
> > > > wrote:
> > > >
> > > > > Hi Joel,
> > > > >
> > > > > The alternatives are embedded in the 'discussion' sections which
> are
> > > > spread
> > > > > throughout the google doc.
> > > > >
> > > > > Admittedly, we have not covered high level alternatives like those
> > > which
> > > > > have been brought up in this thread. In particular, having a
> separate
> > > log
> > > > > for transactional mesages and also having multiple producers
> > > participate
> > > > in
> > > > > a single transaction.
> > > > >
> > > > > This is an omission which we will correct.
> > > > >
> > > > > Thanks,
> > > > > Apurva
> > > > >
> > > > > On Wed, Dec 21, 2016 at 10:34 AM, Joel Koshy <jjkosh...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > >
> > > > > > >
> > > > > > > @Joel,
> > > > > > >
> > > > > > > I read over your wiki, and apart from the introduction of the
> > > notion
> > > > of
> > > > > > > journal partitions --whose pros and cons are already being
> > > > discussed--
> > > > > > you
> > > > > > > also introduce the notion of a 'producer group' which enables
> > > > multiple
> > > > > > > producers to participate in a single transaction. This is
> > > completely
> > > > > > > opposite of the model in the KIP where a transaction is defined
> > by
> > > a
> > > > > > > producer id, and hence there is a 1-1 mapping between producers
> > and
> > > > > > > transactions. Further, each producer can have exactly one
> > in-flight
> > > > > > > transaction at a time in the KIP.
> > > > > > >
> > > > > >
> > > > > > Hi Apurva - yes I did notice those differences among other things
> > :)
> > > > > BTW, I
> > > > > > haven't yet gone through the google-doc carefully but on a skim
> it
> > > does
> > > > > not
> > > > > > seem to contain any rejected alternatives as the wiki states.
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to