artemlivshits commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1197145142


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -41,18 +42,35 @@ public class ProducerStateEntry {
     private int coordinatorEpoch;
     private long lastTimestamp;
     private OptionalLong currentTxnFirstOffset;
+    
+    private VerificationState verificationState;
+    
+    // Before any batches are associated with the entry, the tentative 
sequence represents the lowest sequence seen.
+    private OptionalInt tentativeSequence;
+    
+    public enum VerificationState {
+        EMPTY,
+        VERIFYING,
+        VERIFIED
+    }

Review Comment:
   As discussed offline, this race can cause hanging transactions:
   1. producer properly adds partition to transaction
   2. producer has trouble sending messages, retries a few times (all messages 
have the same epoch + sequence)
   3. partition leader gets first message, sets state to VERIFYING, issues 
request to TC
   4. TC sees the open transaction, replies with success
   5. producer gives up and aborts transaction
   6. partition leader gets abort marker, sets state to EMPTY
   7. partition leader gets second message, sets state to VERIFYING, issues 
request to TC
   8. successful reply from step 4 finally arrives, sees the state is 
VERIFYING, sets to VERIFIED
   9. the first message is written on top of abort marker, resulting in 
"hanging" transaction
   
   the issue is that transition VERIFYING - EMPTY - VERIFYING is the same as 
VERIFYING, the classic ABA problem https://en.wikipedia.org/wiki/ABA_problem



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
         }
     }
 
+    public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+        if (this.producerEpoch < producerEpoch) {
+            batchMetadata.clear();
+            this.producerEpoch = producerEpoch;
+            return true;
+        } else {
+            return false;
+        }
+    }
+    
+    // We only set tentative sequence if no batches have been written to the 
log. It is used to avoid OutOfOrderSequenceExceptions
+    // when we saw a lower sequence during transaction verification. We will 
update the sequence when there is no batch metadata if:
+    //  a) There is no tentative sequence yet
+    //  b) A lower sequence for the same epoch is seen and should thereby 
block records after that
+    //  c) A higher producer epoch is found that will reset the lowest seen 
sequence
+    public void maybeUpdateTentativeSequence(int sequence, short 
producerEpoch) {
+        if (batchMetadata.isEmpty() && 

Review Comment:
   We only need tentative state when we don't have an entry for something that 
we've written.  I'm not sure if doing something that we don't need so that we 
don't rely on it later makes the code simpler :-).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to