This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7f02c263a6e KAFKA-19082:[3/4] Add prepare txn method (KIP-939) (#19539)
7f02c263a6e is described below

commit 7f02c263a6e2490dca5632e0389e312dc14dd593
Author: Ritika Reddy <[email protected]>
AuthorDate: Wed May 14 15:11:39 2025 -0700

    KAFKA-19082:[3/4] Add prepare txn method (KIP-939) (#19539)
    
    This patch belongs to the client-side changes required to enable 2PC as
    a part of KIP-939.
    
    New method is added to KafkaProducer:  public PreparedTxnState
    prepareTransaction()
    
    This would flush all the pending messages and transition the producer
    into a mode where only .commitTransaction, .abortTransaction, or
    .completeTransaction could be called (calling other methods,  e.g. .send
    , in that mode would result in IllegalStateException being thrown).  If
    the call is successful (all messages successfully got flushed to all
    partitions) the transaction is prepared.  If the 2PC is not enabled, we
    return the INVALID_TXN_STATE error.
    
    A new state is added to the TransactionManager called
    PREPARING_TRANSACTION. There are two situations where we would move into
    this state:
    1) When prepareTransaction() is called during an ongoing transaction
    with 2PC enabled
    2) When initTransaction(true) is called after a client failure
    (keepPrepared = true)
    
    Reviewers: Artem Livshits <[email protected]>, Justine Olshan
     <[email protected]>
---
 .../kafka/clients/producer/KafkaProducer.java      |  66 +++++++++
 .../kafka/clients/producer/MockProducer.java       |  12 ++
 .../apache/kafka/clients/producer/Producer.java    |   5 +
 .../producer/internals/KafkaProducerMetrics.java   |  11 ++
 .../producer/internals/TransactionManager.java     |  53 ++++++-
 .../kafka/clients/producer/KafkaProducerTest.java  | 162 +++++++++++++++++++++
 .../producer/internals/TransactionManagerTest.java | 128 +++++++++++-----
 7 files changed, 400 insertions(+), 37 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 0fc237d70c8..b44b358dede 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -50,11 +50,13 @@ import 
