This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new eb155a21137 MINOR: Revert "KAFKA-18913: Start state updater in task 
manager (#198… (#20186)
eb155a21137 is described below

commit eb155a211370fe6f8d3871d4178a44de6ba52fe8
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu Jul 17 17:28:08 2025 +0200

    MINOR: Revert "KAFKA-18913: Start state updater in task manager (#198… 
(#20186)
    
    This reverts commit 4d6cf3efef15b3c6b1511c28b882b5486ff1245c. It seemed
    to trigger a race condition in the state updater initialization.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../streams/processor/internals/StreamThread.java  | 11 +++----
 .../streams/processor/internals/TaskManager.java   |  5 ---
 .../processor/internals/StreamThreadTest.java      | 38 ++++------------------
 3 files changed, 11 insertions(+), 43 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 86e12bf3f65..89cc988313b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -454,7 +454,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         final DefaultTaskManager schedulingTaskManager =
             maybeCreateSchedulingTaskManager(processingThreadsEnabled, 
stateUpdaterEnabled, topologyMetadata, time, threadId, tasks);
         final StateUpdater stateUpdater =
-            maybeCreateStateUpdater(
+            maybeCreateAndStartStateUpdater(
                 stateUpdaterEnabled,
                 streamsMetrics,
                 config,
@@ -633,7 +633,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         return null;
     }
 
-    private static StateUpdater maybeCreateStateUpdater(final boolean 
stateUpdaterEnabled,
+    private static StateUpdater maybeCreateAndStartStateUpdater(final boolean 
stateUpdaterEnabled,
                                                                 final 
StreamsMetricsImpl streamsMetrics,
                                                                 final 
StreamsConfig streamsConfig,
                                                                 final 
Consumer<byte[], byte[]> restoreConsumer,
@@ -644,7 +644,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
                                                                 final int 
threadIdx) {
         if (stateUpdaterEnabled) {
             final String name = clientId + STATE_UPDATER_ID_SUBSTRING + 
threadIdx;
-            return new DefaultStateUpdater(
+            final StateUpdater stateUpdater = new DefaultStateUpdater(
                 name,
                 streamsMetrics.metricsRegistry(),
                 streamsConfig,
@@ -653,6 +653,8 @@ public class StreamThread extends Thread implements 
ProcessingThread {
                 topologyMetadata,
                 time
             );
+            stateUpdater.start();
+            return stateUpdater;
         } else {
             return null;
         }
@@ -881,9 +883,6 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         }
         boolean cleanRun = false;
         try {
-            if (stateUpdaterEnabled) {
-                taskManager.init();
-            }
             cleanRun = runLoop();
         } catch (final Throwable e) {
             failedStreamThreadSensor.record();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 3ec3bb78557..9207ec74a7c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -149,11 +149,6 @@ public class TaskManager {
         );
     }
 
-    void init() {
-        if (stateUpdater != null) {
-            this.stateUpdater.start();
-        }
-    }
     void setMainConsumer(final Consumer<byte[], byte[]> mainConsumer) {
         this.mainConsumer = mainConsumer;
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 9eed49110bb..aa68672b3c0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -109,7 +109,6 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.Mockito;
@@ -917,7 +916,6 @@ public class StreamThreadTest {
         thread = createStreamThread(CLIENT_ID, config);
 
         thread.setState(StreamThread.State.STARTING);
-        thread.taskManager().init();
         thread.setState(StreamThread.State.PARTITIONS_REVOKED);
 
         final TaskId task1 = new TaskId(0, t1p1.partition());
@@ -1291,7 +1289,6 @@ public class StreamThreadTest {
         thread = createStreamThread(CLIENT_ID, new StreamsConfig(props));
 
         thread.setState(StreamThread.State.STARTING);
-        thread.taskManager().init();
         
thread.rebalanceListener().onPartitionsRevoked(Collections.emptyList());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -1549,7 +1546,6 @@ public class StreamThreadTest {
         consumer.updatePartitions(topic1, Collections.singletonList(new 
PartitionInfo(topic1, 1, null, null, null)));
 
         thread.setState(StreamThread.State.STARTING);
-        thread.taskManager().init();
         thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -1615,7 +1611,6 @@ public class StreamThreadTest {
         internalTopologyBuilder.addSink("out", "output", null, null, null, 
"name");
 
         thread.setState(StreamThread.State.STARTING);
-        thread.taskManager().init();
         thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -1698,7 +1693,6 @@ public class StreamThreadTest {
         internalTopologyBuilder.buildTopology();
 
         thread.setState(StreamThread.State.STARTING);
-        thread.taskManager().init();
         thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -1793,7 +1787,6 @@ public class StreamThreadTest {
         consumer.updatePartitions(topic1, Collections.singletonList(new 
PartitionInfo(topic1, 1, null, null, null)));
 
         thread.setState(StreamThread.State.STARTING);
-        thread.taskManager().init();
         thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -1858,7 +1851,6 @@ public class StreamThreadTest {
         internalTopologyBuilder.addSink("out", "output", null, null, null, 
"name");
 
         thread.setState(StreamThread.State.STARTING);
-        thread.taskManager().init();
         thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -1940,7 +1932,6 @@ public class StreamThreadTest {
         );
 
         thread.setState(StreamThread.State.STARTING);
-        thread.taskManager().init();
         thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
 
         final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
@@ -2001,7 +1992,6 @@ public class StreamThreadTest {
         restoreConsumer.updateBeginningOffsets(offsets);
 
         thread.setState(StreamThread.State.STARTING);
-        thread.taskManager().init();
         thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
 
         final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
@@ -2265,7 +2255,6 @@ public class StreamThreadTest {
         thread = createStreamThread(CLIENT_ID, config);
 
         thread.setState(StreamThread.State.STARTING);
-        thread.taskManager().init();
         thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
         final List<TopicPartition> assignedPartitions = new ArrayList<>();
 
@@ -2345,7 +2334,6 @@ public class StreamThreadTest {
         thread = createStreamThread(CLIENT_ID, stateUpdaterEnabled, 
processingThreadsEnabled);
 
         thread.setState(StreamThread.State.STARTING);
-        thread.taskManager().init();
         thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
         final List<TopicPartition> assignedPartitions = new ArrayList<>();
 
@@ -2543,7 +2531,6 @@ public class StreamThreadTest {
         thread = createStreamThread(CLIENT_ID, new StreamsConfig(properties));
 
         thread.setState(StreamThread.State.STARTING);
-        thread.taskManager().init();
         thread.setState(StreamThread.State.PARTITIONS_REVOKED);
 
         final TaskId task1 = new TaskId(0, t1p1.partition());
@@ -3030,7 +3017,6 @@ public class StreamThreadTest {
         thread = createStreamThread(CLIENT_ID, config);
 
         thread.setState(StreamThread.State.STARTING);
-        thread.taskManager().init();
         thread.setState(StreamThread.State.PARTITIONS_REVOKED);
 
         final TaskId task1 = new TaskId(0, t1p1.partition());
@@ -3404,7 +3390,6 @@ public class StreamThreadTest {
 
         thread = createStreamThread("clientId", stateUpdaterEnabled, 
processingThreadsEnabled);
         thread.setState(State.STARTING);
-        thread.taskManager().init();
 
         final Map<String, KafkaFuture<Uuid>> clientInstanceIdFutures = 
thread.clientInstanceIds(Duration.ZERO);
 
@@ -3429,7 +3414,6 @@ public class StreamThreadTest {
     public void shouldReturnErrorIfMainConsumerInstanceIdNotInitialized(final 
boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         thread = createStreamThread("clientId", stateUpdaterEnabled, 
processingThreadsEnabled);
         thread.setState(State.STARTING);
-        thread.taskManager().init();
 
         final Map<String, KafkaFuture<Uuid>> consumerFutures = 
thread.clientInstanceIds(Duration.ZERO);
 
@@ -3446,7 +3430,6 @@ public class StreamThreadTest {
     public void 
shouldReturnErrorIfRestoreConsumerInstanceIdNotInitialized(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         thread = createStreamThread("clientId", stateUpdaterEnabled, 
processingThreadsEnabled);
         thread.setState(State.STARTING);
-        thread.taskManager().init();
 
         final Map<String, KafkaFuture<Uuid>> consumerFutures = 
thread.clientInstanceIds(Duration.ZERO);
 
@@ -3463,7 +3446,6 @@ public class StreamThreadTest {
     public void shouldReturnErrorIfProducerInstanceIdNotInitialized(final 
boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) {
         thread = createStreamThread("clientId", stateUpdaterEnabled, 
processingThreadsEnabled);
         thread.setState(State.STARTING);
-        thread.taskManager().init();
 
         final Map<String, KafkaFuture<Uuid>> producerFutures = 
thread.clientInstanceIds(Duration.ZERO);
 
@@ -3481,7 +3463,6 @@ public class StreamThreadTest {
         clientSupplier.consumer.disableTelemetry();
         thread = createStreamThread("clientId", stateUpdaterEnabled, 
processingThreadsEnabled);
         thread.setState(State.STARTING);
-        thread.taskManager().init();
 
         final Map<String, KafkaFuture<Uuid>> consumerFutures = 
thread.clientInstanceIds(Duration.ZERO);
 
@@ -3499,7 +3480,6 @@ public class StreamThreadTest {
 
         thread = createStreamThread("clientId", stateUpdaterEnabled, 
processingThreadsEnabled);
         thread.setState(State.STARTING);
-        thread.taskManager().init();
 
         final Map<String, KafkaFuture<Uuid>> consumerFutures = 
thread.clientInstanceIds(Duration.ZERO);
 
@@ -3519,7 +3499,6 @@ public class StreamThreadTest {
 
         thread = createStreamThread("clientId", stateUpdaterEnabled, 
processingThreadsEnabled);
         thread.setState(State.STARTING);
-        thread.taskManager().init();
 
         final Map<String, KafkaFuture<Uuid>> producerFutures = 
thread.clientInstanceIds(Duration.ZERO);
 
@@ -3537,7 +3516,6 @@ public class StreamThreadTest {
         clientSupplier.consumer.injectTimeoutException(-1);
         thread = createStreamThread("clientId", stateUpdaterEnabled, 
processingThreadsEnabled);
         thread.setState(State.STARTING);
-        thread.taskManager().init();
 
         final Map<String, KafkaFuture<Uuid>> consumerFutures = 
thread.clientInstanceIds(Duration.ZERO);
 
@@ -3562,7 +3540,6 @@ public class StreamThreadTest {
         clientSupplier.restoreConsumer.injectTimeoutException(-1);
         thread = createStreamThread("clientId", stateUpdaterEnabled, 
processingThreadsEnabled);
         thread.setState(State.STARTING);
-        thread.taskManager().init();
 
         final Map<String, KafkaFuture<Uuid>> consumerFutures = 
thread.clientInstanceIds(Duration.ZERO);
 
@@ -3590,7 +3567,6 @@ public class StreamThreadTest {
 
         thread = createStreamThread("clientId", stateUpdaterEnabled, 
processingThreadsEnabled);
         thread.setState(State.STARTING);
-        thread.taskManager().init();
 
         final Map<String, KafkaFuture<Uuid>> producerFutures = 
thread.clientInstanceIds(Duration.ZERO);
 
@@ -3607,10 +3583,9 @@ public class StreamThreadTest {
         );
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testNamedTopologyWithStreamsProtocol(final boolean 
stateUpdaterEnabled) {
-        final Properties props = configProps(false, stateUpdaterEnabled, 
false);
+    @Test
+    public void testNamedTopologyWithStreamsProtocol() {
+        final Properties props = configProps(false, false, false);
         props.setProperty(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.toString());
         final StreamsConfig config = new StreamsConfig(props);
         final InternalTopologyBuilder topologyBuilder = new 
InternalTopologyBuilder(
@@ -3667,10 +3642,9 @@ public class StreamThreadTest {
         assertTrue(thread.streamsRebalanceData().isEmpty());
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    public void testStreamsRebalanceDataWithExtraCopartition(final boolean 
stateUpdaterEnabled) {
-        final Properties props = configProps(false, stateUpdaterEnabled, 
false);
+    @Test
+    public void testStreamsRebalanceDataWithExtraCopartition() {
+        final Properties props = configProps(false, false, false);
         props.setProperty(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.toString());
 
         internalTopologyBuilder.addSource(null, "source1", null, null, null, 
topic1);

Reply via email to