jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1205777679


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
         }
     }
 
+    public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+        if (this.producerEpoch < producerEpoch) {
+            batchMetadata.clear();
+            this.producerEpoch = producerEpoch;
+            return true;
+        } else {
+            return false;
+        }
+    }
+    
+    // We only set tentative sequence if no batches have been written to the 
log. It is used to avoid OutOfOrderSequenceExceptions
+    // when we saw a lower sequence during transaction verification. We will 
update the sequence when there is no batch metadata if:
+    //  a) There is no tentative sequence yet
+    //  b) A lower sequence for the same epoch is seen and should thereby 
block records after that
+    //  c) A higher producer epoch is found that will reset the lowest seen 
sequence
+    public void maybeUpdateTentativeSequence(int sequence, short 
producerEpoch) {
+        if (batchMetadata.isEmpty() && 

Review Comment:
   I dug into this a bit more. We actually do check sequence on bumped epoch! 
   
   ```
    if (producerEpoch != updatedEntry.producerEpoch()) {
               if (appendFirstSeq != 0) {
                   if (updatedEntry.producerEpoch() != 
RecordBatch.NO_PRODUCER_EPOCH) {
                       throw new OutOfOrderSequenceException("Invalid sequence 
number for new epoch of producer " + producerId +
                               "at offset " + offset + " in partition " + 
topicPartition + ": " + producerEpoch + " (request epoch), "
                               + appendFirstSeq + " (seq. number), " + 
updatedEntry.producerEpoch() + " (current producer epoch)");
                               ```
                               
   Basically we check when the updated epoch is different than the epoch we 
want to use. The tricky part is that we update the epoch when creating the 
verification state, so we just need to do that check there somehow. I will look 
into it.                            



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
         }
     }
 
+    public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+        if (this.producerEpoch < producerEpoch) {
+            batchMetadata.clear();
+            this.producerEpoch = producerEpoch;
+            return true;
+        } else {
+            return false;
+        }
+    }
+    
+    // We only set tentative sequence if no batches have been written to the 
log. It is used to avoid OutOfOrderSequenceExceptions
+    // when we saw a lower sequence during transaction verification. We will 
update the sequence when there is no batch metadata if:
+    //  a) There is no tentative sequence yet
+    //  b) A lower sequence for the same epoch is seen and should thereby 
block records after that
+    //  c) A higher producer epoch is found that will reset the lowest seen 
sequence
+    public void maybeUpdateTentativeSequence(int sequence, short 
producerEpoch) {
+        if (batchMetadata.isEmpty() && 

Review Comment:
   I dug into this a bit more. We actually do check sequence on bumped epoch! 
   
   ```
    if (producerEpoch != updatedEntry.producerEpoch()) {
               if (appendFirstSeq != 0) {
                   if (updatedEntry.producerEpoch() != 
RecordBatch.NO_PRODUCER_EPOCH) {
                       throw new OutOfOrderSequenceException("Invalid sequence 
number for new epoch of producer " + producerId +
                               "at offset " + offset + " in partition " + 
topicPartition + ": " + producerEpoch + " (request epoch), "
                               + appendFirstSeq + " (seq. number), " + 
updatedEntry.producerEpoch() + " (current producer epoch)");
    ```
                               
   Basically we check when the updated epoch is different than the epoch we 
want to use. The tricky part is that we update the epoch when creating the 
verification state, so we just need to do that check there somehow. I will look 
into it.                            



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to