This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new 3ff56ebfc7a KAFKA-14417: Address incompatible error code returned by 
broker from `InitProducerId` (#12968)
3ff56ebfc7a is described below

commit 3ff56ebfc7ac06bb8b0b604ff9780ddcf6da6f2c
Author: Justine Olshan <jols...@confluent.io>
AuthorDate: Mon Dec 19 09:33:11 2022 -0800

    KAFKA-14417: Address incompatible error code returned by broker from 
`InitProducerId` (#12968)
    
    Older clients can not handle the `REQUEST_TIMED_OUT` error that is returned 
from `InitProducerId` when the next producerId block cannot be fetched from the 
controller. In this patch, we return `COORDINATOR_LOAD_IN_PROGRESS` instead 
which is retriable.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/coordinator/transaction/ProducerIdManager.scala   |  5 +++--
 .../coordinator/transaction/ProducerIdManagerTest.scala     | 13 +++++++++++--
 2 files changed, 14 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala 
b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index e1f46eb3712..f16785a7b6c 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -167,7 +167,9 @@ class RPCProducerIdManager(brokerId: Int,
       if (nextProducerId > currentProducerIdBlock.lastProducerId) {
         val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
         if (block == null) {
-          throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next 
producer ID block")
+          // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT 
since older clients treat the error as fatal
+          // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS.
+          throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out 
waiting for next producer ID block")
         } else {
           block match {
             case Success(nextBlock) =>
@@ -236,7 +238,6 @@ class RPCProducerIdManager(brokerId: Int,
   private[transaction] def handleTimeout(): Unit = {
     warn("Timed out when requesting AllocateProducerIds from the controller.")
     requestInFlight.set(false)
-    nextProducerIdBlock.put(Failure(Errors.REQUEST_TIMED_OUT.exception))
     maybeRequestNextBlock()
   }
 }
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
index eefe61d17d6..666a3c363ff 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
@@ -19,6 +19,7 @@ package kafka.coordinator.transaction
 import kafka.server.BrokerToControllerChannelManager
 import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
 import org.apache.kafka.common.KafkaException
+import org.apache.kafka.common.errors.CoordinatorLoadInProgressException
 import org.apache.kafka.common.message.AllocateProducerIdsResponseData
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.AllocateProducerIdsResponse
@@ -30,7 +31,6 @@ import org.junit.jupiter.params.provider.{EnumSource, 
ValueSource}
 import org.mockito.ArgumentCaptor
 import org.mockito.ArgumentMatchers.{any, anyString}
 import org.mockito.Mockito.{mock, when}
-
 import java.util.stream.IntStream
 
 class ProducerIdManagerTest {
@@ -39,10 +39,13 @@ class ProducerIdManagerTest {
   val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
 
   // Mutable test implementation that lets us easily set the idStart and error
-  class MockProducerIdManager(val brokerId: Int, var idStart: Long, val idLen: 
Int, var error: Errors = Errors.NONE)
+  class MockProducerIdManager(val brokerId: Int, var idStart: Long, val idLen: 
Int, var error: Errors = Errors.NONE, timeout: Boolean = false)
     extends RPCProducerIdManager(brokerId, () => 1, brokerToController, 100) {
 
     override private[transaction] def sendRequest(): Unit = {
+      if (timeout)
+        return
+
       if (error == Errors.NONE) {
         handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse(
           new 
AllocateProducerIdsResponseData().setProducerIdStart(idStart).setProducerIdLen(idLen)))
@@ -93,6 +96,12 @@ class ProducerIdManagerTest {
     assertEquals(pid2 + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE * 2, 
manager2.generateProducerId())
   }
 
+  @Test
+  def testRPCProducerIdManagerThrowsConcurrentTransactions(): Unit = {
+    val manager1 = new MockProducerIdManager(0, 0, 0, timeout = true)
+    assertThrows(classOf[CoordinatorLoadInProgressException], () => 
manager1.generateProducerId())
+  }
+
   @Test
   def testExceedProducerIdLimitZk(): Unit = {
     when(zkClient.getDataAndVersion(anyString)).thenAnswer(_ => {

Reply via email to