rreddy-22 commented on code in PR #19429:
URL: https://github.com/apache/kafka/pull/19429#discussion_r2053117512
##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -654,6 +657,46 @@ public void initTransactions() {
transactionManager.maybeUpdateTransactionV2Enabled(true);
}
+ /**
+ * Initialize the transactional state for this producer, similar to {@link
#initTransactions()} but
+ * with additional handling for two-phase commit (2PC). Must be called
before any send operations
+ * that require a {@code transactionalId}.
+ * <p>
+ * Unlike the standard {@link #initTransactions()}, when {@code
keepPreparedTxn} is set to
+ * {@code true}, the producer does <em>not</em> automatically abort
existing transactions
+ * in the “prepare” phase. Instead, it enters a recovery mode allowing
only finalization
+ * of those previously prepared transactions. This behavior is crucial for
2PC scenarios,
+ * where transactions should remain intact until the external transaction
manager decides
+ * whether to commit or abort.
+ * <p>
+ * When {@code keepPreparedTxn} is {@code false}, this behaves like the
normal transactional
+ * initialization, aborting any unfinished transactions and resetting the
producer for
+ * new writes.
+ *
+ * @param keepPreparedTxn true to retain any in-flight prepared
transactions (necessary for 2PC
+ * recovery), false to abort existing transactions
and behave like
+ * the standard initTransactions
+ *
+ * @throws IllegalStateException if no {@code transactional.id} is
configured
+ * @throws org.apache.kafka.common.errors.UnsupportedVersionException if
the broker does not
+ * support transactions (i.e. if its version is lower than
0.11.0.0)
+ * @throws
org.apache.kafka.common.errors.TransactionalIdAuthorizationException if the
configured
+ * {@code transactional.id} is unauthorized either for normal
transaction writes or 2PC.
+ * @throws KafkaException if the producer encounters a fatal error or any
other unexpected error
+ * @throws TimeoutException if the time taken for initialize the
transaction has surpassed <code>max.block.ms</code>.
+ * @throws InterruptException if the thread is interrupted while blocked
+ */
+ public void initTransactions(boolean keepPreparedTxn) {
Review Comment:
Discussed offline, removed initTransactions() and merged the javadocs for
both
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]