[GitHub] [kafka] showuon commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

2022-08-17 Thread GitBox


showuon commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r947888637


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws 
InterruptedException {
 } catch (ExecutionException e) {
 assertTrue(e.getCause() instanceof  TimeoutException);
 }
+
 runUntil(commitResult::isCompleted);  // the commit shouldn't be 
completed without being sent since the produce request failed.
 assertFalse(commitResult.isSuccessful());  // the commit shouldn't 
succeed since the produce request failed.
-assertThrows(TimeoutException.class, commitResult::await);
+assertThrows(KafkaException.class, commitResult::await);
 
-assertTrue(transactionManager.hasAbortableError());
-assertTrue(transactionManager.hasOngoingTransaction());
+assertTrue(transactionManager.hasFatalBumpableError());
+assertFalse(transactionManager.hasOngoingTransaction());
 assertFalse(transactionManager.isCompleting());
-assertTrue(transactionManager.transactionContainsPartition(tp0));
 
-TransactionalRequestResult abortResult = 
transactionManager.beginAbort();
-
-prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 
producerId, epoch);
-prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch 
+ 1));
-runUntil(abortResult::isCompleted);
-assertTrue(abortResult.isSuccessful());
-assertFalse(transactionManager.hasOngoingTransaction());
-assertFalse(transactionManager.transactionContainsPartition(tp0));
+assertThrows(KafkaException.class, () -> 
transactionManager.beginAbort());

Review Comment:
   I personally like the solution to make the producer entering fatal error 
state. But I'd like to hear others' opinion since it will affect producer's 
behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

2022-08-11 Thread GitBox


showuon commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r943219606


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws 
InterruptedException {
 } catch (ExecutionException e) {
 assertTrue(e.getCause() instanceof  TimeoutException);
 }
+
 runUntil(commitResult::isCompleted);  // the commit shouldn't be 
completed without being sent since the produce request failed.
 assertFalse(commitResult.isSuccessful());  // the commit shouldn't 
succeed since the produce request failed.
-assertThrows(TimeoutException.class, commitResult::await);
+assertThrows(KafkaException.class, commitResult::await);
 
-assertTrue(transactionManager.hasAbortableError());
-assertTrue(transactionManager.hasOngoingTransaction());
+assertTrue(transactionManager.hasFatalBumpableError());
+assertFalse(transactionManager.hasOngoingTransaction());
 assertFalse(transactionManager.isCompleting());
-assertTrue(transactionManager.transactionContainsPartition(tp0));
 
-TransactionalRequestResult abortResult = 
transactionManager.beginAbort();
-
-prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 
producerId, epoch);
-prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch 
+ 1));
-runUntil(abortResult::isCompleted);
-assertTrue(abortResult.isSuccessful());
-assertFalse(transactionManager.hasOngoingTransaction());
-assertFalse(transactionManager.transactionContainsPartition(tp0));
+assertThrows(KafkaException.class, () -> 
transactionManager.beginAbort());

Review Comment:
   So, it looks like after this patch, when batch expiration or timeout error, 
the producer will enter fatal error state after bumping epoch. But before this 
patch, the we'll abort it and continue the transaction work. Is that right?
   
   Sorry, I didn't realize this situation. This will impact current user 
behavior, so we need more discussion. I'll ping some experts in this PR, and 
hope they will help provide comments. 
   cc @artemlivshits @ijuma 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

2022-08-10 Thread GitBox


showuon commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r942413117


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -241,12 +246,40 @@ synchronized TransactionalRequestResult 
initializeTransactions(ProducerIdAndEpoc
 .setProducerId(producerIdAndEpoch.producerId)
 .setProducerEpoch(producerIdAndEpoch.epoch);
 InitProducerIdHandler handler = new InitProducerIdHandler(new 
InitProducerIdRequest.Builder(requestData),
-isEpochBump);
+isEpochBump, false);
 enqueueRequest(handler);
 return handler.result;
 }, State.INITIALIZING, "initTransactions");
 }
 
+synchronized void tryTransitioningIntoFatalBumpableError(RuntimeException 
cause) {
+if (currentState == State.FATAL_ERROR || currentState == 
State.FATAL_BUMPABLE_ERROR) {
+// Already in a fatal state, skip
+return;
+}
+String errorMessage = "Encountered unrecoverable error due to batch 
client side timeout";
+RuntimeException failure = cause == null
+? new KafkaException(errorMessage)
+: new KafkaException(errorMessage, cause);
+transitionToFatalBumpableError(failure);
+
+// If an epoch bump is possible, try to fence the current transaction 
by bumping
+if (canBumpEpoch()) {
+log.info("Invoking InitProducerId with current producer ID and 
epoch {} in order to bump the epoch to fence the current transaction", 
producerIdAndEpoch);
+InitProducerIdRequestData requestData = new 
InitProducerIdRequestData()
+.setTransactionalId(transactionalId)
+.setTransactionTimeoutMs(transactionTimeoutMs)
+.setProducerId(producerIdAndEpoch.producerId)
+.setProducerEpoch(producerIdAndEpoch.epoch);
+InitProducerIdHandler handler = new InitProducerIdHandler(new 
InitProducerIdRequest.Builder(requestData),
+false, true);
+enqueueRequest(handler);

Review Comment:
   We enqueue request here, and when will we send out the request?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

2022-07-31 Thread GitBox


showuon commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r933989629


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -241,12 +246,40 @@ synchronized TransactionalRequestResult 
initializeTransactions(ProducerIdAndEpoc
 .setProducerId(producerIdAndEpoch.producerId)
 .setProducerEpoch(producerIdAndEpoch.epoch);
 InitProducerIdHandler handler = new InitProducerIdHandler(new 
InitProducerIdRequest.Builder(requestData),
-isEpochBump);
+isEpochBump, false);
 enqueueRequest(handler);
 return handler.result;
 }, State.INITIALIZING, "initTransactions");
 }
 
