This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new c4cac078196 KAFKA-19414: Remove 2PC public APIs from 4.1 until release 
(KIP-939) (#19985)
c4cac078196 is described below

commit c4cac078196abd12665a54d6800fe2ed4ba5e85e
Author: Ritika Reddy <[email protected]>
AuthorDate: Wed Jun 25 09:06:21 2025 -0700

    KAFKA-19414: Remove 2PC public APIs from 4.1 until release (KIP-939) 
(#19985)
    
    We are removing some of the previously added public APIs until KIP-939
    is ready to use.
    
    Reviewers: Justine Olshan <[email protected]>
---
 .../kafka/clients/producer/KafkaProducer.java      | 117 +-------
 .../kafka/clients/producer/MockProducer.java       |  35 +--
 .../apache/kafka/clients/producer/Producer.java    |  19 +-
 .../producer/internals/TransactionManager.java     |   4 +-
 .../kafka/clients/producer/KafkaProducerTest.java  | 294 ---------------------
 .../producer/internals/TransactionManagerTest.java |  51 ----
 6 files changed, 13 insertions(+), 507 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 71d201b71f9..1b1d7bb7e83 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -50,13 +50,11 @@ import 
org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InvalidTopicException;
-import org.apache.kafka.common.errors.InvalidTxnStateException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
@@ -622,13 +620,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
     }
 
     /**
-     * Initialize the transactional state for this producer, similar to {@link 
#initTransactions()} but
-     * with additional capabilities to keep a previously prepared transaction.
-     *
      * Needs to be called before any other methods when the {@code 
transactional.id} is set in the configuration.
-     *
-     * When {@code keepPreparedTxn} is {@code false}, this behaves like the 
standard transactional
-     * initialization where the method does the following:
+     * This method does the following:
      * <ol>
      * <li>Ensures any transactions initiated by previous instances of the 
producer with the same
      *      {@code transactional.id} are completed. If the previous instance 
had failed with a transaction in
@@ -637,39 +630,26 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
      * <li>Gets the internal producer id and epoch, used in all future 
transactional
      *      messages issued by the producer.</li>
      * </ol>
-     *
-     * <p>
-     * When {@code keepPreparedTxn} is set to {@code true}, the producer does 
<em>not</em> automatically abort existing
-     * transactions. Instead, it enters a recovery mode allowing only 
finalization of those previously
-     * prepared transactions.
-     * This behavior is especially crucial for 2PC scenarios, where 
transactions should remain intact
-     * until the external transaction manager decides whether to commit or 
abort.
-     * <p>
-     *
-     * @param keepPreparedTxn true to retain any in-flight prepared 
transactions (necessary for 2PC
-     *                        recovery), false to abort existing transactions 
and behave like
-     *                        the standard initTransactions.
-     *
      * Note that this method will raise {@link TimeoutException} if the 
transactional state cannot
      * be initialized before expiration of {@code max.block.ms}. Additionally, 
it will raise {@link InterruptException}
      * if interrupted. It is safe to retry in either case, but once the 
transactional state has been successfully
      * initialized, this method should no longer be used.
      *
-     * @throws IllegalStateException if no {@code transactional.id} is 
configured
-     * @throws org.apache.kafka.common.errors.UnsupportedVersionException if 
the broker does not
-     *         support transactions (i.e. if its version is lower than 
0.11.0.0)
-     * @throws 
org.apache.kafka.common.errors.TransactionalIdAuthorizationException if the 
configured
-     *         {@code transactional.id} is unauthorized either for normal 
transaction writes or 2PC.
-     * @throws KafkaException if the producer encounters a fatal error or any 
other unexpected error
+     * @throws IllegalStateException if no {@code transactional.id} has been 
configured
+     * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
+     *         does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
+     * @throws org.apache.kafka.common.errors.AuthorizationException error 
indicating that the configured
+     *         transactional.id is not authorized, or the idempotent producer 
id is unavailable. See the exception for
+     *         more details.  User may retry this function call after fixing 
the permission.
+     * @throws KafkaException if the producer has encountered a previous fatal 
error or for any other unexpected error
      * @throws TimeoutException if the time taken for initialize the 
transaction has surpassed <code>max.block.ms</code>.
      * @throws InterruptException if the thread is interrupted while blocked
      */
-    public void initTransactions(boolean keepPreparedTxn) {
+    public void initTransactions() {
         throwIfNoTransactionManager();
         throwIfProducerClosed();
-        throwIfInPreparedState();
         long now = time.nanoseconds();
-        TransactionalRequestResult result = 
transactionManager.initializeTransactions(keepPreparedTxn);
+        TransactionalRequestResult result = 
transactionManager.initializeTransactions(false);
         sender.wakeup();
         result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
         producerMetrics.recordInit(time.nanoseconds() - now);
@@ -754,7 +734,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         throwIfInvalidGroupMetadata(groupMetadata);
         throwIfNoTransactionManager();
         throwIfProducerClosed();
-        throwIfInPreparedState();
 
         if (!offsets.isEmpty()) {
             long start = time.nanoseconds();
@@ -765,48 +744,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
         }
     }
 
-    /**
-     * Prepares the current transaction for a two-phase commit. This method 
will flush all pending messages
-     * and transition the producer into a mode where only {@link 
#commitTransaction()}, {@link #abortTransaction()},
-     * or completeTransaction(PreparedTxnState) may be called.
-     * <p>
-     * This method is used as part of a two-phase commit protocol:
-     * <ol>
-     *   <li>Prepare the transaction by calling this method. This returns a 
{@link PreparedTxnState} if successful.</li>
-     *   <li>Make any external system changes that need to be atomic with this 
transaction.</li>
-     *   <li>Complete the transaction by calling {@link #commitTransaction()}, 
{@link #abortTransaction()} or
-     *       completeTransaction(PreparedTxnState).</li>
-     * </ol>
-     *
-     * @return the prepared transaction state to use when completing the 
transaction
-     *
-     * @throws IllegalStateException if no transactional.id has been 
configured or no transaction has been started yet.
-     * @throws InvalidTxnStateException if the producer is not in a state 
where preparing
-     *         a transaction is possible or 2PC is not enabled.
-     * @throws ProducerFencedException fatal error indicating another producer 
with the same transactional.id is active
-     * @throws UnsupportedVersionException fatal error indicating the broker
-     *         does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
-     * @throws AuthorizationException fatal error indicating that the 
configured
-     *         transactional.id is not authorized. See the exception for more 
details
-     * @throws KafkaException if the producer has encountered a previous fatal 
error or for any other unexpected error
-     * @throws TimeoutException if the time taken for preparing the 
transaction has surpassed <code>max.block.ms</code>
-     * @throws InterruptException if the thread is interrupted while blocked
-     */
-    @Override
-    public PreparedTxnState prepareTransaction() throws 
ProducerFencedException {
-        throwIfNoTransactionManager();
-        throwIfProducerClosed();
-        throwIfInPreparedState();
-        if (!transactionManager.is2PCEnabled()) {
-            throw new InvalidTxnStateException("Cannot prepare a transaction 
when 2PC is not enabled");
-        }
-        long now = time.nanoseconds();
-        flush();
-        transactionManager.prepareTransaction();
-        producerMetrics.recordPrepareTxn(time.nanoseconds() - now);
-        return transactionManager.preparedTransactionState();
-    }
-
     /**
      * Commits the ongoing transaction. This method will flush any unsent 
records before actually committing the transaction.
      * <p>
@@ -884,40 +821,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
         producerMetrics.recordAbortTxn(time.nanoseconds() - abortStart);
     }
 
-    /**
-     * Completes a prepared transaction by comparing the provided prepared 
transaction state with the
-     * current prepared state on the producer.
-     * If they match, the transaction is committed; otherwise, it is aborted.
-     * 
-     * @param preparedTxnState              The prepared transaction state to 
compare against the current state
-     * @throws IllegalStateException if no transactional.id has been 
configured or no transaction has been started
-     * @throws InvalidTxnStateException if the producer is not in prepared 
state
-     * @throws ProducerFencedException fatal error indicating another producer 
with the same transactional.id is active
-     * @throws KafkaException if the producer has encountered a previous fatal 
error or for any other unexpected error
-     * @throws TimeoutException if the time taken for completing the 
transaction has surpassed <code>max.block.ms</code>
-     * @throws InterruptException if the thread is interrupted while blocked
-     */
-    @Override
-    public void completeTransaction(PreparedTxnState preparedTxnState) throws 
ProducerFencedException {
-        throwIfNoTransactionManager();
-        throwIfProducerClosed();
-        
-        if (!transactionManager.isPrepared()) {
-            throw new InvalidTxnStateException("Cannot complete transaction 
because no transaction has been prepared. " +
-                "Call prepareTransaction() first, or make sure 
initTransaction(true) was called.");
-        }
-        
-        // Get the current prepared transaction state
-        PreparedTxnState currentPreparedState = 
transactionManager.preparedTransactionState();
-        
-        // Compare the prepared transaction state token and commit or abort 
accordingly
-        if (currentPreparedState.equals(preparedTxnState)) {
-            commitTransaction();
-        } else {
-            abortTransaction();
-        }
-    }
-
     /**
      * Asynchronously send a record to a topic. Equivalent to 
<code>send(record, null)</code>.
      * See {@link #send(ProducerRecord, Callback)} for details.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 3e5cb9f5d5a..a4aac86df09 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -142,7 +142,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
     }
 
     @Override
-    public void initTransactions(boolean keepPreparedTxn) {
+    public void initTransactions() {
         verifyNotClosed();
         verifyNotFenced();
         if (this.transactionInitialized) {
@@ -200,18 +200,6 @@ public class MockProducer<K, V> implements Producer<K, V> {
         this.sentOffsets = true;
     }
 
-    @Override
-    public PreparedTxnState prepareTransaction() throws 
ProducerFencedException {
-        verifyNotClosed();
-        verifyNotFenced();
-        verifyTransactionsInitialized();
-        verifyTransactionInFlight();
-        
-        // Return a new PreparedTxnState with mock values for producerId and 
epoch
-        // Using 1000L and (short)1 as arbitrary values for a valid 
PreparedTxnState
-        return new PreparedTxnState(1000L, (short) 1);
-    }
-
     @Override
     public void commitTransaction() throws ProducerFencedException {
         verifyNotClosed();
@@ -257,27 +245,6 @@ public class MockProducer<K, V> implements Producer<K, V> {
         this.transactionInFlight = false;
     }
 
-    @Override
-    public void completeTransaction(PreparedTxnState preparedTxnState) throws 
ProducerFencedException {
-        verifyNotClosed();
-        verifyNotFenced();
-        verifyTransactionsInitialized();
-        
-        if (!this.transactionInFlight) {
-            throw new IllegalStateException("There is no prepared transaction 
to complete.");
-        }
-
-        // For testing purposes, we'll consider a prepared state with 
producerId=1000L and epoch=1 as valid
-        // This should match what's returned in prepareTransaction()
-        PreparedTxnState currentState = new PreparedTxnState(1000L, (short) 1);
-        
-        if (currentState.equals(preparedTxnState)) {
-            commitTransaction();
-        } else {
-            abortTransaction();
-        }
-    }
-
     private synchronized void verifyNotClosed() {
         if (this.closed) {
             throw new IllegalStateException("MockProducer is already closed.");
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index e6e94691e34..798034dda6d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -42,14 +42,7 @@ public interface Producer<K, V> extends Closeable {
     /**
      * See {@link KafkaProducer#initTransactions()}
      */
-    default void initTransactions() {
-        initTransactions(false);
-    }
-
-    /**
-     * See {@link KafkaProducer#initTransactions(boolean)}
-     */
-    void initTransactions(boolean keepPreparedTxn);
+    void initTransactions();
 
     /**
      * See {@link KafkaProducer#beginTransaction()}
@@ -62,11 +55,6 @@ public interface Producer<K, V> extends Closeable {
     void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> 
offsets,
                                   ConsumerGroupMetadata groupMetadata) throws 
ProducerFencedException;
 
-    /**
-     * See {@link KafkaProducer#prepareTransaction()}
-     */
-    PreparedTxnState prepareTransaction() throws ProducerFencedException;
-
     /**
      * See {@link KafkaProducer#commitTransaction()}
      */
@@ -77,11 +65,6 @@ public interface Producer<K, V> extends Closeable {
      */
     void abortTransaction() throws ProducerFencedException;
 
-    /**
-     * See {@link KafkaProducer#completeTransaction(PreparedTxnState)}
-     */
-    void completeTransaction(PreparedTxnState preparedTxnState) throws 
ProducerFencedException;
-
     /**
      * @see KafkaProducer#registerMetricForSubscription(KafkaMetric) 
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 5d83cbc0b1b..20804c505dd 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -320,9 +320,7 @@ public class TransactionManager {
                     .setTransactionalId(transactionalId)
                     .setTransactionTimeoutMs(transactionTimeoutMs)
                     .setProducerId(producerIdAndEpoch.producerId)
-                    .setProducerEpoch(producerIdAndEpoch.epoch)
-                    .setEnable2Pc(enable2PC)
-                    .setKeepPreparedTxn(keepPreparedTxn);
+                    .setProducerEpoch(producerIdAndEpoch.epoch);
 
             InitProducerIdHandler handler = new InitProducerIdHandler(new 
InitProducerIdRequest.Builder(requestData),
                     isEpochBump);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 8460d0f4c5f..ac63fed3c04 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -32,7 +32,6 @@ import 
org.apache.kafka.clients.producer.internals.ProducerMetadata;
 import org.apache.kafka.clients.producer.internals.RecordAccumulator;
 import org.apache.kafka.clients.producer.internals.Sender;
 import org.apache.kafka.clients.producer.internals.TransactionManager;
-import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
@@ -47,7 +46,6 @@ import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.InvalidTopicException;
-import org.apache.kafka.common.errors.InvalidTxnStateException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -78,7 +76,6 @@ import 
org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
 import org.apache.kafka.common.requests.EndTxnResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
-import org.apache.kafka.common.requests.InitProducerIdRequest;
 import org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
@@ -107,7 +104,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
@@ -154,7 +150,6 @@ import static 
org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -165,7 +160,6 @@ import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.notNull;
 import static org.mockito.Mockito.atMostOnce;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.never;
@@ -1389,294 +1383,6 @@ public class KafkaProducerTest {
         }
     }
 
-    @ParameterizedTest
-    @CsvSource({
-        "true, false",
-        "true, true",
-        "false, true"
-    })
-    public void 
testInitTransactionsWithKeepPreparedTxnAndTwoPhaseCommit(boolean 
keepPreparedTxn, boolean enable2PC) {
-        Map<String, Object> configs = new HashMap<>();
-        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-txn-id");
-        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
-        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
-        if (enable2PC) {
-            
configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, true);
-        }
-
-        Time time = new MockTime(1);
-        MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
-        ProducerMetadata metadata = newMetadata(0, 0, Long.MAX_VALUE);
-        MockClient client = new MockClient(time, metadata);
-        client.updateMetadata(initialUpdateResponse);
-
-        // Capture flags from the InitProducerIdRequest
-        boolean[] requestFlags = new boolean[2]; // [keepPreparedTxn, 
enable2Pc]
-        
-        client.prepareResponse(
-            request -> request instanceof FindCoordinatorRequest &&
-                ((FindCoordinatorRequest) request).data().keyType() == 
FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
-            FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"test-txn-id", NODE));
-            
-        client.prepareResponse(
-            request -> {
-                if (request instanceof InitProducerIdRequest) {
-                    InitProducerIdRequest initRequest = 
(InitProducerIdRequest) request;
-                    requestFlags[0] = initRequest.data().keepPreparedTxn();
-                    requestFlags[1] = initRequest.data().enable2Pc();
-                    return true;
-                }
-                return false;
-            },
-            initProducerIdResponse(1L, (short) 5, Errors.NONE));
-            
-        try (Producer<String, String> producer = kafkaProducer(configs, new 
StringSerializer(),
-                new StringSerializer(), metadata, client, null, time)) {
-            producer.initTransactions(keepPreparedTxn);
-            
-            // Verify request flags match expected values
-            assertEquals(keepPreparedTxn, requestFlags[0], 
-                "keepPreparedTxn flag should match input parameter");
-            assertEquals(enable2PC, requestFlags[1], 
-                "enable2Pc flag should match producer configuration");
-        }
-    }
-
-    @Test
-    public void testPrepareTransactionSuccess() throws Exception {
-        StringSerializer serializer = new StringSerializer();
-        KafkaProducerTestContext<String> ctx = new 
KafkaProducerTestContext<>(testInfo, serializer);
-
-        when(ctx.transactionManager.isTransactionV2Enabled()).thenReturn(true);
-        when(ctx.transactionManager.is2PCEnabled()).thenReturn(true);
-        when(ctx.sender.isRunning()).thenReturn(true);
-
-        doNothing().when(ctx.transactionManager).prepareTransaction();
-
-        PreparedTxnState expectedState = mock(PreparedTxnState.class);
-        
when(ctx.transactionManager.preparedTransactionState()).thenReturn(expectedState);
-
-        try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
-            PreparedTxnState returned = producer.prepareTransaction();
-            assertSame(expectedState, returned);
-
-            verify(ctx.transactionManager).prepareTransaction();
-            verify(ctx.accumulator).beginFlush();
-            verify(ctx.accumulator).awaitFlushCompletion();
-        }
-    }
-
-    @Test
-    public void testSendNotAllowedInPreparedTransactionState() throws 
Exception {
-        StringSerializer serializer = new StringSerializer();
-        KafkaProducerTestContext<String> ctx = new 
KafkaProducerTestContext<>(testInfo, serializer);
-
-        String topic = "foo";
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-
-        when(ctx.sender.isRunning()).thenReturn(true);
-        when(ctx.metadata.fetch()).thenReturn(cluster);
-
-        // Mock transaction manager to simulate being in a prepared state
-        when(ctx.transactionManager.isTransactional()).thenReturn(true);
-        when(ctx.transactionManager.isPrepared()).thenReturn(true);
-
-        // Create record to send
-        long timestamp = ctx.time.milliseconds();
-        ProducerRecord<String, String> record = new ProducerRecord<>(topic, 0, 
timestamp, "key", "value");
-
-        try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
-            // Verify that sending a record throws IllegalStateException with 
the correct message
-            IllegalStateException exception = assertThrows(
-                IllegalStateException.class,
-                () -> producer.send(record)
-            );
-
-            assertTrue(exception.getMessage().contains("Cannot perform 
operation while the transaction is in a prepared state"));
-
-            // Verify transactionManager methods were called
-            verify(ctx.transactionManager).isTransactional();
-            verify(ctx.transactionManager).isPrepared();
-
-            // Verify that no message was actually sent (accumulator was not 
called)
-            verify(ctx.accumulator, never()).append(
-                eq(topic),
-                anyInt(),
-                anyLong(),
-                any(),
-                any(),
-                any(),
-                any(),
-                anyLong(),
-                anyLong(),
-                any()
-            );
-        }
-    }
-
-    @Test
-    public void testSendOffsetsNotAllowedInPreparedTransactionState() throws 
Exception {
-        StringSerializer serializer = new StringSerializer();
-        KafkaProducerTestContext<String> ctx = new 
KafkaProducerTestContext<>(testInfo, serializer);
-
-        String topic = "foo";
-        Cluster cluster = TestUtils.singletonCluster(topic, 1);
-
-        when(ctx.sender.isRunning()).thenReturn(true);
-        when(ctx.metadata.fetch()).thenReturn(cluster);
-
-        // Mock transaction manager to simulate being in a prepared state
-        when(ctx.transactionManager.isTransactional()).thenReturn(true);
-        when(ctx.transactionManager.isPrepared()).thenReturn(true);
-
-        // Create consumer group metadata
-        String groupId = "test-group";
-        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
-        offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(100L));
-        ConsumerGroupMetadata groupMetadata = new 
ConsumerGroupMetadata(groupId);
-
-        try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
-            // Verify that sending offsets throws IllegalStateException with 
the correct message
-            IllegalStateException exception = assertThrows(
-                IllegalStateException.class,
-                () -> producer.sendOffsetsToTransaction(offsets, groupMetadata)
-            );
-
-            assertTrue(exception.getMessage().contains("Cannot perform 
operation while the transaction is in a prepared state"));
-
-            // Verify transactionManager methods were called
-            verify(ctx.transactionManager).isTransactional();
-            verify(ctx.transactionManager).isPrepared();
-
-            // Verify that no offsets were actually sent
-            verify(ctx.transactionManager, never()).sendOffsetsToTransaction(
-                eq(offsets),
-                eq(groupMetadata)
-            );
-        }
-    }
-
-    @Test
-    public void testBeginTransactionNotAllowedInPreparedTransactionState() 
throws Exception {
-        StringSerializer serializer = new StringSerializer();
-        KafkaProducerTestContext<String> ctx = new 
KafkaProducerTestContext<>(testInfo, serializer);
-
-        when(ctx.sender.isRunning()).thenReturn(true);
-
-        // Mock transaction manager to simulate being in a prepared state
-        when(ctx.transactionManager.isTransactional()).thenReturn(true);
-        when(ctx.transactionManager.isPrepared()).thenReturn(true);
-
-        try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
-            // Verify that calling beginTransaction throws 
IllegalStateException with the correct message
-            IllegalStateException exception = assertThrows(
-                IllegalStateException.class,
-                producer::beginTransaction
-            );
-
-            assertTrue(exception.getMessage().contains("Cannot perform 
operation while the transaction is in a prepared state"));
-
-            // Verify transactionManager methods were called
-            verify(ctx.transactionManager).isTransactional();
-            verify(ctx.transactionManager).isPrepared();
-        }
-    }
-
-    @Test
-    public void testPrepareTransactionFailsWhen2PCDisabled() {
-        StringSerializer serializer = new StringSerializer();
-        KafkaProducerTestContext<String> ctx = new 
KafkaProducerTestContext<>(testInfo, serializer);
-
-        // Disable 2PC
-        when(ctx.transactionManager.isTransactionV2Enabled()).thenReturn(true);
-        when(ctx.transactionManager.is2PCEnabled()).thenReturn(false);
-        when(ctx.sender.isRunning()).thenReturn(true);
-
-        try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
-            assertThrows(
-                InvalidTxnStateException.class,
-                producer::prepareTransaction,
-                "prepareTransaction() should fail if 2PC is disabled"
-            );
-        }
-    }
-    
-    @Test
-    public void testCompleteTransactionWithMatchingState() throws Exception {
-        StringSerializer serializer = new StringSerializer();
-        KafkaProducerTestContext<String> ctx = new 
KafkaProducerTestContext<>(testInfo, serializer);
-
-        when(ctx.transactionManager.isPrepared()).thenReturn(true);
-        when(ctx.sender.isRunning()).thenReturn(true);
-        
-        // Create prepared states with matching values
-        long producerId = 12345L;
-        short epoch = 5;
-        PreparedTxnState currentState = new PreparedTxnState(producerId, 
epoch);
-        PreparedTxnState inputState = new PreparedTxnState(producerId, epoch);
-        
-        // Set up the transaction manager to return the prepared state
-        
when(ctx.transactionManager.preparedTransactionState()).thenReturn(currentState);
-        
-        // Should trigger commit when states match
-        TransactionalRequestResult commitResult = 
mock(TransactionalRequestResult.class);
-        when(ctx.transactionManager.beginCommit()).thenReturn(commitResult);
-        
-        try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
-            // Call completeTransaction with the matching state
-            producer.completeTransaction(inputState);
-            
-            // Verify methods called in order
-            verify(ctx.transactionManager).isPrepared();
-            verify(ctx.transactionManager).preparedTransactionState();
-            verify(ctx.transactionManager).beginCommit();
-            
-            // Verify abort was never called
-            verify(ctx.transactionManager, never()).beginAbort();
-            
-            // Verify sender was woken up
-            verify(ctx.sender).wakeup();
-        }
-    }
-    
-    @Test
-    public void testCompleteTransactionWithNonMatchingState() throws Exception 
{
-        StringSerializer serializer = new StringSerializer();
-        KafkaProducerTestContext<String> ctx = new 
KafkaProducerTestContext<>(testInfo, serializer);
-
-        when(ctx.transactionManager.isPrepared()).thenReturn(true);
-        when(ctx.sender.isRunning()).thenReturn(true);
-        
-        // Create txn prepared states with different values
-        long producerId = 12345L;
-        short epoch = 5;
-        PreparedTxnState currentState = new PreparedTxnState(producerId, 
epoch);
-        PreparedTxnState inputState = new PreparedTxnState(producerId + 1, 
epoch);
-        
-        // Set up the transaction manager to return the prepared state
-        
when(ctx.transactionManager.preparedTransactionState()).thenReturn(currentState);
-        
-        // Should trigger abort when states don't match
-        TransactionalRequestResult abortResult = 
mock(TransactionalRequestResult.class);
-        when(ctx.transactionManager.beginAbort()).thenReturn(abortResult);
-        
-        try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
-            // Call completeTransaction with the non-matching state
-            producer.completeTransaction(inputState);
-            
-            // Verify methods called in order
-            verify(ctx.transactionManager).isPrepared();
-            verify(ctx.transactionManager).preparedTransactionState();
-            verify(ctx.transactionManager).beginAbort();
-            
-            // Verify commit was never called
-            verify(ctx.transactionManager, never()).beginCommit();
-            
-            // Verify sender was woken up
-            verify(ctx.sender).wakeup();
-        }
-    }
-    
     @Test
     public void testClusterAuthorizationFailure() throws Exception {
         int maxBlockMs = 500;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 4668a91ed04..278e6c3e381 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.PreparedTxnState;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
@@ -4025,56 +4024,6 @@ public class TransactionManagerTest {
         assertFalse(transactionManager.hasOngoingTransaction());
     }
 
-    @Test
-    public void testInitializeTransactionsWithKeepPreparedTxn() {
-        doInitTransactionsWith2PCEnabled(true);
-        runUntil(transactionManager::hasProducerId);
-
-        // Expect a bumped epoch in the response.
-        assertTrue(transactionManager.hasProducerId());
-        assertFalse(transactionManager.hasOngoingTransaction());
-        assertEquals(ongoingProducerId, 
transactionManager.producerIdAndEpoch().producerId);
-        assertEquals(bumpedOngoingEpoch, 
transactionManager.producerIdAndEpoch().epoch);
-    }
-
-    @Test
-    public void testPrepareTransaction() {
-        doInitTransactionsWith2PCEnabled(false);
-        runUntil(transactionManager::hasProducerId);
-
-        // Begin a transaction
-        transactionManager.beginTransaction();
-        assertTrue(transactionManager.hasOngoingTransaction());
-
-        // Add a partition to the transaction
-        transactionManager.maybeAddPartition(tp0);
-
-        // Capture the current producer ID and epoch before preparing the 
response
-        long producerId = transactionManager.producerIdAndEpoch().producerId;
-        short epoch = transactionManager.producerIdAndEpoch().epoch;
-
-        // Simulate a produce request
-        try {
-            // Prepare the response before sending to ensure it's ready
-            prepareProduceResponse(Errors.NONE, producerId, epoch);
-
-            appendToAccumulator(tp0);
-            // Wait until the request is processed
-            runUntil(() -> !client.hasPendingResponses());
-        } catch (InterruptedException e) {
-            fail("Unexpected interruption: " + e);
-        }
-
-        transactionManager.prepareTransaction();
-        assertTrue(transactionManager.isPrepared());
-
-        PreparedTxnState preparedState = 
transactionManager.preparedTransactionState();
-        // Validate the state contains the correct serialized producer ID and 
epoch
-        assertEquals(producerId + ":" + epoch, preparedState.toString());
-        assertEquals(producerId, preparedState.producerId());
-        assertEquals(epoch, preparedState.epoch());
-    }
-
     private void prepareAddPartitionsToTxn(final Map<TopicPartition, Errors> 
errors) {
         AddPartitionsToTxnResult result = 
AddPartitionsToTxnResponse.resultForTransaction(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID,
 errors);
         AddPartitionsToTxnResponseData data = new 
AddPartitionsToTxnResponseData().setResultsByTopicV3AndBelow(result.topicResults()).setThrottleTimeMs(0);


Reply via email to