This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 62fe528f4b3 KAFKA-19082: [1/4] Add client config for enable2PC and
overloaded initProducerId (KIP-939) (#19429)
62fe528f4b3 is described below
commit 62fe528f4b37def581cc664558cd35ad21603ba2
Author: Ritika Reddy <[email protected]>
AuthorDate: Thu Apr 24 09:41:06 2025 -0700
KAFKA-19082: [1/4] Add client config for enable2PC and overloaded
initProducerId (KIP-939) (#19429)
This is part of the client side changes required to enable 2PC for
KIP-939
**Producer Config:**
transaction.two.phase.commit.enable The default would be ‘false’. If
set to ‘true’, the broker is informed that the client is participating
in two phase commit protocol and transactions that this client starts
never expire.
**Overloaded InitProducerId method**
If the value is 'true' then the corresponding field is set in the
InitProducerIdRequest
Reviewers: Justine Olshan <[email protected]>, Artem Livshits
<[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../kafka/clients/producer/KafkaProducer.java | 42 +++++--
.../kafka/clients/producer/MockProducer.java | 2 +-
.../apache/kafka/clients/producer/Producer.java | 9 +-
.../kafka/clients/producer/ProducerConfig.java | 24 ++++
.../producer/internals/TransactionManager.java | 26 +++-
.../kafka/clients/producer/KafkaProducerTest.java | 57 ++++++++-
.../kafka/clients/producer/ProducerConfigTest.java | 23 ++++
.../clients/producer/internals/SenderTest.java | 42 +++----
.../producer/internals/TransactionManagerTest.java | 133 ++++++++++++++++-----
10 files changed, 287 insertions(+), 73 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 4e0f338af5d..2d4c279e707 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -98,7 +98,7 @@
files="(AbstractFetch|ClientTelemetryReporter|ConsumerCoordinator|CommitRequestManager|FetchCollector|OffsetFetcherUtils|KafkaProducer|Sender|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler|MockAdminClient).java"/>
<suppress checks="JavaNCSS"
-
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>
+
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaProducerTest).java"/>
<suppress checks="NPathComplexity"
files="(AbstractMembershipManager|ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|Authorizer|FetchSessionHandler|RecordAccumulator|Shell|MockConsumer).java"/>
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index baaf13388d6..d46a2204beb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -595,14 +595,17 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
if (config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) {
final String transactionalId =
config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+ final boolean enable2PC =
config.getBoolean(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
final int transactionTimeoutMs =
config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
final long retryBackoffMs =
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
+
transactionManager = new TransactionManager(
logContext,
transactionalId,
transactionTimeoutMs,
retryBackoffMs,
- apiVersions
+ apiVersions,
+ enable2PC
);
if (transactionManager.isTransactional())
@@ -617,8 +620,13 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
}
/**
+ * Initialize the transactional state for this producer, similar to {@link
#initTransactions()} but
+ * with additional capabilities to keep a previously prepared transaction.
+ *
* Needs to be called before any other methods when the {@code
transactional.id} is set in the configuration.
- * This method does the following:
+ *
+ * When {@code keepPreparedTxn} is {@code false}, this behaves like the
standard transactional
+ * initialization where the method does the following:
* <ol>
* <li>Ensures any transactions initiated by previous instances of the
producer with the same
* {@code transactional.id} are completed. If the previous instance
had failed with a transaction in
@@ -627,26 +635,38 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
* <li>Gets the internal producer id and epoch, used in all future
transactional
* messages issued by the producer.</li>
* </ol>
+ *
+ * <p>
+ * When {@code keepPreparedTxn} is set to {@code true}, the producer does
<em>not</em> automatically abort existing
+ * transactions. Instead, it enters a recovery mode allowing only
finalization of those previously
+ * prepared transactions.
+ * This behavior is especially crucial for 2PC scenarios, where
transactions should remain intact
+ * until the external transaction manager decides whether to commit or
abort.
+ * <p>
+ *
+ * @param keepPreparedTxn true to retain any in-flight prepared
transactions (necessary for 2PC
+ * recovery), false to abort existing transactions
and behave like
+ * the standard initTransactions.
+ *
* Note that this method will raise {@link TimeoutException} if the
transactional state cannot
* be initialized before expiration of {@code max.block.ms}. Additionally,
it will raise {@link InterruptException}
* if interrupted. It is safe to retry in either case, but once the
transactional state has been successfully
* initialized, this method should no longer be used.
*
- * @throws IllegalStateException if no {@code transactional.id} has been
configured
- * @throws org.apache.kafka.common.errors.UnsupportedVersionException
fatal error indicating the broker
- * does not support transactions (i.e. if its version is lower
than 0.11.0.0)
- * @throws org.apache.kafka.common.errors.AuthorizationException error
indicating that the configured
- * transactional.id is not authorized, or the idempotent producer
id is unavailable. See the exception for
- * more details. User may retry this function call after fixing
the permission.
- * @throws KafkaException if the producer has encountered a previous fatal
error or for any other unexpected error
+ * @throws IllegalStateException if no {@code transactional.id} is
configured
+ * @throws org.apache.kafka.common.errors.UnsupportedVersionException if
the broker does not
+ * support transactions (i.e. if its version is lower than
0.11.0.0)
+ * @throws
org.apache.kafka.common.errors.TransactionalIdAuthorizationException if the
configured
+ * {@code transactional.id} is unauthorized either for normal
transaction writes or 2PC.
+ * @throws KafkaException if the producer encounters a fatal error or any
other unexpected error
* @throws TimeoutException if the time taken for initialize the
transaction has surpassed <code>max.block.ms</code>.
* @throws InterruptException if the thread is interrupted while blocked
*/
- public void initTransactions() {
+ public void initTransactions(boolean keepPreparedTxn) {
throwIfNoTransactionManager();
throwIfProducerClosed();
long now = time.nanoseconds();
- TransactionalRequestResult result =
transactionManager.initializeTransactions();
+ TransactionalRequestResult result =
transactionManager.initializeTransactions(keepPreparedTxn);
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
producerMetrics.recordInit(time.nanoseconds() - now);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index a4aac86df09..e3c5a23ca51 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -142,7 +142,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
}
@Override
- public void initTransactions() {
+ public void initTransactions(boolean keepPreparedTxn) {
verifyNotClosed();
verifyNotFenced();
if (this.transactionInitialized) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index 798034dda6d..a5cd92295ff 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -42,7 +42,14 @@ public interface Producer<K, V> extends Closeable {
/**
* See {@link KafkaProducer#initTransactions()}
*/
- void initTransactions();
+ default void initTransactions() {
+ initTransactions(false);
+ }
+
+ /**
+ * See {@link KafkaProducer#initTransactions(boolean)}
+ */
+ void initTransactions(boolean keepPreparedTxn);
/**
* See {@link KafkaProducer#beginTransaction()}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 949c6c167ba..362d205e8c1 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -355,6 +355,11 @@ public class ProducerConfig extends AbstractConfig {
"By default the TransactionId is not configured, which means
transactions cannot be used. " +
"Note that, by default, transactions require a cluster of at least
three brokers which is the recommended setting for production; for development
you can change this, by adjusting broker setting
<code>transaction.state.log.replication.factor</code>.";
+ /** <code> transaction.two.phase.commit.enable </code> */
+ public static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG =
"transaction.two.phase.commit.enable";
+ private static final String TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC = "If
set to true, then the broker is informed that the client is participating in " +
+ "two phase commit protocol and transactions that this client
starts never expire.";
+
/**
* <code>security.providers</code>
*/
@@ -526,6 +531,11 @@ public class ProducerConfig extends AbstractConfig {
new ConfigDef.NonEmptyString(),
Importance.LOW,
TRANSACTIONAL_ID_DOC)
+
.define(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG,
+ Type.BOOLEAN,
+ false,
+ Importance.LOW,
+
TRANSACTION_TWO_PHASE_COMMIT_ENABLE_DOC)
.define(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY,
@@ -609,6 +619,20 @@ public class ProducerConfig extends AbstractConfig {
if (!idempotenceEnabled && userConfiguredTransactions) {
throw new ConfigException("Cannot set a " +
ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
}
+
+ // Validate that transaction.timeout.ms is not set when
transaction.two.phase.commit.enable is true
+ // In standard Kafka transactions, the broker enforces
transaction.timeout.ms and aborts any
+ // transaction that isn't completed in time. With two-phase commit
(2PC), an external coordinator
+ // decides when to finalize, so broker-side timeouts don't apply.
Disallow using both.
+ boolean enable2PC =
this.getBoolean(TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG);
+ boolean userConfiguredTransactionTimeout =
originalConfigs.containsKey(TRANSACTION_TIMEOUT_CONFIG);
+ if (enable2PC && userConfiguredTransactionTimeout) {
+ throw new ConfigException(
+ "Cannot set " + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG +
+ " when " +
ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG +
+ " is set to true. Transactions will not expire with two-phase
commit enabled."
+ );
+ }
}
private static String parseAcks(String acksString) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index b52d5d4836d..e17dc15d239 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -143,6 +143,7 @@ public class TransactionManager {
private volatile boolean clientSideEpochBumpRequired = false;
private volatile long latestFinalizedFeaturesEpoch = -1;
private volatile boolean isTransactionV2Enabled = false;
+ private final boolean enable2PC;
private enum State {
UNINITIALIZED,
@@ -203,7 +204,8 @@ public class TransactionManager {
final String transactionalId,
final int transactionTimeoutMs,
final long retryBackoffMs,
- final ApiVersions apiVersions) {
+ final ApiVersions apiVersions,
+ final boolean enable2PC) {
this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
this.transactionalId = transactionalId;
this.log = logContext.logger(TransactionManager.class);
@@ -220,6 +222,7 @@ public class TransactionManager {
this.retryBackoffMs = retryBackoffMs;
this.txnPartitionMap = new TxnPartitionMap(logContext);
this.apiVersions = apiVersions;
+ this.enable2PC = enable2PC;
}
/**
@@ -279,11 +282,18 @@ public class TransactionManager {
return Thread.currentThread() instanceof Sender.SenderThread;
}
- public synchronized TransactionalRequestResult initializeTransactions() {
- return initializeTransactions(ProducerIdAndEpoch.NONE);
+ synchronized TransactionalRequestResult
initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) {
+ return initializeTransactions(producerIdAndEpoch, false);
}
- synchronized TransactionalRequestResult
initializeTransactions(ProducerIdAndEpoch producerIdAndEpoch) {
+ public synchronized TransactionalRequestResult
initializeTransactions(boolean keepPreparedTxn) {
+ return initializeTransactions(ProducerIdAndEpoch.NONE,
keepPreparedTxn);
+ }
+
+ synchronized TransactionalRequestResult initializeTransactions(
+ ProducerIdAndEpoch producerIdAndEpoch,
+ boolean keepPreparedTxn
+ ) {
maybeFailWithError();
boolean isEpochBump = producerIdAndEpoch != ProducerIdAndEpoch.NONE;
@@ -292,6 +302,9 @@ public class TransactionManager {
if (!isEpochBump) {
transitionTo(State.INITIALIZING);
log.info("Invoking InitProducerId for the first time in order
to acquire a producer ID");
+ if (keepPreparedTxn) {
+ log.info("Invoking InitProducerId with keepPreparedTxn set
to true for 2PC transactions");
+ }
} else {
log.info("Invoking InitProducerId with current producer ID and
epoch {} in order to bump the epoch", producerIdAndEpoch);
}
@@ -299,7 +312,10 @@ public class TransactionManager {
.setTransactionalId(transactionalId)
.setTransactionTimeoutMs(transactionTimeoutMs)
.setProducerId(producerIdAndEpoch.producerId)
- .setProducerEpoch(producerIdAndEpoch.epoch);
+ .setProducerEpoch(producerIdAndEpoch.epoch)
+ .setEnable2Pc(enable2PC)
+ .setKeepPreparedTxn(keepPreparedTxn);
+
InitProducerIdHandler handler = new InitProducerIdHandler(new
InitProducerIdRequest.Builder(requestData),
isEpochBump);
enqueueRequest(handler);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 48569f1a20c..d8cfe6578c0 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -75,6 +75,7 @@ import
org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.MetadataResponse;
@@ -103,6 +104,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
@@ -1315,7 +1317,7 @@ public class KafkaProducerTest {
((FindCoordinatorRequest) request).data().keyType() ==
FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
FindCoordinatorResponse.prepareResponse(Errors.NONE,
"bad-transaction", NODE));
- Future<?> future = executor.submit(producer::initTransactions);
+ Future<?> future = executor.submit(() ->
producer.initTransactions());
TestUtils.waitForCondition(client::hasInFlightRequests,
"Timed out while waiting for expected `InitProducerId` request
to be sent");
@@ -1390,6 +1392,59 @@ public class KafkaProducerTest {
}
}
+ @ParameterizedTest
+ @CsvSource({
+ "true, false",
+ "true, true",
+ "false, true"
+ })
+ public void
testInitTransactionsWithKeepPreparedTxnAndTwoPhaseCommit(boolean
keepPreparedTxn, boolean enable2PC) {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-txn-id");
+ configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+ if (enable2PC) {
+
configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, true);
+ }
+
+ Time time = new MockTime(1);
+ MetadataResponse initialUpdateResponse =
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
+ ProducerMetadata metadata = newMetadata(0, 0, Long.MAX_VALUE);
+ MockClient client = new MockClient(time, metadata);
+ client.updateMetadata(initialUpdateResponse);
+
+ // Capture flags from the InitProducerIdRequest
+ boolean[] requestFlags = new boolean[2]; // [keepPreparedTxn,
enable2Pc]
+
+ client.prepareResponse(
+ request -> request instanceof FindCoordinatorRequest &&
+ ((FindCoordinatorRequest) request).data().keyType() ==
FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
+ FindCoordinatorResponse.prepareResponse(Errors.NONE,
"test-txn-id", NODE));
+
+ client.prepareResponse(
+ request -> {
+ if (request instanceof InitProducerIdRequest) {
+ InitProducerIdRequest initRequest =
(InitProducerIdRequest) request;
+ requestFlags[0] = initRequest.data().keepPreparedTxn();
+ requestFlags[1] = initRequest.data().enable2Pc();
+ return true;
+ }
+ return false;
+ },
+ initProducerIdResponse(1L, (short) 5, Errors.NONE));
+
+ try (Producer<String, String> producer = kafkaProducer(configs, new
StringSerializer(),
+ new StringSerializer(), metadata, client, null, time)) {
+ producer.initTransactions(keepPreparedTxn);
+
+ // Verify request flags match expected values
+ assertEquals(keepPreparedTxn, requestFlags[0],
+ "keepPreparedTxn flag should match input parameter");
+ assertEquals(enable2PC, requestFlags[1],
+ "enable2Pc flag should match producer configuration");
+ }
+ }
+
@Test
public void testClusterAuthorizationFailure() throws Exception {
int maxBlockMs = 500;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
index 830711c0e54..207bac6476f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
@@ -145,4 +145,27 @@ public class ProducerConfigTest {
configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
assertDoesNotThrow(() -> new ProducerConfig(configs));
}
+
+ @Test
+ void testTwoPhaseCommitIncompatibleWithTransactionTimeout() {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
keySerializerClass);
+ configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
valueSerializerClass);
+ configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+ configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-txn-id");
+ configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG,
true);
+ configs.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);
+
+ ConfigException ce = assertThrows(ConfigException.class, () -> new
ProducerConfig(configs));
+
assertTrue(ce.getMessage().contains(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG));
+
assertTrue(ce.getMessage().contains(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG));
+
+ // Verify that setting one but not the other is valid
+ configs.remove(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
+ assertDoesNotThrow(() -> new ProducerConfig(configs));
+
+ configs.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);
+ configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG,
false);
+ assertDoesNotThrow(() -> new ProducerConfig(configs));
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index e67be76eb9b..74a56420246 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -490,7 +490,7 @@ public class SenderTest {
ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0",
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
- TransactionManager txnManager = new TransactionManager(logContext,
"testUnresolvedSeq", 60000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext,
"testUnresolvedSeq", 60000, 100, apiVersions, false);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -616,10 +616,10 @@ public class SenderTest {
// Initialize transaction manager. InitProducerId will be queued up
until metadata response
// is processed and FindCoordinator can be sent to `leastLoadedNode`.
TransactionManager transactionManager = new TransactionManager(new
LogContext(), "testInitProducerIdWithPendingMetadataRequest",
- 60000, 100L, new ApiVersions());
+ 60000, 100L, new ApiVersions(), false);
setupWithTransactionState(transactionManager, false, null, false);
ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(producerId, (short) 0);
- transactionManager.initializeTransactions();
+ transactionManager.initializeTransactions(false);
sender.runOnce();
// Process metadata response, prepare FindCoordinator and
InitProducerId responses.
@@ -668,10 +668,10 @@ public class SenderTest {
client = new MockClient(time, metadata);
TransactionManager transactionManager = new TransactionManager(new
LogContext(), "testNodeNotReady",
- 60000, 100L, new ApiVersions());
+ 60000, 100L, new ApiVersions(), false);
setupWithTransactionState(transactionManager, false, null, true);
ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(producerId, (short) 0);
- transactionManager.initializeTransactions();
+ transactionManager.initializeTransactions(false);
sender.runOnce();
Node node = metadata.fetch().nodes().get(0);
@@ -1510,7 +1510,7 @@ public class SenderTest {
public void testUnresolvedSequencesAreNotFatal() throws Exception {
ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0",
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
- TransactionManager txnManager = new TransactionManager(logContext,
"testUnresolvedSeq", 60000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext,
"testUnresolvedSeq", 60000, 100, apiVersions, false);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -1795,7 +1795,7 @@ public class SenderTest {
@Test
public void
testTransactionalUnknownProducerHandlingWhenRetentionLimitReached() throws
Exception {
final long producerId = 343434L;
- TransactionManager transactionManager = new
TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions);
+ TransactionManager transactionManager = new
TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions,
false);
setupWithTransactionState(transactionManager);
doInitTransactions(transactionManager, new
ProducerIdAndEpoch(producerId, (short) 0));
@@ -2352,7 +2352,7 @@ public class SenderTest {
public void testTransactionalSplitBatchAndSend() throws Exception {
ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(123456L, (short) 0);
TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1);
- TransactionManager txnManager = new TransactionManager(logContext,
"testSplitBatchAndSend", 60000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext,
"testSplitBatchAndSend", 60000, 100, apiVersions, false);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -2694,7 +2694,7 @@ public class SenderTest {
Metrics m = new Metrics();
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
try {
- TransactionManager txnManager = new TransactionManager(logContext,
"testTransactionalRequestsSentOnShutdown", 6000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext,
"testTransactionalRequestsSentOnShutdown", 6000, 100, apiVersions, false);
Sender sender = new Sender(logContext, client, metadata,
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
maxRetries, senderMetrics, time, REQUEST_TIMEOUT,
RETRY_BACKOFF_MS, txnManager);
@@ -2727,7 +2727,7 @@ public class SenderTest {
int lingerMs = 50;
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
- TransactionManager txnManager = new TransactionManager(logContext,
"txnId", 6000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext,
"txnId", 6000, 100, apiVersions, false);
setupWithTransactionState(txnManager, lingerMs);
Sender sender = new Sender(logContext, client, metadata,
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
@@ -2784,7 +2784,7 @@ public class SenderTest {
try (Metrics m = new Metrics()) {
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
- TransactionManager txnManager = new TransactionManager(logContext,
"txnId", 6000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext,
"txnId", 6000, 100, apiVersions, false);
setupWithTransactionState(txnManager);
Sender sender = new Sender(logContext, client, metadata,
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
@@ -2855,7 +2855,7 @@ public class SenderTest {
Metrics m = new Metrics();
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
try {
- TransactionManager txnManager = new TransactionManager(logContext,
"testIncompleteTransactionAbortOnShutdown", 6000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext,
"testIncompleteTransactionAbortOnShutdown", 6000, 100, apiVersions, false);
Sender sender = new Sender(logContext, client, metadata,
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
maxRetries, senderMetrics, time, REQUEST_TIMEOUT,
RETRY_BACKOFF_MS, txnManager);
@@ -2889,7 +2889,7 @@ public class SenderTest {
Metrics m = new Metrics();
SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
try {
- TransactionManager txnManager = new TransactionManager(logContext,
"testForceShutdownWithIncompleteTransaction", 6000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext,
"testForceShutdownWithIncompleteTransaction", 6000, 100, apiVersions, false);
Sender sender = new Sender(logContext, client, metadata,
this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
maxRetries, senderMetrics, time, REQUEST_TIMEOUT,
RETRY_BACKOFF_MS, txnManager);
@@ -2919,7 +2919,7 @@ public class SenderTest {
@Test
public void testTransactionAbortedExceptionOnAbortWithoutError() throws
InterruptedException {
ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(123456L, (short) 0);
- TransactionManager txnManager = new TransactionManager(logContext,
"testTransactionAbortedExceptionOnAbortWithoutError", 60000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext,
"testTransactionAbortedExceptionOnAbortWithoutError", 60000, 100, apiVersions,
false);
setupWithTransactionState(txnManager, false, null);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -2945,7 +2945,7 @@ public class SenderTest {
public void testDoNotPollWhenNoRequestSent() {
client = spy(new MockClient(time, metadata));
- TransactionManager txnManager = new TransactionManager(logContext,
"testDoNotPollWhenNoRequestSent", 6000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext,
"testDoNotPollWhenNoRequestSent", 6000, 100, apiVersions, false);
ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(123456L, (short) 0);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -2957,7 +2957,7 @@ public class SenderTest {
@Test
public void testTooLargeBatchesAreSafelyRemoved() throws
InterruptedException {
ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(123456L, (short) 0);
- TransactionManager txnManager = new TransactionManager(logContext,
"testSplitBatchAndSend", 60000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext,
"testSplitBatchAndSend", 60000, 100, apiVersions, false);
setupWithTransactionState(txnManager, false, null);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -3026,7 +3026,7 @@ public class SenderTest {
public void testReceiveFailedBatchTwiceWithTransactions() throws Exception
{
ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0",
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
- TransactionManager txnManager = new TransactionManager(logContext,
"testFailTwice", 60000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext,
"testFailTwice", 60000, 100, apiVersions, false);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -3076,7 +3076,7 @@ public class SenderTest {
public void testInvalidTxnStateIsAnAbortableError() throws Exception {
ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0",
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
- TransactionManager txnManager = new TransactionManager(logContext,
"testInvalidTxnState", 60000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext,
"testInvalidTxnState", 60000, 100, apiVersions, false);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -3115,7 +3115,7 @@ public class SenderTest {
public void testTransactionAbortableExceptionIsAnAbortableError() throws
Exception {
ProducerIdAndEpoch producerIdAndEpoch = new
ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0",
NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
- TransactionManager txnManager = new TransactionManager(logContext,
"textTransactionAbortableException", 60000, 100, apiVersions);
+ TransactionManager txnManager = new TransactionManager(logContext,
"textTransactionAbortableException", 60000, 100, apiVersions, false);
setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);
@@ -3617,7 +3617,7 @@ public class SenderTest {
}
private TransactionManager createTransactionManager() {
- return new TransactionManager(new LogContext(), null, 0,
RETRY_BACKOFF_MS, new ApiVersions());
+ return new TransactionManager(new LogContext(), null, 0,
RETRY_BACKOFF_MS, new ApiVersions(), false);
}
private void setupWithTransactionState(TransactionManager
transactionManager) {
@@ -3719,7 +3719,7 @@ public class SenderTest {
}
private void doInitTransactions(TransactionManager transactionManager,
ProducerIdAndEpoch producerIdAndEpoch) {
- TransactionalRequestResult result =
transactionManager.initializeTransactions();
+ TransactionalRequestResult result =
transactionManager.initializeTransactions(false);
prepareFindCoordinatorResponse(Errors.NONE,
transactionManager.transactionalId());
sender.runOnce();
sender.runOnce();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 5520df03467..a19e4977314 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -159,17 +159,28 @@ public class TransactionManagerTest {
this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1,
singletonMap("test", 2)));
this.brokerNode = new Node(0, "localhost", 2211);
- initializeTransactionManager(Optional.of(transactionalId), false);
+ initializeTransactionManager(Optional.of(transactionalId), false,
false);
+ }
+
+ private void initializeTransactionManager(
+ Optional<String> transactionalId,
+ boolean transactionV2Enabled
+ ) {
+ initializeTransactionManager(transactionalId, transactionV2Enabled,
false);
}
- private void initializeTransactionManager(Optional<String>
transactionalId, boolean transactionV2Enabled) {
+ private void initializeTransactionManager(
+ Optional<String> transactionalId,
+ boolean transactionV2Enabled,
+ boolean enable2pc
+ ) {
Metrics metrics = new Metrics(time);
apiVersions.update("0", new NodeApiVersions(Arrays.asList(
new ApiVersion()
.setApiKey(ApiKeys.INIT_PRODUCER_ID.id)
.setMinVersion((short) 0)
- .setMaxVersion((short) 3),
+ .setMaxVersion((short) 6),
new ApiVersion()
.setApiKey(ApiKeys.PRODUCE.id)
.setMinVersion((short) 0)
@@ -189,7 +200,8 @@ public class TransactionManagerTest {
finalizedFeaturesEpoch));
finalizedFeaturesEpoch += 1;
this.transactionManager = new TestableTransactionManager(logContext,
transactionalId.orElse(null),
- transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions);
+ transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions,
enable2pc);
+
int batchSize = 16 * 1024;
int deliveryTimeoutMs = 3000;
@@ -1039,7 +1051,7 @@ public class TransactionManagerTest {
.setMinVersionLevel((short) 1)),
0));
this.transactionManager = new TestableTransactionManager(logContext,
transactionalId,
- transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions);
+ transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions,
false);
int batchSize = 16 * 1024;
int deliveryTimeoutMs = 3000;
@@ -1063,7 +1075,7 @@ public class TransactionManagerTest {
public void testDisconnectAndRetry() {
// This is called from the initTransactions method in the producer as
the first order of business.
// It finds the coordinator and then gets a PID.
- transactionManager.initializeTransactions();
+ transactionManager.initializeTransactions(false);
prepareFindCoordinatorResponse(Errors.NONE, true,
CoordinatorType.TRANSACTION, transactionalId);
runUntil(() ->
transactionManager.coordinator(CoordinatorType.TRANSACTION) == null);
@@ -1076,12 +1088,12 @@ public class TransactionManagerTest {
public void testInitializeTransactionsTwiceRaisesError() {
doInitTransactions(producerId, epoch);
assertTrue(transactionManager.hasProducerId());
- assertThrows(IllegalStateException.class, () ->
transactionManager.initializeTransactions());
+ assertThrows(IllegalStateException.class, () ->
transactionManager.initializeTransactions(false));
}
@Test
public void testUnsupportedFindCoordinator() {
- transactionManager.initializeTransactions();
+ transactionManager.initializeTransactions(false);
client.prepareUnsupportedVersionResponse(body -> {
FindCoordinatorRequest findCoordinatorRequest =
(FindCoordinatorRequest) body;
assertEquals(CoordinatorType.forId(findCoordinatorRequest.data().keyType()),
CoordinatorType.TRANSACTION);
@@ -1098,7 +1110,7 @@ public class TransactionManagerTest {
@Test
public void testUnsupportedInitTransactions() {
- transactionManager.initializeTransactions();
+ transactionManager.initializeTransactions(false);
prepareFindCoordinatorResponse(Errors.NONE, false,
CoordinatorType.TRANSACTION, transactionalId);
runUntil(() ->
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
assertFalse(transactionManager.hasError());
@@ -1243,7 +1255,7 @@ public class TransactionManagerTest {
public void testLookupCoordinatorOnDisconnectAfterSend() {
// This is called from the initTransactions method in the producer as
the first order of business.
// It finds the coordinator and then gets a PID.
- TransactionalRequestResult initPidResult =
transactionManager.initializeTransactions();
+ TransactionalRequestResult initPidResult =
transactionManager.initializeTransactions(false);
prepareFindCoordinatorResponse(Errors.NONE, false,
CoordinatorType.TRANSACTION, transactionalId);
runUntil(() ->
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
assertEquals(brokerNode,
transactionManager.coordinator(CoordinatorType.TRANSACTION));
@@ -1275,7 +1287,7 @@ public class TransactionManagerTest {
public void testLookupCoordinatorOnDisconnectBeforeSend() {
// This is called from the initTransactions method in the producer as
the first order of business.
// It finds the coordinator and then gets a PID.
- TransactionalRequestResult initPidResult =
transactionManager.initializeTransactions();
+ TransactionalRequestResult initPidResult =
transactionManager.initializeTransactions(false);
prepareFindCoordinatorResponse(Errors.NONE, false,
CoordinatorType.TRANSACTION, transactionalId);
runUntil(() ->
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
assertEquals(brokerNode,
transactionManager.coordinator(CoordinatorType.TRANSACTION));
@@ -1306,7 +1318,7 @@ public class TransactionManagerTest {
public void testLookupCoordinatorOnNotCoordinatorError() {
// This is called from the initTransactions method in the producer as
the first order of business.
// It finds the coordinator and then gets a PID.
- TransactionalRequestResult initPidResult =
transactionManager.initializeTransactions();
+ TransactionalRequestResult initPidResult =
transactionManager.initializeTransactions(false);
prepareFindCoordinatorResponse(Errors.NONE, false,
CoordinatorType.TRANSACTION, transactionalId);
runUntil(() ->
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
assertEquals(brokerNode,
transactionManager.coordinator(CoordinatorType.TRANSACTION));
@@ -1331,7 +1343,7 @@ public class TransactionManagerTest {
@Test
public void testTransactionalIdAuthorizationFailureInFindCoordinator() {
- TransactionalRequestResult initPidResult =
transactionManager.initializeTransactions();
+ TransactionalRequestResult initPidResult =
transactionManager.initializeTransactions(false);
prepareFindCoordinatorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
false,
CoordinatorType.TRANSACTION, transactionalId);
@@ -1346,7 +1358,7 @@ public class TransactionManagerTest {
@Test
public void testTransactionalIdAuthorizationFailureInInitProducerId() {
- TransactionalRequestResult initPidResult =
transactionManager.initializeTransactions();
+ TransactionalRequestResult initPidResult =
transactionManager.initializeTransactions(false);
prepareFindCoordinatorResponse(Errors.NONE, false,
CoordinatorType.TRANSACTION, transactionalId);
runUntil(() ->
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
assertEquals(brokerNode,
transactionManager.coordinator(CoordinatorType.TRANSACTION));
@@ -1646,7 +1658,7 @@ public class TransactionManagerTest {
assertFalse(result.isAcked());
assertFalse(transactionManager.hasOngoingTransaction());
- assertThrows(IllegalStateException.class,
transactionManager::initializeTransactions);
+ assertThrows(IllegalStateException.class, () ->
transactionManager.initializeTransactions(false));
assertThrows(IllegalStateException.class,
transactionManager::beginTransaction);
assertThrows(IllegalStateException.class,
transactionManager::beginCommit);
assertThrows(IllegalStateException.class, () ->
transactionManager.maybeAddPartition(tp0));
@@ -1680,7 +1692,7 @@ public class TransactionManagerTest {
assertFalse(result.isAcked());
assertFalse(transactionManager.hasOngoingTransaction());
- assertThrows(IllegalStateException.class,
transactionManager::initializeTransactions);
+ assertThrows(IllegalStateException.class, () ->
transactionManager.initializeTransactions(false));
assertThrows(IllegalStateException.class,
transactionManager::beginTransaction);
assertThrows(IllegalStateException.class,
transactionManager::beginAbort);
assertThrows(IllegalStateException.class, () ->
transactionManager.maybeAddPartition(tp0));
@@ -1694,7 +1706,7 @@ public class TransactionManagerTest {
@Test
public void testRetryInitTransactionsAfterTimeout() {
- TransactionalRequestResult result =
transactionManager.initializeTransactions();
+ TransactionalRequestResult result =
transactionManager.initializeTransactions(false);
prepareFindCoordinatorResponse(Errors.NONE, false,
CoordinatorType.TRANSACTION, transactionalId);
runUntil(() ->
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
assertEquals(brokerNode,
transactionManager.coordinator(CoordinatorType.TRANSACTION));
@@ -1715,10 +1727,10 @@ public class TransactionManagerTest {
assertThrows(IllegalStateException.class,
transactionManager::beginCommit);
assertThrows(IllegalStateException.class, () ->
transactionManager.maybeAddPartition(tp0));
- assertSame(result, transactionManager.initializeTransactions());
+ assertSame(result, transactionManager.initializeTransactions(false));
result.await();
assertTrue(result.isAcked());
- assertThrows(IllegalStateException.class,
transactionManager::initializeTransactions);
+ assertThrows(IllegalStateException.class, () ->
transactionManager.initializeTransactions(false));
transactionManager.beginTransaction();
assertTrue(transactionManager.hasOngoingTransaction());
@@ -1960,7 +1972,7 @@ public class TransactionManagerTest {
})
public void testRetriableErrors(Errors error) {
// Ensure FindCoordinator retries.
- TransactionalRequestResult result =
transactionManager.initializeTransactions();
+ TransactionalRequestResult result =
transactionManager.initializeTransactions(false);
prepareFindCoordinatorResponse(error, false,
CoordinatorType.TRANSACTION, transactionalId);
prepareFindCoordinatorResponse(Errors.NONE, false,
CoordinatorType.TRANSACTION, transactionalId);
runUntil(() ->
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
@@ -1994,7 +2006,7 @@ public class TransactionManagerTest {
@Test
public void testCoordinatorNotAvailable() {
// Ensure FindCoordinator with COORDINATOR_NOT_AVAILABLE error retries.
- TransactionalRequestResult result =
transactionManager.initializeTransactions();
+ TransactionalRequestResult result =
transactionManager.initializeTransactions(false);
prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,
false, CoordinatorType.TRANSACTION, transactionalId);
prepareFindCoordinatorResponse(Errors.NONE, false,
CoordinatorType.TRANSACTION, transactionalId);
runUntil(() ->
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
@@ -2017,7 +2029,7 @@ public class TransactionManagerTest {
}
private void verifyProducerFencedForInitProducerId(Errors error) {
- TransactionalRequestResult result =
transactionManager.initializeTransactions();
+ TransactionalRequestResult result =
transactionManager.initializeTransactions(false);
prepareFindCoordinatorResponse(Errors.NONE, false,
CoordinatorType.TRANSACTION, transactionalId);
runUntil(() ->
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
assertEquals(brokerNode,
transactionManager.coordinator(CoordinatorType.TRANSACTION));
@@ -3815,7 +3827,7 @@ public class TransactionManagerTest {
assertThrows(IllegalStateException.class, () ->
transactionManager.beginAbort());
assertThrows(IllegalStateException.class, () ->
transactionManager.beginCommit());
assertThrows(IllegalStateException.class, () ->
transactionManager.maybeAddPartition(tp0));
- assertThrows(IllegalStateException.class, () ->
transactionManager.initializeTransactions());
+ assertThrows(IllegalStateException.class, () ->
transactionManager.initializeTransactions(false));
assertThrows(IllegalStateException.class, () ->
transactionManager.sendOffsetsToTransaction(Collections.emptyMap(), new
ConsumerGroupMetadata("fake-group-id")));
}
@@ -3852,7 +3864,7 @@ public class TransactionManagerTest {
@Test
public void testTransactionAbortableExceptionInInitProducerId() {
- TransactionalRequestResult initPidResult =
transactionManager.initializeTransactions();
+ TransactionalRequestResult initPidResult =
transactionManager.initializeTransactions(false);
prepareFindCoordinatorResponse(Errors.NONE, false,
CoordinatorType.TRANSACTION, transactionalId);
runUntil(() ->
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
assertEquals(brokerNode,
transactionManager.coordinator(CoordinatorType.TRANSACTION));
@@ -4035,16 +4047,39 @@ public class TransactionManagerTest {
}, FindCoordinatorResponse.prepareResponse(error, coordinatorKey,
brokerNode), shouldDisconnect);
}
- private void prepareInitPidResponse(Errors error, boolean
shouldDisconnect, long producerId, short producerEpoch) {
+ private void prepareInitPidResponse(
+ Errors error,
+ boolean shouldDisconnect,
+ long producerId,
+ short producerEpoch
+ ) {
+ prepareInitPidResponse(error, shouldDisconnect, producerId,
producerEpoch, false, false, (long) -1, (short) -1);
+ }
+
+ private void prepareInitPidResponse(
+ Errors error,
+ boolean shouldDisconnect,
+ long producerId,
+ short producerEpoch,
+ boolean keepPreparedTxn,
+ boolean enable2Pc,
+ long ongoingProducerId,
+ short ongoingProducerEpoch
+ ) {
InitProducerIdResponseData responseData = new
InitProducerIdResponseData()
- .setErrorCode(error.code())
- .setProducerEpoch(producerEpoch)
- .setProducerId(producerId)
- .setThrottleTimeMs(0);
+ .setErrorCode(error.code())
+ .setProducerEpoch(producerEpoch)
+ .setProducerId(producerId)
+ .setThrottleTimeMs(0)
+ .setOngoingTxnProducerId(ongoingProducerId)
+ .setOngoingTxnProducerEpoch(ongoingProducerEpoch);
+
client.prepareResponse(body -> {
InitProducerIdRequest initProducerIdRequest =
(InitProducerIdRequest) body;
assertEquals(transactionalId,
initProducerIdRequest.data().transactionalId());
assertEquals(transactionTimeoutMs,
initProducerIdRequest.data().transactionTimeoutMs());
+ assertEquals(keepPreparedTxn,
initProducerIdRequest.data().keepPreparedTxn());
+ assertEquals(enable2Pc, initProducerIdRequest.data().enable2Pc());
return true;
}, new InitProducerIdResponse(responseData), shouldDisconnect);
}
@@ -4309,7 +4344,7 @@ public class TransactionManagerTest {
}
private void doInitTransactions(long producerId, short epoch) {
- TransactionalRequestResult result =
transactionManager.initializeTransactions();
+ TransactionalRequestResult result =
transactionManager.initializeTransactions(false);
prepareFindCoordinatorResponse(Errors.NONE, false,
CoordinatorType.TRANSACTION, transactionalId);
runUntil(() ->
transactionManager.coordinator(CoordinatorType.TRANSACTION) != null);
assertEquals(brokerNode,
transactionManager.coordinator(CoordinatorType.TRANSACTION));
@@ -4373,6 +4408,39 @@ public class TransactionManagerTest {
ProducerTestUtils.runUntil(sender, condition);
}
+ @Test
+ public void testInitializeTransactionsWithKeepPreparedTxn() {
+ initializeTransactionManager(Optional.of(transactionalId), true, true);
+
+ client.prepareResponse(
+ FindCoordinatorResponse.prepareResponse(Errors.NONE,
transactionalId, brokerNode)
+ );
+
+ // Simulate an ongoing prepared transaction (ongoingProducerId != -1).
+ long ongoingProducerId = 999L;
+ short ongoingEpoch = 10;
+ short bumpedEpoch = 11;
+
+ prepareInitPidResponse(
+ Errors.NONE,
+ false,
+ ongoingProducerId,
+ bumpedEpoch,
+ true,
+ true,
+ ongoingProducerId,
+ ongoingEpoch
+ );
+
+ transactionManager.initializeTransactions(true);
+ runUntil(transactionManager::hasProducerId);
+
+ assertTrue(transactionManager.hasProducerId());
+ assertFalse(transactionManager.hasOngoingTransaction());
+ assertEquals(ongoingProducerId,
transactionManager.producerIdAndEpoch().producerId);
+ assertEquals(bumpedEpoch,
transactionManager.producerIdAndEpoch().epoch);
+ }
+
/**
* This subclass exists only to optionally change the default behavior
related to poisoning the state
* on invalid state transition attempts.
@@ -4385,8 +4453,9 @@ public class TransactionManagerTest {
String transactionalId,
int transactionTimeoutMs,
long retryBackoffMs,
- ApiVersions apiVersions) {
- super(logContext, transactionalId, transactionTimeoutMs,
retryBackoffMs, apiVersions);
+ ApiVersions apiVersions,
+ boolean enable2Pc) {
+ super(logContext, transactionalId, transactionTimeoutMs,
retryBackoffMs, apiVersions, enable2Pc);
this.shouldPoisonStateOnInvalidTransitionOverride =
Optional.empty();
}