artemlivshits commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1194600838


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java:
##########
@@ -183,6 +184,19 @@ private void clearProducerIds() {
         producers.clear();
         producerIdCount = 0;
     }
+    
+    public ProducerStateEntry entryForVerification(long producerId, short 
producerEpoch, int firstSequence) {

Review Comment:
   "Tentative" state is generally useful for both idempotent and transactional 
producers independently of verification functionality - as soon as we've got 
batch1 it's good to remember that we've got it, so that if we fail down the 
road with a retriable error, batch2 won't get in before we have  a chance to 
retry batch1.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
         }
     }
 
+    public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+        if (this.producerEpoch < producerEpoch) {
+            batchMetadata.clear();
+            this.producerEpoch = producerEpoch;
+            return true;

Review Comment:
   Do we use a return value?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -104,27 +122,63 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
         }
     }
 
+    public boolean maybeUpdateProducerHigherEpoch(short producerEpoch) {
+        if (this.producerEpoch < producerEpoch) {
+            batchMetadata.clear();
+            this.producerEpoch = producerEpoch;
+            return true;
+        } else {
+            return false;
+        }
+    }
+    
+    // We only set tentative sequence if no batches have been written to the 
log. It is used to avoid OutOfOrderSequenceExceptions
+    // when we saw a lower sequence during transaction verification. We will 
update the sequence when there is no batch metadata if:
+    //  a) There is no tentative sequence yet
+    //  b) A lower sequence for the same epoch is seen and should thereby 
block records after that
+    //  c) A higher producer epoch is found that will reset the lowest seen 
sequence
+    public void maybeUpdateTentativeSequence(int sequence, short 
producerEpoch) {
+        if (batchMetadata.isEmpty() && 

Review Comment:
   I don't think this is the right condition -- we only need a tentative entry 
when we literally have no entry (i.e. we don't know in which state the producer 
was when producer id got expired), not just the empty batch metadata (which can 
happen on the epoch bump).  If we got an epoch bump we know that the next 
sequence is 0 and we can reject messages that have a gap.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -41,18 +42,35 @@ public class ProducerStateEntry {
     private int coordinatorEpoch;
     private long lastTimestamp;
     private OptionalLong currentTxnFirstOffset;
+    
+    private VerificationState verificationState;
+    
+    // Before any batches are associated with the entry, the tentative 
sequence represents the lowest sequence seen.
+    private OptionalInt tentativeSequence;
+    
+    public enum VerificationState {
+        EMPTY,
+        VERIFYING,
+        VERIFIED
+    }

Review Comment:
   Using a finite state machine for handling race conditions is susceptible to 
ABA problem, i.e. the logic may not distinguish between the original state not 
changing concurrently and changing back to the same state.  In this case it 
looks like the following sequence is possible:
   1. Got message, started verifying transaction (transitioned to VERIFYING)
   2. Transaction is in ongoing, so replied with success.
   3. Transaction got aborted (transitioned to EMPTY).
   4. Got another message, started verifying transaction (transitioned to 
VERIFYING)
   5. The first message verification successfully completes, sees that state is 
VERIFYING and allows message to get written.
   
   This is why we (and pretty much any distributed system) use 
ever-incrementing states (e.g. epochs) to handle concurrency, so that we don't 
get into ABA problem.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1007,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is 
client, that we are in VERIFIED state.
+          // Also check that we are not appending a record with a higher 
sequence than one previously seen through verification.
+          if (batch.isTransactional && 
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled())
 {
+            if (verificationState(batch.producerId(), batch.producerEpoch()) 
!= ProducerStateEntry.VerificationState.VERIFIED) {
+              throw new InvalidRecordException("Record was not part of an 
ongoing transaction")
+            } else if (maybeLastEntry.isPresent && 
maybeLastEntry.get.tentativeSequence.isPresent && 
maybeLastEntry.get.tentativeSequence.getAsInt < batch.baseSequence)

Review Comment:
   I think we do sequencing checks in ProducerAppendInfo, should we move this 
there as well so that all checks are in one place?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to