This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new f13367de4ec KAFKA-15459: Convert coordinator retriable errors to a 
known producer response error (#14378)
f13367de4ec is described below

commit f13367de4ec149388432dbf8b9023054c19f53d9
Author: Justine Olshan <[email protected]>
AuthorDate: Wed Sep 13 14:21:58 2023 -0700

    KAFKA-15459: Convert coordinator retriable errors to a known producer 
response error (#14378)
    
    KIP-890 Part 1 tries to address hanging transactions on old clients. Thus, 
the produce version can not be bumped and no new errors can be added. Before we 
used the java client's notion of retriable and abortable errors -- retriable 
errors are defined as such by extending the retriable error class, fatal errors 
are defined explicitly, and abortable errors are the remaining. However, many 
other clients treat non specified errors as fatal and that means many retriable 
errors kill the app [...]
    
    Stuck between having specific errors for Java clients that are handled 
correctly (ie we retry) or specific fatal errors for cases that should not be 
fatal, we opted for a middle ground of non-specific error, but a message in the 
response to specify.
    
    Converting some of the coordinator error codes to NOT_ENOUGH_REPLICAS which 
is a known produce response.
    Also correctly add the old errors to the produce response. (We were not 
doing this correctly before)
    
    Added tests for the new errors and messages.
    
    Reviewers: Jason Gustafson <[email protected]>, David Jacot 
<[email protected]>
---
 .../main/scala/kafka/server/ReplicaManager.scala   | 111 ++++++++++++---------
 .../unit/kafka/server/ReplicaManagerTest.scala     |  69 ++++++++++++-
 2 files changed, 130 insertions(+), 50 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 948bd30d747..a574ba37716 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -753,39 +753,56 @@ class ReplicaManager(val config: KafkaConfig,
         val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
           origin, verifiedEntries, requiredAcks, requestLocal, 
verificationGuards.toMap)
         debug("Produce to local log in %d ms".format(time.milliseconds - 
sTime))
-        
-        val unverifiedResults = unverifiedEntries.map { case (topicPartition, 
error) =>
-          val message = if (error == Errors.INVALID_TXN_STATE) "Partition was 
not added to the transaction" else error.message()
-          topicPartition -> LogAppendResult(
-            LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-            Some(error.exception(message))
-          )
-        }
 
-        val errorResults = errorsPerPartition.map { case (topicPartition, 
error) =>
-          topicPartition -> LogAppendResult(
-            LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-            Some(error.exception())
-          )
+        def produceStatusResult(appendResult: Map[TopicPartition, 
LogAppendResult],
+                                useCustomMessage: Boolean): 
Map[TopicPartition, ProducePartitionStatus] = {
+          appendResult.map { case (topicPartition, result) =>
+            topicPartition -> ProducePartitionStatus(
+              result.info.lastOffset + 1, // required offset
+              new PartitionResponse(
+                result.error,
+                result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
+                result.info.lastOffset,
+                result.info.logAppendTime,
+                result.info.logStartOffset,
+                result.info.recordErrors,
+                if (useCustomMessage) result.exception.get.getMessage else 
result.info.errorMessage
+              )
+            ) // response status
+          }
         }
         
-        val allResults = localProduceResults ++ unverifiedResults ++ 
errorResults
+        val unverifiedResults = unverifiedEntries.map {
+          case (topicPartition, error) =>
+            val finalException =
+              error match {
+                case Errors.INVALID_TXN_STATE => error.exception("Partition 
was not added to the transaction")
+                case Errors.CONCURRENT_TRANSACTIONS |
+                     Errors.COORDINATOR_LOAD_IN_PROGRESS |
+                     Errors.COORDINATOR_NOT_AVAILABLE |
+                     Errors.NOT_COORDINATOR => new NotEnoughReplicasException(
+                         s"Unable to verify the partition has been added to 
the transaction. Underlying error: ${error.toString}")
+                case _ => error.exception()
+            }
+            topicPartition -> LogAppendResult(
+              LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
+              Some(finalException)
+            )
+        }
 
-        val produceStatus = allResults.map { case (topicPartition, result) =>
-          topicPartition -> ProducePartitionStatus(
-            result.info.lastOffset + 1, // required offset
-            new PartitionResponse(
-              result.error,
-              result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L),
-              result.info.lastOffset,
-              result.info.logAppendTime,
-              result.info.logStartOffset,
-              result.info.recordErrors,
-              result.info.errorMessage
+        val errorResults = errorsPerPartition.map {
+          case (topicPartition, error) =>
+            topicPartition -> LogAppendResult(
+              LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
+              Some(error.exception())
             )
-          ) // response status
         }
 
+        val produceStatus = Set((localProduceResults, false), 
(unverifiedResults, true), (errorResults, false)).flatMap {
+          case (results, useCustomError) => produceStatusResult(results, 
useCustomError)
+        }.toMap
+        val allResults = localProduceResults ++ unverifiedResults ++ 
errorResults
+
         actionQueue.add {
           () => allResults.foreach { case (topicPartition, result) =>
             val requestKey = TopicPartitionOperationKey(topicPartition)
@@ -832,28 +849,30 @@ class ReplicaManager(val config: KafkaConfig,
         val (error, node) = 
getTransactionCoordinator(transactionStatePartition.get)
 
         if (error != Errors.NONE) {
-          throw error.exception() // Can throw coordinator not available -- 
which is retriable
-        }
+          
appendEntries(entriesPerPartition)(notYetVerifiedEntriesPerPartition.map {
+            case (tp, _) => (tp, error)
+          }.toMap)
+        } else {
+          val topicGrouping = 
notYetVerifiedEntriesPerPartition.keySet.groupBy(tp => tp.topic())
+          val topicCollection = new AddPartitionsToTxnTopicCollection()
+          topicGrouping.foreach { case (topic, tps) =>
+            topicCollection.add(new AddPartitionsToTxnTopic()
+              .setName(topic)
+              .setPartitions(tps.map(tp => 
Integer.valueOf(tp.partition())).toList.asJava))
+          }
 
-        val topicGrouping = 
notYetVerifiedEntriesPerPartition.keySet.groupBy(tp => tp.topic())
-        val topicCollection = new AddPartitionsToTxnTopicCollection()
-        topicGrouping.foreach { case (topic, tps) =>
-          topicCollection.add(new AddPartitionsToTxnTopic()
-            .setName(topic)
-            .setPartitions(tps.map(tp => 
Integer.valueOf(tp.partition())).toList.asJava))
+          // Map not yet verified partitions to a request object.
+          // We verify above that all partitions use the same producer ID.
+          val batchInfo = 
notYetVerifiedEntriesPerPartition.head._2.firstBatch()
+          val notYetVerifiedTransaction = new AddPartitionsToTxnTransaction()
+            .setTransactionalId(transactionalId)
+            .setProducerId(batchInfo.producerId())
+            .setProducerEpoch(batchInfo.producerEpoch())
+            .setVerifyOnly(true)
+            .setTopics(topicCollection)
+
+          addPartitionsToTxnManager.foreach(_.addTxnData(node, 
notYetVerifiedTransaction, 
KafkaRequestHandler.wrap(appendEntries(entriesPerPartition)(_))))
         }
-
-        // Map not yet verified partitions to a request object.
-        // We verify above that all partitions use the same producer ID.
-        val batchInfo = notYetVerifiedEntriesPerPartition.head._2.firstBatch()
-        val notYetVerifiedTransaction = new AddPartitionsToTxnTransaction()
-          .setTransactionalId(transactionalId)
-          .setProducerId(batchInfo.producerId())
-          .setProducerEpoch(batchInfo.producerEpoch())
-          .setVerifyOnly(true)
-          .setTopics(topicCollection)
-
-        addPartitionsToTxnManager.foreach(_.addTxnData(node, 
notYetVerifiedTransaction, 
KafkaRequestHandler.wrap(appendEntries(entriesPerPartition)(_))))
       }
     } else {
       // If required.acks is outside accepted range, something is wrong with 
the client
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index b4bc4f540ad..6f913208320 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -64,7 +64,7 @@ import org.apache.kafka.storage.internals.log.{AppendOrigin, 
FetchDataInfo, Fetc
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
 import com.yammer.metrics.core.Gauge
 import kafka.log.remote.RemoteLogManager
 import org.apache.kafka.common.config.AbstractConfig
@@ -2287,8 +2287,8 @@ class ReplicaManagerTest {
 
       // Confirm we did not write to the log and instead returned error.
       val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue()
-      callback(Map(tp0 -> Errors.NOT_COORDINATOR).toMap)
-      assertEquals(Errors.NOT_COORDINATOR, result.assertFired.error)
+      callback(Map(tp0 -> Errors.INVALID_PRODUCER_ID_MAPPING).toMap)
+      assertEquals(Errors.INVALID_PRODUCER_ID_MAPPING, 
result.assertFired.error)
       assertEquals(verificationGuard, getVerificationGuard(replicaManager, 
tp0, producerId))
 
       // Try to append a higher sequence (7) after the first one failed with a 
retriable error.
@@ -2537,7 +2537,9 @@ class ReplicaManagerTest {
       quotaManagers = quotaManager,
       metadataCache = metadataCache,
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
-      alterPartitionManager = alterPartitionManager)
+      alterPartitionManager = alterPartitionManager,
+      addPartitionsToTxnManager = Some(addPartitionsToTxnManager)
+    )
 
     try {
       val txnCoordinatorPartition0 = 0
@@ -2569,6 +2571,65 @@ class ReplicaManagerTest {
 
       assertEquals((Errors.NONE, node0), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition0))
       assertEquals((Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode), 
replicaManager.getTransactionCoordinator(txnCoordinatorPartition1))
+
+      // Test we convert the error correctly when trying to append and 
coordinator is not available
+      val tp0 = new TopicPartition(topic, 0)
+      val producerId = 24L
+      val producerEpoch = 0.toShort
+      val sequence = 0
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
+      val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+      val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = 
Some(txnCoordinatorPartition1))
+      val expectedError = s"Unable to verify the partition has been added to 
the transaction. Underlying error: ${Errors.COORDINATOR_NOT_AVAILABLE.toString}"
+      assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+      assertEquals(expectedError, result.assertFired.errorMessage)
+    } finally {
+      replicaManager.shutdown(checkpointHW = false)
+    }
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = classOf[Errors], names = Array("NOT_COORDINATOR", 
"CONCURRENT_TRANSACTIONS", "COORDINATOR_LOAD_IN_PROGRESS", 
"COORDINATOR_NOT_AVAILABLE"))
+  def testVerificationErrorConversions(error: Errors): Unit = {
+    val tp0 = new TopicPartition(topic, 0)
+    val producerId = 24L
+    val producerEpoch = 0.toShort
+    val sequence = 0
+    val node = new Node(0, "host1", 0)
+    val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
+
+    val replicaManager = 
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
 List(tp0), node)
+    try {
+      replicaManager.becomeLeaderOrFollower(1,
+        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), 
LeaderAndIsr(1, List(0, 1))),
+        (_, _) => ())
+
+      val transactionalRecords = 
MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, 
producerEpoch, sequence,
+        new SimpleRecord("message".getBytes))
+
+      val transactionToAdd = new AddPartitionsToTxnTransaction()
+        .setTransactionalId(transactionalId)
+        .setProducerId(producerId)
+        .setProducerEpoch(producerEpoch)
+        .setVerifyOnly(true)
+        .setTopics(new AddPartitionsToTxnTopicCollection(
+          Seq(new 
AddPartitionsToTxnTopic().setName(tp0.topic).setPartitions(Collections.singletonList(tp0.partition))).iterator.asJava
+        ))
+
+      // Start verification and return the coordinator related errors.
+      val expectedMessage = s"Unable to verify the partition has been added to 
the transaction. Underlying error: ${error.toString}"
+      val result = appendRecords(replicaManager, tp0, transactionalRecords, 
transactionalId = transactionalId, transactionStatePartition = Some(0))
+      val appendCallback = 
ArgumentCaptor.forClass(classOf[AddPartitionsToTxnManager.AppendCallback])
+      verify(addPartitionsToTxnManager, 
times(1)).addTxnData(ArgumentMatchers.eq(node), 
ArgumentMatchers.eq(transactionToAdd), appendCallback.capture())
+
+      // Confirm we did not write to the log and instead returned the 
converted error with the correct error message.
+      val callback: AddPartitionsToTxnManager.AppendCallback = 
appendCallback.getValue()
+      callback(Map(tp0 -> error).toMap)
+      assertEquals(Errors.NOT_ENOUGH_REPLICAS, result.assertFired.error)
+      assertEquals(expectedMessage, result.assertFired.errorMessage)
     } finally {
       replicaManager.shutdown(checkpointHW = false)
     }

Reply via email to