This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 62fe528f4b3 KAFKA-19082: [1/4] Add client config for enable2PC and 
overloaded initProducerId (KIP-939) (#19429)
62fe528f4b3 is described below

commit 62fe528f4b37def581cc664558cd35ad21603ba2
Author: Ritika Reddy <[email protected]>
AuthorDate: Thu Apr 24 09:41:06 2025 -0700

    KAFKA-19082: [1/4] Add client config for enable2PC and overloaded 
initProducerId (KIP-939) (#19429)
    
    This is part of the client side changes required to enable 2PC for
    KIP-939
    
    **Producer Config:**
    transaction.two.phase.commit.enable The default would be ‘false’.  If
    set to ‘true’, the broker is informed that the client is participating
    in two phase commit protocol and transactions that this client starts
    never expire.
    
    **Overloaded InitProducerId method**
    If the value is 'true' then the corresponding field is set in the
    InitProducerIdRequest
    
    Reviewers: Justine Olshan <[email protected]>, Artem Livshits
     <[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../kafka/clients/producer/KafkaProducer.java      |  42 +++++--
 .../kafka/clients/producer/MockProducer.java       |   2 +-
 .../apache/kafka/clients/producer/Producer.java    |   9 +-
 .../kafka/clients/producer/ProducerConfig.java     |  24 ++++
 .../producer/internals/TransactionManager.java     |  26 +++-
 .../kafka/clients/producer/KafkaProducerTest.java  |  57 ++++++++-
 .../kafka/clients/producer/ProducerConfigTest.java |  23 ++++
 .../clients/producer/internals/SenderTest.java     |  42 +++----
 .../producer/internals/TransactionManagerTest.java | 133 ++++++++++++++++-----
 10 files changed, 287 insertions(+), 73 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 4e0f338af5d..2d4c279e707 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -98,7 +98,7 @@
               
files="(AbstractFetch|ClientTelemetryReporter|ConsumerCoordinator|CommitRequestManager|FetchCollector|OffsetFetcherUtils|KafkaProducer|Sender|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler|MockAdminClient).java"/>
 
     <suppress checks="JavaNCSS"
-              
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>
+              
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaProducerTest).java"/>
 
     <suppress checks="NPathComplexity"
               
files="(AbstractMembershipManager|ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|Authorizer|FetchSessionHandler|RecordAccumulator|Shell|MockConsumer).java"/>
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index baaf13388d6..d46a2204beb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -595,14 +595,17 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
 
         if (config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) {
             final String transactionalId = 
config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+            final boolean enable2PC = 
config.getBoolean(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
             final int transactionTimeoutMs = 
config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
             final long retryBackoffMs = 
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
+            
             transactionManager = new TransactionManager(
                 logContext,
                 transactionalId,
                 transactionTimeoutMs,
                 retryBackoffMs,
-                apiVersions
+                apiVersions,
+                enable2PC
             );
 
             if (transactionManager.isTransactional())
@@ -617,8 +620,13 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
     }
 
     /**
+     * Initialize the transactional state for this producer, similar to {@link 
#initTransactions()} but
+     * with additional capabilities to keep a previously prepared transaction.
+     *
      * Needs to be called before any other methods when the {@code 
transactional.id} is set in the configuration.
-     * This method does the following:
+     *
+     * When {@code keepPreparedTxn} is {@code false}, this behaves like the 
standard transactional
+     * initialization where the method does the following:
      * <ol>
      * <li>Ensures any transactions initiated by previous instances of the 
producer with the same
      *      {@code transactional.id} are completed. If the previous instance 
had failed with a transaction in
@@ -627,26 +635,38 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
      * <li>Gets the internal producer id and epoch, used in all future 
transactional
      *      messages issued by the producer.</li>
      * </ol>
+     *
+     * <p>
+     * When {@code keepPreparedTxn} is set to {@code true}, the producer does 
<em>not</em> automatically abort existing
+     * transactions. Instead, it enters a recovery mode allowing only 
finalization of those previously
+     * prepared transactions.
+     * This behavior is especially crucial for 2PC scenarios, where 
transactions should remain intact
+     * until the external transaction manager decides whether to commit or 
abort.
+     * <p>
+     *
+     * @param keepPreparedTxn true to retain any in-flight prepared 
transactions (necessary for 2PC
+     *                        recovery), false to abort existing transactions 
and behave like
+     *                        the standard initTransactions.
+     *
      * Note that this method will raise {@link TimeoutException} if the 
transactional state cannot
      * be initialized before expiration of {@code max.block.ms}. Additionally, 
it will raise {@link InterruptException}
      * if interrupted. It is safe to retry in either case, but once the 
transactional state has been successfully
      * initialized, this method should no longer be used.
      *
-     * @throws IllegalStateException if no {@code transactional.id} has been 
configured
-     * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
-     *         does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
-     * @throws org.apache.kafka.common.errors.AuthorizationException error 
indicating that the configured
-     *         transactional.id is not authorized, or the idempotent producer 
id is unavailable. See the exception for
-     *         more details.  User may retry this function call after fixing 
the permission.
-     * @throws KafkaException if the producer has encountered a previous fatal 
error or for any other unexpected error
+     * @throws IllegalStateException if no {@code transactional.id} is 
configured
+     * @throws org.apache.kafka.common.errors.UnsupportedVersionException if 
the broker does not
+     *         support transactions (i.e. if its version is lower than 
0.11.0.0)
+     * @throws 
org.apache.kafka.common.errors.TransactionalIdAuthorizationException if the 
configured
+     *         {@code transactional.id} is unauthorized either for normal 
transaction writes or 2PC.
+     * @throws KafkaException if the producer encounters a fatal error or any 
other unexpected error
      * @throws TimeoutException if the time taken for initialize the 
transaction has surpassed <code>max.block.ms</code>.
      * @throws InterruptException if the thread is interrupted while blocked
      */
-    public void initTransactions() {
+    public void initTransactions(boolean keepPreparedTxn) {
         throwIfNoTransactionManager();
         throwIfProducerClosed();
         long now = time.nanoseconds();
-        TransactionalRequestResult result = 
transactionManager.initializeTransactions();
+        TransactionalRequestResult result = 
transactionManager.initializeTransactions(keepPreparedTxn);
         sender.wakeup();
         result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
         producerMetrics.recordInit(time.nanoseconds() - now);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index a4aac86df09..e3c5a23ca51 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -142,7 +142,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
     }
 
     @Override
-    public void initTransactions() {
+    public void initTransactions(boolean keepPreparedTxn) {
         verifyNotClosed();
         verifyNotFenced();
         if (this.transactionInitialized) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index 798034dda6d..a5cd92295ff 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -42,7 +42,14 @@ public interface Producer<K, V> extends Closeable {
     /**
      * See {@link KafkaProducer#initTransactions()}
      */
-    void initTransactions();
+    default void initTransactions() {
+        initTransactions(false);
+    }
+
+    /**
+     * See {@link KafkaProducer#initTransactions(boolean)}
+     */
+    void initTransactions(boolean keepPreparedTxn);
 
     /**
      * See {@link KafkaProducer#beginTransaction()}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 949c6c167ba..362d205e8c1 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -355,6 +355,11 @@ public class ProducerConfig extends AbstractConfig {
             "By default the TransactionId is not configured, which means 
transactions cannot be used. " +
             "Note that, by default, transactions require a cluster of at least 
three brokers which is the recommended setting for production; for development 
you can change this, by adjusting broker setting 
<code>transaction.state.log.replication.factor</code>.";
 
+    /** <code> transaction.two.phase.commit.enable </code> */
+    public static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG = 
"transaction.two.phase.commit.enable";
+    private static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC = "If 
set to true, then the broker is informed that the client is participating in " +
+            "two phase commit protocol and transactions that this client 
starts never expire.";
+
     /**
      * <code>security.providers</code>
      */
@@ -526,6 +531,11 @@ public class ProducerConfig extends AbstractConfig {
                                         new ConfigDef.NonEmptyString(),
                                         Importance.LOW,
                                         TRANSACTIONAL_ID_DOC)
+                                
.define(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG,
+                                        Type.BOOLEAN,
+                                        false,
+                                        Importance.LOW,
+                                        
TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC)
                                 
.define(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
                                         Type.STRING,
                                         
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY,
@@ -609,6 +619,20 @@ public class ProducerConfig extends AbstractConfig {
         if (!idempotenceEnabled && userConfiguredTransactions) {
             throw new ConfigException("Cannot set a " + 
ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
         }
+
+        // Validate that transaction.timeout.ms is not set when 
transaction.two.phase.commit.enable is true
+        // In standard Kafka transactions, the broker enforces 
transaction.timeout.ms and aborts any
+        // transaction that isn't completed in time. With two-phase commit 
(2PC), an external coordinator
+        // decides when to finalize, so broker-side timeouts don't apply. 
Disallow using both.
+        boolean enable2PC = 
this.getBoolean(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
+        boolean userConfiguredTransactionTimeout = 
originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG);
+        if (enable2PC && userConfiguredTransactionTimeout) {
+            throw new ConfigException(
+                "Cannot set " + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG +
+                " when " + 
ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG +
+                " is set to true. Transactions will not expire with two-phase 
commit enabled."
+            );
+        }
     }
 
     private static String parseAcks(String acksString) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index b52d5d4836d..e17dc15d239 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -143,6 +143,7 @@ public class TransactionManager {
     private volatile boolean clientSideEpochBumpRequired = false;
     private volatile long latestFinalizedFeaturesEpoch = -1;
     private volatile boolean isTransactionV2Enabled = false;
+    private final boolean enable2PC;
 
     private enum State {
         UNINITIALIZED,
@@ -203,7 +204,8 @@ public class TransactionManager {
                               final String transactionalId,
                               final int transactionTimeoutMs,
                               final long retryBackoffMs,
-                              final ApiVersions apiVersions) {
+                              final ApiVersions apiVersions,
+                              final boolean enable2PC) {
         this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
         this.transactionalId = transactionalId;
         this.log = logContext.logger(TransactionManager.class);
@@ -220,6 +222,7 @@ public class TransactionManager {
         this.retryBackoffMs = retryBackoffMs;
         this.txnPartitionMap = new TxnPartitionMap(logContext);
         this.apiVersions = apiVersions;
+        this.enable2PC = enable2PC;
     }
 
     /**
@@ -279,11 +282,18 @@ public class TransactionManager {
         return Thread.currentThread() instanceof Sender.SenderThread;
     }
 
-    public synchronized TransactionalRequestResult initializeTransactions() {
-        return initializeTransactions(ProducerIdAndEpoch.NONE);
+    synchronized TransactionalRequestResult 
initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) {
+        return initializeTransactions(producerIdAndEpoch, false);
     }
 
-    synchronized TransactionalRequestResult 
initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) {
+    public synchronized TransactionalRequestResult 
initializeTransactions(boolean keepPreparedTxn) {
+        return initializeTransactions(ProducerIdAndEpoch.NONE, 
keepPreparedTxn);
+    }
+
+    synchronized TransactionalRequestResult initializeTransactions(
+        ProducerIdAndEpoch producerIdAndEpoch,
+        boolean keepPreparedTxn
+    ) {
         maybeFailWithError();
 
         boolean isEpochBump = producerIdAndEpoch != ProducerIdAndEpoch.NONE;
@@ -292,6 +302,9 @@ public class TransactionManager {
             if (!isEpochBump) {
                 transitionTo(State.INITIALIZING);
                 log.info("Invoking InitProducerId for the first time in order 
to acquire a producer ID");
+                if (keepPreparedTxn) {
+                    log.info("Invoking InitProducerId with keepPreparedTxn set 
to true for 2PC transactions");
+                }
             } else {
                 log.info("Invoking InitProducerId with current producer ID and 
epoch {} in order to bump the epoch", producerIdAndEpoch);
             }
@@ -299,7 +312,10 @@ public class TransactionManager {
                     .setTransactionalId(transactionalId)
                     .setTransactionTimeoutMs(transactionTimeoutMs)
                     .setProducerId(producerIdAndEpoch.producerId)
-                    .setProducerEpoch(producerIdAndEpoch.epoch);
+                    .setProducerEpoch(producerIdAndEpoch.epoch)
+                    .setEnable2Pc(enable2PC)
+                    .setKeepPreparedTxn(keepPreparedTxn);
+
             InitProducerIdHandler handler = new InitProducerIdHandler(new 
InitProducerIdRequest.Builder(requestData),
                     isEpochBump);
             enqueueRequest(handler);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 48569f1a20c..d8cfe6578c0 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -75,6 +75,7 @@ import 
org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
 import org.apache.kafka.common.requests.EndTxnResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
 import org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
@@ -103,6 +104,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
@@ -1315,7 +1317,7 @@ public class KafkaProducerTest {
                     ((FindCoordinatorRequest) request).data().keyType() == 
FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
                 FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"bad-transaction", NODE));
 
-            Future<?> future = executor.submit(producer::initTransactions);
+            Future<?> future = executor.submit(() -> 
producer.initTransactions());
             TestUtils.waitForCondition(client::hasInFlightRequests,
                 "Timed out while waiting for expected `InitProducerId` request 
to be sent");
 
@@ -1390,6 +1392,59 @@ public class KafkaProducerTest {
         }
     }
 
+    @ParameterizedTest
+    @CsvSource({
+        "true, false",
+        "true, true",
+        "false, true"
+    })
+    public void 
testInitTransactionsWithKeepPreparedTxnAndTwoPhaseCommit(boolean 
keepPreparedTxn, boolean enable2PC) {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-txn-id");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        if (enable2PC) {
+            
configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, true);
+        }
+
+        Time time = new MockTime(1);
+        MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+        ProducerMetadata metadata = newMetadata(0, 0, Long.MAX_VALUE);
+        MockClient client = new MockClient(time, metadata);
+        client.updateMetadata(initialUpdateResponse);
+
+        // Capture flags from the InitProducerIdRequest
+        boolean[] requestFlags = new boolean[2]; // [keepPreparedTxn, 
enable2Pc]
+        
+        client.prepareResponse(
+            request -> request instanceof FindCoordinatorRequest &&
+                ((FindCoordinatorRequest) request).data().keyType() == 
FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
+            FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"test-txn-id", NODE));
+            
+        client.prepareResponse(
+            request -> {
+                if (request instanceof InitProducerIdRequest) {
+                    InitProducerIdRequest initRequest = 
(InitProducerIdRequest) request;
+                    requestFlags[0] = initRequest.data().keepPreparedTxn();
+                    requestFlags[1] = initRequest.data().enable2Pc();
+                    return true;
+                }
+                return false;
+            },
+            initProducerIdResponse(1L, (short) 5, Errors.NONE));
+            
+        try (Producer<String, String> producer = kafkaProducer(configs, new 
StringSerializer(),
+                new StringSerializer(), metadata, client, null, time)) {
+            producer.initTransactions(keepPreparedTxn);
+            
+            // Verify request flags match expected values
+            assertEquals(keepPreparedTxn, requestFlags[0], 
+                "keepPreparedTxn flag should match input parameter");
+            assertEquals(enable2PC, requestFlags[1], 
+                "enable2Pc flag should match producer configuration");
+        }
+    }
+    
     @Test
     public void testClusterAuthorizationFailure() throws Exception {
         int maxBlockMs = 500;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
index 830711c0e54..207bac6476f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
@@ -145,4 +145,27 @@ public class ProducerConfigTest {
         configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
         assertDoesNotThrow(() -> new ProducerConfig(configs));
     }
+
+    @Test
+    void testTwoPhaseCommitIncompatibleWithTransactionTimeout() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
keySerializerClass);
+        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
valueSerializerClass);
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-txn-id");
+        configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, 
true);
+        configs.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);
+        
+        ConfigException ce = assertThrows(ConfigException.class, () -> new 
ProducerConfig(configs));
+        
assertTrue(ce.getMessage().contains(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG));
+        
assertTrue(ce.getMessage().contains(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG));
+        
+        // Verify that setting one but not the other is valid
+        configs.remove(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
+        assertDoesNotThrow(() -> new ProducerConfig(configs));
+        
+        configs.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);
+        configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, 
false);
+        assertDoesNotThrow(() -> new ProducerConfig(configs));
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index e67be76eb9b..74a56420246 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -490,7 +490,7 @@ public class SenderTest {
 
             ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
             apiVersions.update("0", 
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
-            TransactionManager txnManager = new TransactionManager(logContext, 
"testUnresolvedSeq", 60000, 100, apiVersions);
+            TransactionManager txnManager = new TransactionManager(logContext, 
"testUnresolvedSeq", 60000, 100, apiVersions, false);
 
             setupWithTransactionState(txnManager);
             doInitTransactions(txnManager, producerIdAndEpoch);
@@ -616,10 +616,10 @@ public class SenderTest {
         // Initialize transaction manager. InitProducerId will be queued up 
until metadata response
         // is processed and FindCoordinator can be sent to `leastLoadedNode`.
         TransactionManager transactionManager = new TransactionManager(new 
LogContext(), "testInitProducerIdWithPendingMetadataRequest",
-                60000, 100L, new ApiVersions());
+                60000, 100L, new ApiVersions(), false);
         setupWithTransactionState(transactionManager, false, null, false);
         ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(producerId, (short) 0);
-        transactionManager.initializeTransactions();
+        transactionManager.initializeTransactions(false);
         sender.runOnce();
 
         // Process metadata response, prepare FindCoordinator and 
InitProducerId responses.
@@ -668,10 +668,10 @@ public class SenderTest {
         client = new MockClient(time, metadata);
 
         TransactionManager transactionManager = new TransactionManager(new 
LogContext(), "testNodeNotReady",
-                60000, 100L, new ApiVersions());
+                60000, 100L, new ApiVersions(), false);
         setupWithTransactionState(transactionManager, false, null, true);
         ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(producerId, (short) 0);
-        transactionManager.initializeTransactions();
+        transactionManager.initializeTransactions(false);
         sender.runOnce();
 
         Node node = metadata.fetch().nodes().get(0);
@@ -1510,7 +1510,7 @@ public class SenderTest {
     public void testUnresolvedSequencesAreNotFatal() throws Exception {
         ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
         apiVersions.update("0", 
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
-        TransactionManager txnManager = new TransactionManager(logContext, 
"testUnresolvedSeq", 60000, 100, apiVersions);
+        TransactionManager txnManager = new TransactionManager(logContext, 
"testUnresolvedSeq", 60000, 100, apiVersions, false);
 
         setupWithTransactionState(txnManager);
         doInitTransactions(txnManager, producerIdAndEpoch);
@@ -1795,7 +1795,7 @@ public class SenderTest {
     @Test
     public void 
testTransactionalUnknownProducerHandlingWhenRetentionLimitReached() throws 
Exception {
         final long producerId = 343434L;
-        TransactionManager transactionManager = new 
TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions);
+        TransactionManager transactionManager = new 
TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions, 
false);
 
         setupWithTransactionState(transactionManager);
         doInitTransactions(transactionManager, new 
ProducerIdAndEpoch(producerId, (short) 0));
@@ -2352,7 +2352,7 @@ public class SenderTest {
     public void testTransactionalSplitBatchAndSend() throws Exception {
         ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
         TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1);
-        TransactionManager txnManager = new TransactionManager(logContext, 
"testSplitBatchAndSend", 60000, 100, apiVersions);
+        TransactionManager txnManager = new TransactionManager(logContext, 
"testSplitBatchAndSend", 60000, 100, apiVersions, false);
 
         setupWithTransactionState(txnManager);
         doInitTransactions(txnManager, producerIdAndEpoch);
@@ -2694,7 +2694,7 @@ public class SenderTest {
         Metrics m = new Metrics();
         SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
         try {
-            TransactionManager txnManager = new TransactionManager(logContext, 
"testTransactionalRequestsSentOnShutdown", 6000, 100, apiVersions);
+            TransactionManager txnManager = new TransactionManager(logContext, 
"testTransactionalRequestsSentOnShutdown", 6000, 100, apiVersions, false);
             Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
                     maxRetries, senderMetrics, time, REQUEST_TIMEOUT, 
RETRY_BACKOFF_MS, txnManager);
 
@@ -2727,7 +2727,7 @@ public class SenderTest {
             int lingerMs = 50;
             SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
 
-            TransactionManager txnManager = new TransactionManager(logContext, 
"txnId", 6000, 100, apiVersions);
+            TransactionManager txnManager = new TransactionManager(logContext, 
"txnId", 6000, 100, apiVersions, false);
             setupWithTransactionState(txnManager, lingerMs);
 
             Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
@@ -2784,7 +2784,7 @@ public class SenderTest {
         try (Metrics m = new Metrics()) {
             SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
 
-            TransactionManager txnManager = new TransactionManager(logContext, 
"txnId", 6000, 100, apiVersions);
+            TransactionManager txnManager = new TransactionManager(logContext, 
"txnId", 6000, 100, apiVersions, false);
             setupWithTransactionState(txnManager);
 
             Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
@@ -2855,7 +2855,7 @@ public class SenderTest {
         Metrics m = new Metrics();
         SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
         try {
-            TransactionManager txnManager = new TransactionManager(logContext, 
"testIncompleteTransactionAbortOnShutdown", 6000, 100, apiVersions);
+            TransactionManager txnManager = new TransactionManager(logContext, 
"testIncompleteTransactionAbortOnShutdown", 6000, 100, apiVersions, false);
             Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
                     maxRetries, senderMetrics, time, REQUEST_TIMEOUT, 
RETRY_BACKOFF_MS, txnManager);
 
@@ -2889,7 +2889,7 @@ public class SenderTest {
         Metrics m = new Metrics();
         SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
         try {
-            TransactionManager txnManager = new TransactionManager(logContext, 
"testForceShutdownWithIncompleteTransaction", 6000, 100, apiVersions);
+            TransactionManager txnManager = new TransactionManager(logContext, 
"testForceShutdownWithIncompleteTransaction", 6000, 100, apiVersions, false);
             Sender sender = new Sender(logContext, client, metadata, 
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
                     maxRetries, senderMetrics, time, REQUEST_TIMEOUT, 
RETRY_BACKOFF_MS, txnManager);
 
@@ -2919,7 +2919,7 @@ public class SenderTest {
     @Test
     public void testTransactionAbortedExceptionOnAbortWithoutError() throws 
InterruptedException {
         ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
-        TransactionManager txnManager = new TransactionManager(logContext, 
"testTransactionAbortedExceptionOnAbortWithoutError", 60000, 100, apiVersions);
+        TransactionManager txnManager = new TransactionManager(logContext, 
"testTransactionAbortedExceptionOnAbortWithoutError", 60000, 100, apiVersions, 
false);
 
         setupWithTransactionState(txnManager, false, null);
         doInitTransactions(txnManager, producerIdAndEpoch);
@@ -2945,7 +2945,7 @@ public class SenderTest {
     public void testDoNotPollWhenNoRequestSent() {
         client = spy(new MockClient(time, metadata));
 
-        TransactionManager txnManager = new TransactionManager(logContext, 
"testDoNotPollWhenNoRequestSent", 6000, 100, apiVersions);
+        TransactionManager txnManager = new TransactionManager(logContext, 
"testDoNotPollWhenNoRequestSent", 6000, 100, apiVersions, false);
         ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
         setupWithTransactionState(txnManager);
         doInitTransactions(txnManager, producerIdAndEpoch);
@@ -2957,7 +2957,7 @@ public class SenderTest {
     @Test
     public void testTooLargeBatchesAreSafelyRemoved() throws 
InterruptedException {
         ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
-        TransactionManager txnManager = new TransactionManager(logContext, 
"testSplitBatchAndSend", 60000, 100, apiVersions);
+        TransactionManager txnManager = new TransactionManager(logContext, 
"testSplitBatchAndSend", 60000, 100, apiVersions, false);
 
         setupWithTransactionState(txnManager, false, null);
         doInitTransactions(txnManager, producerIdAndEpoch);
@@ -3026,7 +3026,7 @@ public class SenderTest {
     public void testReceiveFailedBatchTwiceWithTransactions() throws Exception 
{
         ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
         apiVersions.update("0", 
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
-        TransactionManager txnManager = new TransactionManager(logContext, 
"testFailTwice", 60000, 100, apiVersions);
+        TransactionManager txnManager = new TransactionManager(logContext, 
"testFailTwice", 60000, 100, apiVersions, false);
 
         setupWithTransactionState(txnManager);
         doInitTransactions(txnManager, producerIdAndEpoch);
@@ -3076,7 +3076,7 @@ public class SenderTest {
     public void testInvalidTxnStateIsAnAbortableError() throws Exception {
         ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
         apiVersions.update("0", 
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
-        TransactionManager txnManager = new TransactionManager(logContext, 
"testInvalidTxnState", 60000, 100, apiVersions);
+        TransactionManager txnManager = new TransactionManager(logContext, 
"testInvalidTxnState", 60000, 100, apiVersions, false);
 
         setupWithTransactionState(txnManager);
         doInitTransactions(txnManager, producerIdAndEpoch);
@@ -3115,7 +3115,7 @@ public class SenderTest {
     public void testTransactionAbortableExceptionIsAnAbortableError() throws 
Exception {
         ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(123456L, (short) 0);
         apiVersions.update("0", 
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
-        TransactionManager txnManager = new TransactionManager(logContext, 
"textTransactionAbortableException", 60000, 100, apiVersions);
+        TransactionManager txnManager = new TransactionManager(logContext, 
"textTransactionAbortableException", 60000, 100, apiVersions, false);
 
         setupWithTransactionState(txnManager);
         doInitTransactions(txnManager, producerIdAndEpoch);
@@ -3617,7 +3617,7 @@ public class SenderTest {
     }
 
     private TransactionManager createTransactionManager() {
-        return new TransactionManager(new LogContext(), null, 0, 
RETRY_BACKOFF_MS, new ApiVersions());
+        return new TransactionManager(new LogContext(), null, 0, 
RETRY_BACKOFF_MS, new ApiVersions(), false);
     }
     
     private void setupWithTransactionState(TransactionManager 
transactionManager) {
@@ -3719,7 +3719,7 @@ public class SenderTest {
     }
 
     private void doInitTransactions(TransactionManager transactionManager, 
ProducerIdAndEpoch producerIdAndEpoch) {
-        TransactionalRequestResult result = 
transactionManager.initializeTransactions();
+        TransactionalRequestResult result = 
transactionManager.initializeTransactions(false);
         prepareFindCoordinatorResponse(Errors.NONE, 
transactionManager.transactionalId());
         sender.runOnce();
         sender.runOnce();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 5520df03467..a19e4977314 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -159,17 +159,28 @@ public class TransactionManagerTest {
         this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, 
singletonMap("test", 2)));
         this.brokerNode = new Node(0, "localhost", 2211);
 
-        initializeTransactionManager(Optional.of(transactionalId), false);
+        initializeTransactionManager(Optional.of(transactionalId), false, 
false);
+    }
+
+    private void initializeTransactionManager(
+        Optional<String> transactionalId,
+        boolean transactionV2Enabled
+    ) {
+        initializeTransactionManager(transactionalId, transactionV2Enabled, 
false);
     }
 
-    private void initializeTransactionManager(Optional<String> 
transactionalId, boolean transactionV2Enabled) {
+    private void initializeTransactionManager(
+        Optional<String> transactionalId,
+        boolean transactionV2Enabled,
+        boolean enable2pc
+    ) {
         Metrics metrics = new Metrics(time);
 
         apiVersions.update("0", new NodeApiVersions(Arrays.asList(
             new ApiVersion()
                 .setApiKey(ApiKeys.INIT_PRODUCER_ID.id)
                 .setMinVersion((short) 0)
-                .setMaxVersion((short) 3),
+                .setMaxVersion((short) 6),
             new ApiVersion()
                 .setApiKey(ApiKeys.PRODUCE.id)
                 .setMinVersion((short) 0)
@@ -189,7 +200,8 @@ public class TransactionManagerTest {
             finalizedFeaturesEpoch));
         finalizedFeaturesEpoch += 1;
         this.transactionManager = new TestableTransactionManager(logContext, 
transactionalId.orElse(null),
-                transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions);
+                transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions, 
enable2pc);
+
 
         int batchSize = 16 * 1024;
         int deliveryTimeoutMs = 3000;
@@ -1039,7 +1051,7 @@ public class TransactionManagerTest {
                 .setMinVersionLevel((short) 1)),
             0));
         this.transactionManager = new TestableTransactionManager(logContext, 
transactionalId,
-            transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions);
+            transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions, 
false);
 
         int batchSize = 16 * 1024;
         int deliveryTimeoutMs = 3000;
@@ -1063,7 +1075,7 @@ public class TransactionManagerTest {
     public void testDisconnectAndRetry() {
         // This is called from the initTransactions method in the producer as 
the first order of business.
         // It finds the coordinator and then gets a PID.
-        transactionManager.initializeTransactions();
+        transactionManager.initializeTransactions(false);
         prepareFindCoordinatorResponse(Errors.NONE, true, 
CoordinatorType.TRANSACTION, transactionalId);
         runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) == null);
 
@@ -1076,12 +1088,12 @@ public class TransactionManagerTest {
     public void testInitializeTransactionsTwiceRaisesError() {
         doInitTransactions(producerId, epoch);
         assertTrue(transactionManager.hasProducerId());
-        assertThrows(IllegalStateException.class, () -> 
transactionManager.initializeTransactions());
+        assertThrows(IllegalStateException.class, () -> 
transactionManager.initializeTransactions(false));
     }
 
     @Test
     public void testUnsupportedFindCoordinator() {
-        transactionManager.initializeTransactions();
+        transactionManager.initializeTransactions(false);
         client.prepareUnsupportedVersionResponse(body -> {
             FindCoordinatorRequest findCoordinatorRequest = 
(FindCoordinatorRequest) body;
             
assertEquals(CoordinatorType.forId(findCoordinatorRequest.data().keyType()), 
CoordinatorType.TRANSACTION);
@@ -1098,7 +1110,7 @@ public class TransactionManagerTest {
 
     @Test
     public void testUnsupportedInitTransactions() {
-        transactionManager.initializeTransactions();
+        transactionManager.initializeTransactions(false);
         prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
         runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
         assertFalse(transactionManager.hasError());
@@ -1243,7 +1255,7 @@ public class TransactionManagerTest {
     public void testLookupCoordinatorOnDisconnectAfterSend() {
         // This is called from the initTransactions method in the producer as 
the first order of business.
         // It finds the coordinator and then gets a PID.
-        TransactionalRequestResult initPidResult = 
transactionManager.initializeTransactions();
+        TransactionalRequestResult initPidResult = 
transactionManager.initializeTransactions(false);
         prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
         runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
         assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
@@ -1275,7 +1287,7 @@ public class TransactionManagerTest {
     public void testLookupCoordinatorOnDisconnectBeforeSend() {
         // This is called from the initTransactions method in the producer as 
the first order of business.
         // It finds the coordinator and then gets a PID.
-        TransactionalRequestResult initPidResult = 
transactionManager.initializeTransactions();
+        TransactionalRequestResult initPidResult = 
transactionManager.initializeTransactions(false);
         prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
         runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
         assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
@@ -1306,7 +1318,7 @@ public class TransactionManagerTest {
     public void testLookupCoordinatorOnNotCoordinatorError() {
         // This is called from the initTransactions method in the producer as 
the first order of business.
         // It finds the coordinator and then gets a PID.
-        TransactionalRequestResult initPidResult = 
transactionManager.initializeTransactions();
+        TransactionalRequestResult initPidResult = 
transactionManager.initializeTransactions(false);
         prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
         runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
         assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
@@ -1331,7 +1343,7 @@ public class TransactionManagerTest {
 
     @Test
     public void testTransactionalIdAuthorizationFailureInFindCoordinator() {
-        TransactionalRequestResult initPidResult = 
transactionManager.initializeTransactions();
+        TransactionalRequestResult initPidResult = 
transactionManager.initializeTransactions(false);
         
prepareFindCoordinatorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED, 
false,
                 CoordinatorType.TRANSACTION, transactionalId);
 
@@ -1346,7 +1358,7 @@ public class TransactionManagerTest {
 
     @Test
     public void testTransactionalIdAuthorizationFailureInInitProducerId() {
-        TransactionalRequestResult initPidResult = 
transactionManager.initializeTransactions();
+        TransactionalRequestResult initPidResult = 
transactionManager.initializeTransactions(false);
         prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
         runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
         assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
@@ -1646,7 +1658,7 @@ public class TransactionManagerTest {
         assertFalse(result.isAcked());
         assertFalse(transactionManager.hasOngoingTransaction());
 
-        assertThrows(IllegalStateException.class, 
transactionManager::initializeTransactions);
+        assertThrows(IllegalStateException.class, () -> 
transactionManager.initializeTransactions(false));
         assertThrows(IllegalStateException.class, 
transactionManager::beginTransaction);
         assertThrows(IllegalStateException.class, 
transactionManager::beginCommit);
         assertThrows(IllegalStateException.class, () -> 
transactionManager.maybeAddPartition(tp0));
@@ -1680,7 +1692,7 @@ public class TransactionManagerTest {
         assertFalse(result.isAcked());
         assertFalse(transactionManager.hasOngoingTransaction());
 
-        assertThrows(IllegalStateException.class, 
transactionManager::initializeTransactions);
+        assertThrows(IllegalStateException.class, () -> 
transactionManager.initializeTransactions(false));
         assertThrows(IllegalStateException.class, 
transactionManager::beginTransaction);
         assertThrows(IllegalStateException.class, 
transactionManager::beginAbort);
         assertThrows(IllegalStateException.class, () -> 
transactionManager.maybeAddPartition(tp0));
@@ -1694,7 +1706,7 @@ public class TransactionManagerTest {
 
     @Test
     public void testRetryInitTransactionsAfterTimeout() {
-        TransactionalRequestResult result = 
transactionManager.initializeTransactions();
+        TransactionalRequestResult result = 
transactionManager.initializeTransactions(false);
         prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
         runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
         assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
@@ -1715,10 +1727,10 @@ public class TransactionManagerTest {
         assertThrows(IllegalStateException.class, 
transactionManager::beginCommit);
         assertThrows(IllegalStateException.class, () -> 
transactionManager.maybeAddPartition(tp0));
 
-        assertSame(result, transactionManager.initializeTransactions());
+        assertSame(result, transactionManager.initializeTransactions(false));
         result.await();
         assertTrue(result.isAcked());
-        assertThrows(IllegalStateException.class, 
transactionManager::initializeTransactions);
+        assertThrows(IllegalStateException.class, () -> 
transactionManager.initializeTransactions(false));
 
         transactionManager.beginTransaction();
         assertTrue(transactionManager.hasOngoingTransaction());
@@ -1960,7 +1972,7 @@ public class TransactionManagerTest {
     })
     public void testRetriableErrors(Errors error) {
         // Ensure FindCoordinator retries.
-        TransactionalRequestResult result = 
transactionManager.initializeTransactions();
+        TransactionalRequestResult result = 
transactionManager.initializeTransactions(false);
         prepareFindCoordinatorResponse(error, false, 
CoordinatorType.TRANSACTION, transactionalId);
         prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
         runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
@@ -1994,7 +2006,7 @@ public class TransactionManagerTest {
     @Test
     public void testCoordinatorNotAvailable() {
         // Ensure FindCoordinator with COORDINATOR_NOT_AVAILABLE error retries.
-        TransactionalRequestResult result = 
transactionManager.initializeTransactions();
+        TransactionalRequestResult result = 
transactionManager.initializeTransactions(false);
         prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
false, CoordinatorType.TRANSACTION, transactionalId);
         prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
         runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
@@ -2017,7 +2029,7 @@ public class TransactionManagerTest {
     }
 
     private void verifyProducerFencedForInitProducerId(Errors error) {
-        TransactionalRequestResult result = 
transactionManager.initializeTransactions();
+        TransactionalRequestResult result = 
transactionManager.initializeTransactions(false);
         prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
         runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
         assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
@@ -3815,7 +3827,7 @@ public class TransactionManagerTest {
         assertThrows(IllegalStateException.class, () -> 
transactionManager.beginAbort());
         assertThrows(IllegalStateException.class, () -> 
transactionManager.beginCommit());
         assertThrows(IllegalStateException.class, () -> 
transactionManager.maybeAddPartition(tp0));
-        assertThrows(IllegalStateException.class, () -> 
transactionManager.initializeTransactions());
+        assertThrows(IllegalStateException.class, () -> 
transactionManager.initializeTransactions(false));
         assertThrows(IllegalStateException.class, () -> 
transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new 
ConsumerGroupMetadata("fake-group-id")));
     }
 
@@ -3852,7 +3864,7 @@ public class TransactionManagerTest {
 
     @Test
     public void testTransactionAbortableExceptionInInitProducerId() {
-        TransactionalRequestResult initPidResult = 
transactionManager.initializeTransactions();
+        TransactionalRequestResult initPidResult = 
transactionManager.initializeTransactions(false);
         prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
         runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
         assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
@@ -4035,16 +4047,39 @@ public class TransactionManagerTest {
         }, FindCoordinatorResponse.prepareResponse(error, coordinatorKey, 
brokerNode), shouldDisconnect);
     }
 
-    private void prepareInitPidResponse(Errors error, boolean 
shouldDisconnect, long producerId, short producerEpoch) {
+    private void prepareInitPidResponse(
+        Errors error,
+        boolean shouldDisconnect,
+        long producerId,
+        short producerEpoch
+    ) {
+        prepareInitPidResponse(error, shouldDisconnect, producerId, 
producerEpoch, false, false, (long) -1, (short) -1);
+    }
+
+    private void prepareInitPidResponse(
+        Errors error,
+        boolean shouldDisconnect,
+        long producerId,
+        short producerEpoch,
+        boolean keepPreparedTxn,
+        boolean enable2Pc,
+        long ongoingProducerId,
+        short ongoingProducerEpoch
+    ) {
         InitProducerIdResponseData responseData = new 
InitProducerIdResponseData()
-                .setErrorCode(error.code())
-                .setProducerEpoch(producerEpoch)
-                .setProducerId(producerId)
-                .setThrottleTimeMs(0);
+            .setErrorCode(error.code())
+            .setProducerEpoch(producerEpoch)
+            .setProducerId(producerId)
+            .setThrottleTimeMs(0)
+            .setOngoingTxnProducerId(ongoingProducerId)
+            .setOngoingTxnProducerEpoch(ongoingProducerEpoch);
+
         client.prepareResponse(body -> {
             InitProducerIdRequest initProducerIdRequest = 
(InitProducerIdRequest) body;
             assertEquals(transactionalId, 
initProducerIdRequest.data().transactionalId());
             assertEquals(transactionTimeoutMs, 
initProducerIdRequest.data().transactionTimeoutMs());
+            assertEquals(keepPreparedTxn, 
initProducerIdRequest.data().keepPreparedTxn());
+            assertEquals(enable2Pc, initProducerIdRequest.data().enable2Pc());
             return true;
         }, new InitProducerIdResponse(responseData), shouldDisconnect);
     }
@@ -4309,7 +4344,7 @@ public class TransactionManagerTest {
     }
 
     private void doInitTransactions(long producerId, short epoch) {
-        TransactionalRequestResult result = 
transactionManager.initializeTransactions();
+        TransactionalRequestResult result = 
transactionManager.initializeTransactions(false);
         prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
         runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
         assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
@@ -4373,6 +4408,39 @@ public class TransactionManagerTest {
         ProducerTestUtils.runUntil(sender, condition);
     }
 
+    @Test
+    public void testInitializeTransactionsWithKeepPreparedTxn() {
+        initializeTransactionManager(Optional.of(transactionalId), true, true);
+
+        client.prepareResponse(
+            FindCoordinatorResponse.prepareResponse(Errors.NONE, 
transactionalId, brokerNode)
+        );
+
+        // Simulate an ongoing prepared transaction (ongoingProducerId != -1).
+        long ongoingProducerId = 999L;
+        short ongoingEpoch = 10;
+        short bumpedEpoch = 11;
+
+        prepareInitPidResponse(
+            Errors.NONE,
+            false,
+            ongoingProducerId,
+            bumpedEpoch,
+            true,
+            true,
+            ongoingProducerId,
+            ongoingEpoch
+        );
+
+        transactionManager.initializeTransactions(true);
+        runUntil(transactionManager::hasProducerId);
+        
+        assertTrue(transactionManager.hasProducerId());
+        assertFalse(transactionManager.hasOngoingTransaction());
+        assertEquals(ongoingProducerId, 
transactionManager.producerIdAndEpoch().producerId);
+        assertEquals(bumpedEpoch, 
transactionManager.producerIdAndEpoch().epoch);
+    }
+
     /**
      * This subclass exists only to optionally change the default behavior 
related to poisoning the state
      * on invalid state transition attempts.
@@ -4385,8 +4453,9 @@ public class TransactionManagerTest {
                                           String transactionalId,
                                           int transactionTimeoutMs,
                                           long retryBackoffMs,
-                                          ApiVersions apiVersions) {
-            super(logContext, transactionalId, transactionTimeoutMs, 
retryBackoffMs, apiVersions);
+                                          ApiVersions apiVersions,
+                                          boolean enable2Pc) {
+            super(logContext, transactionalId, transactionTimeoutMs, 
retryBackoffMs, apiVersions, enable2Pc);
             this.shouldPoisonStateOnInvalidTransitionOverride = 
Optional.empty();
         }
 

Reply via email to