This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new aaacb586754 KAFKA-19990 gracefully handle exceptions when handling
AllocateProducerIdsResponse (#21138)
aaacb586754 is described below
commit aaacb5867540cbe81ff4a1ff315f439bc65cc4a8
Author: Gaurav Narula <[email protected]>
AuthorDate: Wed Dec 17 17:05:49 2025 +0000
KAFKA-19990 gracefully handle exceptions when handling
AllocateProducerIdsResponse (#21138)
The handler in `RPCProducerIdManager` doesn't handle authentication
exception and version mismatch exceptions gracefully which can result in
an NPE in 3.9 and the broker crashing with a `FatalExitError`.
This change ensures we retry on such failures and adds unit tests for
these scenarios.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../transaction/ProducerIdManager.scala | 33 +++++++--
.../transaction/ProducerIdManagerTest.scala | 83 +++++++++++++++++++---
2 files changed, 101 insertions(+), 15 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
index 322dccd0dcb..52daf42ce5e 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala
@@ -236,8 +236,7 @@ class RPCProducerIdManager(brokerId: Int,
debug("Requesting next Producer ID block")
controllerChannel.sendRequest(request, new
ControllerRequestCompletionHandler() {
override def onComplete(response: ClientResponse): Unit = {
- val message =
response.responseBody().asInstanceOf[AllocateProducerIdsResponse]
- handleAllocateProducerIdsResponse(message)
+ handleAllocateProducerIdsResponse(response)
}
override def onTimeout(): Unit = handleTimeout()
@@ -245,7 +244,23 @@ class RPCProducerIdManager(brokerId: Int,
}
// Visible for testing
- private[transaction] def handleAllocateProducerIdsResponse(response:
AllocateProducerIdsResponse): Unit = {
+ private[transaction] def handleAllocateProducerIdsResponse(clientResponse:
ClientResponse): Unit = {
+ if (clientResponse.authenticationException != null) {
+ error("Unable to allocate producer id because of an authentication
exception", clientResponse.authenticationException)
+ handleUnsuccessfulResponse()
+ return
+ }
+ if (clientResponse.versionMismatch != null) {
+ error("Unable to allocate producer id because of a version mismatch
exception", clientResponse.versionMismatch)
+ handleUnsuccessfulResponse()
+ return
+ }
+ if (!clientResponse.hasResponse) {
+ error("Unable to allocate producer id because of empty response from
controller")
+ handleUnsuccessfulResponse()
+ return
+ }
+ val response =
clientResponse.responseBody().asInstanceOf[AllocateProducerIdsResponse]
val data = response.data
var successfulResponse = false
Errors.forCode(data.errorCode()) match {
@@ -269,13 +284,17 @@ class RPCProducerIdManager(brokerId: Int,
}
if (!successfulResponse) {
- // There is no need to compare and set because only one thread
- // handles the AllocateProducerIds response.
- backoffDeadlineMs.set(time.milliseconds() + RetryBackoffMs)
- requestInFlight.set(false)
+ handleUnsuccessfulResponse()
}
}
+ private def handleUnsuccessfulResponse(): Unit = {
+ // There is no need to compare and set because only one thread
+ // handles the AllocateProducerIds response.
+ backoffDeadlineMs.set(time.milliseconds() + RetryBackoffMs)
+ requestInFlight.set(false)
+ }
+
private def handleTimeout(): Unit = {
warn("Timed out when requesting AllocateProducerIds from the controller.")
requestInFlight.set(false)
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
index eef0b31e415..a94d2d3592b 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
@@ -19,8 +19,9 @@ package kafka.coordinator.transaction
import kafka.coordinator.transaction.ProducerIdManager.RetryBackoffMs
import kafka.utils.TestUtils
import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode}
+import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.KafkaException
-import org.apache.kafka.common.errors.CoordinatorLoadInProgressException
+import org.apache.kafka.common.errors.{AuthenticationException,
CoordinatorLoadInProgressException, UnsupportedVersionException}
import org.apache.kafka.common.message.AllocateProducerIdsResponseData
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.AllocateProducerIdsResponse
@@ -52,31 +53,58 @@ class ProducerIdManagerTest {
val idLen: Int,
val errorQueue: ConcurrentLinkedQueue[Errors] = new
ConcurrentLinkedQueue[Errors](),
val isErroneousBlock: Boolean = false,
- val time: Time = Time.SYSTEM
+ val time: Time = Time.SYSTEM,
+ var hasAuthenticationException: Boolean = false,
+ var hasVersionMismatch: Boolean = false,
+ var hasNoResponse: Boolean = false
) extends RPCProducerIdManager(brokerId, time, () => 1, brokerToController) {
private val brokerToControllerRequestExecutor =
Executors.newSingleThreadExecutor()
val capturedFailure: AtomicBoolean = new AtomicBoolean(false)
+ private def createClientResponse(authenticationException:
AuthenticationException = null, versionException: UnsupportedVersionException =
null, response: AllocateProducerIdsResponse = null): ClientResponse =
+ new ClientResponse(null, null, null, time.milliseconds,
time.milliseconds, false, versionException, authenticationException, response)
+
override private[transaction] def sendRequest(): Unit = {
brokerToControllerRequestExecutor.submit(() => {
+ if (hasAuthenticationException) {
+
handleAllocateProducerIdsResponse(createClientResponse(authenticationException
= new AuthenticationException("Auth Failure")))
+ hasAuthenticationException = false // reset so retry works
+ return
+ }
+ if (hasVersionMismatch) {
+
handleAllocateProducerIdsResponse(createClientResponse(versionException = new
UnsupportedVersionException("Version Mismatch")))
+ hasVersionMismatch = false // reset so retry works
+ return
+ }
+ if (hasNoResponse) {
+ handleAllocateProducerIdsResponse(createClientResponse(null, null,
null))
+ hasNoResponse = false // reset so retry works
+ return
+ }
val error = errorQueue.poll()
if (error == null || error == Errors.NONE) {
- handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse(
- new
AllocateProducerIdsResponseData().setProducerIdStart(idStart).setProducerIdLen(idLen)))
+ handleAllocateProducerIdsResponse(createClientResponse(
+ response = new AllocateProducerIdsResponse(
+ new
AllocateProducerIdsResponseData().setProducerIdStart(idStart).setProducerIdLen(idLen)
+ )
+ ))
if (!isErroneousBlock) {
idStart += idLen
}
} else {
- handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse(
- new AllocateProducerIdsResponseData().setErrorCode(error.code)))
+ handleAllocateProducerIdsResponse(createClientResponse(
+ response = new AllocateProducerIdsResponse(
+ new AllocateProducerIdsResponseData().setErrorCode(error.code)
+ )
+ ))
}
}, 0)
}
- override private[transaction] def
handleAllocateProducerIdsResponse(response: AllocateProducerIdsResponse): Unit
= {
- super.handleAllocateProducerIdsResponse(response)
+ override private[transaction] def
handleAllocateProducerIdsResponse(clientResponse: ClientResponse): Unit = {
+ super.handleAllocateProducerIdsResponse(clientResponse)
capturedFailure.set(nextProducerIdBlock.get == null)
}
}
@@ -215,6 +243,45 @@ class ProducerIdManagerTest {
verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0)
}
+ @Test
+ def testRetryBackoffOnAuthException(): Unit = {
+ val time = new MockTime()
+ val manager = new MockProducerIdManager(0, 0, 1, time = time,
hasAuthenticationException = true)
+
+ verifyFailure(manager)
+
+ // We should only get a new block once retry backoff ms has passed.
+
assertCoordinatorLoadInProgressExceptionFailure(manager.generateProducerId())
+ time.sleep(RetryBackoffMs)
+ verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0)
+ }
+
+ @Test
+ def testRetryBackoffOnVersionMismatch(): Unit = {
+ val time = new MockTime()
+ val manager = new MockProducerIdManager(0, 0, 1, time = time,
hasVersionMismatch = true)
+
+ verifyFailure(manager)
+
+ // We should only get a new block once retry backoff ms has passed.
+
assertCoordinatorLoadInProgressExceptionFailure(manager.generateProducerId())
+ time.sleep(RetryBackoffMs)
+ verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0)
+ }
+
+ @Test
+ def testRetryBackoffOnNoResponse(): Unit = {
+ val time = new MockTime()
+ val manager = new MockProducerIdManager(0, 0, 1, time = time,
hasNoResponse = true)
+
+ verifyFailure(manager)
+
+ // We should only get a new block once retry backoff ms has passed.
+
assertCoordinatorLoadInProgressExceptionFailure(manager.generateProducerId())
+ time.sleep(RetryBackoffMs)
+ verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0)
+ }
+
private def queue(errors: Errors*): ConcurrentLinkedQueue[Errors] = {
val queue = new ConcurrentLinkedQueue[Errors]()
errors.foreach(queue.add)