artemlivshits commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1197145142
########## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ########## @@ -41,18 +42,35 @@ public class ProducerStateEntry { private int coordinatorEpoch; private long lastTimestamp; private OptionalLong currentTxnFirstOffset; + + private VerificationState verificationState; + + // Before any batches are associated with the entry, the tentative sequence represents the lowest sequence seen. + private OptionalInt tentativeSequence; + + public enum VerificationState { + EMPTY, + VERIFYING, + VERIFIED + } Review Comment: As discussed offline, this race can cause hanging transactions: 1. producer properly adds partition to transaction 2. producer has trouble sending messages, retries a few times (all messages have the same epoch + sequence) 3. partition leader gets first message, sets state to VERIFYING, issues request to TC 4. TC sees the open transaction, replies with success 5. producer gives up and aborts transaction 6. partition leader gets abort marker, sets state to EMPTY 7. partition leader gets second message, sets state to VERIFYING, issues request to TC 8. successful reply from step 4 finally arrives, sees the state is VERIFYING, sets to VERIFIED 9. the first message is written on top of abort marker, resulting in "hanging" transaction the issue is that transition VERIFYING - EMPTY - VERIFYING is the same as VERIFYING, the classic ABA problem https://en.wikipedia.org/wiki/ABA_problem ########## storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java: ########## @@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short producerEpoch) { } } + public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) { + if (this.producerEpoch < producerEpoch) { + batchMetadata.clear(); + this.producerEpoch = producerEpoch; + return true; + } else { + return false; + } + } + + // We only set tentative sequence if no batches have been written to the log. It is used to avoid OutOfOrderSequenceExceptions + // when we saw a lower sequence during transaction verification. We will update the sequence when there is no batch metadata if: + // a) There is no tentative sequence yet + // b) A lower sequence for the same epoch is seen and should thereby block records after that + // c) A higher producer epoch is found that will reset the lowest seen sequence + public void maybeUpdateTentativeSequence(int sequence, short producerEpoch) { + if (batchMetadata.isEmpty() && Review Comment: We only need tentative state when we don't have an entry for something that we've written. I'm not sure if doing something that we don't need so that we don't rely on it later makes the code simpler :-). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org