jolshan commented on code in PR #13608:
URL: https://github.com/apache/kafka/pull/13608#discussion_r1207368803


##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -45,35 +45,48 @@ class AddPartitionsToTxnManager(config: KafkaConfig, 
client: NetworkClient, time
   def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, 
callback: AddPartitionsToTxnManager.AppendCallback): Unit = {
     nodesToTransactions.synchronized {
       // Check if we have already have either node or individual transaction. 
Add the Node if it isn't there.
-      val currentNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
+      val existingNodeAndTransactionData = 
nodesToTransactions.getOrElseUpdate(node,
         new TransactionDataAndCallbacks(
           new AddPartitionsToTxnTransactionCollection(1),
           mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]()))
 
-      val currentTransactionData = 
currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
+      val existingTransactionData = 
existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId)
 
-      // Check if we already have txn ID -- if the epoch is bumped, return 
invalid producer epoch, otherwise, the client likely disconnected and 
-      // reconnected so return the retriable network exception.
-      if (currentTransactionData != null) {
-        val error = if (currentTransactionData.producerEpoch() < 
transactionData.producerEpoch())
-          Errors.INVALID_PRODUCER_EPOCH
-        else 
-          Errors.NETWORK_EXCEPTION
-        val topicPartitionsToError = mutable.Map[TopicPartition, Errors]()
-        currentTransactionData.topics().forEach { topic =>
-          topic.partitions().forEach { partition =>
-            topicPartitionsToError.put(new TopicPartition(topic.name(), 
partition), error)
-          }
+      // There are 3 cases if we already have existing data
+      // 1. Incoming data has a higher epoch -- return INVALID_PRODUCER_EPOCH 
for existing data since it is fenced
+      // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION for 
existing data, since the client is likely retrying and we want another 
retriable exception 
+      // 3. Incoming data has a lower epoch -- return INVALID_PRODUCER_EPOCH 
for the incoming data since it is fenced, do not add incoming data to verify
+      if (existingTransactionData != null) {
+        if (existingTransactionData.producerEpoch() <= 
transactionData.producerEpoch()) {
+            val error = if (existingTransactionData.producerEpoch() < 
transactionData.producerEpoch())
+              Errors.INVALID_PRODUCER_EPOCH
+            else
+              Errors.NETWORK_EXCEPTION
+          val oldCallback = 
existingNodeAndTransactionData.callbacks(transactionData.transactionalId())
+          
existingNodeAndTransactionData.transactionData.remove(transactionData)
+          oldCallback(topicPartitionsToError(existingTransactionData, error))
+        } else {
+          // If the incoming transactionData's epoch is lower, we can return 
with INVALID_PRODUCER_EPOCH immediately.
+          callback(topicPartitionsToError(transactionData, 
Errors.INVALID_PRODUCER_EPOCH))
+          return
         }
-        val oldCallback = 
currentNodeAndTransactionData.callbacks(transactionData.transactionalId())
-        currentNodeAndTransactionData.transactionData.remove(transactionData)
-        oldCallback(topicPartitionsToError.toMap)
       }
-      currentNodeAndTransactionData.transactionData.add(transactionData)
-      
currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), 
callback)
+

Review Comment:
   I'll separate this out into a new PR as I'm already splitting this up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to