[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error
hachikuji commented on code in PR #12915: URL: https://github.com/apache/kafka/pull/12915#discussion_r1038367059 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept assertTrue(secondResponseFuture.isDone()); } +@ParameterizedTest +@ValueSource(ints = {3, 7, 14, 51}) +public void testRetriableErrors(int errorCode) { +// Tests Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS, +// Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION +// We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic. +Errors error = Errors.forCode((short) errorCode); + +// Ensure FindCoordinator retries. +TransactionalRequestResult result = transactionManager.initializeTransactions(); +prepareFindCoordinatorResponse(error, false, CoordinatorType.TRANSACTION, transactionalId); +prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); +runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null); +assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + +// Ensure InitPid retries. +prepareInitPidResponse(error, false, producerId, epoch); +prepareInitPidResponse(Errors.NONE, false, producerId, epoch); +runUntil(transactionManager::hasProducerId); + +result.await(); +transactionManager.beginTransaction(); + +// Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is handled differently here, we substitute. +Errors addPartitionsToTxnError = errorCode == 51 ? Errors.COORDINATOR_LOAD_IN_PROGRESS : error; +transactionManager.maybeAddPartition(tp0); +prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, producerId); +prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId); +runUntil(() -> transactionManager.transactionContainsPartition(tp0)); + +// Ensure txnOffsetCommit retries is tested in testRetriableErrorInTxnOffsetCommit. + +// Ensure EndTxn retries. +TransactionalRequestResult abortResult = transactionManager.beginCommit(); +prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, epoch); +prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, producerId, epoch); +runUntil(abortResult::isCompleted); +assertTrue(abortResult.isSuccessful()); +} + +@Test +public void testCoordinatorNotAvailable() { Review Comment: Sorry, I realized this is not a valid error for `FindCoordinator`, so nevermind. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error
hachikuji commented on code in PR #12915: URL: https://github.com/apache/kafka/pull/12915#discussion_r1038362105 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept assertTrue(secondResponseFuture.isDone()); } +@ParameterizedTest +@ValueSource(ints = {3, 7, 14, 51}) Review Comment: I think another way to do this is like this: ```java @EnumSource(names = { "UNKNOWN_TOPIC_OR_PARTITION", "REQUEST_TIMED_OUT", "COORDINATOR_LOAD_IN_PROGRESS", "CONCURRENT_TRANSACTIONS" }) public void testRetriableErrors(Errors error) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error
hachikuji commented on code in PR #12915: URL: https://github.com/apache/kafka/pull/12915#discussion_r1038362105 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept assertTrue(secondResponseFuture.isDone()); } +@ParameterizedTest +@ValueSource(ints = {3, 7, 14, 51}) Review Comment: I think another way to do this is like this: ```java @EnumSource(names = { "UNKNOWN_TOPIC_OR_PARTITION", "REQUEST_TIMED_OUT", "COORDINATOR_LOAD_IN_PROGRESS", "CONCURRENT_TRANSACTIONS" }) ``` ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept assertTrue(secondResponseFuture.isDone()); } +@ParameterizedTest +@ValueSource(ints = {3, 7, 14, 51}) +public void testRetriableErrors(int errorCode) { +// Tests Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS, +// Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION +// We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic. +Errors error = Errors.forCode((short) errorCode); + +// Ensure FindCoordinator retries. +TransactionalRequestResult result = transactionManager.initializeTransactions(); +prepareFindCoordinatorResponse(error, false, CoordinatorType.TRANSACTION, transactionalId); +prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); +runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null); +assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + +// Ensure InitPid retries. +prepareInitPidResponse(error, false, producerId, epoch); +prepareInitPidResponse(Errors.NONE, false, producerId, epoch); +runUntil(transactionManager::hasProducerId); + +result.await(); +transactionManager.beginTransaction(); + +// Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is handled differently here, we substitute. +Errors addPartitionsToTxnError = errorCode == 51 ? Errors.COORDINATOR_LOAD_IN_PROGRESS : error; +transactionManager.maybeAddPartition(tp0); +prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, producerId); +prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId); +runUntil(() -> transactionManager.transactionContainsPartition(tp0)); + +// Ensure txnOffsetCommit retries is tested in testRetriableErrorInTxnOffsetCommit. + +// Ensure EndTxn retries. +TransactionalRequestResult abortResult = transactionManager.beginCommit(); +prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, epoch); +prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, producerId, epoch); +runUntil(abortResult::isCompleted); +assertTrue(abortResult.isSuccessful()); +} + +@Test +public void testCoordinatorNotAvailable() { +// Ensure FindCoordinator with COORDINATOR_NOT_AVAILABLE error retries. +TransactionalRequestResult result = transactionManager.initializeTransactions(); +prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, false, CoordinatorType.TRANSACTION, transactionalId); +prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); +runUntil(() -> transactionManager.coordinator(CoordinatorType.TRANSACTION) != null); +assertEquals(brokerNode, transactionManager.coordinator(CoordinatorType.TRANSACTION)); + +prepareInitPidResponse(Errors.NONE, false, producerId, epoch); +runUntil(transactionManager::hasProducerId); + +result.await(); +} + Review Comment: nit: extra new line ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept assertTrue(secondResponseFuture.isDone()); } +@ParameterizedTest +@ValueSource(ints = {3, 7, 14, 51}) +public void testRetriableErrors(int errorCode) { +// Tests Errors.CONCURRENT_TRANSACTIONS, Errors.COORDINATOR_LOAD_IN_PROGRESS, +// Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION +// We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic. +Errors error = Errors.forCode((short) errorCode); + +// Ensure FindCoordinator retries. +TransactionalRequestResult result = transactionManager.initializeTransactions(); +prep
[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error
hachikuji commented on code in PR #12915: URL: https://github.com/apache/kafka/pull/12915#discussion_r1036589456 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -1678,6 +1678,39 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept assertTrue(secondResponseFuture.isDone()); } +@Test +public void testRetriableErrors() { Review Comment: Just the ones we expect. So `REQUEST_TIMED_OUT`, `CONCURRENT_TRANSACTIONS` and any of the others we've replaced with the indirect `RetriableException` check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error
hachikuji commented on code in PR #12915: URL: https://github.com/apache/kafka/pull/12915#discussion_r1036589456 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -1678,6 +1678,39 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept assertTrue(secondResponseFuture.isDone()); } +@Test +public void testRetriableErrors() { Review Comment: Just the ones we expect. So `REQUEST_TIMED_OUT`, `CONCURRENT_TRANSACTIONS` and any of the others we've replaced with the implicit `RetriableException` check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error
hachikuji commented on code in PR #12915: URL: https://github.com/apache/kafka/pull/12915#discussion_r1036571472 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -1678,6 +1678,39 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept assertTrue(secondResponseFuture.isDone()); } +@Test +public void testRetriableErrors() { Review Comment: Is it possible to parameterize this test case so that we can verify retriable errors other than REQUEST_TIMED_OUT (e.g. CONCURRENT_TRANSACTIONS). ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -1678,6 +1678,39 @@ public void testMultipleAddPartitionsPerForOneProduce() throws InterruptedExcept assertTrue(secondResponseFuture.isDone()); } +@Test +public void testRetriableErrors() { Review Comment: Is it possible to parameterize this test case so that we can verify retriable errors other than REQUEST_TIMED_OUT (e.g. CONCURRENT_TRANSACTIONS)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error
hachikuji commented on code in PR #12915: URL: https://github.com/apache/kafka/pull/12915#discussion_r1036564976 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -1515,7 +1515,7 @@ public void handleResponse(AbstractResponse response) { } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); reenqueue(); -} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) { +} else if (error == Errors.CONCURRENT_TRANSACTIONS || error.exception() instanceof RetriableException) { Review Comment: Hmm, I don't think there are any dependencies outside `TransactionManager`. It seems like a safe change? It _is_ a retriable error: we just haven't marked it as such explicitly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error
hachikuji commented on code in PR #12915: URL: https://github.com/apache/kafka/pull/12915#discussion_r1036359658 ## core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala: ## @@ -167,7 +167,7 @@ class RPCProducerIdManager(brokerId: Int, if (nextProducerId > currentProducerIdBlock.lastProducerId) { val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS) if (block == null) { - throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next producer ID block") + throw Errors.CONCURRENT_TRANSACTIONS.exception("Timed out waiting for next producer ID block") Review Comment: Should we do this fix separately? I thought we had considered whether a short KIP would be needed. We probably also need to consider the behavior of other clients (such as rdkafka). ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -1515,7 +1515,7 @@ public void handleResponse(AbstractResponse response) { } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); reenqueue(); -} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) { +} else if (error == Errors.CONCURRENT_TRANSACTIONS || error.exception() instanceof RetriableException) { Review Comment: I think `ConcurrentTransactionsException` should be retriable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error
hachikuji commented on code in PR #12915: URL: https://github.com/apache/kafka/pull/12915#discussion_r1035124199 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -1290,7 +1290,7 @@ public void handleResponse(AbstractResponse response) { } else if (error == Errors.NOT_COORDINATOR || error == Errors.COORDINATOR_NOT_AVAILABLE) { lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); reenqueue(); -} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) { +} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS || error == Errors.REQUEST_TIMED_OUT) { Review Comment: Yeah, I'm suggesting that we check the timeout error code (or retriable errors generally) for all of the handlers. That protects the client from future changes on the broker if the error code is not used today. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error
hachikuji commented on code in PR #12915: URL: https://github.com/apache/kafka/pull/12915#discussion_r1035081487 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -1290,7 +1290,7 @@ public void handleResponse(AbstractResponse response) { } else if (error == Errors.NOT_COORDINATOR || error == Errors.COORDINATOR_NOT_AVAILABLE) { lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); reenqueue(); -} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) { +} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS || error == Errors.REQUEST_TIMED_OUT) { Review Comment: Also, can we check the other handlers? We may as well update all of them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error
hachikuji commented on code in PR #12915: URL: https://github.com/apache/kafka/pull/12915#discussion_r1035062033 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -1290,7 +1290,7 @@ public void handleResponse(AbstractResponse response) { } else if (error == Errors.NOT_COORDINATOR || error == Errors.COORDINATOR_NOT_AVAILABLE) { lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); reenqueue(); -} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) { +} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS || error == Errors.REQUEST_TIMED_OUT) { Review Comment: I'm wondering if we should be even more lenient with the error codes and check for `RetriableException`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org