This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new eb155a21137 MINOR: Revert "KAFKA-18913: Start state updater in task
manager (#198… (#20186)
eb155a21137 is described below
commit eb155a211370fe6f8d3871d4178a44de6ba52fe8
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu Jul 17 17:28:08 2025 +0200
MINOR: Revert "KAFKA-18913: Start state updater in task manager (#198…
(#20186)
This reverts commit 4d6cf3efef15b3c6b1511c28b882b5486ff1245c. It seemed
to trigger a race condition in the state updater initialization.
Reviewers: Bill Bejeck <[email protected]>
---
.../streams/processor/internals/StreamThread.java | 11 +++----
.../streams/processor/internals/TaskManager.java | 5 ---
.../processor/internals/StreamThreadTest.java | 38 ++++------------------
3 files changed, 11 insertions(+), 43 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 86e12bf3f65..89cc988313b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -454,7 +454,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
final DefaultTaskManager schedulingTaskManager =
maybeCreateSchedulingTaskManager(processingThreadsEnabled,
stateUpdaterEnabled, topologyMetadata, time, threadId, tasks);
final StateUpdater stateUpdater =
- maybeCreateStateUpdater(
+ maybeCreateAndStartStateUpdater(
stateUpdaterEnabled,
streamsMetrics,
config,
@@ -633,7 +633,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
return null;
}
- private static StateUpdater maybeCreateStateUpdater(final boolean
stateUpdaterEnabled,
+ private static StateUpdater maybeCreateAndStartStateUpdater(final boolean
stateUpdaterEnabled,
final
StreamsMetricsImpl streamsMetrics,
final
StreamsConfig streamsConfig,
final
Consumer<byte[], byte[]> restoreConsumer,
@@ -644,7 +644,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
final int
threadIdx) {
if (stateUpdaterEnabled) {
final String name = clientId + STATE_UPDATER_ID_SUBSTRING +
threadIdx;
- return new DefaultStateUpdater(
+ final StateUpdater stateUpdater = new DefaultStateUpdater(
name,
streamsMetrics.metricsRegistry(),
streamsConfig,
@@ -653,6 +653,8 @@ public class StreamThread extends Thread implements
ProcessingThread {
topologyMetadata,
time
);
+ stateUpdater.start();
+ return stateUpdater;
} else {
return null;
}
@@ -881,9 +883,6 @@ public class StreamThread extends Thread implements
ProcessingThread {
}
boolean cleanRun = false;
try {
- if (stateUpdaterEnabled) {
- taskManager.init();
- }
cleanRun = runLoop();
} catch (final Throwable e) {
failedStreamThreadSensor.record();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 3ec3bb78557..9207ec74a7c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -149,11 +149,6 @@ public class TaskManager {
);
}
- void init() {
- if (stateUpdater != null) {
- this.stateUpdater.start();
- }
- }
void setMainConsumer(final Consumer<byte[], byte[]> mainConsumer) {
this.mainConsumer = mainConsumer;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 9eed49110bb..aa68672b3c0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -109,7 +109,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
@@ -917,7 +916,6 @@ public class StreamThreadTest {
thread = createStreamThread(CLIENT_ID, config);
thread.setState(StreamThread.State.STARTING);
- thread.taskManager().init();
thread.setState(StreamThread.State.PARTITIONS_REVOKED);
final TaskId task1 = new TaskId(0, t1p1.partition());
@@ -1291,7 +1289,6 @@ public class StreamThreadTest {
thread = createStreamThread(CLIENT_ID, new StreamsConfig(props));
thread.setState(StreamThread.State.STARTING);
- thread.taskManager().init();
thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -1549,7 +1546,6 @@ public class StreamThreadTest {
consumer.updatePartitions(topic1, Collections.singletonList(new
PartitionInfo(topic1, 1, null, null, null)));
thread.setState(StreamThread.State.STARTING);
- thread.taskManager().init();
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -1615,7 +1611,6 @@ public class StreamThreadTest {
internalTopologyBuilder.addSink("out", "output", null, null, null,
"name");
thread.setState(StreamThread.State.STARTING);
- thread.taskManager().init();
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -1698,7 +1693,6 @@ public class StreamThreadTest {
internalTopologyBuilder.buildTopology();
thread.setState(StreamThread.State.STARTING);
- thread.taskManager().init();
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -1793,7 +1787,6 @@ public class StreamThreadTest {
consumer.updatePartitions(topic1, Collections.singletonList(new
PartitionInfo(topic1, 1, null, null, null)));
thread.setState(StreamThread.State.STARTING);
- thread.taskManager().init();
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -1858,7 +1851,6 @@ public class StreamThreadTest {
internalTopologyBuilder.addSink("out", "output", null, null, null,
"name");
thread.setState(StreamThread.State.STARTING);
- thread.taskManager().init();
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -1940,7 +1932,6 @@ public class StreamThreadTest {
);
thread.setState(StreamThread.State.STARTING);
- thread.taskManager().init();
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -2001,7 +1992,6 @@ public class StreamThreadTest {
restoreConsumer.updateBeginningOffsets(offsets);
thread.setState(StreamThread.State.STARTING);
- thread.taskManager().init();
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
@@ -2265,7 +2255,6 @@ public class StreamThreadTest {
thread = createStreamThread(CLIENT_ID, config);
thread.setState(StreamThread.State.STARTING);
- thread.taskManager().init();
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
final List<TopicPartition> assignedPartitions = new ArrayList<>();
@@ -2345,7 +2334,6 @@ public class StreamThreadTest {
thread = createStreamThread(CLIENT_ID, stateUpdaterEnabled,
processingThreadsEnabled);
thread.setState(StreamThread.State.STARTING);
- thread.taskManager().init();
thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
final List<TopicPartition> assignedPartitions = new ArrayList<>();
@@ -2543,7 +2531,6 @@ public class StreamThreadTest {
thread = createStreamThread(CLIENT_ID, new StreamsConfig(properties));
thread.setState(StreamThread.State.STARTING);
- thread.taskManager().init();
thread.setState(StreamThread.State.PARTITIONS_REVOKED);
final TaskId task1 = new TaskId(0, t1p1.partition());
@@ -3030,7 +3017,6 @@ public class StreamThreadTest {
thread = createStreamThread(CLIENT_ID, config);
thread.setState(StreamThread.State.STARTING);
- thread.taskManager().init();
thread.setState(StreamThread.State.PARTITIONS_REVOKED);
final TaskId task1 = new TaskId(0, t1p1.partition());
@@ -3404,7 +3390,6 @@ public class StreamThreadTest {
thread = createStreamThread("clientId", stateUpdaterEnabled,
processingThreadsEnabled);
thread.setState(State.STARTING);
- thread.taskManager().init();
final Map<String, KafkaFuture<Uuid>> clientInstanceIdFutures =
thread.clientInstanceIds(Duration.ZERO);
@@ -3429,7 +3414,6 @@ public class StreamThreadTest {
public void shouldReturnErrorIfMainConsumerInstanceIdNotInitialized(final
boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
thread = createStreamThread("clientId", stateUpdaterEnabled,
processingThreadsEnabled);
thread.setState(State.STARTING);
- thread.taskManager().init();
final Map<String, KafkaFuture<Uuid>> consumerFutures =
thread.clientInstanceIds(Duration.ZERO);
@@ -3446,7 +3430,6 @@ public class StreamThreadTest {
public void
shouldReturnErrorIfRestoreConsumerInstanceIdNotInitialized(final boolean
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
thread = createStreamThread("clientId", stateUpdaterEnabled,
processingThreadsEnabled);
thread.setState(State.STARTING);
- thread.taskManager().init();
final Map<String, KafkaFuture<Uuid>> consumerFutures =
thread.clientInstanceIds(Duration.ZERO);
@@ -3463,7 +3446,6 @@ public class StreamThreadTest {
public void shouldReturnErrorIfProducerInstanceIdNotInitialized(final
boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
thread = createStreamThread("clientId", stateUpdaterEnabled,
processingThreadsEnabled);
thread.setState(State.STARTING);
- thread.taskManager().init();
final Map<String, KafkaFuture<Uuid>> producerFutures =
thread.clientInstanceIds(Duration.ZERO);
@@ -3481,7 +3463,6 @@ public class StreamThreadTest {
clientSupplier.consumer.disableTelemetry();
thread = createStreamThread("clientId", stateUpdaterEnabled,
processingThreadsEnabled);
thread.setState(State.STARTING);
- thread.taskManager().init();
final Map<String, KafkaFuture<Uuid>> consumerFutures =
thread.clientInstanceIds(Duration.ZERO);
@@ -3499,7 +3480,6 @@ public class StreamThreadTest {
thread = createStreamThread("clientId", stateUpdaterEnabled,
processingThreadsEnabled);
thread.setState(State.STARTING);
- thread.taskManager().init();
final Map<String, KafkaFuture<Uuid>> consumerFutures =
thread.clientInstanceIds(Duration.ZERO);
@@ -3519,7 +3499,6 @@ public class StreamThreadTest {
thread = createStreamThread("clientId", stateUpdaterEnabled,
processingThreadsEnabled);
thread.setState(State.STARTING);
- thread.taskManager().init();
final Map<String, KafkaFuture<Uuid>> producerFutures =
thread.clientInstanceIds(Duration.ZERO);
@@ -3537,7 +3516,6 @@ public class StreamThreadTest {
clientSupplier.consumer.injectTimeoutException(-1);
thread = createStreamThread("clientId", stateUpdaterEnabled,
processingThreadsEnabled);
thread.setState(State.STARTING);
- thread.taskManager().init();
final Map<String, KafkaFuture<Uuid>> consumerFutures =
thread.clientInstanceIds(Duration.ZERO);
@@ -3562,7 +3540,6 @@ public class StreamThreadTest {
clientSupplier.restoreConsumer.injectTimeoutException(-1);
thread = createStreamThread("clientId", stateUpdaterEnabled,
processingThreadsEnabled);
thread.setState(State.STARTING);
- thread.taskManager().init();
final Map<String, KafkaFuture<Uuid>> consumerFutures =
thread.clientInstanceIds(Duration.ZERO);
@@ -3590,7 +3567,6 @@ public class StreamThreadTest {
thread = createStreamThread("clientId", stateUpdaterEnabled,
processingThreadsEnabled);
thread.setState(State.STARTING);
- thread.taskManager().init();
final Map<String, KafkaFuture<Uuid>> producerFutures =
thread.clientInstanceIds(Duration.ZERO);
@@ -3607,10 +3583,9 @@ public class StreamThreadTest {
);
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testNamedTopologyWithStreamsProtocol(final boolean
stateUpdaterEnabled) {
- final Properties props = configProps(false, stateUpdaterEnabled,
false);
+ @Test
+ public void testNamedTopologyWithStreamsProtocol() {
+ final Properties props = configProps(false, false, false);
props.setProperty(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.toString());
final StreamsConfig config = new StreamsConfig(props);
final InternalTopologyBuilder topologyBuilder = new
InternalTopologyBuilder(
@@ -3667,10 +3642,9 @@ public class StreamThreadTest {
assertTrue(thread.streamsRebalanceData().isEmpty());
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testStreamsRebalanceDataWithExtraCopartition(final boolean
stateUpdaterEnabled) {
- final Properties props = configProps(false, stateUpdaterEnabled,
false);
+ @Test
+ public void testStreamsRebalanceDataWithExtraCopartition() {
+ final Properties props = configProps(false, false, false);
props.setProperty(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.toString());
internalTopologyBuilder.addSource(null, "source1", null, null, null,
topic1);