+synchronized void tryTransitioningIntoFatalBumpableError(RuntimeException 
cause) {
+if (currentState == State.FATAL_ERROR || currentState == 
State.FATAL_BUMPABLE_ERROR) {
+// Already in a fatal state, skip
+return;
+}
+String errorMessage = "Encountered unrecoverable error due to batch 
client side timeout";
+RuntimeException failure = cause == null
+? new KafkaException(errorMessage)
+: new KafkaException(errorMessage, cause);
+transitionToFatalBumpableError(failure);
+
+// If an epoch bump is possible, try to fence the current transaction 
by bumping
+if (canBumpEpoch()) {
+log.info("Invoking InitProducerId with current producer ID and 
epoch {} in order to bump the epoch to fence the current transaction", 
producerIdAndEpoch);
+InitProducerIdRequestData requestData = new 
InitProducerIdRequestData()
+.setTransactionalId(transactionalId)
+.setTransactionTimeoutMs(transactionTimeoutMs)
+.setProducerId(producerIdAndEpoch.producerId)
+.setProducerEpoch(producerIdAndEpoch.epoch);
+InitProducerIdHandler handler = new InitProducerIdHandler(new 
InitProducerIdRequest.Builder(requestData),
+false, true);
+enqueueRequest(handler);
+} else {
+log.info("Cannot bump epoch, transitioning into fatal error");
+transitionToFatalError(failure);

Review Comment:
   Let me make sure I understand your problem and solution. Are you saying the 
issue happens only when the **"timed out transaction ID" is not re-used**, and 
the abort marker arrived earlier than transaction records. Is my understanding 
correct? 
   
   And what we are trying to do is to force bump the epoch when encountering 
timeout exception, to let the fence mechanism help us abort previous in-flight 
transactions. And next, we enter `fatal error` state as before. Is that right?
   
   If so, then I have a question: what if the initPid request failed (i.e. 
failed to bump the epoch), what will happen? The pending transactions will 
still occur?
   
   Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

2022-07-31 Thread GitBox


showuon commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r933989629


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -241,12 +246,40 @@ synchronized TransactionalRequestResult 
initializeTransactions(ProducerIdAndEpoc
 .setProducerId(producerIdAndEpoch.producerId)
 .setProducerEpoch(producerIdAndEpoch.epoch);
 InitProducerIdHandler handler = new InitProducerIdHandler(new 
InitProducerIdRequest.Builder(requestData),
-isEpochBump);
+isEpochBump, false);
 enqueueRequest(handler);
 return handler.result;
 }, State.INITIALIZING, "initTransactions");
 }
 
+synchronized void tryTransitioningIntoFatalBumpableError(RuntimeException 
cause) {
+if (currentState == State.FATAL_ERROR || currentState == 
State.FATAL_BUMPABLE_ERROR) {
+// Already in a fatal state, skip
+return;
+}
+String errorMessage = "Encountered unrecoverable error due to batch 
client side timeout";
+RuntimeException failure = cause == null
+? new KafkaException(errorMessage)
+: new KafkaException(errorMessage, cause);
+transitionToFatalBumpableError(failure);
+
+// If an epoch bump is possible, try to fence the current transaction 
by bumping
+if (canBumpEpoch()) {
+log.info("Invoking InitProducerId with current producer ID and 
epoch {} in order to bump the epoch to fence the current transaction", 
producerIdAndEpoch);
+InitProducerIdRequestData requestData = new 
InitProducerIdRequestData()
+.setTransactionalId(transactionalId)
+.setTransactionTimeoutMs(transactionTimeoutMs)
+.setProducerId(producerIdAndEpoch.producerId)
+.setProducerEpoch(producerIdAndEpoch.epoch);
+InitProducerIdHandler handler = new InitProducerIdHandler(new 
InitProducerIdRequest.Builder(requestData),
+false, true);
+enqueueRequest(handler);
+} else {
+log.info("Cannot bump epoch, transitioning into fatal error");
+transitionToFatalError(failure);

Review Comment:
   Let me make sure I understand your problem and solution. Are you saying the 
issue happens only when the **"timed out transaction ID" is not re-used**, and 
the abort marker arrived earlier than transaction records. Is my understanding 
correct? 
   
   And what we are trying to do is to force bump the epoch when encountering 
timeout exception, to let the fence mechanism help us abort previous in-flight 
transactions. And next, we enter `fatal error` state as before. Is that right?
   
   So, question: what if the initPid request failed (i.e. failed to bump the 
epoch), what will happen? The pending transactions will still occur?
   
   Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org