This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new aaacb586754 KAFKA-19990 gracefully handle exceptions when handling 
AllocateProducerIdsResponse (#21138)
aaacb586754 is described below

commit aaacb5867540cbe81ff4a1ff315f439bc65cc4a8
Author: Gaurav Narula <[email protected]>
AuthorDate: Wed Dec 17 17:05:49 2025 +0000

    KAFKA-19990 gracefully handle exceptions when handling 
AllocateProducerIdsResponse (#21138)
    
    The handler in `RPCProducerIdManager` doesn't handle authentication
    exception and version mismatch exceptions gracefully which can result in
    an NPE in 3.9 and the broker crashing with a `FatalExitError`.
    
    This change ensures we retry on such failures and adds unit tests for
    these scenarios.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../transaction/ProducerIdManager.scala            | 33 +++++++--
 .../transaction/ProducerIdManagerTest.scala        | 83 +++++++++++++++++++---
 2 files changed, 101 insertions(+), 15 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala 
b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index 322dccd0dcb..52daf42ce5e 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -236,8 +236,7 @@ class RPCProducerIdManager(brokerId: Int,
     debug("Requesting next Producer ID block")
     controllerChannel.sendRequest(request, new 
ControllerRequestCompletionHandler() {
       override def onComplete(response: ClientResponse): Unit = {
-        val message = 
response.responseBody().asInstanceOf[AllocateProducerIdsResponse]
-        handleAllocateProducerIdsResponse(message)
+        handleAllocateProducerIdsResponse(response)
       }
 
       override def onTimeout(): Unit = handleTimeout()
@@ -245,7 +244,23 @@ class RPCProducerIdManager(brokerId: Int,
   }
 
   // Visible for testing
-  private[transaction] def handleAllocateProducerIdsResponse(response: 
AllocateProducerIdsResponse): Unit = {
+  private[transaction] def handleAllocateProducerIdsResponse(clientResponse: 
ClientResponse): Unit = {
+    if (clientResponse.authenticationException != null) {
+      error("Unable to allocate producer id because of an authentication 
exception", clientResponse.authenticationException)
+      handleUnsuccessfulResponse()
+      return
+    }
+    if (clientResponse.versionMismatch != null) {
+      error("Unable to allocate producer id because of a version mismatch 
exception", clientResponse.versionMismatch)
+      handleUnsuccessfulResponse()
+      return
+    }
+    if (!clientResponse.hasResponse) {
+      error("Unable to allocate producer id because of empty response from 
controller")
+      handleUnsuccessfulResponse()
+      return
+    }
+    val response = 
clientResponse.responseBody().asInstanceOf[AllocateProducerIdsResponse]
     val data = response.data
     var successfulResponse = false
     Errors.forCode(data.errorCode()) match {
@@ -269,13 +284,17 @@ class RPCProducerIdManager(brokerId: Int,
     }
 
     if (!successfulResponse) {
-      // There is no need to compare and set because only one thread
-      // handles the AllocateProducerIds response.
-      backoffDeadlineMs.set(time.milliseconds() + RetryBackoffMs)
-      requestInFlight.set(false)
+      handleUnsuccessfulResponse()
     }
   }
 
+  private def handleUnsuccessfulResponse(): Unit = {
+    // There is no need to compare and set because only one thread
+    // handles the AllocateProducerIds response.
+    backoffDeadlineMs.set(time.milliseconds() + RetryBackoffMs)
+    requestInFlight.set(false)
+  }
+
   private def handleTimeout(): Unit = {
     warn("Timed out when requesting AllocateProducerIds from the controller.")
     requestInFlight.set(false)
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
index eef0b31e415..a94d2d3592b 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
@@ -19,8 +19,9 @@ package kafka.coordinator.transaction
 import kafka.coordinator.transaction.ProducerIdManager.RetryBackoffMs
 import kafka.utils.TestUtils
 import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
+import org.apache.kafka.clients.ClientResponse
 import org.apache.kafka.common.KafkaException
-import org.apache.kafka.common.errors.CoordinatorLoadInProgressException
+import org.apache.kafka.common.errors.{AuthenticationException, 
CoordinatorLoadInProgressException, UnsupportedVersionException}
 import org.apache.kafka.common.message.AllocateProducerIdsResponseData
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.AllocateProducerIdsResponse
@@ -52,31 +53,58 @@ class ProducerIdManagerTest {
     val idLen: Int,
     val errorQueue: ConcurrentLinkedQueue[Errors] = new 
ConcurrentLinkedQueue[Errors](),
     val isErroneousBlock: Boolean = false,
-    val time: Time = Time.SYSTEM
+    val time: Time = Time.SYSTEM,
+    var hasAuthenticationException: Boolean = false,
+    var hasVersionMismatch: Boolean = false,
+    var hasNoResponse: Boolean = false
   ) extends RPCProducerIdManager(brokerId, time, () => 1, brokerToController) {
 
     private val brokerToControllerRequestExecutor = 
Executors.newSingleThreadExecutor()
     val capturedFailure: AtomicBoolean = new AtomicBoolean(false)
 
+    private def createClientResponse(authenticationException: 
AuthenticationException = null, versionException: UnsupportedVersionException = 
null, response: AllocateProducerIdsResponse = null): ClientResponse =
+      new ClientResponse(null, null, null, time.milliseconds, 
time.milliseconds, false, versionException, authenticationException, response)
+
     override private[transaction] def sendRequest(): Unit = {
 
       brokerToControllerRequestExecutor.submit(() => {
+        if (hasAuthenticationException) {
+          
handleAllocateProducerIdsResponse(createClientResponse(authenticationException 
= new AuthenticationException("Auth Failure")))
+          hasAuthenticationException = false // reset so retry works
+          return
+        }
+        if (hasVersionMismatch) {
+          
handleAllocateProducerIdsResponse(createClientResponse(versionException = new 
UnsupportedVersionException("Version Mismatch")))
+          hasVersionMismatch = false // reset so retry works
+          return
+        }
+        if (hasNoResponse) {
+          handleAllocateProducerIdsResponse(createClientResponse(null, null, 
null))
+          hasNoResponse = false // reset so retry works
+          return
+        }
         val error = errorQueue.poll()
         if (error == null || error == Errors.NONE) {
-          handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse(
-            new 
AllocateProducerIdsResponseData().setProducerIdStart(idStart).setProducerIdLen(idLen)))
+          handleAllocateProducerIdsResponse(createClientResponse(
+            response = new AllocateProducerIdsResponse(
+              new 
AllocateProducerIdsResponseData().setProducerIdStart(idStart).setProducerIdLen(idLen)
+            )
+          ))
           if (!isErroneousBlock) {
             idStart += idLen
           }
         } else {
-          handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse(
-            new AllocateProducerIdsResponseData().setErrorCode(error.code)))
+          handleAllocateProducerIdsResponse(createClientResponse(
+            response = new AllocateProducerIdsResponse(
+              new AllocateProducerIdsResponseData().setErrorCode(error.code)
+            )
+          ))
         }
       }, 0)
     }
 
-    override private[transaction] def 
handleAllocateProducerIdsResponse(response: AllocateProducerIdsResponse): Unit 
= {
-      super.handleAllocateProducerIdsResponse(response)
+    override private[transaction] def 
handleAllocateProducerIdsResponse(clientResponse: ClientResponse): Unit = {
+      super.handleAllocateProducerIdsResponse(clientResponse)
       capturedFailure.set(nextProducerIdBlock.get == null)
     }
   }
@@ -215,6 +243,45 @@ class ProducerIdManagerTest {
     verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0)
   }
 
+  @Test
+  def testRetryBackoffOnAuthException(): Unit = {
+    val time = new MockTime()
+    val manager = new MockProducerIdManager(0, 0, 1, time = time, 
hasAuthenticationException = true)
+
+    verifyFailure(manager)
+
+    // We should only get a new block once retry backoff ms has passed.
+    
assertCoordinatorLoadInProgressExceptionFailure(manager.generateProducerId())
+    time.sleep(RetryBackoffMs)
+    verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0)
+  }
+
+  @Test
+  def testRetryBackoffOnVersionMismatch(): Unit = {
+    val time = new MockTime()
+    val manager = new MockProducerIdManager(0, 0, 1, time = time, 
hasVersionMismatch = true)
+
+    verifyFailure(manager)
+
+    // We should only get a new block once retry backoff ms has passed.
+    
assertCoordinatorLoadInProgressExceptionFailure(manager.generateProducerId())
+    time.sleep(RetryBackoffMs)
+    verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0)
+  }
+
+  @Test
+  def testRetryBackoffOnNoResponse(): Unit = {
+    val time = new MockTime()
+    val manager = new MockProducerIdManager(0, 0, 1, time = time, 
hasNoResponse = true)
+
+    verifyFailure(manager)
+
+    // We should only get a new block once retry backoff ms has passed.
+    
assertCoordinatorLoadInProgressExceptionFailure(manager.generateProducerId())
+    time.sleep(RetryBackoffMs)
+    verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0)
+  }
+
   private def queue(errors: Errors*): ConcurrentLinkedQueue[Errors] = {
     val queue = new ConcurrentLinkedQueue[Errors]()
     errors.foreach(queue.add)

Reply via email to