This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push: new d424aea8c3d KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error (#12915) d424aea8c3d is described below commit d424aea8c3d847f7d5d742693e47a3f8495cee93 Author: Justine Olshan <jols...@confluent.io> AuthorDate: Tue Dec 6 11:41:31 2022 -0800 KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error (#12915) The broker may return the `REQUEST_TIMED_OUT` error in `InitProducerId` responses when allocating the ID using the `AllocateProducerIds` request. The client currently does not handle this. Instead of retrying as we would expect, the client raises a fatal exception to the application. In this patch, we address this problem by modifying the producer to handle `REQUEST_TIMED_OUT` and any other retriable errors by re-enqueuing the request. Reviewers: Jason Gustafson <ja...@confluent.io> --- .../producer/internals/TransactionManager.java | 10 ++-- .../errors/ConcurrentTransactionsException.java | 2 +- .../producer/internals/TransactionManagerTest.java | 57 ++++++++++++++++++++++ 3 files changed, 63 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 5aab62eaf22..de5a6ced41c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -1290,7 +1290,7 @@ public class TransactionManager { } else if (error == Errors.NOT_COORDINATOR || error == Errors.COORDINATOR_NOT_AVAILABLE) { lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); reenqueue(); - } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) { + } else if (error.exception() instanceof RetriableException) { reenqueue(); } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED || error == Errors.CLUSTER_AUTHORIZATION_FAILED) { @@ -1347,7 +1347,7 @@ public class TransactionManager { maybeOverrideRetryBackoffMs(); reenqueue(); return; - } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { + } else if (error.exception() instanceof RetriableException) { reenqueue(); return; } else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) { @@ -1467,7 +1467,7 @@ public class TransactionManager { } result.done(); log.info("Discovered {} coordinator {}", coordinatorType.toString().toLowerCase(Locale.ROOT), node); - } else if (error == Errors.COORDINATOR_NOT_AVAILABLE) { + } else if (error.exception() instanceof RetriableException) { reenqueue(); } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) { fatalError(error.exception()); @@ -1515,7 +1515,7 @@ public class TransactionManager { } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); reenqueue(); - } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) { + } else if (error.exception() instanceof RetriableException) { reenqueue(); } else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) { // We could still receive INVALID_PRODUCER_EPOCH from old versioned transaction coordinator, @@ -1572,7 +1572,7 @@ public class TransactionManager { } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); reenqueue(); - } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) { + } else if (error.exception() instanceof RetriableException) { reenqueue(); } else if (error == Errors.UNKNOWN_PRODUCER_ID || error == Errors.INVALID_PRODUCER_ID_MAPPING) { abortableErrorIfPossible(error.exception()); diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ConcurrentTransactionsException.java b/clients/src/main/java/org/apache/kafka/common/errors/ConcurrentTransactionsException.java index 6ad6b8a3ddb..118b4de50aa 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/ConcurrentTransactionsException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/ConcurrentTransactionsException.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.common.errors; -public class ConcurrentTransactionsException extends ApiException { +public class ConcurrentTransactionsException extends RetriableException { private static final long serialVersionUID = 1L; public ConcurrentTransactionsException(final String message) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index b6bf9e6f4f1..ce9b8052207 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -77,6 +77,8 @@ import org.apache.kafka.common.utils.ProducerIdAndEpoch; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.nio.ByteBuffer; import java.util.Arrays; @@ -1678,6 +1680,61 @@ public class TransactionManagerTest { assertTrue(secondResponseFuture.isDone()); } + @ParameterizedTest + @EnumSource(names = { + "UNKNOWN_TOPIC_OR_PARTITION", + "REQUEST_TIMED_OUT", + "COORDINATOR_LOAD_IN_PROGRESS", + "CONCURRENT_TRANSACTIONS" + }) + public void testRetriableErrors2(Errors error) { + // Ensure FindCoordinator retries. + TransactionalRequestResult result = transactionManager.initializeTransactions(); + prepareFindCoordinatorResponse(error, false, CoordinatorType.TRANSACTION, transactionalId); + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); + runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null); + assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + + // Ensure InitPid retries. + prepareInitPidResponse(error, false, producerId, epoch); + prepareInitPidResponse(Errors.NONE, false, producerId, epoch); + runUntil(transactionManager::hasProducerId); + + result.await(); + transactionManager.beginTransaction(); + + // Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is handled differently here, we substitute. + Errors addPartitionsToTxnError = error.equals(Errors.CONCURRENT_TRANSACTIONS) ? Errors.COORDINATOR_LOAD_IN_PROGRESS : error; + transactionManager.maybeAddPartition(tp0); + prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, producerId); + prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId); + runUntil(() -> transactionManager.transactionContainsPartition(tp0)); + + // Ensure txnOffsetCommit retries is tested in testRetriableErrorInTxnOffsetCommit. + + // Ensure EndTxn retries. + TransactionalRequestResult abortResult = transactionManager.beginCommit(); + prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, epoch); + prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, producerId, epoch); + runUntil(abortResult::isCompleted); + assertTrue(abortResult.isSuccessful()); + } + + @Test + public void testCoordinatorNotAvailable() { + // Ensure FindCoordinator with COORDINATOR_NOT_AVAILABLE error retries. + TransactionalRequestResult result = transactionManager.initializeTransactions(); + prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, false, CoordinatorType.TRANSACTION, transactionalId); + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); + runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null); + assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + + prepareInitPidResponse(Errors.NONE, false, producerId, epoch); + runUntil(transactionManager::hasProducerId); + + result.await(); + } + @Test public void testProducerFencedExceptionInInitProducerId() { verifyProducerFencedForInitProducerId(Errors.PRODUCER_FENCED);