This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new ca9a1b10375 KAFKA-19990 gracefully handle exceptions when handling
AllocateProducerIdsResponse (#21135)
ca9a1b10375 is described below
commit ca9a1b103755217643500b1f01f4dbc16d7da83d
Author: Gaurav Narula <[email protected]>
AuthorDate: Wed Dec 17 16:43:32 2025 +0000
KAFKA-19990 gracefully handle exceptions when handling
AllocateProducerIdsResponse (#21135)
The handler in `RPCProducerIdManager` doesn't handle authentication
exception and version mismatch exceptions gracefully.
This change ensures we retry on such failures and adds unit tests for
these scenarios.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../transaction/RPCProducerIdManager.java | 37 +++++--
.../transaction/ProducerIdManagerTest.java | 110 ++++++++++++++++++---
2 files changed, 122 insertions(+), 25 deletions(-)
diff --git
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java
index 32e43880ac1..e4e3772de60 100644
---
a/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java
+++
b/transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java
@@ -51,7 +51,8 @@ public class RPCProducerIdManager implements
ProducerIdManager {
private final String logPrefix;
private final int brokerId;
- private final Time time;
+ // Visible for testing
+ final Time time;
private final Supplier<Long> brokerEpochSupplier;
private final NodeToControllerChannelManager controllerChannel;
@@ -129,9 +130,7 @@ public class RPCProducerIdManager implements
ProducerIdManager {
@Override
public void onComplete(ClientResponse response) {
- if (response.responseBody() instanceof
AllocateProducerIdsResponse) {
-
handleAllocateProducerIdsResponse((AllocateProducerIdsResponse)
response.responseBody());
- }
+ handleAllocateProducerIdsResponse(response);
}
@Override
@@ -142,7 +141,30 @@ public class RPCProducerIdManager implements
ProducerIdManager {
});
}
- protected void
handleAllocateProducerIdsResponse(AllocateProducerIdsResponse response) {
+ private void handleUnsuccessfulResponse() {
+ // There is no need to compare and set because only one thread
+ // handles the AllocateProducerIds response.
+ backoffDeadlineMs.set(time.milliseconds() + RETRY_BACKOFF_MS);
+ requestInFlight.set(false);
+ }
+
+ protected void handleAllocateProducerIdsResponse(ClientResponse
clientResponse) {
+ if (clientResponse.authenticationException() != null) {
+ log.error("{} Unable to allocate producer id because of an
authentication exception", logPrefix, clientResponse.authenticationException());
+ handleUnsuccessfulResponse();
+ return;
+ }
+ if (clientResponse.versionMismatch() != null) {
+ log.error("{} Unable to allocate producer id because of a version
mismatch exception", logPrefix, clientResponse.versionMismatch());
+ handleUnsuccessfulResponse();
+ return;
+ }
+ if (!clientResponse.hasResponse()) {
+ log.error("{} Unable to allocate producer id because of empty
response from controller", logPrefix);
+ handleUnsuccessfulResponse();
+ return;
+ }
+ AllocateProducerIdsResponse response = (AllocateProducerIdsResponse)
clientResponse.responseBody();
var data = response.data();
var successfulResponse = false;
var errors = Errors.forCode(data.errorCode());
@@ -161,10 +183,7 @@ public class RPCProducerIdManager implements
ProducerIdManager {
log.error("{} Received error code {} from the controller.",
logPrefix, errors);
}
if (!successfulResponse) {
- // There is no need to compare and set because only one thread
- // handles the AllocateProducerIds response.
- backoffDeadlineMs.set(time.milliseconds() + RETRY_BACKOFF_MS);
- requestInFlight.set(false);
+ handleUnsuccessfulResponse();
}
}
diff --git
a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/ProducerIdManagerTest.java
b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/ProducerIdManagerTest.java
index 72aba6b407f..13ce730660f 100644
---
a/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/ProducerIdManagerTest.java
+++
b/transaction-coordinator/src/test/java/org/apache/kafka/coordinator/transaction/ProducerIdManagerTest.java
@@ -16,7 +16,10 @@
*/
package org.apache.kafka.coordinator.transaction;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AllocateProducerIdsResponse;
@@ -61,43 +64,82 @@ public class ProducerIdManagerTest {
private final ExecutorService brokerToControllerRequestExecutor =
Executors.newSingleThreadExecutor();
private final int idLen;
private Long idStart;
+ private boolean hasAuthenticationException;
+ private boolean hasVersionMismatch;
+ private boolean hasNoResponse;
MockProducerIdManager(int brokerId,
long idStart,
int idLen,
Queue<Errors> errorQueue,
boolean isErroneousBlock,
- Time time) {
+ Time time,
+ boolean hasAuthenticationException,
+ boolean hasVersionMismatch,
+ boolean hasNoResponse) {
super(brokerId, time, () -> 1L, brokerToController);
this.idStart = idStart;
this.idLen = idLen;
this.errorQueue = errorQueue;
this.isErroneousBlock = isErroneousBlock;
+ this.hasAuthenticationException = hasAuthenticationException;
+ this.hasVersionMismatch = hasVersionMismatch;
+ this.hasNoResponse = hasNoResponse;
+ }
+
+ private ClientResponse createClientResponse(
+ AuthenticationException authenticationException,
+ UnsupportedVersionException versionException,
+ AllocateProducerIdsResponse response
+ ) {
+ return new ClientResponse(null, null, null, time.milliseconds(),
time.milliseconds(),
+ false, versionException, authenticationException,
response);
}
@Override
protected void sendRequest() {
brokerToControllerRequestExecutor.submit(() -> {
+ if (hasAuthenticationException) {
+ handleAllocateProducerIdsResponse(createClientResponse(new
AuthenticationException("Auth Failure"), null, null));
+ hasAuthenticationException = false; // reset so retry works
+ return;
+ }
+ if (hasVersionMismatch) {
+
handleAllocateProducerIdsResponse(createClientResponse(null, new
UnsupportedVersionException("Version Mismatch"), null));
+ hasVersionMismatch = false; // reset so retry works
+ return;
+ }
+ if (hasNoResponse) {
+
handleAllocateProducerIdsResponse(createClientResponse(null, null, null));
+ hasNoResponse = false; // reset so retry works
+ return;
+ }
Errors error = errorQueue.poll();
if (error == null || error == Errors.NONE) {
- handleAllocateProducerIdsResponse(new
AllocateProducerIdsResponse(
- new AllocateProducerIdsResponseData()
- .setProducerIdStart(idStart)
- .setProducerIdLen(idLen)
- ));
+ handleAllocateProducerIdsResponse(createClientResponse(
+ null,
+ null,
+ new AllocateProducerIdsResponse(
+ new AllocateProducerIdsResponseData()
+ .setProducerIdStart(idStart)
+ .setProducerIdLen(idLen)
+ )));
if (!isErroneousBlock) {
idStart += idLen;
}
} else {
- handleAllocateProducerIdsResponse(new
AllocateProducerIdsResponse(
- new
AllocateProducerIdsResponseData().setErrorCode(error.code())
- ));
+ handleAllocateProducerIdsResponse(createClientResponse(
+ null,
+ null,
+ new AllocateProducerIdsResponse(
+ new
AllocateProducerIdsResponseData().setErrorCode(error.code())
+ )));
}
}, 0);
}
@Override
- protected void
handleAllocateProducerIdsResponse(AllocateProducerIdsResponse response) {
+ protected void handleAllocateProducerIdsResponse(ClientResponse
response) {
super.handleAllocateProducerIdsResponse(response);
capturedFailure.set(nextProducerIdBlock.get() == null);
}
@@ -112,7 +154,7 @@ public class ProducerIdManagerTest {
var numThreads = 5;
var latch = new CountDownLatch(idBlockLen * 3);
var manager = new MockProducerIdManager(0, 0, idBlockLen,
- new ConcurrentLinkedQueue<>(), false, Time.SYSTEM);
+ new ConcurrentLinkedQueue<>(), false, Time.SYSTEM, false,
false, false);
var requestHandlerThreadPool =
Executors.newFixedThreadPool(numThreads);
Map<Long, Integer> pidMap = new ConcurrentHashMap<>();
@@ -149,7 +191,7 @@ public class ProducerIdManagerTest {
@EnumSource(value = Errors.class, names = {"UNKNOWN_SERVER_ERROR",
"INVALID_REQUEST"})
public void testUnrecoverableErrors(Errors error) throws Exception {
var time = new MockTime();
- var manager = new MockProducerIdManager(0, 0, 1, queue(Errors.NONE,
error), false, time);
+ var manager = new MockProducerIdManager(0, 0, 1, queue(Errors.NONE,
error), false, time, false, false, false);
verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0);
verifyFailureWithoutGenerateProducerId(manager);
@@ -159,20 +201,56 @@ public class ProducerIdManagerTest {
@Test
public void testInvalidRanges() throws InterruptedException {
- var manager = new MockProducerIdManager(0, -1, 10, new
ConcurrentLinkedQueue<>(), true, Time.SYSTEM);
+ var manager = new MockProducerIdManager(0, -1, 10, new
ConcurrentLinkedQueue<>(), true, Time.SYSTEM, false, false, false);
verifyFailure(manager);
- manager = new MockProducerIdManager(0, 0, -1, new
ConcurrentLinkedQueue<>(), true, Time.SYSTEM);
+ manager = new MockProducerIdManager(0, 0, -1, new
ConcurrentLinkedQueue<>(), true, Time.SYSTEM, false, false, false);
verifyFailure(manager);
- manager = new MockProducerIdManager(0, Long.MAX_VALUE - 1, 10, new
ConcurrentLinkedQueue<>(), true, Time.SYSTEM);
+ manager = new MockProducerIdManager(0, Long.MAX_VALUE - 1, 10, new
ConcurrentLinkedQueue<>(), true, Time.SYSTEM, false, false, false);
verifyFailure(manager);
}
@Test
public void testRetryBackoff() throws Exception {
var time = new MockTime();
- var manager = new MockProducerIdManager(0, 0, 1,
queue(Errors.UNKNOWN_SERVER_ERROR), false, time);
+ var manager = new MockProducerIdManager(0, 0, 1,
queue(Errors.UNKNOWN_SERVER_ERROR), false, time, false, false, false);
+
+ verifyFailure(manager);
+
+ assertThrows(CoordinatorLoadInProgressException.class,
manager::generateProducerId);
+ time.sleep(RETRY_BACKOFF_MS);
+ verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0);
+ }
+
+ @Test
+ public void testRetryBackoffOnAuthException() throws Exception {
+ var time = new MockTime();
+ var manager = new MockProducerIdManager(0, 0, 1, new
ConcurrentLinkedQueue<>(), false, time, true, false, false);
+
+ verifyFailure(manager);
+
+ assertThrows(CoordinatorLoadInProgressException.class,
manager::generateProducerId);
+ time.sleep(RETRY_BACKOFF_MS);
+ verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0);
+ }
+
+ @Test
+ public void testRetryBackoffOnVersionMismatch() throws Exception {
+ var time = new MockTime();
+ var manager = new MockProducerIdManager(0, 0, 1, new
ConcurrentLinkedQueue<>(), false, time, false, true, false);
+
+ verifyFailure(manager);
+
+ assertThrows(CoordinatorLoadInProgressException.class,
manager::generateProducerId);
+ time.sleep(RETRY_BACKOFF_MS);
+ verifyNewBlockAndProducerId(manager, new ProducerIdsBlock(0, 0, 1), 0);
+ }
+
+ @Test
+ public void testRetryBackoffOnNoResponse() throws Exception {
+ var time = new MockTime();
+ var manager = new MockProducerIdManager(0, 0, 1, new
ConcurrentLinkedQueue<>(), false, time, false, false, true);
verifyFailure(manager);