[GitHub] [kafka] showuon commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…
showuon commented on code in PR #12392: URL: https://github.com/apache/kafka/pull/12392#discussion_r947888637 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws InterruptedException { } catch (ExecutionException e) { assertTrue(e.getCause() instanceof TimeoutException); } + runUntil(commitResult::isCompleted); // the commit shouldn't be completed without being sent since the produce request failed. assertFalse(commitResult.isSuccessful()); // the commit shouldn't succeed since the produce request failed. -assertThrows(TimeoutException.class, commitResult::await); +assertThrows(KafkaException.class, commitResult::await); -assertTrue(transactionManager.hasAbortableError()); -assertTrue(transactionManager.hasOngoingTransaction()); +assertTrue(transactionManager.hasFatalBumpableError()); +assertFalse(transactionManager.hasOngoingTransaction()); assertFalse(transactionManager.isCompleting()); -assertTrue(transactionManager.transactionContainsPartition(tp0)); -TransactionalRequestResult abortResult = transactionManager.beginAbort(); - -prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, epoch); -prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1)); -runUntil(abortResult::isCompleted); -assertTrue(abortResult.isSuccessful()); -assertFalse(transactionManager.hasOngoingTransaction()); -assertFalse(transactionManager.transactionContainsPartition(tp0)); +assertThrows(KafkaException.class, () -> transactionManager.beginAbort()); Review Comment: I personally like the solution to make the producer entering fatal error state. But I'd like to hear others' opinion since it will affect producer's behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…
showuon commented on code in PR #12392: URL: https://github.com/apache/kafka/pull/12392#discussion_r943219606 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws InterruptedException { } catch (ExecutionException e) { assertTrue(e.getCause() instanceof TimeoutException); } + runUntil(commitResult::isCompleted); // the commit shouldn't be completed without being sent since the produce request failed. assertFalse(commitResult.isSuccessful()); // the commit shouldn't succeed since the produce request failed. -assertThrows(TimeoutException.class, commitResult::await); +assertThrows(KafkaException.class, commitResult::await); -assertTrue(transactionManager.hasAbortableError()); -assertTrue(transactionManager.hasOngoingTransaction()); +assertTrue(transactionManager.hasFatalBumpableError()); +assertFalse(transactionManager.hasOngoingTransaction()); assertFalse(transactionManager.isCompleting()); -assertTrue(transactionManager.transactionContainsPartition(tp0)); -TransactionalRequestResult abortResult = transactionManager.beginAbort(); - -prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, producerId, epoch); -prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch + 1)); -runUntil(abortResult::isCompleted); -assertTrue(abortResult.isSuccessful()); -assertFalse(transactionManager.hasOngoingTransaction()); -assertFalse(transactionManager.transactionContainsPartition(tp0)); +assertThrows(KafkaException.class, () -> transactionManager.beginAbort()); Review Comment: So, it looks like after this patch, when batch expiration or timeout error, the producer will enter fatal error state after bumping epoch. But before this patch, the we'll abort it and continue the transaction work. Is that right? Sorry, I didn't realize this situation. This will impact current user behavior, so we need more discussion. I'll ping some experts in this PR, and hope they will help provide comments. cc @artemlivshits @ijuma -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…
showuon commented on code in PR #12392: URL: https://github.com/apache/kafka/pull/12392#discussion_r942413117 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -241,12 +246,40 @@ synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoc .setProducerId(producerIdAndEpoch.producerId) .setProducerEpoch(producerIdAndEpoch.epoch); InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData), -isEpochBump); +isEpochBump, false); enqueueRequest(handler); return handler.result; }, State.INITIALIZING, "initTransactions"); } +synchronized void tryTransitioningIntoFatalBumpableError(RuntimeException cause) { +if (currentState == State.FATAL_ERROR || currentState == State.FATAL_BUMPABLE_ERROR) { +// Already in a fatal state, skip +return; +} +String errorMessage = "Encountered unrecoverable error due to batch client side timeout"; +RuntimeException failure = cause == null +? new KafkaException(errorMessage) +: new KafkaException(errorMessage, cause); +transitionToFatalBumpableError(failure); + +// If an epoch bump is possible, try to fence the current transaction by bumping +if (canBumpEpoch()) { +log.info("Invoking InitProducerId with current producer ID and epoch {} in order to bump the epoch to fence the current transaction", producerIdAndEpoch); +InitProducerIdRequestData requestData = new InitProducerIdRequestData() +.setTransactionalId(transactionalId) +.setTransactionTimeoutMs(transactionTimeoutMs) +.setProducerId(producerIdAndEpoch.producerId) +.setProducerEpoch(producerIdAndEpoch.epoch); +InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData), +false, true); +enqueueRequest(handler); Review Comment: We enqueue request here, and when will we send out the request? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…
showuon commented on code in PR #12392: URL: https://github.com/apache/kafka/pull/12392#discussion_r933989629 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -241,12 +246,40 @@ synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoc .setProducerId(producerIdAndEpoch.producerId) .setProducerEpoch(producerIdAndEpoch.epoch); InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData), -isEpochBump); +isEpochBump, false); enqueueRequest(handler); return handler.result; }, State.INITIALIZING, "initTransactions"); } +synchronized void tryTransitioningIntoFatalBumpableError(RuntimeException cause) { +if (currentState == State.FATAL_ERROR || currentState == State.FATAL_BUMPABLE_ERROR) { +// Already in a fatal state, skip +return; +} +String errorMessage = "Encountered unrecoverable error due to batch client side timeout"; +RuntimeException failure = cause == null +? new KafkaException(errorMessage) +: new KafkaException(errorMessage, cause); +transitionToFatalBumpableError(failure); + +// If an epoch bump is possible, try to fence the current transaction by bumping +if (canBumpEpoch()) { +log.info("Invoking InitProducerId with current producer ID and epoch {} in order to bump the epoch to fence the current transaction", producerIdAndEpoch); +InitProducerIdRequestData requestData = new InitProducerIdRequestData() +.setTransactionalId(transactionalId) +.setTransactionTimeoutMs(transactionTimeoutMs) +.setProducerId(producerIdAndEpoch.producerId) +.setProducerEpoch(producerIdAndEpoch.epoch); +InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData), +false, true); +enqueueRequest(handler); +} else { +log.info("Cannot bump epoch, transitioning into fatal error"); +transitionToFatalError(failure); Review Comment: Let me make sure I understand your problem and solution. Are you saying the issue happens only when the **"timed out transaction ID" is not re-used**, and the abort marker arrived earlier than transaction records. Is my understanding correct? And what we are trying to do is to force bump the epoch when encountering timeout exception, to let the fence mechanism help us abort previous in-flight transactions. And next, we enter `fatal error` state as before. Is that right? If so, then I have a question: what if the initPid request failed (i.e. failed to bump the epoch), what will happen? The pending transactions will still occur? Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…
showuon commented on code in PR #12392: URL: https://github.com/apache/kafka/pull/12392#discussion_r933989629 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -241,12 +246,40 @@ synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoc .setProducerId(producerIdAndEpoch.producerId) .setProducerEpoch(producerIdAndEpoch.epoch); InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData), -isEpochBump); +isEpochBump, false); enqueueRequest(handler); return handler.result; }, State.INITIALIZING, "initTransactions"); } +synchronized void tryTransitioningIntoFatalBumpableError(RuntimeException cause) { +if (currentState == State.FATAL_ERROR || currentState == State.FATAL_BUMPABLE_ERROR) { +// Already in a fatal state, skip +return; +} +String errorMessage = "Encountered unrecoverable error due to batch client side timeout"; +RuntimeException failure = cause == null +? new KafkaException(errorMessage) +: new KafkaException(errorMessage, cause); +transitionToFatalBumpableError(failure); + +// If an epoch bump is possible, try to fence the current transaction by bumping +if (canBumpEpoch()) { +log.info("Invoking InitProducerId with current producer ID and epoch {} in order to bump the epoch to fence the current transaction", producerIdAndEpoch); +InitProducerIdRequestData requestData = new InitProducerIdRequestData() +.setTransactionalId(transactionalId) +.setTransactionTimeoutMs(transactionTimeoutMs) +.setProducerId(producerIdAndEpoch.producerId) +.setProducerEpoch(producerIdAndEpoch.epoch); +InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData), +false, true); +enqueueRequest(handler); +} else { +log.info("Cannot bump epoch, transitioning into fatal error"); +transitionToFatalError(failure); Review Comment: Let me make sure I understand your problem and solution. Are you saying the issue happens only when the **"timed out transaction ID" is not re-used**, and the abort marker arrived earlier than transaction records. Is my understanding correct? And what we are trying to do is to force bump the epoch when encountering timeout exception, to let the fence mechanism help us abort previous in-flight transactions. And next, we enter `fatal error` state as before. Is that right? So, question: what if the initPid request failed (i.e. failed to bump the epoch), what will happen? The pending transactions will still occur? Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org