[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1038367059


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() 
throws InterruptedExcept
 assertTrue(secondResponseFuture.isDone());
 }
 
+@ParameterizedTest
+@ValueSource(ints = {3, 7, 14, 51})
+public void testRetriableErrors(int errorCode) {
+// Tests Errors.CONCURRENT_TRANSACTIONS, 
Errors.COORDINATOR_LOAD_IN_PROGRESS,
+// Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION
+// We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic.
+Errors error = Errors.forCode((short) errorCode);
+
+// Ensure FindCoordinator retries.
+TransactionalRequestResult result = 
transactionManager.initializeTransactions();
+prepareFindCoordinatorResponse(error, false, 
CoordinatorType.TRANSACTION, transactionalId);
+prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
+runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+// Ensure InitPid retries.
+prepareInitPidResponse(error, false, producerId, epoch);
+prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
+runUntil(transactionManager::hasProducerId);
+
+result.await();
+transactionManager.beginTransaction();
+
+// Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is 
handled differently here, we substitute.
+Errors addPartitionsToTxnError = errorCode == 51 ? 
Errors.COORDINATOR_LOAD_IN_PROGRESS : error;
+transactionManager.maybeAddPartition(tp0);
+prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, 
producerId);
+prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+runUntil(() -> transactionManager.transactionContainsPartition(tp0));
+
+// Ensure txnOffsetCommit retries is tested in 
testRetriableErrorInTxnOffsetCommit.
+
+// Ensure EndTxn retries.
+TransactionalRequestResult abortResult = 
transactionManager.beginCommit();
+prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, 
epoch);
+prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 
producerId, epoch);
+runUntil(abortResult::isCompleted);
+assertTrue(abortResult.isSuccessful());
+}
+
+@Test
+public void testCoordinatorNotAvailable() {

Review Comment:
   Sorry, I realized this is not a valid error for `FindCoordinator`, so 
nevermind.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1038362105


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() 
throws InterruptedExcept
 assertTrue(secondResponseFuture.isDone());
 }
 
+@ParameterizedTest
+@ValueSource(ints = {3, 7, 14, 51})

