showuon commented on code in PR #12392: URL: https://github.com/apache/kafka/pull/12392#discussion_r942413117
########## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ########## @@ -241,12 +246,40 @@ synchronized TransactionalRequestResult initializeTransactions(ProducerIdAndEpoc .setProducerId(producerIdAndEpoch.producerId) .setProducerEpoch(producerIdAndEpoch.epoch); InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData), - isEpochBump); + isEpochBump, false); enqueueRequest(handler); return handler.result; }, State.INITIALIZING, "initTransactions"); } + synchronized void tryTransitioningIntoFatalBumpableError(RuntimeException cause) { + if (currentState == State.FATAL_ERROR || currentState == State.FATAL_BUMPABLE_ERROR) { + // Already in a fatal state, skip + return; + } + String errorMessage = "Encountered unrecoverable error due to batch client side timeout"; + RuntimeException failure = cause == null + ? new KafkaException(errorMessage) + : new KafkaException(errorMessage, cause); + transitionToFatalBumpableError(failure); + + // If an epoch bump is possible, try to fence the current transaction by bumping + if (canBumpEpoch()) { + log.info("Invoking InitProducerId with current producer ID and epoch {} in order to bump the epoch to fence the current transaction", producerIdAndEpoch); + InitProducerIdRequestData requestData = new InitProducerIdRequestData() + .setTransactionalId(transactionalId) + .setTransactionTimeoutMs(transactionTimeoutMs) + .setProducerId(producerIdAndEpoch.producerId) + .setProducerEpoch(producerIdAndEpoch.epoch); + InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData), + false, true); + enqueueRequest(handler); Review Comment: We enqueue request here, and when will we send out the request? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org