Quick update: I have merged the abort index proposal linked above into the main design document. We are now working on tying up the loose ends raised by Jun and others.
Thanks, Jason On Tue, Jan 17, 2017 at 8:53 PM, Apurva Mehta <apu...@confluent.io> wrote: > > > > 114.The section on Discussion on Pro-active Transaction Timeout: "If > there > > is no other instance with the same PID yet, or it has not started a > > transaction, or it has not appended a message to some of the partitions, > > then the zombie can continue appending messages to those partitions after > > the abort marker whose epoch has not been incremented yet, but its > > commitTxn call will fail.” > > Is that correct, in earlier discussion, it seems that if a transaction is > > timed out by the coordinator, the coordinator will bump up epoch and > write > > the abort marker to those inserted partitions. > > > You are correct. The zombie producer will not be able to write to any > partitions after the abort because the epoch on each broker would have been > bumped by the abort message, causing the zombie to be fenced of totally. > Will correct the doc. > > Thanks, > Apurva > > On Tue, Jan 17, 2017 at 6:17 PM, Apurva Mehta <apu...@confluent.io> wrote: > > > Hi Jun, > > > > Some answers in line. > > > > > > 109. Could you describe when Producer.send() will receive an Unrecognized > > > > MessageException? > > > > > > This exception will be thrown if the producer sends a sequence number > > which is greater than the sequence number expected by the broker (ie. > more > > than 1 greater than the previously sent sequence number). This can happen > > in two cases: > > > > a) If there is a bug in the producer where sequence numbers are > > incremented more than once per message. So the producer itself will send > > messages with gaps in sequence numbers. > > b) The broker somehow lost a previous message. In a cluster configured > for > > durability (ie. no unclean leader elections, replication factor of 3, > > min.isr of 2, acks=all, etc.), this should not happened. > > > > So realistically, this exception will only be thrown in clusters > > configured for high availability where brokers could lose messages. > > > > Becket raised the question if we should throw this exception at all in > > case b: it indicates a problem with a previously sent message and hence > the > > semantics are counter intuitive. We are still discussing this point, and > > suggestions are most welcome! > > > > > >> 111. Transaction coordinator startup: "Verify that there is already an > >> entry with the PID in the AppID map. If there is not, raise an > exception." > >> For completed transactions, it seems that it's possible that their > >> AppId->pid has been compacted out. But that shouldn't trigger an > >> exception? > > > > > > This exception will only be raised if the coordinator encounters > > transaction status message in the log (Begin, AddTPToTransaction, > Prepare, > > Commit/Abort). We will compact out the AppId->PID mapping along with the > > transaction status messages for that PID, so we should not encounter one > > without the other. We will clarify that status messages for completed > > transactions can be compacted out aggressively. > > > > 113. Zombie producer: > >> "If the zombie has an ongoing transaction with its old PID while its > AppID > >> is being expired by the coordinator, by the time the zombie is about to > >> commit the transaction it needs to talk to coordinator again and will be > >> notified its PID is unrecognized and hence need to re-register its AppID > >> with the InitPIDRequest. At this time, if there is already another > >> registered producer with the same AppID, then this request will be > >> rejected > >> with the fatal ProducerFenced error code." > >> Is that right? According the the coordinator request handling logic, it > >> seems that the InitPIDRequest will bump up the epoch of the pid and > >> succeed? > > > > > > This is a good point. The InitPIDRequest will not fail, but will fence > off > > the other producer. In this case, the CommitTxn should fail, since there > > would be no ongoing transaction for the PID/Epoch pair. I will update the > > document to reflect this. > > > > > > On Wed, Jan 4, 2017 at 10:54 AM, Jun Rao <j...@confluent.io> wrote: > > > >> Thanks for the proposal. A few more detailed comments. > >> > >> 100. Security: It seems that if an app is mistakenly configured with the > >> appId of an existing producer, it can take over the pid and prevent the > >> existing app from publishing. So, I am wondering if it makes sense to > add > >> ACLs at the TransactionResource level just like we do for > >> ConsumerGroupResource. So, a user can only do transactions under a > >> particular appId if he/she has the write permission to the > >> TransactionResource > >> associated with the appId. > >> > >> 101. Compatibility during upgrade: Suppose that the brokers are upgraded > >> to > >> the new version, but the broker message format is still the old one. If > a > >> new producer uses the transaction feature, should the producer get an > >> error > >> in this case? A tricky case can be that the leader broker is on the new > >> message format, but the follower broker is still on the old message > >> format. > >> In this case, the transactional info will be lost in the follower due to > >> down conversion. Should we failed the transactional requests when the > >> followers are still on the old message format? > >> > >> 102. When there is a correlated hard failure (e.g., power outage), it's > >> possible that an existing commit/abort marker is lost in all replicas. > >> This > >> may not be fixed by the transaction coordinator automatically and the > >> consumer may get stuck on that incomplete transaction forever. Not sure > >> what's the best way to address this. Perhaps, one way is to run a tool > to > >> add an abort maker for all pids in all affected partitions. > >> > >> 103. Currently, there is no check for producer liveness. This means that > >> if > >> a producer has not been sending transactional requests for a long time, > >> its > >> appId will be expired by the coordinator. Have we considered having > >> producers sending heartbeatRequest just like the consumer to keep it > >> alive? > >> > >> 104. The logic for handling follower truncation can be a bit tricker > now. > >> The truncation may rewind the sequence number for some pids. The > question > >> is how to quickly recover the last sequence number of those pids. Do we > >> plan to reload from a PID snapshot and scan forward? > >> > >> 105. When the transaction coordinator changes (due to leadership > changes), > >> it's possible for both the old and the new coordinator sending requests > to > >> a broker at the same time (for a short period of time). I am wondering > if > >> we need to add logic to fence off the old coordinator. One way to do > that > >> is to include the leaderEpoch of the partition associated with the > >> coordinator in the coordinator to broker request and control messages. > >> > >> 106. Compacted topics. > >> 106.1. When all messages in a transaction are removed, we could remove > the > >> commit/abort marker for that transaction too. However, we have to be a > bit > >> careful. If the marker is removed too quickly, it's possible for a > >> consumer > >> to see a message in that transaction, but not to see the marker, and > >> therefore will be stuck in that transaction forever. We have a similar > >> issue when dealing with tombstones. The solution is to preserve the > >> tombstone for at least a preconfigured amount of time after the cleaning > >> has passed the tombstone. Then, as long as a consumer can finish reading > >> to > >> the cleaning point within the configured amount of time, it's guaranteed > >> not to miss the tombstone after it has seen a non-tombstone message on > the > >> same key. I am wondering if we should do something similar here. > >> 106.2. "To address this problem, we propose to preserve the last epoch > and > >> sequence number written by each producer for a fixed amount of time as > an > >> empty message set. This is allowed by the new message format we are > >> proposing in this document. The time to preserve the sequence number > will > >> be governed by the log retention settings. " Could you be a bit more > >> specific on what retention time will be used since by default, there is > no > >> retention time for compacted (but not delete) topic? > >> 106.3 "As for control messages, if the broker does not have any > >> corresponding transaction cached with the PID when encountering a > control > >> message, that message can be safely removed." > >> Do controlled messages have keys? If not, do we need to relax the > >> constraint that messages in a compacted topic must have keys? > >> > >> 107. Could you include the default values for the newly introduced > >> configs? > >> > >> 108. Could you describe the format of the PID snapshot file? > >> > >> 109. Could you describe when Producer.send() will receive an > UnrecognizedM > >> essageException? > >> > >> 110. Transaction log: > >> 110.1 "Key => Version AppID Version" It seems that Version should really > >> be > >> Type? > >> 110.2 "Value => Version Epoch Status ExpirationTime [Topic Partition]" > >> Should we store [Topic [Partition]] instead? > >> 110.3 To expire an AppId, do we need to insert a tombstone with the > >> expired > >> AppID as the key to physically remove the existing AppID entries in the > >> transaction log? > >> > >> 111. Transaction coordinator startup: "Verify that there is already an > >> entry with the PID in the AppID map. If there is not, raise an > exception." > >> For completed transactions, it seems that it's possible that their > >> AppId->pid has been compacted out. But that shouldn't trigger an > >> exception? > >> > >> 112. Control message: Will control messages be used for timestamp > >> indexing? > >> If so, what timestamp will we use if the timestamp type is creation > time? > >> > >> 113. Zombie producer: > >> "If the zombie has an ongoing transaction with its old PID while its > AppID > >> is being expired by the coordinator, by the time the zombie is about to > >> commit the transaction it needs to talk to coordinator again and will be > >> notified its PID is unrecognized and hence need to re-register its AppID > >> with the InitPIDRequest. At this time, if there is already another > >> registered producer with the same AppID, then this request will be > >> rejected > >> with the fatal ProducerFenced error code." > >> Is that right? According the the coordinator request handling logic, it > >> seems that the InitPIDRequest will bump up the epoch of the pid and > >> succeed? > >> > >> 114.The section on Discussion on Pro-active Transaction Timeout: "If > there > >> is no other instance with the same PID yet, or it has not started a > >> transaction, or it has not appended a message to some of the partitions, > >> then the zombie can continue appending messages to those partitions > after > >> the abort marker whose epoch has not been incremented yet, but its > >> commitTxn > >> call will fail." > >> Is that correct, in earlier discussion, it seems that if a transaction > is > >> timed out by the coordinator, the coordinator will bump up epoch and > write > >> the abort marker to those inserted partitions. > >> > >> 115. Message format: > >> 115.1 Epoch is int16. Does it wrap after max? If not, is int16 too small > >> since it's possible for a producer to be restarted 10s thousands of > times? > >> 115.2 Sequence number int32. Does it wrap after max? It's possible for a > >> producer to publish more than 2 billion messages in a session. > >> 115.3 "Null-value bit is 1: skip the key-length (since it can now be > >> calculated) and value fields." It seems that it's unnatural for the > format > >> of key to depend on value. It seems it's easier to just skip value in > this > >> case? > >> > >> 116. ProducerRequest: The existing format doesn't have "MessageSetSize" > at > >> the partition level. > >> > >> 117. UpdateTxnRequest: Could you explain the format of Marker? > >> > >> 118. TxnOffsetCommitRequest: How is retention time determined? Do we > need > >> a > >> new config in producer or just default it to -1 as the consumer? > >> > >> 119. InitPIDRequest > >> <https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7 > >> CP1SnQGdp2eF0wSw9ra8/edit#heading=h.z99xar1h2enr> > >> : Should we write the completion of open transactions before append the > >> pid > >> with bumped up epoch to the transaction log? > >> > >> 120. transaction.app.id: An app may have multiple concurrent instances. > >> Perhaps we should name it transaction.instance.id or just instance.id? > >> > >> 121. The ordering is important with idempotent producer, which means > >> that max.in.flight.requests.per.connection should be set to 1. Do we > want > >> to enforce this? > >> > >> Thanks, > >> > >> Jun > >> > >> > >> On Tue, Jan 3, 2017 at 5:38 PM, radai <radai.rosenbl...@gmail.com> > wrote: > >> > >> > @jun - good proposal. i was willing to concede that read-uncommitted > was > >> > impossible under my proposal but if LSO/NSO is introduced is becomes > >> > possible. > >> > >> > > >> > > >> > On Tue, Jan 3, 2017 at 7:50 AM, Jun Rao <j...@confluent.io> wrote: > >> > > >> > > Just to follow up on Radai's idea of pushing the buffering logic to > >> the > >> > > broker. It may be possible to do this efficiently if we assume > aborted > >> > > transactions are rare. The following is a draft proposal. For each > >> > > partition, the broker maintains the last stable offset (LSO) as > >> described > >> > > in the document, and only exposes messages up to this point if the > >> reader > >> > > is in the read-committed mode. When a new stable offset (NSO) is > >> > > determined, if there is no aborted message in this window, the > broker > >> > > simply advances the LSO to the NSO. If there is at least one aborted > >> > > message, the broker first replaces the current log segment with new > >> log > >> > > segments excluding the aborted messages and then advances the LSO. > To > >> > make > >> > > the replacement efficient, we can replace the current log segment > >> with 3 > >> > > new segments: (1) a new "shadow" log segment that simply references > >> the > >> > > portion of the current log segment from the beginning to the LSO, > (2) > >> a > >> > log > >> > > segment created by copying only committed messages between the LSO > and > >> > the > >> > > NSO, (3) a new "shadow" log segment that references the portion of > the > >> > > current log segment from the NSO (open ended). Note that only (2) > >> > involves > >> > > real data copying. If aborted transactions are rare, this overhead > >> will > >> > be > >> > > insignificant. Assuming that applications typically don't abort > >> > > transactions, transactions will only be aborted by transaction > >> > coordinators > >> > > during hard failure of the producers, which should be rare. > >> > > > >> > > This way, the consumer library's logic will be simplified. We can > >> still > >> > > expose uncommitted messages to readers in the read-uncommitted mode > >> and > >> > > therefore leave the door open for speculative reader in the future. > >> > > > >> > > Thanks, > >> > > > >> > > Jun > >> > > > >> > > > >> > > On Wed, Dec 21, 2016 at 10:44 AM, Apurva Mehta <apu...@confluent.io > > > >> > > wrote: > >> > > > >> > > > Hi Joel, > >> > > > > >> > > > The alternatives are embedded in the 'discussion' sections which > are > >> > > spread > >> > > > throughout the google doc. > >> > > > > >> > > > Admittedly, we have not covered high level alternatives like those > >> > which > >> > > > have been brought up in this thread. In particular, having a > >> separate > >> > log > >> > > > for transactional mesages and also having multiple producers > >> > participate > >> > > in > >> > > > a single transaction. > >> > > > > >> > > > This is an omission which we will correct. > >> > > > > >> > > > Thanks, > >> > > > Apurva > >> > > > > >> > > > On Wed, Dec 21, 2016 at 10:34 AM, Joel Koshy <jjkosh...@gmail.com > > > >> > > wrote: > >> > > > > >> > > > > > > >> > > > > > > >> > > > > > @Joel, > >> > > > > > > >> > > > > > I read over your wiki, and apart from the introduction of the > >> > notion > >> > > of > >> > > > > > journal partitions --whose pros and cons are already being > >> > > discussed-- > >> > > > > you > >> > > > > > also introduce the notion of a 'producer group' which enables > >> > > multiple > >> > > > > > producers to participate in a single transaction. This is > >> > completely > >> > > > > > opposite of the model in the KIP where a transaction is > defined > >> by > >> > a > >> > > > > > producer id, and hence there is a 1-1 mapping between > producers > >> and > >> > > > > > transactions. Further, each producer can have exactly one > >> in-flight > >> > > > > > transaction at a time in the KIP. > >> > > > > > > >> > > > > > >> > > > > Hi Apurva - yes I did notice those differences among other > things > >> :) > >> > > > BTW, I > >> > > > > haven't yet gone through the google-doc carefully but on a skim > it > >> > does > >> > > > not > >> > > > > seem to contain any rejected alternatives as the wiki states. > >> > > > > > >> > > > > >> > > > >> > > >> > > > > >