This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new 26c2d47186b KAFKA-19414: Remove 2PC public APIs from 4.2 until release 
(KIP-939) … (#21159)
26c2d47186b is described below

commit 26c2d47186b12fc1636b5d1e52141befa5e3b3e6
Author: Calvin Liu <[email protected]>
AuthorDate: Tue Dec 16 08:04:52 2025 -0800

    KAFKA-19414: Remove 2PC public APIs from 4.2 until release (KIP-939) … 
(#21159)
    
    We are removing some of the previously added public APIs until KIP-939
    is ready to use.
    
    Simple conflicts in the KafkaProduce, KafkaProducerTest,
    TransactionManagerTest to remove the new codes.
    
    Reviewers: Justine Olshan <[email protected]>
    
    ---------
    
    Co-authored-by: Ritika Reddy <[email protected]>
---
 .../kafka/clients/producer/KafkaProducer.java      | 120 +--------
 .../kafka/clients/producer/MockProducer.java       |  35 +--
 .../apache/kafka/clients/producer/Producer.java    |  19 +-
 .../producer/internals/TransactionManager.java     |   4 +-
 .../kafka/clients/producer/KafkaProducerTest.java  | 298 ---------------------
 .../producer/internals/TransactionManagerTest.java | 131 ---------
 6 files changed, 13 insertions(+), 594 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 9672b595e36..7f6edfb37ea 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -50,13 +50,11 @@ import 
org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InvalidTopicException;
-import org.apache.kafka.common.errors.InvalidTxnStateException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
@@ -78,7 +76,6 @@ import 
org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
 import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.ProducerIdAndEpoch;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
 import org.apache.kafka.common.utils.Utils;
@@ -623,13 +620,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
     }
 
     /**
-     * Initialize the transactional state for this producer, similar to {@link 
#initTransactions()} but
-     * with additional capabilities to keep a previously prepared transaction.
-     *
      * Needs to be called before any other methods when the {@code 
transactional.id} is set in the configuration.
-     *
-     * When {@code keepPreparedTxn} is {@code false}, this behaves like the 
standard transactional
-     * initialization where the method does the following:
+     * This method does the following:
      * <ol>
      * <li>Ensures any transactions initiated by previous instances of the 
producer with the same
      *      {@code transactional.id} are completed. If the previous instance 
had failed with a transaction in
@@ -638,39 +630,26 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
      * <li>Gets the internal producer id and epoch, used in all future 
transactional
      *      messages issued by the producer.</li>
      * </ol>
-     *
-     * <p>
-     * When {@code keepPreparedTxn} is set to {@code true}, the producer does 
<em>not</em> automatically abort existing
-     * transactions. Instead, it enters a recovery mode allowing only 
finalization of those previously
-     * prepared transactions.
-     * This behavior is especially crucial for 2PC scenarios, where 
transactions should remain intact
-     * until the external transaction manager decides whether to commit or 
abort.
-     * <p>
-     *
-     * @param keepPreparedTxn true to retain any in-flight prepared 
transactions (necessary for 2PC
-     *                        recovery), false to abort existing transactions 
and behave like
-     *                        the standard initTransactions.
-     *
      * Note that this method will raise {@link TimeoutException} if the 
transactional state cannot
      * be initialized before expiration of {@code max.block.ms}. Additionally, 
it will raise {@link InterruptException}
      * if interrupted. It is safe to retry in either case, but once the 
transactional state has been successfully
      * initialized, this method should no longer be used.
      *
-     * @throws IllegalStateException if no {@code transactional.id} is 
configured
-     * @throws org.apache.kafka.common.errors.UnsupportedVersionException if 
the broker does not
-     *         support transactions (i.e. if its version is lower than 
0.11.0.0)
-     * @throws 
org.apache.kafka.common.errors.TransactionalIdAuthorizationException if the 
configured
-     *         {@code transactional.id} is unauthorized either for normal 
transaction writes or 2PC.
-     * @throws KafkaException if the producer encounters a fatal error or any 
other unexpected error
+     * @throws IllegalStateException if no {@code transactional.id} has been 
configured
+     * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
+     *         does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
+     * @throws org.apache.kafka.common.errors.AuthorizationException error 
indicating that the configured
+     *         transactional.id is not authorized, or the idempotent producer 
id is unavailable. See the exception for
+     *         more details.  User may retry this function call after fixing 
the permission.
+     * @throws KafkaException if the producer has encountered a previous fatal 
error or for any other unexpected error
      * @throws TimeoutException if the time taken for initialize the 
transaction has surpassed <code>max.block.ms</code>.
      * @throws InterruptException if the thread is interrupted while blocked
      */
-    public void initTransactions(boolean keepPreparedTxn) {
+    public void initTransactions() {
         throwIfNoTransactionManager();
         throwIfProducerClosed();
-        throwIfInPreparedState();
         long now = time.nanoseconds();
-        TransactionalRequestResult result = 
transactionManager.initializeTransactions(keepPreparedTxn);
+        TransactionalRequestResult result = 
transactionManager.initializeTransactions(false);
         sender.wakeup();
         result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
         producerMetrics.recordInit(time.nanoseconds() - now);
@@ -755,7 +734,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         throwIfInvalidGroupMetadata(groupMetadata);
         throwIfNoTransactionManager();
         throwIfProducerClosed();
-        throwIfInPreparedState();
 
         if (!offsets.isEmpty()) {
             long start = time.nanoseconds();
@@ -766,49 +744,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
         }
     }
 
-    /**
-     * Prepares the current transaction for a two-phase commit. This method 
will flush all pending messages
-     * and transition the producer into a mode where only {@link 
#commitTransaction()}, {@link #abortTransaction()},
-     * or completeTransaction(PreparedTxnState) may be called.
-     * <p>
-     * This method is used as part of a two-phase commit protocol:
-     * <ol>
-     *   <li>Prepare the transaction by calling this method. This returns a 
{@link PreparedTxnState} if successful.</li>
-     *   <li>Make any external system changes that need to be atomic with this 
transaction.</li>
-     *   <li>Complete the transaction by calling {@link #commitTransaction()}, 
{@link #abortTransaction()} or
-     *       completeTransaction(PreparedTxnState).</li>
-     * </ol>
-     *
-     * @return the prepared transaction state to use when completing the 
transaction
-     *
-     * @throws IllegalStateException if no transactional.id has been 
configured or no transaction has been started yet.
-     * @throws InvalidTxnStateException if the producer is not in a state 
where preparing
-     *         a transaction is possible or 2PC is not enabled.
-     * @throws ProducerFencedException fatal error indicating another producer 
with the same transactional.id is active
-     * @throws UnsupportedVersionException fatal error indicating the broker
-     *         does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
-     * @throws AuthorizationException fatal error indicating that the 
configured
-     *         transactional.id is not authorized. See the exception for more 
details
-     * @throws KafkaException if the producer has encountered a previous fatal 
error or for any other unexpected error
-     * @throws TimeoutException if the time taken for preparing the 
transaction has surpassed <code>max.block.ms</code>
-     * @throws InterruptException if the thread is interrupted while blocked
-     */
-    @Override
-    public PreparedTxnState prepareTransaction() throws 
ProducerFencedException {
-        throwIfNoTransactionManager();
-        throwIfProducerClosed();
-        throwIfInPreparedState();
-        if (!transactionManager.is2PCEnabled()) {
-            throw new InvalidTxnStateException("Cannot prepare a transaction 
when 2PC is not enabled");
-        }
-        long now = time.nanoseconds();
-        flush();
-        transactionManager.prepareTransaction();
-        producerMetrics.recordPrepareTxn(time.nanoseconds() - now);
-        ProducerIdAndEpoch producerIdAndEpoch = 
transactionManager.preparedTransactionState();
-        return new PreparedTxnState(producerIdAndEpoch.producerId, 
producerIdAndEpoch.epoch);
-    }
-
     /**
      * Commits the ongoing transaction. This method will flush any unsent 
records before actually committing the transaction.
      * <p>
@@ -886,41 +821,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
         producerMetrics.recordAbortTxn(time.nanoseconds() - abortStart);
     }
 
-    /**
-     * Completes a prepared transaction by comparing the provided prepared 
transaction state with the
-     * current prepared state on the producer.
-     * If they match, the transaction is committed; otherwise, it is aborted.
-     * 
-     * @param preparedTxnState              The prepared transaction state to 
compare against the current state
-     * @throws IllegalStateException if no transactional.id has been 
configured or no transaction has been started
-     * @throws InvalidTxnStateException if the producer is not in prepared 
state
-     * @throws ProducerFencedException fatal error indicating another producer 
with the same transactional.id is active
-     * @throws KafkaException if the producer has encountered a previous fatal 
error or for any other unexpected error
-     * @throws TimeoutException if the time taken for completing the 
transaction has surpassed <code>max.block.ms</code>
-     * @throws InterruptException if the thread is interrupted while blocked
-     */
-    @Override
-    public void completeTransaction(PreparedTxnState preparedTxnState) throws 
ProducerFencedException {
-        throwIfNoTransactionManager();
-        throwIfProducerClosed();
-        
-        if (!transactionManager.isPrepared()) {
-            throw new InvalidTxnStateException("Cannot complete transaction 
because no transaction has been prepared. " +
-                "Call prepareTransaction() first, or make sure 
initTransaction(true) was called.");
-        }
-        
-        // Get the current prepared transaction state
-        ProducerIdAndEpoch currentProducerIdAndEpoch = 
transactionManager.preparedTransactionState();
-        PreparedTxnState currentPreparedState = new 
PreparedTxnState(currentProducerIdAndEpoch.producerId, 
currentProducerIdAndEpoch.epoch);
-        
-        // Compare the prepared transaction state token and commit or abort 
accordingly
-        if (currentPreparedState.equals(preparedTxnState)) {
-            commitTransaction();
-        } else {
-            abortTransaction();
-        }
-    }
-
     /**
      * Asynchronously send a record to a topic. Equivalent to 
<code>send(record, null)</code>.
      * See {@link #send(ProducerRecord, Callback)} for details.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 3e5cb9f5d5a..a4aac86df09 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -142,7 +142,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
     }
 
     @Override
-    public void initTransactions(boolean keepPreparedTxn) {
+    public void initTransactions() {
         verifyNotClosed();
         verifyNotFenced();
         if (this.transactionInitialized) {
@@ -200,18 +200,6 @@ public class MockProducer<K, V> implements Producer<K, V> {
         this.sentOffsets = true;
     }
 
-    @Override
-    public PreparedTxnState prepareTransaction() throws 
ProducerFencedException {
-        verifyNotClosed();
-        verifyNotFenced();
-        verifyTransactionsInitialized();
-        verifyTransactionInFlight();
-        
-        // Return a new PreparedTxnState with mock values for producerId and 
epoch
-        // Using 1000L and (short)1 as arbitrary values for a valid 
PreparedTxnState
-        return new PreparedTxnState(1000L, (short) 1);
-    }
-
     @Override
     public void commitTransaction() throws ProducerFencedException {
         verifyNotClosed();
@@ -257,27 +245,6 @@ public class MockProducer<K, V> implements Producer<K, V> {
         this.transactionInFlight = false;
     }
 
-    @Override
-    public void completeTransaction(PreparedTxnState preparedTxnState) throws 
ProducerFencedException {
-        verifyNotClosed();
-        verifyNotFenced();
-        verifyTransactionsInitialized();
-        
-        if (!this.transactionInFlight) {
-            throw new IllegalStateException("There is no prepared transaction 
to complete.");
-        }
-
-        // For testing purposes, we'll consider a prepared state with 
producerId=1000L and epoch=1 as valid
-        // This should match what's returned in prepareTransaction()
-        PreparedTxnState currentState = new PreparedTxnState(1000L, (short) 1);
-        
-        if (currentState.equals(preparedTxnState)) {
-            commitTransaction();
-        } else {
-            abortTransaction();
-        }
-    }
-
     private synchronized void verifyNotClosed() {
         if (this.closed) {
             throw new IllegalStateException("MockProducer is already closed.");
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index e6e94691e34..798034dda6d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -42,14 +42,7 @@ public interface Producer<K, V> extends Closeable {
     /**
      * See {@link KafkaProducer#initTransactions()}
      */
-    default void initTransactions() {
-        initTransactions(false);
-    }
-
-    /**
-     * See {@link KafkaProducer#initTransactions(boolean)}
-     */
-    void initTransactions(boolean keepPreparedTxn);
+    void initTransactions();
 
     /**
      * See {@link KafkaProducer#beginTransaction()}
@@ -62,11 +55,6 @@ public interface Producer<K, V> extends Closeable {
     void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> 
offsets,
                                   ConsumerGroupMetadata groupMetadata) throws 
ProducerFencedException;
 
-    /**
-     * See {@link KafkaProducer#prepareTransaction()}
-     */
-    PreparedTxnState prepareTransaction() throws ProducerFencedException;
-
     /**
      * See {@link KafkaProducer#commitTransaction()}
      */
@@ -77,11 +65,6 @@ public interface Producer<K, V> extends Closeable {
      */
     void abortTransaction() throws ProducerFencedException;
 
-    /**
-     * See {@link KafkaProducer#completeTransaction(PreparedTxnState)}
-     */
-    void completeTransaction(PreparedTxnState preparedTxnState) throws 
ProducerFencedException;
-
     /**
      * @see KafkaProducer#registerMetricForSubscription(KafkaMetric) 
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 969085809e6..79c2df0a2b3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -318,9 +318,7 @@ public class TransactionManager {
                     .setTransactionalId(transactionalId)
                     .setTransactionTimeoutMs(transactionTimeoutMs)
                     .setProducerId(producerIdAndEpoch.producerId)
-                    .setProducerEpoch(producerIdAndEpoch.epoch)
-                    .setEnable2Pc(enable2PC)
-                    .setKeepPreparedTxn(keepPreparedTxn);
+                    .setProducerEpoch(producerIdAndEpoch.epoch);
 
             InitProducerIdHandler handler = new InitProducerIdHandler(new 
InitProducerIdRequest.Builder(requestData),
                     isEpochBump);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 22bccd72743..0694091a487 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -32,7 +32,6 @@ import 
org.apache.kafka.clients.producer.internals.ProducerMetadata;
 import org.apache.kafka.clients.producer.internals.RecordAccumulator;
 import org.apache.kafka.clients.producer.internals.Sender;
 import org.apache.kafka.clients.producer.internals.TransactionManager;
-import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
@@ -47,7 +46,6 @@ import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InvalidTopicException;
-import org.apache.kafka.common.errors.InvalidTxnStateException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -78,7 +76,6 @@ import 
org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
 import org.apache.kafka.common.requests.EndTxnResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
-import org.apache.kafka.common.requests.InitProducerIdRequest;
 import org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
@@ -94,7 +91,6 @@ import 
org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
 import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.ProducerIdAndEpoch;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.MockPartitioner;
@@ -108,7 +104,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
@@ -165,7 +160,6 @@ import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.notNull;
 import static org.mockito.Mockito.atMostOnce;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.never;
@@ -1392,298 +1386,6 @@ public class KafkaProducerTest {
         }
     }
 
-    @ParameterizedTest
-    @CsvSource({
-        "true, false",
-        "true, true",
-        "false, true"
-    })
-    public void 
testInitTransactionsWithKeepPreparedTxnAndTwoPhaseCommit(boolean 
keepPreparedTxn, boolean enable2PC) {
-        Map<String, Object> configs = new HashMap<>();
-        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-txn-id");
-        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
-        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
-        if (enable2PC) {
-            
configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, true);
-        }
-
-        Time time = new MockTime(1);
-        MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
-        ProducerMetadata metadata = newMetadata(0, 0, Long.MAX_VALUE);
-        MockClient client = new MockClient(time, metadata);
-        client.updateMetadata(initialUpdateResponse);
-
-        // Capture flags from the InitProducerIdRequest
-        boolean[] requestFlags = new boolean[2]; // [keepPreparedTxn, 
enable2Pc]
-
-        client.prepareResponse(
-            request -> request instanceof FindCoordinatorRequest &&
-                ((FindCoordinatorRequest) request).data().keyType() == 
FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
-            FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"test-txn-id", NODE));
-
-        client.prepareResponse(
-            request -> {
-                if (request instanceof InitProducerIdRequest) {
-                    InitProducerIdRequest initRequest = 
(InitProducerIdRequest) request;
-                    requestFlags[0] = initRequest.data().keepPreparedTxn();
-                    requestFlags[1] = initRequest.data().enable2Pc();
-                    return true;
-                }
-                return false;
-            },
-            initProducerIdResponse(1L, (short) 5, Errors.NONE));
-
-        try (Producer<String, String> producer = kafkaProducer(configs, new 
StringSerializer(),
-                new StringSerializer(), metadata, client, null, time)) {
-            producer.initTransactions(keepPreparedTxn);
-
-            // Verify request flags match expected values
-            assertEquals(keepPreparedTxn, requestFlags[0],
-                "keepPreparedTxn flag should match input parameter");
-            assertEquals(enable2PC, requestFlags[1],
-                "enable2Pc flag should match producer configuration");
-        }
-    }
-
-    @Test
-    public void testPrepareTransactionSuccess() throws Exception {
-        StringSerializer serializer = new StringSerializer();
-        KafkaProducerTestContext<String> ctx = new 
KafkaProducerTestContext<>(testInfo, serializer);
-
-        when(ctx.transactionManager.isTransactionV2Enabled()).thenReturn(true);
-        when(ctx.transactionManager.is2PCEnabled()).thenReturn(true);
-        when(ctx.sender.isRunning()).thenReturn(true);
-
-        doNothing().when(ctx.transactionManager).prepareTransaction();
-
-        long expectedProducerId = 12345L;
-        short expectedEpoch = 5;
-        ProducerIdAndEpoch expectedProducerIdAndEpoch = new 
ProducerIdAndEpoch(expectedProducerId, expectedEpoch);
-        
when(ctx.transactionManager.preparedTransactionState()).thenReturn(expectedProducerIdAndEpoch);
-
-        try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
-            PreparedTxnState returned = producer.prepareTransaction();
-            assertEquals(expectedProducerId, returned.producerId());
-            assertEquals(expectedEpoch, returned.epoch());
-
-            verify(ctx.transactionManager).prepareTransaction();
-            verify(ctx.accumulator).beginFlush();
-            verify(ctx.accumulator).awaitFlushCompletion();
-        }
-    }
-
-    @Test
-    public void testSendNotAllowedInPreparedTransactionState() throws 
Exception {
-        StringSerializer serializer = new StringSerializer();
-        KafkaProducerTestContext<String> ctx = new 
KafkaProducerTestContext<>(testInfo, serializer);
-
-        String topic = "foo";
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-
-        when(ctx.sender.isRunning()).thenReturn(true);
-        when(ctx.metadata.fetch()).thenReturn(cluster);
-
-        // Mock transaction manager to simulate being in a prepared state
-        when(ctx.transactionManager.isTransactional()).thenReturn(true);
-        when(ctx.transactionManager.isPrepared()).thenReturn(true);
-
-        // Create record to send
-        long timestamp = ctx.time.milliseconds();
-        ProducerRecord<String, String> record = new ProducerRecord<>(topic, 0, 
timestamp, "key", "value");
-
-        try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
-            // Verify that sending a record throws IllegalStateException with 
the correct message
-            IllegalStateException exception = assertThrows(
-                IllegalStateException.class,
-                () -> producer.send(record)
-            );
-
-            assertTrue(exception.getMessage().contains("Cannot perform 
operation while the transaction is in a prepared state"));
-
-            // Verify transactionManager methods were called
-            verify(ctx.transactionManager).isTransactional();
-            verify(ctx.transactionManager).isPrepared();
-
-            // Verify that no message was actually sent (accumulator was not 
called)
-            verify(ctx.accumulator, never()).append(
-                eq(topic),
-                anyInt(),
-                anyLong(),
-                any(),
-                any(),
-                any(),
-                any(),
-                anyLong(),
-                anyLong(),
-                any()
-            );
-        }
-    }
-
-    @SuppressWarnings("removal")
-    @Test
-    public void testSendOffsetsNotAllowedInPreparedTransactionState() throws 
Exception {
-        StringSerializer serializer = new StringSerializer();
-        KafkaProducerTestContext<String> ctx = new 
KafkaProducerTestContext<>(testInfo, serializer);
-
-        String topic = "foo";
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-
-        when(ctx.sender.isRunning()).thenReturn(true);
-        when(ctx.metadata.fetch()).thenReturn(cluster);
-
-        // Mock transaction manager to simulate being in a prepared state
-        when(ctx.transactionManager.isTransactional()).thenReturn(true);
-        when(ctx.transactionManager.isPrepared()).thenReturn(true);
-
-        // Create consumer group metadata
-        String groupId = "test-group";
-        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
-        offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(100L));
-        ConsumerGroupMetadata groupMetadata = new 
ConsumerGroupMetadata(groupId);
-
-        try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
-            // Verify that sending offsets throws IllegalStateException with 
the correct message
-            IllegalStateException exception = assertThrows(
-                IllegalStateException.class,
-                () -> producer.sendOffsetsToTransaction(offsets, groupMetadata)
-            );
-
-            assertTrue(exception.getMessage().contains("Cannot perform 
operation while the transaction is in a prepared state"));
-
-            // Verify transactionManager methods were called
-            verify(ctx.transactionManager).isTransactional();
-            verify(ctx.transactionManager).isPrepared();
-
-            // Verify that no offsets were actually sent
-            verify(ctx.transactionManager, never()).sendOffsetsToTransaction(
-                eq(offsets),
-                eq(groupMetadata)
-            );
-        }
-    }
-
-    @Test
-    public void testBeginTransactionNotAllowedInPreparedTransactionState() 
throws Exception {
-        StringSerializer serializer = new StringSerializer();
-        KafkaProducerTestContext<String> ctx = new 
KafkaProducerTestContext<>(testInfo, serializer);
-
-        when(ctx.sender.isRunning()).thenReturn(true);
-
-        // Mock transaction manager to simulate being in a prepared state
-        when(ctx.transactionManager.isTransactional()).thenReturn(true);
-        when(ctx.transactionManager.isPrepared()).thenReturn(true);
-
-        try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
-            // Verify that calling beginTransaction throws 
IllegalStateException with the correct message
-            IllegalStateException exception = assertThrows(
-                IllegalStateException.class,
-                producer::beginTransaction
-            );
-
-            assertTrue(exception.getMessage().contains("Cannot perform 
operation while the transaction is in a prepared state"));
-
-            // Verify transactionManager methods were called
-            verify(ctx.transactionManager).isTransactional();
-            verify(ctx.transactionManager).isPrepared();
-        }
-    }
-
-    @Test
-    public void testPrepareTransactionFailsWhen2PCDisabled() {
-        StringSerializer serializer = new StringSerializer();
-        KafkaProducerTestContext<String> ctx = new 
KafkaProducerTestContext<>(testInfo, serializer);
-
-        // Disable 2PC
-        when(ctx.transactionManager.isTransactionV2Enabled()).thenReturn(true);
-        when(ctx.transactionManager.is2PCEnabled()).thenReturn(false);
-        when(ctx.sender.isRunning()).thenReturn(true);
-
-        try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
-            assertThrows(
-                InvalidTxnStateException.class,
-                producer::prepareTransaction,
-                "prepareTransaction() should fail if 2PC is disabled"
-            );
-        }
-    }
-
-    @Test
-    public void testCompleteTransactionWithMatchingState() throws Exception {
-        StringSerializer serializer = new StringSerializer();
-        KafkaProducerTestContext<String> ctx = new 
KafkaProducerTestContext<>(testInfo, serializer);
-
-        when(ctx.transactionManager.isPrepared()).thenReturn(true);
-        when(ctx.sender.isRunning()).thenReturn(true);
-
-        // Create prepared states with matching values
-        long producerId = 12345L;
-        short epoch = 5;
-        PreparedTxnState inputState = new PreparedTxnState(producerId, epoch);
-        ProducerIdAndEpoch currentProducerIdAndEpoch = new 
ProducerIdAndEpoch(producerId, epoch);
-
-        // Set up the transaction manager to return the prepared state
-        
when(ctx.transactionManager.preparedTransactionState()).thenReturn(currentProducerIdAndEpoch);
-
-        // Should trigger commit when states match
-        TransactionalRequestResult commitResult = 
mock(TransactionalRequestResult.class);
-        when(ctx.transactionManager.beginCommit()).thenReturn(commitResult);
-
-        try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
-            // Call completeTransaction with the matching state
-            producer.completeTransaction(inputState);
-
-            // Verify methods called in order
-            verify(ctx.transactionManager).isPrepared();
-            verify(ctx.transactionManager).preparedTransactionState();
-            verify(ctx.transactionManager).beginCommit();
-
-            // Verify abort was never called
-            verify(ctx.transactionManager, never()).beginAbort();
-
-            // Verify sender was woken up
-            verify(ctx.sender).wakeup();
-        }
-    }
-
-    @Test
-    public void testCompleteTransactionWithNonMatchingState() throws Exception 
{
-        StringSerializer serializer = new StringSerializer();
-        KafkaProducerTestContext<String> ctx = new 
KafkaProducerTestContext<>(testInfo, serializer);
-
-        when(ctx.transactionManager.isPrepared()).thenReturn(true);
-        when(ctx.sender.isRunning()).thenReturn(true);
-
-        // Create txn prepared states with different values
-        long producerId = 12345L;
-        short epoch = 5;
-        PreparedTxnState inputState = new PreparedTxnState(producerId + 1, 
epoch);
-        ProducerIdAndEpoch currentProducerIdAndEpoch = new 
ProducerIdAndEpoch(producerId, epoch);
-
-        // Set up the transaction manager to return the prepared state
-        
when(ctx.transactionManager.preparedTransactionState()).thenReturn(currentProducerIdAndEpoch);
-
-        // Should trigger abort when states don't match
-        TransactionalRequestResult abortResult = 
mock(TransactionalRequestResult.class);
-        when(ctx.transactionManager.beginAbort()).thenReturn(abortResult);
-
-        try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
-            // Call completeTransaction with the non-matching state
-            producer.completeTransaction(inputState);
-
-            // Verify methods called in order
-            verify(ctx.transactionManager).isPrepared();
-            verify(ctx.transactionManager).preparedTransactionState();
-            verify(ctx.transactionManager).beginAbort();
-
-            // Verify commit was never called
-            verify(ctx.transactionManager, never()).beginCommit();
-
-            // Verify sender was woken up
-            verify(ctx.sender).wakeup();
-        }
-    }
-
     @Test
     public void testClusterAuthorizationFailure() throws Exception {
         int maxBlockMs = 500;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 15539e40bef..130c395d2c7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -4025,137 +4025,6 @@ public class TransactionManagerTest {
         assertFalse(transactionManager.hasOngoingTransaction());
     }
 
-    @Test
-    public void testInitializeTransactionsWithKeepPreparedTxn() {
-        doInitTransactionsWith2PCEnabled(true);
-        runUntil(transactionManager::hasProducerId);
-
-        // Expect a bumped epoch in the response.
-        assertTrue(transactionManager.hasProducerId());
-        assertFalse(transactionManager.hasOngoingTransaction());
-        assertEquals(ongoingProducerId, 
transactionManager.producerIdAndEpoch().producerId);
-        assertEquals(bumpedOngoingEpoch, 
transactionManager.producerIdAndEpoch().epoch);
-    }
-
-    @Test
-    public void testPrepareTransaction() {
-        doInitTransactionsWith2PCEnabled(false);
-        runUntil(transactionManager::hasProducerId);
-
-        // Begin a transaction
-        transactionManager.beginTransaction();
-        assertTrue(transactionManager.hasOngoingTransaction());
-
-        // Add a partition to the transaction
-        transactionManager.maybeAddPartition(tp0);
-
-        // Capture the current producer ID and epoch before preparing the 
response
-        long producerId = transactionManager.producerIdAndEpoch().producerId;
-        short epoch = transactionManager.producerIdAndEpoch().epoch;
-
-        // Simulate a produce request
-        try {
-            // Prepare the response before sending to ensure it's ready
-            prepareProduceResponse(Errors.NONE, producerId, epoch);
-
-            appendToAccumulator(tp0);
-            // Wait until the request is processed
-            runUntil(() -> !client.hasPendingResponses());
-        } catch (InterruptedException e) {
-            fail("Unexpected interruption: " + e);
-        }
-
-        transactionManager.prepareTransaction();
-        assertTrue(transactionManager.isPrepared());
-
-        ProducerIdAndEpoch preparedState = 
transactionManager.preparedTransactionState();
-        // Validate the state contains the correct producer ID and epoch
-        assertEquals(producerId, preparedState.producerId);
-        assertEquals(epoch, preparedState.epoch);
-    }
-
-    @Test
-    public void testInitPidResponseWithKeepPreparedTrueAndOngoingTransaction() 
{
-        // Initialize transaction manager with 2PC enabled
-        initializeTransactionManager(Optional.of(transactionalId), true, true);
-        
-        // Start initializeTransactions with keepPreparedTxn=true
-        TransactionalRequestResult result = 
transactionManager.initializeTransactions(true);
-        
-        // Prepare coordinator response
-        prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
-        runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
-        
-        // Simulate InitProducerId response with ongoing transaction
-        long ongoingPid = 12345L;
-        short ongoingEpoch = 5;
-        prepareInitPidResponse(
-            Errors.NONE,
-            false,
-            producerId,
-            epoch,
-            true,
-            true,
-            ongoingPid,
-            ongoingEpoch
-        );
-        
-        runUntil(transactionManager::hasProducerId);
-        transactionManager.maybeUpdateTransactionV2Enabled(true);
-        
-        result.await();
-        assertTrue(result.isSuccessful());
-        
-        // Verify transaction manager transitioned to PREPARED_TRANSACTION 
state
-        assertTrue(transactionManager.isPrepared());
-        
-        // Verify preparedTxnState was set with ongoing producer ID and epoch
-        ProducerIdAndEpoch preparedState = 
transactionManager.preparedTransactionState();
-        assertNotNull(preparedState);
-        assertEquals(ongoingPid, preparedState.producerId);
-        assertEquals(ongoingEpoch, preparedState.epoch);
-    }
-
-    @Test
-    public void 
testInitPidResponseWithKeepPreparedTrueAndNoOngoingTransaction() {
-        // Initialize transaction manager without 2PC enabled
-        // keepPrepared can be true even when enable2Pc is false, and we 
expect the same behavior
-        initializeTransactionManager(Optional.of(transactionalId), true, 
false);
-        
-        // Start initializeTransactions with keepPreparedTxn=true
-        TransactionalRequestResult result = 
transactionManager.initializeTransactions(true);
-        
-        // Prepare coordinator response
-        prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
-        runUntil(() -> 
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
-        
-        // Simulate InitProducerId response without ongoing transaction
-        prepareInitPidResponse(
-            Errors.NONE,
-            false,
-            producerId,
-            epoch,
-            true,
-            false,
-            RecordBatch.NO_PRODUCER_ID,
-            RecordBatch.NO_PRODUCER_EPOCH
-        );
-        
-        runUntil(transactionManager::hasProducerId);
-        transactionManager.maybeUpdateTransactionV2Enabled(true);
-        
-        result.await();
-        assertTrue(result.isSuccessful());
-        
-        // Verify transaction manager transitioned to READY state (not 
PREPARED_TRANSACTION)
-        assertFalse(transactionManager.isPrepared());
-        assertTrue(transactionManager.isReady());
-        
-        // Verify preparedTxnState was not set or is empty
-        ProducerIdAndEpoch preparedState = 
transactionManager.preparedTransactionState();
-        assertEquals(ProducerIdAndEpoch.NONE, preparedState);
-    }
-
     private void prepareAddPartitionsToTxn(final Map<TopicPartition, Errors> 
errors) {
         AddPartitionsToTxnResult result = 
AddPartitionsToTxnResponse.resultForTransaction(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID,
 errors);
         AddPartitionsToTxnResponseData data = new 
AddPartitionsToTxnResponseData().setResultsByTopicV3AndBelow(result.topicResults()).setThrottleTimeMs(0);


Reply via email to