This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new c4cac078196 KAFKA-19414: Remove 2PC public APIs from 4.1 until release
(KIP-939) (#19985)
c4cac078196 is described below
commit c4cac078196abd12665a54d6800fe2ed4ba5e85e
Author: Ritika Reddy <[email protected]>
AuthorDate: Wed Jun 25 09:06:21 2025 -0700
KAFKA-19414: Remove 2PC public APIs from 4.1 until release (KIP-939)
(#19985)
We are removing some of the previously added public APIs until KIP-939
is ready to use.
Reviewers: Justine Olshan <[email protected]>
---
.../kafka/clients/producer/KafkaProducer.java | 117 +-------
.../kafka/clients/producer/MockProducer.java | 35 +--
.../apache/kafka/clients/producer/Producer.java | 19 +-
.../producer/internals/TransactionManager.java | 4 +-
.../kafka/clients/producer/KafkaProducerTest.java | 294 ---------------------
.../producer/internals/TransactionManagerTest.java | 51 ----
6 files changed, 13 insertions(+), 507 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 71d201b71f9..1b1d7bb7e83 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -50,13 +50,11 @@ import
org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidTopicException;
-import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
@@ -622,13 +620,8 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
}
/**
- * Initialize the transactional state for this producer, similar to {@link
#initTransactions()} but
- * with additional capabilities to keep a previously prepared transaction.
- *
* Needs to be called before any other methods when the {@code
transactional.id} is set in the configuration.
- *
- * When {@code keepPreparedTxn} is {@code false}, this behaves like the
standard transactional
- * initialization where the method does the following:
+ * This method does the following:
* <ol>
* <li>Ensures any transactions initiated by previous instances of the
producer with the same
* {@code transactional.id} are completed. If the previous instance
had failed with a transaction in
@@ -637,39 +630,26 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
* <li>Gets the internal producer id and epoch, used in all future
transactional
* messages issued by the producer.</li>
* </ol>
- *
- * <p>
- * When {@code keepPreparedTxn} is set to {@code true}, the producer does
<em>not</em> automatically abort existing
- * transactions. Instead, it enters a recovery mode allowing only
finalization of those previously
- * prepared transactions.
- * This behavior is especially crucial for 2PC scenarios, where
transactions should remain intact
- * until the external transaction manager decides whether to commit or
abort.
- * <p>
- *
- * @param keepPreparedTxn true to retain any in-flight prepared
transactions (necessary for 2PC
- * recovery), false to abort existing transactions
and behave like
- * the standard initTransactions.
- *
* Note that this method will raise {@link TimeoutException} if the
transactional state cannot
* be initialized before expiration of {@code max.block.ms}. Additionally,
it will raise {@link InterruptException}
* if interrupted. It is safe to retry in either case, but once the
transactional state has been successfully
* initialized, this method should no longer be used.
*
- * @throws IllegalStateException if no {@code transactional.id} is
configured
- * @throws org.apache.kafka.common.errors.UnsupportedVersionException if
the broker does not
- * support transactions (i.e. if its version is lower than
0.11.0.0)
- * @throws
org.apache.kafka.common.errors.TransactionalIdAuthorizationException if the
configured
- * {@code transactional.id} is unauthorized either for normal
transaction writes or 2PC.
- * @throws KafkaException if the producer encounters a fatal error or any
other unexpected error
+ * @throws IllegalStateException if no {@code transactional.id} has been
configured
+ * @throws org.apache.kafka.common.errors.UnsupportedVersionException
fatal error indicating the broker
+ * does not support transactions (i.e. if its version is lower
than 0.11.0.0)
+ * @throws org.apache.kafka.common.errors.AuthorizationException error
indicating that the configured
+ * transactional.id is not authorized, or the idempotent producer
id is unavailable. See the exception for
+ * more details. User may retry this function call after fixing
the permission.
+ * @throws KafkaException if the producer has encountered a previous fatal
error or for any other unexpected error
* @throws TimeoutException if the time taken for initialize the
transaction has surpassed <code>max.block.ms</code>.
* @throws InterruptException if the thread is interrupted while blocked
*/
- public void initTransactions(boolean keepPreparedTxn) {
+ public void initTransactions() {
throwIfNoTransactionManager();
throwIfProducerClosed();
- throwIfInPreparedState();
long now = time.nanoseconds();
- TransactionalRequestResult result =
transactionManager.initializeTransactions(keepPreparedTxn);
+ TransactionalRequestResult result =
transactionManager.initializeTransactions(false);
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
producerMetrics.recordInit(time.nanoseconds() - now);
@@ -754,7 +734,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
throwIfInvalidGroupMetadata(groupMetadata);
throwIfNoTransactionManager();
throwIfProducerClosed();
- throwIfInPreparedState();
if (!offsets.isEmpty()) {
long start = time.nanoseconds();
@@ -765,48 +744,6 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
}
}
- /**
- * Prepares the current transaction for a two-phase commit. This method
will flush all pending messages
- * and transition the producer into a mode where only {@link
#commitTransaction()}, {@link #abortTransaction()},
- * or completeTransaction(PreparedTxnState) may be called.
- * <p>
- * This method is used as part of a two-phase commit protocol:
- * <ol>
- * <li>Prepare the transaction by calling this method. This returns a
{@link PreparedTxnState} if successful.</li>
- * <li>Make any external system changes that need to be atomic with this
transaction.</li>
- * <li>Complete the transaction by calling {@link #commitTransaction()},
{@link #abortTransaction()} or
- * completeTransaction(PreparedTxnState).</li>
- * </ol>
- *
- * @return the prepared transaction state to use when completing the
transaction
- *
- * @throws IllegalStateException if no transactional.id has been
configured or no transaction has been started yet.
- * @throws InvalidTxnStateException if the producer is not in a state
where preparing
- * a transaction is possible or 2PC is not enabled.
- * @throws ProducerFencedException fatal error indicating another producer
with the same transactional.id is active
- * @throws UnsupportedVersionException fatal error indicating the broker
- * does not support transactions (i.e. if its version is lower
than 0.11.0.0)
- * @throws AuthorizationException fatal error indicating that the
configured
- * transactional.id is not authorized. See the exception for more
details
- * @throws KafkaException if the producer has encountered a previous fatal
error or for any other unexpected error
- * @throws TimeoutException if the time taken for preparing the
transaction has surpassed <code>max.block.ms</code>
- * @throws InterruptException if the thread is interrupted while blocked
- */
- @Override
- public PreparedTxnState prepareTransaction() throws
ProducerFencedException {
- throwIfNoTransactionManager();
- throwIfProducerClosed();
- throwIfInPreparedState();
- if (!transactionManager.is2PCEnabled()) {
- throw new InvalidTxnStateException("Cannot prepare a transaction
when 2PC is not enabled");
- }
- long now = time.nanoseconds();
- flush();
- transactionManager.prepareTransaction();
- producerMetrics.recordPrepareTxn(time.nanoseconds() - now);
- return transactionManager.preparedTransactionState();
- }
-
/**
* Commits the ongoing transaction. This method will flush any unsent
records before actually committing the transaction.
* <p>
@@ -884,40 +821,6 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
producerMetrics.recordAbortTxn(time.nanoseconds() - abortStart);
}
- /**
- * Completes a prepared transaction by comparing the provided prepared
transaction state with the
- * current prepared state on the producer.
- * If they match, the transaction is committed; otherwise, it is aborted.
- *
- * @param preparedTxnState The prepared transaction state to
compare against the current state
- * @throws IllegalStateException if no transactional.id has been
configured or no transaction has been started
- * @throws InvalidTxnStateException if the producer is not in prepared
state
- * @throws ProducerFencedException fatal error indicating another producer
with the same transactional.id is active
- * @throws KafkaException if the producer has encountered a previous fatal
error or for any other unexpected error
- * @throws TimeoutException if the time taken for completing the
transaction has surpassed <code>max.block.ms</code>
- * @throws InterruptException if the thread is interrupted while blocked
- */
- @Override
- public void completeTransaction(PreparedTxnState preparedTxnState) throws
ProducerFencedException {
- throwIfNoTransactionManager();
- throwIfProducerClosed();
-
- if (!transactionManager.isPrepared()) {
- throw new InvalidTxnStateException("Cannot complete transaction
because no transaction has been prepared. " +
- "Call prepareTransaction() first, or make sure
initTransaction(true) was called.");
- }
-
- // Get the current prepared transaction state
- PreparedTxnState currentPreparedState =
transactionManager.preparedTransactionState();
-
- // Compare the prepared transaction state token and commit or abort
accordingly
- if (currentPreparedState.equals(preparedTxnState)) {
- commitTransaction();
- } else {
- abortTransaction();
- }
- }
-
/**
* Asynchronously send a record to a topic. Equivalent to
<code>send(record, null)</code>.
* See {@link #send(ProducerRecord, Callback)} for details.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 3e5cb9f5d5a..a4aac86df09 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -142,7 +142,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
}
@Override
- public void initTransactions(boolean keepPreparedTxn) {
+ public void initTransactions() {
verifyNotClosed();
verifyNotFenced();
if (this.transactionInitialized) {
@@ -200,18 +200,6 @@ public class MockProducer<K, V> implements Producer<K, V> {
this.sentOffsets = true;
}
- @Override
- public PreparedTxnState prepareTransaction() throws
ProducerFencedException {
- verifyNotClosed();
- verifyNotFenced();
- verifyTransactionsInitialized();
- verifyTransactionInFlight();
-
- // Return a new PreparedTxnState with mock values for producerId and
epoch
- // Using 1000L and (short)1 as arbitrary values for a valid
PreparedTxnState
- return new PreparedTxnState(1000L, (short) 1);
- }
-
@Override
public void commitTransaction() throws ProducerFencedException {
verifyNotClosed();
@@ -257,27 +245,6 @@ public class MockProducer<K, V> implements Producer<K, V> {
this.transactionInFlight = false;
}
- @Override
- public void completeTransaction(PreparedTxnState preparedTxnState) throws
ProducerFencedException {
- verifyNotClosed();
- verifyNotFenced();
- verifyTransactionsInitialized();
-
- if (!this.transactionInFlight) {
- throw new IllegalStateException("There is no prepared transaction
to complete.");
- }
-
- // For testing purposes, we'll consider a prepared state with
producerId=1000L and epoch=1 as valid
- // This should match what's returned in prepareTransaction()
- PreparedTxnState currentState = new PreparedTxnState(1000L, (short) 1);
-
- if (currentState.equals(preparedTxnState)) {
- commitTransaction();
- } else {
- abortTransaction();
- }
- }
-
private synchronized void verifyNotClosed() {
if (this.closed) {
throw new IllegalStateException("MockProducer is already closed.");
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index e6e94691e34..798034dda6d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -42,14 +42,7 @@ public interface Producer<K, V> extends Closeable {
/**
* See {@link KafkaProducer#initTransactions()}
*/
- default void initTransactions() {
- initTransactions(false);
- }
-
- /**
- * See {@link KafkaProducer#initTransactions(boolean)}
- */
- void initTransactions(boolean keepPreparedTxn);
+ void initTransactions();
/**
* See {@link KafkaProducer#beginTransaction()}
@@ -62,11 +55,6 @@ public interface Producer<K, V> extends Closeable {
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata>
offsets,
ConsumerGroupMetadata groupMetadata) throws
ProducerFencedException;
- /**
- * See {@link KafkaProducer#prepareTransaction()}
- */
- PreparedTxnState prepareTransaction() throws ProducerFencedException;
-
/**
* See {@link KafkaProducer#commitTransaction()}
*/
@@ -77,11 +65,6 @@ public interface Producer<K, V> extends Closeable {
*/
void abortTransaction() throws ProducerFencedException;
- /**
- * See {@link KafkaProducer#completeTransaction(PreparedTxnState)}
- */
- void completeTransaction(PreparedTxnState preparedTxnState) throws
ProducerFencedException;
-
/**
* @see KafkaProducer#registerMetricForSubscription(KafkaMetric)
*/
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 5d83cbc0b1b..20804c505dd 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -320,9 +320,7 @@ public class TransactionManager {
.setTransactionalId(transactionalId)
.setTransactionTimeoutMs(transactionTimeoutMs)
.setProducerId(producerIdAndEpoch.producerId)
- .setProducerEpoch(producerIdAndEpoch.epoch)
- .setEnable2Pc(enable2PC)
- .setKeepPreparedTxn(keepPreparedTxn);
+ .setProducerEpoch(producerIdAndEpoch.epoch);
InitProducerIdHandler handler = new InitProducerIdHandler(new
InitProducerIdRequest.Builder(requestData),
isEpochBump);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 8460d0f4c5f..ac63fed3c04 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -32,7 +32,6 @@ import
org.apache.kafka.clients.producer.internals.ProducerMetadata;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.clients.producer.internals.TransactionManager;
-import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
@@ -47,7 +46,6 @@ import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidTopicException;
-import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -78,7 +76,6 @@ import
org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
-import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.MetadataResponse;
@@ -107,7 +104,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
@@ -154,7 +150,6 @@ import static
org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -165,7 +160,6 @@ import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.notNull;
import static org.mockito.Mockito.atMostOnce;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
@@ -1389,294 +1383,6 @@ public class KafkaProducerTest {
}
}
- @ParameterizedTest
- @CsvSource({
- "true, false",
- "true, true",
- "false, true"
- })
- public void
testInitTransactionsWithKeepPreparedTxnAndTwoPhaseCommit(boolean
keepPreparedTxn, boolean enable2PC) {
- Map<String, Object> configs = new HashMap<>();
- configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-txn-id");
- configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
- configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
- if (enable2PC) {
-
configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, true);
- }
-
- Time time = new MockTime(1);
- MetadataResponse initialUpdateResponse =
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
- ProducerMetadata metadata = newMetadata(0, 0, Long.MAX_VALUE);
- MockClient client = new MockClient(time, metadata);
- client.updateMetadata(initialUpdateResponse);
-
- // Capture flags from the InitProducerIdRequest
- boolean[] requestFlags = new boolean[2]; // [keepPreparedTxn,
enable2Pc]
-
- client.prepareResponse(
- request -> request instanceof FindCoordinatorRequest &&
- ((FindCoordinatorRequest) request).data().keyType() ==
FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
- FindCoordinatorResponse.prepareResponse(Errors.NONE,
"test-txn-id", NODE));
-
- client.prepareResponse(
- request -> {
- if (request instanceof InitProducerIdRequest) {
- InitProducerIdRequest initRequest =
(InitProducerIdRequest) request;
- requestFlags[0] = initRequest.data().keepPreparedTxn();
- requestFlags[1] = initRequest.data().enable2Pc();
- return true;
- }
- return false;
- },
- initProducerIdResponse(1L, (short) 5, Errors.NONE));
-
- try (Producer<String, String> producer = kafkaProducer(configs, new
StringSerializer(),
- new StringSerializer(), metadata, client, null, time)) {
- producer.initTransactions(keepPreparedTxn);
-
- // Verify request flags match expected values
- assertEquals(keepPreparedTxn, requestFlags[0],
- "keepPreparedTxn flag should match input parameter");
- assertEquals(enable2PC, requestFlags[1],
- "enable2Pc flag should match producer configuration");
- }
- }
-
- @Test
- public void testPrepareTransactionSuccess() throws Exception {
- StringSerializer serializer = new StringSerializer();
- KafkaProducerTestContext<String> ctx = new
KafkaProducerTestContext<>(testInfo, serializer);
-
- when(ctx.transactionManager.isTransactionV2Enabled()).thenReturn(true);
- when(ctx.transactionManager.is2PCEnabled()).thenReturn(true);
- when(ctx.sender.isRunning()).thenReturn(true);
-
- doNothing().when(ctx.transactionManager).prepareTransaction();
-
- PreparedTxnState expectedState = mock(PreparedTxnState.class);
-
when(ctx.transactionManager.preparedTransactionState()).thenReturn(expectedState);
-
- try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
- PreparedTxnState returned = producer.prepareTransaction();
- assertSame(expectedState, returned);
-
- verify(ctx.transactionManager).prepareTransaction();
- verify(ctx.accumulator).beginFlush();
- verify(ctx.accumulator).awaitFlushCompletion();
- }
- }
-
- @Test
- public void testSendNotAllowedInPreparedTransactionState() throws
Exception {
- StringSerializer serializer = new StringSerializer();
- KafkaProducerTestContext<String> ctx = new
KafkaProducerTestContext<>(testInfo, serializer);
-
- String topic = "foo";
- Cluster cluster = TestUtils.singletonCluster(topic, 1);
-
- when(ctx.sender.isRunning()).thenReturn(true);
- when(ctx.metadata.fetch()).thenReturn(cluster);
-
- // Mock transaction manager to simulate being in a prepared state
- when(ctx.transactionManager.isTransactional()).thenReturn(true);
- when(ctx.transactionManager.isPrepared()).thenReturn(true);
-
- // Create record to send
- long timestamp = ctx.time.milliseconds();
- ProducerRecord<String, String> record = new ProducerRecord<>(topic, 0,
timestamp, "key", "value");
-
- try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
- // Verify that sending a record throws IllegalStateException with
the correct message
- IllegalStateException exception = assertThrows(
- IllegalStateException.class,
- () -> producer.send(record)
- );
-
- assertTrue(exception.getMessage().contains("Cannot perform
operation while the transaction is in a prepared state"));
-
- // Verify transactionManager methods were called
- verify(ctx.transactionManager).isTransactional();
- verify(ctx.transactionManager).isPrepared();
-
- // Verify that no message was actually sent (accumulator was not
called)
- verify(ctx.accumulator, never()).append(
- eq(topic),
- anyInt(),
- anyLong(),
- any(),
- any(),
- any(),
- any(),
- anyLong(),
- anyLong(),
- any()
- );
- }
- }
-
- @Test
- public void testSendOffsetsNotAllowedInPreparedTransactionState() throws
Exception {
- StringSerializer serializer = new StringSerializer();
- KafkaProducerTestContext<String> ctx = new
KafkaProducerTestContext<>(testInfo, serializer);
-
- String topic = "foo";
- Cluster cluster = TestUtils.singletonCluster(topic, 1);
-
- when(ctx.sender.isRunning()).thenReturn(true);
- when(ctx.metadata.fetch()).thenReturn(cluster);
-
- // Mock transaction manager to simulate being in a prepared state
- when(ctx.transactionManager.isTransactional()).thenReturn(true);
- when(ctx.transactionManager.isPrepared()).thenReturn(true);
-
- // Create consumer group metadata
- String groupId = "test-group";
- Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
- offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(100L));
- ConsumerGroupMetadata groupMetadata = new
ConsumerGroupMetadata(groupId);
-
- try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
- // Verify that sending offsets throws IllegalStateException with
the correct message
- IllegalStateException exception = assertThrows(
- IllegalStateException.class,
- () -> producer.sendOffsetsToTransaction(offsets, groupMetadata)
- );
-
- assertTrue(exception.getMessage().contains("Cannot perform
operation while the transaction is in a prepared state"));
-
- // Verify transactionManager methods were called
- verify(ctx.transactionManager).isTransactional();
- verify(ctx.transactionManager).isPrepared();
-
- // Verify that no offsets were actually sent
- verify(ctx.transactionManager, never()).sendOffsetsToTransaction(
- eq(offsets),
- eq(groupMetadata)
- );
- }
- }
-
- @Test
- public void testBeginTransactionNotAllowedInPreparedTransactionState()
throws Exception {
- StringSerializer serializer = new StringSerializer();
- KafkaProducerTestContext<String> ctx = new
KafkaProducerTestContext<>(testInfo, serializer);
-
- when(ctx.sender.isRunning()).thenReturn(true);
-
- // Mock transaction manager to simulate being in a prepared state
- when(ctx.transactionManager.isTransactional()).thenReturn(true);
- when(ctx.transactionManager.isPrepared()).thenReturn(true);
-
- try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
- // Verify that calling beginTransaction throws
IllegalStateException with the correct message
- IllegalStateException exception = assertThrows(
- IllegalStateException.class,
- producer::beginTransaction
- );
-
- assertTrue(exception.getMessage().contains("Cannot perform
operation while the transaction is in a prepared state"));
-
- // Verify transactionManager methods were called
- verify(ctx.transactionManager).isTransactional();
- verify(ctx.transactionManager).isPrepared();
- }
- }
-
- @Test
- public void testPrepareTransactionFailsWhen2PCDisabled() {
- StringSerializer serializer = new StringSerializer();
- KafkaProducerTestContext<String> ctx = new
KafkaProducerTestContext<>(testInfo, serializer);
-
- // Disable 2PC
- when(ctx.transactionManager.isTransactionV2Enabled()).thenReturn(true);
- when(ctx.transactionManager.is2PCEnabled()).thenReturn(false);
- when(ctx.sender.isRunning()).thenReturn(true);
-
- try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
- assertThrows(
- InvalidTxnStateException.class,
- producer::prepareTransaction,
- "prepareTransaction() should fail if 2PC is disabled"
- );
- }
- }
-
- @Test
- public void testCompleteTransactionWithMatchingState() throws Exception {
- StringSerializer serializer = new StringSerializer();
- KafkaProducerTestContext<String> ctx = new
KafkaProducerTestContext<>(testInfo, serializer);
-
- when(ctx.transactionManager.isPrepared()).thenReturn(true);
- when(ctx.sender.isRunning()).thenReturn(true);
-
- // Create prepared states with matching values
- long producerId = 12345L;
- short epoch = 5;
- PreparedTxnState currentState = new PreparedTxnState(producerId,
epoch);
- PreparedTxnState inputState = new PreparedTxnState(producerId, epoch);
-
- // Set up the transaction manager to return the prepared state
-
when(ctx.transactionManager.preparedTransactionState()).thenReturn(currentState);
-
- // Should trigger commit when states match
- TransactionalRequestResult commitResult =
mock(TransactionalRequestResult.class);
- when(ctx.transactionManager.beginCommit()).thenReturn(commitResult);
-
- try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
- // Call completeTransaction with the matching state
- producer.completeTransaction(inputState);
-
- // Verify methods called in order
- verify(ctx.transactionManager).isPrepared();
- verify(ctx.transactionManager).preparedTransactionState();
- verify(ctx.transactionManager).beginCommit();
-
- // Verify abort was never called
- verify(ctx.transactionManager, never()).beginAbort();
-
- // Verify sender was woken up
- verify(ctx.sender).wakeup();
- }
- }
-
- @Test
- public void testCompleteTransactionWithNonMatchingState() throws Exception
{
- StringSerializer serializer = new StringSerializer();
- KafkaProducerTestContext<String> ctx = new
KafkaProducerTestContext<>(testInfo, serializer);
-
- when(ctx.transactionManager.isPrepared()).thenReturn(true);
- when(ctx.sender.isRunning()).thenReturn(true);
-
- // Create txn prepared states with different values
- long producerId = 12345L;
- short epoch = 5;
- PreparedTxnState currentState = new PreparedTxnState(producerId,
epoch);
- PreparedTxnState inputState = new PreparedTxnState(producerId + 1,
epoch);
-
- // Set up the transaction manager to return the prepared state
-
when(ctx.transactionManager.preparedTransactionState()).thenReturn(currentState);
-
- // Should trigger abort when states don't match
- TransactionalRequestResult abortResult =
mock(TransactionalRequestResult.class);
- when(ctx.transactionManager.beginAbort()).thenReturn(abortResult);
-
- try (KafkaProducer<String, String> producer = ctx.newKafkaProducer()) {
- // Call completeTransaction with the non-matching state
- producer.completeTransaction(inputState);
-
- // Verify methods called in order
- verify(ctx.transactionManager).isPrepared();
- verify(ctx.transactionManager).preparedTransactionState();
- verify(ctx.transactionManager).beginAbort();
-
- // Verify commit was never called
- verify(ctx.transactionManager, never()).beginCommit();
-
- // Verify sender was woken up
- verify(ctx.sender).wakeup();
- }
- }
-
@Test
public void testClusterAuthorizationFailure() throws Exception {
int maxBlockMs = 500;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 4668a91ed04..278e6c3e381 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.PreparedTxnState;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
@@ -4025,56 +4024,6 @@ public class TransactionManagerTest {
assertFalse(transactionManager.hasOngoingTransaction());
}
- @Test
- public void testInitializeTransactionsWithKeepPreparedTxn() {
- doInitTransactionsWith2PCEnabled(true);
- runUntil(transactionManager::hasProducerId);
-
- // Expect a bumped epoch in the response.
- assertTrue(transactionManager.hasProducerId());
- assertFalse(transactionManager.hasOngoingTransaction());
- assertEquals(ongoingProducerId,
transactionManager.producerIdAndEpoch().producerId);
- assertEquals(bumpedOngoingEpoch,
transactionManager.producerIdAndEpoch().epoch);
- }
-
- @Test
- public void testPrepareTransaction() {
- doInitTransactionsWith2PCEnabled(false);
- runUntil(transactionManager::hasProducerId);
-
- // Begin a transaction
- transactionManager.beginTransaction();
- assertTrue(transactionManager.hasOngoingTransaction());
-
- // Add a partition to the transaction
- transactionManager.maybeAddPartition(tp0);
-
- // Capture the current producer ID and epoch before preparing the
response
- long producerId = transactionManager.producerIdAndEpoch().producerId;
- short epoch = transactionManager.producerIdAndEpoch().epoch;
-
- // Simulate a produce request
- try {
- // Prepare the response before sending to ensure it's ready
- prepareProduceResponse(Errors.NONE, producerId, epoch);
-
- appendToAccumulator(tp0);
- // Wait until the request is processed
- runUntil(() -> !client.hasPendingResponses());
- } catch (InterruptedException e) {
- fail("Unexpected interruption: " + e);
- }
-
- transactionManager.prepareTransaction();
- assertTrue(transactionManager.isPrepared());
-
- PreparedTxnState preparedState =
transactionManager.preparedTransactionState();
- // Validate the state contains the correct serialized producer ID and
epoch
- assertEquals(producerId + ":" + epoch, preparedState.toString());
- assertEquals(producerId, preparedState.producerId());
- assertEquals(epoch, preparedState.epoch());
- }
-
private void prepareAddPartitionsToTxn(final Map<TopicPartition, Errors>
errors) {
AddPartitionsToTxnResult result =
AddPartitionsToTxnResponse.resultForTransaction(AddPartitionsToTxnResponse.V3_AND_BELOW_TXN_ID,
errors);
AddPartitionsToTxnResponseData data = new
AddPartitionsToTxnResponseData().setResultsByTopicV3AndBelow(result.topicResults()).setThrottleTimeMs(0);