jolshan commented on code in PR #13608: URL: https://github.com/apache/kafka/pull/13608#discussion_r1207368803
########## core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala: ########## @@ -45,35 +45,48 @@ class AddPartitionsToTxnManager(config: KafkaConfig, client: NetworkClient, time def addTxnData(node: Node, transactionData: AddPartitionsToTxnTransaction, callback: AddPartitionsToTxnManager.AppendCallback): Unit = { nodesToTransactions.synchronized { // Check if we have already have either node or individual transaction. Add the Node if it isn't there. - val currentNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node, + val existingNodeAndTransactionData = nodesToTransactions.getOrElseUpdate(node, new TransactionDataAndCallbacks( new AddPartitionsToTxnTransactionCollection(1), mutable.Map[String, AddPartitionsToTxnManager.AppendCallback]())) - val currentTransactionData = currentNodeAndTransactionData.transactionData.find(transactionData.transactionalId) + val existingTransactionData = existingNodeAndTransactionData.transactionData.find(transactionData.transactionalId) - // Check if we already have txn ID -- if the epoch is bumped, return invalid producer epoch, otherwise, the client likely disconnected and - // reconnected so return the retriable network exception. - if (currentTransactionData != null) { - val error = if (currentTransactionData.producerEpoch() < transactionData.producerEpoch()) - Errors.INVALID_PRODUCER_EPOCH - else - Errors.NETWORK_EXCEPTION - val topicPartitionsToError = mutable.Map[TopicPartition, Errors]() - currentTransactionData.topics().forEach { topic => - topic.partitions().forEach { partition => - topicPartitionsToError.put(new TopicPartition(topic.name(), partition), error) - } + // There are 3 cases if we already have existing data + // 1. Incoming data has a higher epoch -- return INVALID_PRODUCER_EPOCH for existing data since it is fenced + // 2. Incoming data has the same epoch -- return NETWORK_EXCEPTION for existing data, since the client is likely retrying and we want another retriable exception + // 3. Incoming data has a lower epoch -- return INVALID_PRODUCER_EPOCH for the incoming data since it is fenced, do not add incoming data to verify + if (existingTransactionData != null) { + if (existingTransactionData.producerEpoch() <= transactionData.producerEpoch()) { + val error = if (existingTransactionData.producerEpoch() < transactionData.producerEpoch()) + Errors.INVALID_PRODUCER_EPOCH + else + Errors.NETWORK_EXCEPTION + val oldCallback = existingNodeAndTransactionData.callbacks(transactionData.transactionalId()) + existingNodeAndTransactionData.transactionData.remove(transactionData) + oldCallback(topicPartitionsToError(existingTransactionData, error)) + } else { + // If the incoming transactionData's epoch is lower, we can return with INVALID_PRODUCER_EPOCH immediately. + callback(topicPartitionsToError(transactionData, Errors.INVALID_PRODUCER_EPOCH)) + return } - val oldCallback = currentNodeAndTransactionData.callbacks(transactionData.transactionalId()) - currentNodeAndTransactionData.transactionData.remove(transactionData) - oldCallback(topicPartitionsToError.toMap) } - currentNodeAndTransactionData.transactionData.add(transactionData) - currentNodeAndTransactionData.callbacks.put(transactionData.transactionalId(), callback) + Review Comment: I'll separate this out into a new PR as I'm already splitting this up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org