Review Comment:
   I think another way to do this is like this:
   ```java
   @EnumSource(names = { 
 "UNKNOWN_TOPIC_OR_PARTITION", 
 "REQUEST_TIMED_OUT", 
 "COORDINATOR_LOAD_IN_PROGRESS", 
 "CONCURRENT_TRANSACTIONS" 
   })
   public void testRetriableErrors(Errors error) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

2022-12-02 Thread GitBox


hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1038362105


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() 
throws InterruptedExcept
 assertTrue(secondResponseFuture.isDone());
 }
 
+@ParameterizedTest
+@ValueSource(ints = {3, 7, 14, 51})

Review Comment:
   I think another way to do this is like this:
   ```java
   @EnumSource(names = { 
 "UNKNOWN_TOPIC_OR_PARTITION", 
 "REQUEST_TIMED_OUT", 
 "COORDINATOR_LOAD_IN_PROGRESS", 
 "CONCURRENT_TRANSACTIONS" 
   })
   ```



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() 
throws InterruptedExcept
 assertTrue(secondResponseFuture.isDone());
 }
 
+@ParameterizedTest
+@ValueSource(ints = {3, 7, 14, 51})
+public void testRetriableErrors(int errorCode) {
+// Tests Errors.CONCURRENT_TRANSACTIONS, 
Errors.COORDINATOR_LOAD_IN_PROGRESS,
+// Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION
+// We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic.
+Errors error = Errors.forCode((short) errorCode);
+
+// Ensure FindCoordinator retries.
+TransactionalRequestResult result = 
transactionManager.initializeTransactions();
+prepareFindCoordinatorResponse(error, false, 
CoordinatorType.TRANSACTION, transactionalId);
+prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
+runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+// Ensure InitPid retries.
+prepareInitPidResponse(error, false, producerId, epoch);
+prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
+runUntil(transactionManager::hasProducerId);
+
+result.await();
+transactionManager.beginTransaction();
+
+// Ensure AddPartitionsToTxn retries. Since CONCURRENT_TRANSACTIONS is 
handled differently here, we substitute.
+Errors addPartitionsToTxnError = errorCode == 51 ? 
Errors.COORDINATOR_LOAD_IN_PROGRESS : error;
+transactionManager.maybeAddPartition(tp0);
+prepareAddPartitionsToTxnResponse(addPartitionsToTxnError, tp0, epoch, 
producerId);
+prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
+runUntil(() -> transactionManager.transactionContainsPartition(tp0));
+
+// Ensure txnOffsetCommit retries is tested in 
testRetriableErrorInTxnOffsetCommit.
+
+// Ensure EndTxn retries.
+TransactionalRequestResult abortResult = 
transactionManager.beginCommit();
+prepareEndTxnResponse(error, TransactionResult.COMMIT, producerId, 
epoch);
+prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, 
producerId, epoch);
+runUntil(abortResult::isCompleted);
+assertTrue(abortResult.isSuccessful());
+}
+
+@Test
+public void testCoordinatorNotAvailable() {
+// Ensure FindCoordinator with COORDINATOR_NOT_AVAILABLE error retries.
+TransactionalRequestResult result = 
transactionManager.initializeTransactions();
+prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
false, CoordinatorType.TRANSACTION, transactionalId);
+prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
+runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+prepareInitPidResponse(Errors.NONE, false, producerId, epoch);
+runUntil(transactionManager::hasProducerId);
+
+result.await();
+}
+

Review Comment:
   nit: extra new line



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -1678,6 +1680,62 @@ public void testMultipleAddPartitionsPerForOneProduce() 
throws InterruptedExcept
 assertTrue(secondResponseFuture.isDone());
 }
 
+@ParameterizedTest
+@ValueSource(ints = {3, 7, 14, 51})
+public void testRetriableErrors(int errorCode) {
+// Tests Errors.CONCURRENT_TRANSACTIONS, 
Errors.COORDINATOR_LOAD_IN_PROGRESS,
+// Errors.REQUEST_TIMED_OUT, Errors.UNKNOWN_TOPIC_OR_PARTITION
+// We skip COORDINATOR_NOT_AVAILABLE since it breaks the logic.
+Errors error = Errors.forCode((short) errorCode);
+
+// Ensure FindCoordinator retries.
+TransactionalRequestResult result = 
transactionManager.initializeTransactions();
+prep

[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

2022-12-01 Thread GitBox


hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1036589456


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -1678,6 +1678,39 @@ public void testMultipleAddPartitionsPerForOneProduce() 
throws InterruptedExcept
 assertTrue(secondResponseFuture.isDone());
 }
 
+@Test
+public void testRetriableErrors() {

Review Comment:
   Just the ones we expect. So `REQUEST_TIMED_OUT`, `CONCURRENT_TRANSACTIONS` 
and any of the others we've replaced with the indirect `RetriableException` 
check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

2022-11-30 Thread GitBox


hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1036589456


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -1678,6 +1678,39 @@ public void testMultipleAddPartitionsPerForOneProduce() 
throws InterruptedExcept
 assertTrue(secondResponseFuture.isDone());
 }
 
+@Test
+public void testRetriableErrors() {

Review Comment:
   Just the ones we expect. So `REQUEST_TIMED_OUT`, `CONCURRENT_TRANSACTIONS` 
and any of the others we've replaced with the implicit `RetriableException` 
check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

2022-11-30 Thread GitBox


hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1036571472


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -1678,6 +1678,39 @@ public void testMultipleAddPartitionsPerForOneProduce() 
throws InterruptedExcept
 assertTrue(secondResponseFuture.isDone());
 }
 
+@Test
+public void testRetriableErrors() {

Review Comment:
   Is it possible to parameterize this test case so that we can verify 
retriable errors other than REQUEST_TIMED_OUT (e.g. CONCURRENT_TRANSACTIONS).



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -1678,6 +1678,39 @@ public void testMultipleAddPartitionsPerForOneProduce() 
throws InterruptedExcept
 assertTrue(secondResponseFuture.isDone());
 }
 
+@Test
+public void testRetriableErrors() {

Review Comment:
   Is it possible to parameterize this test case so that we can verify 
retriable errors other than REQUEST_TIMED_OUT (e.g. CONCURRENT_TRANSACTIONS)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

2022-11-30 Thread GitBox


hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1036564976


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1515,7 +1515,7 @@ public void handleResponse(AbstractResponse response) {
 } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == 
Errors.NOT_COORDINATOR) {
 
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, 
transactionalId);
 reenqueue();
-} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error 
== Errors.CONCURRENT_TRANSACTIONS) {
+} else if (error == Errors.CONCURRENT_TRANSACTIONS || 
error.exception() instanceof RetriableException) {

Review Comment:
   Hmm, I don't think there are any dependencies outside `TransactionManager`. 
It seems like a safe change? It _is_ a retriable error: we just haven't marked 
it as such explicitly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

2022-11-30 Thread GitBox


hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1036359658


##
core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala:
##
@@ -167,7 +167,7 @@ class RPCProducerIdManager(brokerId: Int,
   if (nextProducerId > currentProducerIdBlock.lastProducerId) {
 val block = nextProducerIdBlock.poll(maxWaitMs, TimeUnit.MILLISECONDS)
 if (block == null) {
-  throw Errors.REQUEST_TIMED_OUT.exception("Timed out waiting for next 
producer ID block")
+  throw Errors.CONCURRENT_TRANSACTIONS.exception("Timed out waiting 
for next producer ID block")

Review Comment:
   Should we do this fix separately? I thought we had considered whether a 
short KIP would be needed. We probably also need to consider the behavior of 
other clients (such as rdkafka).



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1515,7 +1515,7 @@ public void handleResponse(AbstractResponse response) {
 } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == 
Errors.NOT_COORDINATOR) {
 
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, 
transactionalId);
 reenqueue();
-} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error 
== Errors.CONCURRENT_TRANSACTIONS) {
+} else if (error == Errors.CONCURRENT_TRANSACTIONS || 
error.exception() instanceof RetriableException) {

Review Comment:
   I think `ConcurrentTransactionsException` should be retriable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

2022-11-29 Thread GitBox


hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1035124199


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1290,7 +1290,7 @@ public void handleResponse(AbstractResponse response) {
 } else if (error == Errors.NOT_COORDINATOR || error == 
Errors.COORDINATOR_NOT_AVAILABLE) {
 
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, 
transactionalId);
 reenqueue();
-} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error 
== Errors.CONCURRENT_TRANSACTIONS) {
+} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error 
== Errors.CONCURRENT_TRANSACTIONS || error == Errors.REQUEST_TIMED_OUT) {

Review Comment:
   Yeah, I'm suggesting that we check the timeout error code (or retriable 
errors generally) for all of the handlers. That protects the client from future 
changes on the broker if the error code is not used today.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

2022-11-29 Thread GitBox


hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1035081487


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1290,7 +1290,7 @@ public void handleResponse(AbstractResponse response) {
 } else if (error == Errors.NOT_COORDINATOR || error == 
Errors.COORDINATOR_NOT_AVAILABLE) {
 
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, 
transactionalId);
 reenqueue();
-} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error 
== Errors.CONCURRENT_TRANSACTIONS) {
+} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error 
== Errors.CONCURRENT_TRANSACTIONS || error == Errors.REQUEST_TIMED_OUT) {

Review Comment:
   Also, can we check the other handlers? We may as well update all of them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error

2022-11-29 Thread GitBox


hachikuji commented on code in PR #12915:
URL: https://github.com/apache/kafka/pull/12915#discussion_r1035062033


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -1290,7 +1290,7 @@ public void handleResponse(AbstractResponse response) {
 } else if (error == Errors.NOT_COORDINATOR || error == 
Errors.COORDINATOR_NOT_AVAILABLE) {
 
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, 
transactionalId);
 reenqueue();
-} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error 
== Errors.CONCURRENT_TRANSACTIONS) {
+} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error 
== Errors.CONCURRENT_TRANSACTIONS || error == Errors.REQUEST_TIMED_OUT) {

Review Comment:
   I'm wondering if we should be even more lenient with the error codes and 
check for `RetriableException`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org