org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
@@ -665,6 +667,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     public void initTransactions(boolean keepPreparedTxn) {
         throwIfNoTransactionManager();
         throwIfProducerClosed();
+        throwIfInPreparedState();
         long now = time.nanoseconds();
         TransactionalRequestResult result = 
transactionManager.initializeTransactions(keepPreparedTxn);
         sender.wakeup();
@@ -691,6 +694,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     public void beginTransaction() throws ProducerFencedException {
         throwIfNoTransactionManager();
         throwIfProducerClosed();
+        throwIfInPreparedState();
         long now = time.nanoseconds();
         transactionManager.beginTransaction();
         producerMetrics.recordBeginTxn(time.nanoseconds() - now);
@@ -750,6 +754,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         throwIfInvalidGroupMetadata(groupMetadata);
         throwIfNoTransactionManager();
         throwIfProducerClosed();
+        throwIfInPreparedState();
 
         if (!offsets.isEmpty()) {
             long start = time.nanoseconds();
@@ -760,6 +765,48 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
         }
     }
 
+    /**
+     * Prepares the current transaction for a two-phase commit. This method 
will flush all pending messages
+     * and transition the producer into a mode where only {@link 
#commitTransaction()}, {@link #abortTransaction()},
+     * or completeTransaction(PreparedTxnState) may be called.
+     * <p>
+     * This method is used as part of a two-phase commit protocol:
+     * <ol>
+     *   <li>Prepare the transaction by calling this method. This returns a 
{@link PreparedTxnState} if successful.</li>
+     *   <li>Make any external system changes that need to be atomic with this 
transaction.</li>
+     *   <li>Complete the transaction by calling {@link #commitTransaction()}, 
{@link #abortTransaction()} or
+     *       completeTransaction(PreparedTxnState).</li>
+     * </ol>
+     *
+     * @return the prepared transaction state to use when completing the 
transaction
+     *
+     * @throws IllegalStateException if no transactional.id has been 
configured or no transaction has been started yet.
+     * @throws InvalidTxnStateException if the producer is not in a state 
where preparing
+     *         a transaction is possible or 2PC is not enabled.
+     * @throws ProducerFencedException fatal error indicating another producer 
with the same transactional.id is active
+     * @throws UnsupportedVersionException fatal error indicating the broker
+     *         does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
+     * @throws AuthorizationException fatal error indicating that the 
configured
+     *         transactional.id is not authorized. See the exception for more 
details
+     * @throws KafkaException if the producer has encountered a previous fatal 
error or for any other unexpected error
+     * @throws TimeoutException if the time taken for preparing the 
transaction has surpassed <code>max.block.ms</code>
+     * @throws InterruptException if the thread is interrupted while blocked
+     */
+    @Override
+    public PreparedTxnState prepareTransaction() throws 
ProducerFencedException {
+        throwIfNoTransactionManager();
+        throwIfProducerClosed();
+        throwIfInPreparedState();
+        if (!transactionManager.is2PCEnabled()) {
+            throw new InvalidTxnStateException("Cannot prepare a transaction 
when 2PC is not enabled");
+        }
+        long now = time.nanoseconds();
+        flush();
+        transactionManager.prepareTransaction();
+        producerMetrics.recordPrepareTxn(time.nanoseconds() - now);
+        return transactionManager.preparedTransactionState();
+    }
+
     /**
      * Commits the ongoing transaction. This method will flush any unsent 
records before actually committing the transaction.
      * <p>
@@ -967,6 +1014,23 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
             throw new IllegalStateException("Cannot perform operation after 
producer has been closed");
     }
 
+    /**
+     * Throws an exception if the transaction is in a prepared state.
+     * In a two-phase commit (2PC) flow, once a transaction enters the 
prepared state,
+     * only commit, abort, or complete operations are allowed.
+     *
+     * @throws IllegalStateException if any other operation is attempted in 
the prepared state.
+     */
+    private void throwIfInPreparedState() {
+        if (transactionManager != null &&
+            transactionManager.isTransactional() &&
+            transactionManager.isPrepared()
+        ) {
+            throw new IllegalStateException("Cannot perform operation while 
the transaction is in a prepared state. " +
+                "Only commitTransaction(), abortTransaction(), or 
completeTransaction() are permitted.");
+        }
+    }
+
     /**
      * Implementation of asynchronously send a record to a topic.
      */
@@ -978,6 +1042,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
 
         try {
             throwIfProducerClosed();
+            throwIfInPreparedState();
+
             // first make sure the metadata for the topic is available
             long nowMs = time.milliseconds();
             ClusterAndWaitTime clusterAndWaitTime;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index e3c5a23ca51..c6e02d4ab16 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -200,6 +200,18 @@ public class MockProducer<K, V> implements Producer<K, V> {
         this.sentOffsets = true;
     }
 
+    @Override
+    public PreparedTxnState prepareTransaction() throws 
ProducerFencedException {
+        verifyNotClosed();
+        verifyNotFenced();
+        verifyTransactionsInitialized();
+        verifyTransactionInFlight();
+        
+        // Return a new PreparedTxnState with mock values for producerId and 
epoch
+        // Using 1000L and (short)1 as arbitrary values for a valid 
PreparedTxnState
+        return new PreparedTxnState(1000L, (short) 1);
+    }
+
     @Override
     public void commitTransaction() throws ProducerFencedException {
         verifyNotClosed();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index a5cd92295ff..db4460d6b10 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -62,6 +62,11 @@ public interface Producer<K, V> extends Closeable {
     void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> 
offsets,
                                   ConsumerGroupMetadata groupMetadata) throws 
ProducerFencedException;
 
+    /**
+     * See {@link KafkaProducer#prepareTransaction()}
+     */
+    PreparedTxnState prepareTransaction() throws ProducerFencedException;
+
     /**
      * See {@link KafkaProducer#commitTransaction()}
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
index 7d942d572cf..6c94466c55e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/KafkaProducerMetrics.java
@@ -33,6 +33,7 @@ public class KafkaProducerMetrics implements AutoCloseable {
     private static final String TXN_SEND_OFFSETS = "txn-send-offsets";
     private static final String TXN_COMMIT = "txn-commit";
     private static final String TXN_ABORT = "txn-abort";
+    private static final String TXN_PREPARE = "txn-prepare";
     private static final String TOTAL_TIME_SUFFIX = "-time-ns-total";
     private static final String METADATA_WAIT = "metadata-wait";
 
@@ -44,6 +45,7 @@ public class KafkaProducerMetrics implements AutoCloseable {
     private final Sensor sendOffsetsSensor;
     private final Sensor commitTxnSensor;
     private final Sensor abortTxnSensor;
+    private final Sensor prepareTxnSensor;
     private final Sensor metadataWaitSensor;
 
     public KafkaProducerMetrics(Metrics metrics) {
@@ -73,6 +75,10 @@ public class KafkaProducerMetrics implements AutoCloseable {
             TXN_ABORT,
             "Total time producer has spent in abortTransaction in nanoseconds."
         );
+        prepareTxnSensor = newLatencySensor(
+            TXN_PREPARE,
+            "Total time producer has spent in prepareTransaction in 
nanoseconds."
+        );
         metadataWaitSensor = newLatencySensor(
             METADATA_WAIT,
             "Total time producer has spent waiting on topic metadata in 
nanoseconds."
@@ -87,6 +93,7 @@ public class KafkaProducerMetrics implements AutoCloseable {
         removeMetric(TXN_SEND_OFFSETS);
         removeMetric(TXN_COMMIT);
         removeMetric(TXN_ABORT);
+        removeMetric(TXN_PREPARE);
         removeMetric(METADATA_WAIT);
     }
 
@@ -114,6 +121,10 @@ public class KafkaProducerMetrics implements AutoCloseable 
{
         abortTxnSensor.record(duration);
     }
 
+    public void recordPrepareTxn(long duration) {
+        prepareTxnSensor.record(duration);
+    }
+
     public void recordMetadataWait(long duration) {
         metadataWaitSensor.record(duration);
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index e17dc15d239..061df29fbb2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.PreparedTxnState;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.KafkaException;
@@ -144,12 +145,14 @@ public class TransactionManager {
     private volatile long latestFinalizedFeaturesEpoch = -1;
     private volatile boolean isTransactionV2Enabled = false;
     private final boolean enable2PC;
+    private volatile PreparedTxnState preparedTxnState;
 
     private enum State {
         UNINITIALIZED,
         INITIALIZING,
         READY,
         IN_TRANSACTION,
+        PREPARED_TRANSACTION,
         COMMITTING_TRANSACTION,
         ABORTING_TRANSACTION,
         ABORTABLE_ERROR,
@@ -165,10 +168,12 @@ public class TransactionManager {
                     return source == INITIALIZING || source == 
COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION;
                 case IN_TRANSACTION:
                     return source == READY;
+                case PREPARED_TRANSACTION:
+                    return source == IN_TRANSACTION || source == INITIALIZING;
                 case COMMITTING_TRANSACTION:
-                    return source == IN_TRANSACTION;
+                    return source == IN_TRANSACTION || source == 
PREPARED_TRANSACTION;
                 case ABORTING_TRANSACTION:
-                    return source == IN_TRANSACTION || source == 
ABORTABLE_ERROR;
+                    return source == IN_TRANSACTION || source == 
PREPARED_TRANSACTION || source == ABORTABLE_ERROR;
                 case ABORTABLE_ERROR:
                     return source == IN_TRANSACTION || source == 
COMMITTING_TRANSACTION || source == ABORTABLE_ERROR
                             || source == INITIALIZING;
@@ -223,6 +228,7 @@ public class TransactionManager {
         this.txnPartitionMap = new TxnPartitionMap(logContext);
         this.apiVersions = apiVersions;
         this.enable2PC = enable2PC;
+        this.preparedTxnState = new PreparedTxnState();
     }
 
     /**
@@ -241,7 +247,7 @@ public class TransactionManager {
      * transaction manager. The {@link Producer} API calls that perform a 
state transition include:
      *
      * <ul>
-     *     <li>{@link Producer#initTransactions()} calls {@link 
#initializeTransactions()}</li>
+     *     <li>{@link Producer#initTransactions()} calls {@link 
#initializeTransactions(boolean)}</li>
      *     <li>{@link Producer#beginTransaction()} calls {@link 
#beginTransaction()}</li>
      *     <li>{@link Producer#commitTransaction()}} calls {@link 
#beginCommit()}</li>
      *     <li>{@link Producer#abortTransaction()} calls {@link #beginAbort()}
@@ -330,6 +336,22 @@ public class TransactionManager {
         transitionTo(State.IN_TRANSACTION);
     }
 
+    /**
+     * Prepare a transaction for a two-phase commit.
+     * This transitions the transaction to the PREPARED_TRANSACTION state.
+     * The preparedTxnState is set with the current producer ID and epoch.
+     */
+    public synchronized void prepareTransaction() {
+        ensureTransactional();
+        throwIfPendingState("prepareTransaction");
+        maybeFailWithError();
+        transitionTo(State.PREPARED_TRANSACTION);
+        this.preparedTxnState = new PreparedTxnState(
+            this.producerIdAndEpoch.producerId + ":" +
+            this.producerIdAndEpoch.epoch
+        );
+    }
+
     public synchronized TransactionalRequestResult beginCommit() {
         return handleCachedTransactionRequestResult(() -> {
             maybeFailWithError();
@@ -487,6 +509,10 @@ public class TransactionManager {
         return isTransactionV2Enabled;
     }
 
+    public boolean is2PCEnabled() {
+        return enable2PC;
+    }
+
     synchronized boolean hasPartitionsToAdd() {
         return !newPartitionsInTransaction.isEmpty() || 
!pendingPartitionsInTransaction.isEmpty();
     }
@@ -1058,6 +1084,15 @@ public class TransactionManager {
         return isTransactional() && currentState == State.INITIALIZING;
     }
 
+    /**
+     * Check if the transaction is in the prepared state.
+     *
+     * @return true if the current state is PREPARED_TRANSACTION
+     */
+    public synchronized boolean isPrepared() {
+        return currentState == State.PREPARED_TRANSACTION;
+    }
+
     void handleCoordinatorReady() {
         NodeApiVersions nodeApiVersions = transactionCoordinator != null ?
                 apiVersions.get(transactionCoordinator.idString()) :
@@ -1453,6 +1488,7 @@ public class TransactionManager {
                 ProducerIdAndEpoch producerIdAndEpoch = new 
ProducerIdAndEpoch(initProducerIdResponse.data().producerId(),
                         initProducerIdResponse.data().producerEpoch());
                 setProducerIdAndEpoch(producerIdAndEpoch);
+                // TO_DO Add code to handle transition to prepared_txn when 
keepPrepared = true
                 transitionTo(State.READY);
                 lastError = null;
                 if (this.isEpochBump) {
@@ -1904,5 +1940,14 @@ public class TransactionManager {
         }
     }
 
-
+    /**
+     * Returns a PreparedTxnState object containing the producer ID and epoch
+     * of the ongoing transaction.
+     * This is used when preparing a transaction for a two-phase commit.
+     *
+     * @return a PreparedTxnState with the current producer ID and epoch
+     */
+    public PreparedTxnState preparedTransactionState() {
+        return this.preparedTxnState;
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index ca6757d1f6d..2231e0338c7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -46,6 +46,7 @@ import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -152,6 +153,7 @@ import static 
org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -162,6 +164,7 @@ import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.notNull;
 import static org.mockito.Mockito.atMostOnce;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.never;
@@ -1438,6 +1441,165 @@ public class KafkaProducerTest {
                 "enable2Pc flag should match producer configuration");
         }
     }
+
+    @Test
+    public void testPrepareTransactionSuccess() throws Exception {
+        StringSerializer serializer = new StringSerializer();
+        KafkaProducerTestContext<String> ctx = new 
KafkaProducerTestContext<>(testInfo, serializer);
+
+        when(ctx.transactionManager.isTransactionV2Enabled()).thenReturn(true);
+        when(ctx.transactionManager.is2PCEnabled()).thenReturn(true);
+        when(ctx.sender.isRunning()).thenReturn(true);
+
+        doNothing().when(ctx.transactionManager).prepareTransaction();
+
+        PreparedTxnState expectedState = mock(PreparedTxnState.class);
+        
when(ctx.transactionManager.preparedTransactionState()).thenReturn(expectedState);
+
+        try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
+            PreparedTxnState returned = producer.prepareTransaction();
+            assertSame(expectedState, returned);
+
+            verify(ctx.transactionManager).prepareTransaction();
+            verify(ctx.accumulator).beginFlush();
+            verify(ctx.accumulator).awaitFlushCompletion();
+        }
+    }
+
+    @Test
+    public void testSendNotAllowedInPreparedTransactionState() throws 
Exception {
+        StringSerializer serializer = new StringSerializer();
+        KafkaProducerTestContext<String> ctx = new 
KafkaProducerTestContext<>(testInfo, serializer);
+
+        String topic = "foo";
+        Cluster cluster = TestUtils.singletonCluster(topic, 1);
+
+        when(ctx.sender.isRunning()).thenReturn(true);
+        when(ctx.metadata.fetch()).thenReturn(cluster);
+
+        // Mock transaction manager to simulate being in a prepared state
+        when(ctx.transactionManager.isTransactional()).thenReturn(true);
+        when(ctx.transactionManager.isPrepared()).thenReturn(true);
+
+        // Create record to send
+        long timestamp = ctx.time.milliseconds();
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, 0, 
timestamp, "key", "value");
+
+        try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
+            // Verify that sending a record throws IllegalStateException with 
the correct message
+            IllegalStateException exception = assertThrows(
+                IllegalStateException.class,
+                () -> producer.send(record)
+            );
+
+            assertTrue(exception.getMessage().contains("Cannot perform 
operation while the transaction is in a prepared state"));
+
+            // Verify transactionManager methods were called
+            verify(ctx.transactionManager).isTransactional();
+            verify(ctx.transactionManager).isPrepared();
+
+            // Verify that no message was actually sent (accumulator was not 
called)
+            verify(ctx.accumulator, never()).append(
+                eq(topic),
+                anyInt(),
+                anyLong(),
+                any(),
+                any(),
+                any(),
+                any(),
+                anyLong(),
+                anyLong(),
+                any()
+            );
+        }
+    }
+
+    @Test
+    public void testSendOffsetsNotAllowedInPreparedTransactionState() throws 
Exception {
+        StringSerializer serializer = new StringSerializer();
+        KafkaProducerTestContext<String> ctx = new 
KafkaProducerTestContext<>(testInfo, serializer);
+
+        String topic = "foo";
+        Cluster cluster = TestUtils.singletonCluster(topic, 1);
+
+        when(ctx.sender.isRunning()).thenReturn(true);
+        when(ctx.metadata.fetch()).thenReturn(cluster);
+
+        // Mock transaction manager to simulate being in a prepared state
+        when(ctx.transactionManager.isTransactional()).thenReturn(true);
+        when(ctx.transactionManager.isPrepared()).thenReturn(true);
+
+        // Create consumer group metadata
+        String groupId = "test-group";
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(100L));
+        ConsumerGroupMetadata groupMetadata = new 
ConsumerGroupMetadata(groupId);
+
+        try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
+            // Verify that sending offsets throws IllegalStateException with 
the correct message
+            IllegalStateException exception = assertThrows(
+                IllegalStateException.class,
+                () -> producer.sendOffsetsToTransaction(offsets, groupMetadata)
+            );
+
+            assertTrue(exception.getMessage().contains("Cannot perform 
operation while the transaction is in a prepared state"));
+
+            // Verify transactionManager methods were called
+            verify(ctx.transactionManager).isTransactional();
+            verify(ctx.transactionManager).isPrepared();
+
+            // Verify that no offsets were actually sent
+            verify(ctx.transactionManager, never()).sendOffsetsToTransaction(
+                eq(offsets),
+                eq(groupMetadata)
+            );
+        }
+    }
+
+    @Test
+    public void testBeginTransactionNotAllowedInPreparedTransactionState() 
throws Exception {
+        StringSerializer serializer = new StringSerializer();
+        KafkaProducerTestContext<String> ctx = new 
KafkaProducerTestContext<>(testInfo, serializer);
+
+        when(ctx.sender.isRunning()).thenReturn(true);
+
+        // Mock transaction manager to simulate being in a prepared state
+        when(ctx.transactionManager.isTransactional()).thenReturn(true);
+        when(ctx.transactionManager.isPrepared()).thenReturn(true);
+
+        try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
+            // Verify that calling beginTransaction throws 
IllegalStateException with the correct message
+            IllegalStateException exception = assertThrows(
+                IllegalStateException.class,
+                producer::beginTransaction
+            );
+
+            assertTrue(exception.getMessage().contains("Cannot perform 
operation while the transaction is in a prepared state"));
+
+            // Verify transactionManager methods were called
+            verify(ctx.transactionManager).isTransactional();
+            verify(ctx.transactionManager).isPrepared();
+        }
+    }
+
+    @Test
+    public void testPrepareTransactionFailsWhen2PCDisabled() {
+        StringSerializer serializer = new StringSerializer();
+        KafkaProducerTestContext<String> ctx = new 
KafkaProducerTestContext<>(testInfo, serializer);
+
+        // Disable 2PC
+        when(ctx.transactionManager.isTransactionV2Enabled()).thenReturn(true);
+        when(ctx.transactionManager.is2PCEnabled()).thenReturn(false);
+        when(ctx.sender.isRunning()).thenReturn(true);
+
+        try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
+            assertThrows(
+                InvalidTxnStateException.class,
+                producer::prepareTransaction,
+                "prepareTransaction() should fail if 2PC is disabled"
+            );
+        }
+    }
     
     @Test
     public void testClusterAuthorizationFailure() throws Exception {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index e6ac3e56299..118656e47f8 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.PreparedTxnState;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
@@ -138,6 +139,8 @@ public class TransactionManagerTest {
     private final TopicPartition tp1 = new TopicPartition(topic, 1);
     private final long producerId = 13131L;
     private final short epoch = 1;
+    private final long ongoingProducerId = 999L;
+    private final short bumpedOngoingEpoch = 11;
     private final String consumerGroupId = "myConsumerGroup";
     private final String memberId = "member";
     private final int generationId = 5;
@@ -4022,6 +4025,56 @@ public class TransactionManagerTest {
         assertFalse(transactionManager.hasOngoingTransaction());
     }
 
+    @Test
+    public void testInitializeTransactionsWithKeepPreparedTxn() {
+        doInitTransactionsWith2PCEnabled(true);
+        runUntil(transactionManager::hasProducerId);
+
+        // Expect a bumped epoch in the response.
+        assertTrue(transactionManager.hasProducerId());
+        assertFalse(transactionManager.hasOngoingTransaction());
+        assertEquals(ongoingProducerId, 
transactionManager.producerIdAndEpoch().producerId);
+        assertEquals(bumpedOngoingEpoch, 
transactionManager.producerIdAndEpoch().epoch);
+    }
+
+    @Test
+    public void testPrepareTransaction() {
+        doInitTransactionsWith2PCEnabled(false);
+        runUntil(transactionManager::hasProducerId);
+
+        // Begin a transaction
+        transactionManager.beginTransaction();
+        assertTrue(transactionManager.hasOngoingTransaction());
+
+        // Add a partition to the transaction
+        transactionManager.maybeAddPartition(tp0);
+
+        // Capture the current producer ID and epoch before preparing the 
response
+        long producerId = transactionManager.producerIdAndEpoch().producerId;
+        short epoch = transactionManager.producerIdAndEpoch().epoch;
+
+        // Simulate a produce request
+        try {
+            // Prepare the response before sending to ensure it's ready
+            prepareProduceResponse(Errors.NONE, producerId, epoch);
+
+            appendToAccumulator(tp0);
+            // Wait until the request is processed
+            runUntil(() -> !client.hasPendingResponses());
+        } catch (InterruptedException e) {
+            fail("Unexpected interruption: " + e);
+        }
+
+        transactionManager.prepareTransaction();
+        assertTrue(transactionManager.isPrepared());
+
+        PreparedTxnState preparedState = 
transactionManager.preparedTransactionState();
+        // Validate the state contains the correct serialized producer ID and 
epoch
+        assertEquals(producerId + ":" + epoch, preparedState.toString());
+        assertEquals(producerId, preparedState.producerId());
+        assertEquals(epoch, preparedState.epoch());
+    }
+
     private void prepareAddPartitionsToTxn(final Map<TopicPartition, Errors> 
errors) {
         AddPartitionsToTxnResult result = 
AddPartitionsToTxnResponse.resultForTransaction(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID,
 errors);
         AddPartitionsToTxnResponseData data = new 
AddPartitionsToTxnResponseData().setResultsByTopicV3AndBelow(result.topicResults()).setThrottleTimeMs(0);
@@ -4361,6 +4414,48 @@ public class TransactionManagerTest {
         assertTrue(result.isAcked());
     }
 
+    private void doInitTransactionsWith2PCEnabled(boolean keepPrepared) {
+        initializeTransactionManager(Optional.of(transactionalId), true, true);
+        TransactionalRequestResult result = 
transactionManager.initializeTransactions(keepPrepared);
+
+        prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
+        runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
+        assertEquals(brokerNode, 
transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+        if (keepPrepared) {
+            // Simulate an ongoing prepared transaction (ongoingProducerId != 
-1).
+            short ongoingEpoch = bumpedOngoingEpoch - 1;
+            prepareInitPidResponse(
+                Errors.NONE,
+                false,
+                ongoingProducerId,
+                bumpedOngoingEpoch,
+                true,
+                true,
+                ongoingProducerId,
+                ongoingEpoch
+            );
+        } else {
+            prepareInitPidResponse(
+                Errors.NONE,
+                false,
+                producerId,
+                epoch,
+                false,
+                true,
+                RecordBatch.NO_PRODUCER_ID,
+                RecordBatch.NO_PRODUCER_EPOCH
+            );
+        }
+
+        runUntil(transactionManager::hasProducerId);
+        transactionManager.maybeUpdateTransactionV2Enabled(true);
+
+        result.await();
+        assertTrue(result.isSuccessful());
+        assertTrue(result.isAcked());
+    }
+
     private void assertAbortableError(Class<? extends RuntimeException> cause) 
{
         try {
             transactionManager.beginCommit();
@@ -4411,39 +4506,6 @@ public class TransactionManagerTest {
         ProducerTestUtils.runUntil(sender, condition);
     }
 
-    @Test
-    public void testInitializeTransactionsWithKeepPreparedTxn() {
-        initializeTransactionManager(Optional.of(transactionalId), true, true);
-
-        client.prepareResponse(
-            FindCoordinatorResponse.prepareResponse(Errors.NONE, 
transactionalId, brokerNode)
-        );
-
-        // Simulate an ongoing prepared transaction (ongoingProducerId != -1).
-        long ongoingProducerId = 999L;
-        short ongoingEpoch = 10;
-        short bumpedEpoch = 11;
-
-        prepareInitPidResponse(
-            Errors.NONE,
-            false,
-            ongoingProducerId,
-            bumpedEpoch,
-            true,
-            true,
-            ongoingProducerId,
-            ongoingEpoch
-        );
-
-        transactionManager.initializeTransactions(true);
-        runUntil(transactionManager::hasProducerId);
-        
-        assertTrue(transactionManager.hasProducerId());
-        assertFalse(transactionManager.hasOngoingTransaction());
-        assertEquals(ongoingProducerId, 
transactionManager.producerIdAndEpoch().producerId);
-        assertEquals(bumpedEpoch, 
transactionManager.producerIdAndEpoch().epoch);
-    }
-
     /**
      * This subclass exists only to optionally change the default behavior 
related to poisoning the state
      * on invalid state transition attempts.


Reply via email to