This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8b4560e3f0f KAFKA-15767 Refactor TransactionManager to avoid use of
ThreadLocal (#19440)
8b4560e3f0f is described below
commit 8b4560e3f0f8e6cc16fe9c4c6eac95d6ae9b7c51
Author: Kirk True <[email protected]>
AuthorDate: Wed Apr 23 09:31:30 2025 -0700
KAFKA-15767 Refactor TransactionManager to avoid use of ThreadLocal (#19440)
Introduces a concrete subclass of `KafkaThread` named `SenderThread`.
The poisoning of the TransactionManager on invalid state changes is
determined by looking at the type of the current thread.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/clients/producer/KafkaProducer.java | 7 +-
.../kafka/clients/producer/internals/Sender.java | 10 +-
.../producer/internals/TransactionManager.java | 112 ++++++++++-----------
.../kafka/clients/producer/KafkaProducerTest.java | 5 +-
.../producer/internals/TransactionManagerTest.java | 35 ++++++-
5 files changed, 99 insertions(+), 70 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 16512c42d5f..baaf13388d6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -75,7 +75,6 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
@@ -256,7 +255,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
private final ProducerMetadata metadata;
private final RecordAccumulator accumulator;
private final Sender sender;
- private final Thread ioThread;
+ private final Sender.SenderThread ioThread;
private final Compression compression;
private final Sensor errors;
private final Time time;
@@ -454,7 +453,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
this.errors = this.metrics.sensor("errors");
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
- this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
+ this.ioThread = new Sender.SenderThread(ioThreadName, this.sender,
true);
this.ioThread.start();
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics,
time.milliseconds());
@@ -480,7 +479,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
ProducerInterceptors<K, V> interceptors,
Partitioner partitioner,
Time time,
- KafkaThread ioThread,
+ Sender.SenderThread ioThread,
Optional<ClientTelemetryReporter> clientTelemetryReporter) {
this.producerConfig = config;
this.time = time;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 614fe562d87..6739facfc34 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -52,6 +52,7 @@ import
org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@@ -234,9 +235,6 @@ public class Sender implements Runnable {
public void run() {
log.debug("Starting Kafka producer I/O thread.");
- if (transactionManager != null)
- transactionManager.setPoisonStateOnInvalidTransition(true);
-
// main loop, runs until close is called
while (running) {
try {
@@ -1072,4 +1070,10 @@ public class Sender implements Runnable {
}
}
+ public static class SenderThread extends KafkaThread {
+
+ public SenderThread(final String name, Runnable runnable, boolean
daemon) {
+ super(name, runnable, daemon);
+ }
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index c78134c72ec..b52d5d4836d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -120,58 +120,6 @@ public class TransactionManager {
private final Set<TopicPartition> newPartitionsInTransaction;
private final Set<TopicPartition> pendingPartitionsInTransaction;
private final Set<TopicPartition> partitionsInTransaction;
-
- /**
- * During its normal course of operations, the transaction manager
transitions through different internal
- * states (i.e. by updating {@link #currentState}) to one of those defined
in {@link State}. These state transitions
- * result from actions on one of the following classes of threads:
- *
- * <ul>
- * <li><em>Application</em> threads that invokes {@link Producer} API
calls</li>
- * <li><em>{@link Sender}</em> thread operations</li>
- * </ul>
- *
- * When an invalid state transition is detected during execution on an
<em>application</em> thread, the
- * {@link #currentState} is <em>not updated</em> and an {@link
IllegalStateException} is thrown. This gives the
- * application the opportunity to fix the issue without permanently
poisoning the state of the
- * transaction manager. The {@link Producer} API calls that perform a
state transition include:
- *
- * <ul>
- * <li>{@link Producer#initTransactions()} calls {@link
#initializeTransactions()}</li>
- * <li>{@link Producer#beginTransaction()} calls {@link
#beginTransaction()}</li>
- * <li>{@link Producer#commitTransaction()}} calls {@link
#beginCommit()}</li>
- * <li>{@link Producer#abortTransaction()} calls {@link #beginAbort()}
- * </li>
- * <li>{@link Producer#sendOffsetsToTransaction(Map,
ConsumerGroupMetadata)} calls
- * {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)}
- * </li>
- * <li>{@link Producer#send(ProducerRecord)} (and its variants) calls
- * {@link #maybeAddPartition(TopicPartition)} and
- * {@link #maybeTransitionToErrorState(RuntimeException)}
- * </li>
- * </ul>
- *
- * <p/>
- *
- * The {@link Producer} is implemented such that much of its work
delegated to and performed asynchronously on the
- * <em>{@link Sender}</em> thread. This includes record batching, network
I/O, broker response handlers, etc. If an
- * invalid state transition is detected in the <em>{@link Sender}</em>
thread, in addition to throwing an
- * {@link IllegalStateException}, the transaction manager intentionally
"poisons" itself by setting its
- * {@link #currentState} to {@link State#FATAL_ERROR}, a state from which
it cannot recover.
- *
- * <p/>
- *
- * It's important to prevent possible corruption when the transaction
manager has determined that it is in a
- * fatal state. Subsequent transaction operations attempted via either the
<em>application</em> or the
- * <em>{@link Sender}</em> thread should fail. This is achieved when these
operations invoke the
- * {@link #maybeFailWithError()} method, as it causes a {@link
KafkaException} to be thrown, ensuring the stated
- * transactional guarantees are not violated.
- *
- * <p/>
- *
- * See KAFKA-14831 for more detail.
- */
- private final ThreadLocal<Boolean> shouldPoisonStateOnInvalidTransition;
private PendingStateTransition pendingTransition;
// This is used by the TxnRequestHandlers to control how long to back off
before a given request is retried.
@@ -265,7 +213,6 @@ public class TransactionManager {
this.newPartitionsInTransaction = new HashSet<>();
this.pendingPartitionsInTransaction = new HashSet<>();
this.partitionsInTransaction = new HashSet<>();
- this.shouldPoisonStateOnInvalidTransition = ThreadLocal.withInitial(()
-> false);
this.pendingRequests = new PriorityQueue<>(10,
Comparator.comparingInt(o -> o.priority().priority));
this.pendingTxnOffsetCommits = new HashMap<>();
this.partitionsWithUnresolvedSequences = new HashMap<>();
@@ -275,8 +222,61 @@ public class TransactionManager {
this.apiVersions = apiVersions;
}
- void setPoisonStateOnInvalidTransition(boolean shouldPoisonState) {
- shouldPoisonStateOnInvalidTransition.set(shouldPoisonState);
+ /**
+ * During its normal course of operations, the transaction manager
transitions through different internal
+ * states (i.e. by updating {@link #currentState}) to one of those defined
in {@link State}. These state transitions
+ * result from actions on one of the following classes of threads:
+ *
+ * <ul>
+ * <li><em>Application</em> threads that invokes {@link Producer} API
calls</li>
+ * <li><em>{@link Sender}</em> thread operations</li>
+ * </ul>
+ *
+ * When an invalid state transition is detected during execution on an
<em>application</em> thread, the
+ * {@link #currentState} is <em>not updated</em> and an {@link
IllegalStateException} is thrown. This gives the
+ * application the opportunity to fix the issue without permanently
poisoning the state of the
+ * transaction manager. The {@link Producer} API calls that perform a
state transition include:
+ *
+ * <ul>
+ * <li>{@link Producer#initTransactions()} calls {@link
#initializeTransactions()}</li>
+ * <li>{@link Producer#beginTransaction()} calls {@link
#beginTransaction()}</li>
+ * <li>{@link Producer#commitTransaction()}} calls {@link
#beginCommit()}</li>
+ * <li>{@link Producer#abortTransaction()} calls {@link #beginAbort()}
+ * </li>
+ * <li>{@link Producer#sendOffsetsToTransaction(Map,
ConsumerGroupMetadata)} calls
+ * {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)}
+ * </li>
+ * <li>{@link Producer#send(ProducerRecord)} (and its variants) calls
+ * {@link #maybeAddPartition(TopicPartition)} and
+ * {@link #maybeTransitionToErrorState(RuntimeException)}
+ * </li>
+ * </ul>
+ *
+ * <p/>
+ *
+ * The {@link Producer} is implemented such that much of its work
delegated to and performed asynchronously on the
+ * <em>{@link Sender}</em> thread. This includes record batching, network
I/O, broker response handlers, etc. If an
+ * invalid state transition is detected in the <em>{@link Sender}</em>
thread, in addition to throwing an
+ * {@link IllegalStateException}, the transaction manager intentionally
"poisons" itself by setting its
+ * {@link #currentState} to {@link State#FATAL_ERROR}, a state from which
it cannot recover.
+ *
+ * <p/>
+ *
+ * It's important to prevent possible corruption when the transaction
manager has determined that it is in a
+ * fatal state. Subsequent transaction operations attempted via either the
<em>application</em> or the
+ * <em>{@link Sender}</em> thread should fail. This is achieved when these
operations invoke the
+ * {@link #maybeFailWithError()} method, as it causes a {@link
KafkaException} to be thrown, ensuring the stated
+ * transactional guarantees are not violated.
+ *
+ * <p/>
+ *
+ * See KAFKA-14831 for more detail.
+ *
+ * @return {@code true} to set state to {@link State#FATAL_ERROR} before
throwing an exception,
+ * {@code false} to throw an exception without first changing the
state
+ */
+ protected boolean shouldPoisonStateOnInvalidTransition() {
+ return Thread.currentThread() instanceof Sender.SenderThread;
}
public synchronized TransactionalRequestResult initializeTransactions() {
@@ -1063,7 +1063,7 @@ public class TransactionManager {
String message = idString + "Invalid transition attempted from
state "
+ currentState.name() + " to state " + target.name();
- if (shouldPoisonStateOnInvalidTransition.get()) {
+ if (shouldPoisonStateOnInvalidTransition()) {
currentState = State.FATAL_ERROR;
lastError = new IllegalStateException(message);
throw lastError;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index fb2f4f01282..48569f1a20c 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -87,7 +87,6 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
-import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
@@ -2592,7 +2591,7 @@ public class KafkaProducerTest {
private final Map<String, Object> configs;
private final Serializer<T> serializer;
private final Partitioner partitioner = mock(Partitioner.class);
- private final KafkaThread ioThread = mock(KafkaThread.class);
+ private final Sender.SenderThread senderThread =
mock(Sender.SenderThread.class);
private final List<ProducerInterceptor<T, T>> interceptors = new
ArrayList<>();
private ProducerMetadata metadata = mock(ProducerMetadata.class);
private RecordAccumulator accumulator = mock(RecordAccumulator.class);
@@ -2673,7 +2672,7 @@ public class KafkaProducerTest {
interceptors,
partitioner,
time,
- ioThread,
+ senderThread,
Optional.empty()
);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 8b4decfb959..5520df03467 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -149,7 +149,7 @@ public class TransactionManagerTest {
private RecordAccumulator accumulator = null;
private Sender sender = null;
- private TransactionManager transactionManager = null;
+ private TestableTransactionManager transactionManager = null;
private Node brokerNode = null;
private long finalizedFeaturesEpoch = 0;
@@ -188,7 +188,7 @@ public class TransactionManagerTest {
.setMinVersionLevel(transactionV2Enabled ? (short) 2 : (short)
1)),
finalizedFeaturesEpoch));
finalizedFeaturesEpoch += 1;
- this.transactionManager = new TransactionManager(logContext,
transactionalId.orElse(null),
+ this.transactionManager = new TestableTransactionManager(logContext,
transactionalId.orElse(null),
transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions);
int batchSize = 16 * 1024;
@@ -1038,7 +1038,7 @@ public class TransactionManagerTest {
.setMaxVersionLevel((short) 1)
.setMinVersionLevel((short) 1)),
0));
- this.transactionManager = new TransactionManager(logContext,
transactionalId,
+ this.transactionManager = new TestableTransactionManager(logContext,
transactionalId,
transactionTimeoutMs, DEFAULT_RETRY_BACKOFF_MS, apiVersions);
int batchSize = 16 * 1024;
@@ -3802,7 +3802,7 @@ public class TransactionManagerTest {
doInitTransactions();
assertTrue(transactionManager.isTransactional());
- transactionManager.setPoisonStateOnInvalidTransition(true);
+
transactionManager.setShouldPoisonStateOnInvalidTransitionOverride(true);
// Intentionally perform an operation that will cause an invalid state
transition. The detection of this
// will result in a poisoning of the transaction manager for all
subsequent transactional operations since
@@ -4373,4 +4373,31 @@ public class TransactionManagerTest {
ProducerTestUtils.runUntil(sender, condition);
}
+ /**
+ * This subclass exists only to optionally change the default behavior
related to poisoning the state
+ * on invalid state transition attempts.
+ */
+ private static class TestableTransactionManager extends TransactionManager
{
+
+ private Optional<Boolean> shouldPoisonStateOnInvalidTransitionOverride;
+
+ public TestableTransactionManager(LogContext logContext,
+ String transactionalId,
+ int transactionTimeoutMs,
+ long retryBackoffMs,
+ ApiVersions apiVersions) {
+ super(logContext, transactionalId, transactionTimeoutMs,
retryBackoffMs, apiVersions);
+ this.shouldPoisonStateOnInvalidTransitionOverride =
Optional.empty();
+ }
+
+ private void setShouldPoisonStateOnInvalidTransitionOverride(boolean
override) {
+ shouldPoisonStateOnInvalidTransitionOverride =
Optional.of(override);
+ }
+
+ @Override
+ protected boolean shouldPoisonStateOnInvalidTransition() {
+ // If there's an override, use it, otherwise invoke the default
(i.e. super class) logic.
+ return
shouldPoisonStateOnInvalidTransitionOverride.orElseGet(super::shouldPoisonStateOnInvalidTransition);
+ }
+ }
}