artemlivshits commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1206199912


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -579,10 +579,28 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     result
   }
 
+  def transactionNeedsVerifying(producerId: Long, producerEpoch: Short, 
baseSequence: Int): Optional[VerificationState] = lock synchronized {
+    val entry = producerStateManager.entryForVerification(producerId, 
producerEpoch, baseSequence)
+    if (entry.currentTxnFirstOffset.isPresent) {

Review Comment:
   Maybe add a comment why we don't need verification if 
currentTxnFirstOffset.isPresent.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -103,28 +115,57 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
             return false;
         }
     }
+    
+    public void maybeAddVerificationState() {
+        // If we already have a verification state, we can reuse it. This is 
because we know this is the same transaction 
+        // as the state is cleared upon writing a control marker.
+        if (!this.verificationState.isPresent())
+            this.verificationState = Optional.of(new VerificationState());

Review Comment:
   This creates an object of class Optional that points to an object of class 
Verification state, so we get an extra object for every producer entry.  We 
just need a plain value of an Object to avoid extra overhead.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -683,6 +702,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       validateAndAssignOffsets = false,
       leaderEpoch = -1,
       requestLocal = None,
+      verificationState = Optional.empty(),

Review Comment:
   Do we ever go through verification logic for followers?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1002,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is 
client, that we either have an ongoing transaction or verified transaction state

Review Comment:
   Maybe add more comments on how we validate transaction state.  Also maybe 
not here but at least somewhere we should have a detailed comment about the 
race condition we're addressing and specifically how the verificationState 
solves the ABA problem.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/VerificationState.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+/**
+ * This class serves as a unique object to ensure the same transaction is 
being verified.
+ * When verification starts, this object is created and checked before append 
to ensure the producer state entry
+ * is not modified (via ending the transaction) before the record is appended.
+ */
+public class VerificationState {

Review Comment:
   Do we need a separate class for that?  I think we could just use Object, 
because we just compare references, not values.



##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -45,35 +45,48 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
     nodesToTransactions.synchronized {
       // Check if we have already have either node or individual transaction. 
Add the Node if it isn't there.
-      val currentNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
+      val existingNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
         new TransactionDataAndCallbacks(
           new AddPartitionsToTxnTransactionCollection(1),
           mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
 
-      val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+      val existingTransactionData = 
existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
 
-      // Check if we already have txn ID -- if the epoch is bumped, return 
invalid producer epoch, otherwise, the client likely disconnected and 
-      // reconnected so return the retriable network exception.
-      if (currentTransactionData != null) {
-        val error = if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch())
-          Errors.INVALID_PRODUCER_EPOCH
-        else 
-          Errors.NETWORK_EXCEPTION
-        val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
-        currentTransactionData.topics().forEach { topic =>
-          topic.partitions().forEach { partition =>
-            topicPartitionsToError.put(new TopicPartition(topic.name(), 
partition), error)
-          }
+      // There are 3 cases if we already have existing data
+      // 1. Incoming data has a higher epoch -- return INVALID_PRODUCER_EPOCH 
for existing data since it is fenced
+      // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION for 
existing data, since the client is likely retrying and we want another 
retriable exception 
+      // 3. Incoming data has a lower epoch -- return INVALID_PRODUCER_EPOCH 
for the incoming data since it is fenced, do not add incoming data to verify
+      if (existingTransactionData != null) {
+        if (existingTransactionData.producerEpoch() <= 
transactionData.producerEpoch()) {
+            val error = if (existingTransactionData.producerEpoch() < 
transactionData.producerEpoch())
+              Errors.INVALID_PRODUCER_EPOCH
+            else
+              Errors.NETWORK_EXCEPTION
+          val oldCallback = 
existingNodeAndTransactionData.callbacks(transactionData.transactionalId())
+          
existingNodeAndTransactionData.transactionData.remove(transactionData)
+          oldCallback(topicPartitionsToError(existingTransactionData, error))
+        } else {
+          // If the incoming transactionData's epoch is lower, we can return 
with INVALID_PRODUCER_EPOCH immediately.
+          callback(topicPartitionsToError(transactionData, 
Errors.INVALID_PRODUCER_EPOCH))
+          return
         }
-        val oldCallback = 
currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
-        currentNodeAndTransactionData.transactionData.remove(transactionData)
-        oldCallback(topicPartitionsToError.toMap)
       }
-      currentNodeAndTransactionData.transactionData.add(transactionData)
-      
currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), 
callback)
+

Review Comment:
   Just to clarify -- this change isn't about "transient" state or handling 
race with abort, but just an unrelated issue?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java:
##########
@@ -103,28 +115,57 @@ public boolean maybeUpdateProducerEpoch(short 
producerEpoch) {
             return false;
         }
     }
+    
+    public void maybeAddVerificationState() {
+        // If we already have a verification state, we can reuse it. This is 
because we know this is the same transaction 
+        // as the state is cleared upon writing a control marker.
+        if (!this.verificationState.isPresent())
+            this.verificationState = Optional.of(new VerificationState());
+    }
+
+    // We only set tentative sequence if this entry has never had any batch 
metadata written to it. (We do not need to store if we've just bumped epoch.)
+    // It is used to avoid OutOfOrderSequenceExceptions when we saw a lower 
sequence during transaction verification.

Review Comment:
   Maybe elaborate on the problem we're trying to address (sequence of steps 
that leads to the problem).



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -671,6 +671,7 @@ class ReplicaManager(val config: KafkaConfig,
       val sTime = time.milliseconds
       
       val transactionalProducerIds = mutable.HashSet[Long]()
+      var verificationState: Optional[VerificationState] = Optional.empty()

Review Comment:
   I think this should be per partition -- each log has its own producer state 
and its own verificationState.  Do we have a unit test that tries to append to 
multiple partitions?  I would expect it to fail, because each producer state 
would have its own verificationState object, but we pass only whoever happened 
to be last and so for the others it would look like a race.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1007,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is 
client, that we are in VERIFIED state.
+          // Also check that we are not appending a record with a higher 
sequence than one previously seen through verification.
+          if (batch.isTransactional && 
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled())
 {
+            if (verificationState(batch.producerId(), batch.producerEpoch()) 
!= ProducerStateEntry.VerificationState.VERIFIED) {
+              throw new InvalidRecordException("Record was not part of an 
ongoing transaction")
+            } else if (maybeLastEntry.isPresent && 
maybeLastEntry.get.tentativeSequence.isPresent && 
maybeLastEntry.get.tentativeSequence.getAsInt < batch.baseSequence)

Review Comment:
   I think verification check belongs here, and sequencing would be better done 
in ProducerAppendInfo.



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -980,6 +1002,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           if (duplicateBatch.isPresent) {
             return (updatedProducers, completedTxns.toList, 
Some(duplicateBatch.get()))
           }
+
+          // Verify that if the record is transactional & the append origin is 
client, that we either have an ongoing transaction or verified transaction state
+          // Also check that we are not appending a record with a higher 
sequence than one previously seen through verification.
+          if (batch.isTransactional && 
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled())
 {
+            if (!hasOngoingTransaction(batch.producerId) && 
(verificationStateOpt != verificationState(batch.producerId) || 
!verificationStateOpt.isPresent)) {
+              throw new InvalidRecordException("Record was not part of an 
ongoing transaction")

Review Comment:
   Looks like we already have the producer state in the maybeLastEntry, so we 
don't need to do additional lookups and locks and just check the state of 
maybeLastEntry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to