This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 22bb3e7a585 KAFKA-14417: Address incompatible error code returned by broker from `InitProducerId` (#12968) 22bb3e7a585 is described below commit 22bb3e7a5853c8f54fc030273f83f1db8c99648f Author: Justine Olshan <jols...@confluent.io> AuthorDate: Mon Dec 19 09:33:11 2022 -0800 KAFKA-14417: Address incompatible error code returned by broker from `InitProducerId` (#12968) Older clients can not handle the `REQUEST_TIMED_OUT` error that is returned from `InitProducerId` when the next producerId block cannot be fetched from the controller. In this patch, we return `COORDINATOR_LOAD_IN_PROGRESS` instead which is retriable. Reviewers: Jason Gustafson <ja...@confluent.io> --- .../kafka/coordinator/transaction/ProducerIdManager.scala | 5 +++-- .../coordinator/transaction/ProducerIdManagerTest.scala | 13 +++++++++++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala index e1f46eb3712..f16785a7b6c 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala @@ -167,7 +167,9 @@ class RPCProducerIdManager(brokerId: Int, if (nextProducerId > currentProducerIdBlock.lastProducerId) { val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS) if (block == null) { - throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next producer ID block") + // Return COORDINATOR_LOAD_IN_PROGRESS rather than REQUEST_TIMED_OUT since older clients treat the error as fatal + // when it should be retriable like COORDINATOR_LOAD_IN_PROGRESS. + throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Timed out waiting for next producer ID block") } else { block match { case Success(nextBlock) => @@ -236,7 +238,6 @@ class RPCProducerIdManager(brokerId: Int, private[transaction] def handleTimeout(): Unit = { warn("Timed out when requesting AllocateProducerIds from the controller.") requestInFlight.set(false) - nextProducerIdBlock.put(Failure(Errors.REQUEST_TIMED_OUT.exception)) maybeRequestNextBlock() } } diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala index eefe61d17d6..666a3c363ff 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala @@ -19,6 +19,7 @@ package kafka.coordinator.transaction import kafka.server.BrokerToControllerChannelManager import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode} import org.apache.kafka.common.KafkaException +import org.apache.kafka.common.errors.CoordinatorLoadInProgressException import org.apache.kafka.common.message.AllocateProducerIdsResponseData import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.AllocateProducerIdsResponse @@ -30,7 +31,6 @@ import org.junit.jupiter.params.provider.{EnumSource, ValueSource} import org.mockito.ArgumentCaptor import org.mockito.ArgumentMatchers.{any, anyString} import org.mockito.Mockito.{mock, when} - import java.util.stream.IntStream class ProducerIdManagerTest { @@ -39,10 +39,13 @@ class ProducerIdManagerTest { val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient]) // Mutable test implementation that lets us easily set the idStart and error - class MockProducerIdManager(val brokerId: Int, var idStart: Long, val idLen: Int, var error: Errors = Errors.NONE) + class MockProducerIdManager(val brokerId: Int, var idStart: Long, val idLen: Int, var error: Errors = Errors.NONE, timeout: Boolean = false) extends RPCProducerIdManager(brokerId, () => 1, brokerToController, 100) { override private[transaction] def sendRequest(): Unit = { + if (timeout) + return + if (error == Errors.NONE) { handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse( new AllocateProducerIdsResponseData().setProducerIdStart(idStart).setProducerIdLen(idLen))) @@ -93,6 +96,12 @@ class ProducerIdManagerTest { assertEquals(pid2 + ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE * 2, manager2.generateProducerId()) } + @Test + def testRPCProducerIdManagerThrowsConcurrentTransactions(): Unit = { + val manager1 = new MockProducerIdManager(0, 0, 0, timeout = true) + assertThrows(classOf[CoordinatorLoadInProgressException], () => manager1.generateProducerId()) + } + @Test def testExceedProducerIdLimitZk(): Unit = { when(zkClient.getDataAndVersion(anyString)).thenAnswer(_ => {