This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9a239c6142a KAFKA-16955: fix synchronization of streams threadState 
(#16337)
9a239c6142a is described below

commit 9a239c6142a8f2eb36f1600d1012224c31e58e71
Author: Rohan <desai.p.ro...@gmail.com>
AuthorDate: Fri Jun 14 10:44:36 2024 -0700

    KAFKA-16955: fix synchronization of streams threadState (#16337)
    
    Each KafkaStreams instance maintains a map from threadId to state
    to use to aggregate to a KafkaStreams app state. The map is updated
    on every state change, and when a new thread is created. State change
    updates are done in a synchronized blocks, however the update that
    happens on thread creation is not, which can raise
    ConcurrentModificationException. This patch moves this update
    into the listener object and protects it using the object's lock.
    It also moves ownership of the state map into the listener so that
    its less likely that future changes access it without locking
    
    Reviewers: Matthias J. Sax <matth...@confluent.io>
---
 .../org/apache/kafka/streams/KafkaStreams.java     | 64 ++++++++++------------
 1 file changed, 30 insertions(+), 34 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 9cf91f163e2..fcc999e92af 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -176,7 +176,6 @@ public class KafkaStreams implements AutoCloseable {
     private final long totalCacheSize;
     private final StreamStateListener streamStateListener;
     private final DelegatingStateRestoreListener 
delegatingStateRestoreListener;
-    private final Map<Long, StreamThread.State> threadState;
     private final UUID processId;
     private final KafkaClientSupplier clientSupplier;
     protected final TopologyMetadata topologyMetadata;
@@ -633,17 +632,13 @@ public class KafkaStreams implements AutoCloseable {
     /**
      * Class that handles stream thread transitions
      */
-    final class StreamStateListener implements StreamThread.StateListener {
+    private final class StreamStateListener implements 
StreamThread.StateListener {
         private final Map<Long, StreamThread.State> threadState;
         private GlobalStreamThread.State globalThreadState;
-        // this lock should always be held before the state lock
-        private final Object threadStatesLock;
 
-        StreamStateListener(final Map<Long, StreamThread.State> threadState,
-                            final GlobalStreamThread.State globalThreadState) {
-            this.threadState = threadState;
+        StreamStateListener(final GlobalStreamThread.State globalThreadState) {
+            this.threadState = new HashMap<>();
             this.globalThreadState = globalThreadState;
-            this.threadStatesLock = new Object();
         }
 
         /**
@@ -675,33 +670,35 @@ public class KafkaStreams implements AutoCloseable {
         public synchronized void onChange(final Thread thread,
                                           final ThreadStateTransitionValidator 
abstractNewState,
                                           final ThreadStateTransitionValidator 
abstractOldState) {
-            synchronized (threadStatesLock) {
-                // StreamThreads first
-                if (thread instanceof StreamThread) {
-                    final StreamThread.State newState = (StreamThread.State) 
abstractNewState;
-                    threadState.put(thread.getId(), newState);
-
-                    if (newState == StreamThread.State.PARTITIONS_REVOKED || 
newState == StreamThread.State.PARTITIONS_ASSIGNED) {
-                        setState(State.REBALANCING);
-                    } else if (newState == StreamThread.State.RUNNING) {
-                        maybeSetRunning();
-                    }
-                } else if (thread instanceof GlobalStreamThread) {
-                    // global stream thread has different invariants
-                    final GlobalStreamThread.State newState = 
(GlobalStreamThread.State) abstractNewState;
-                    globalThreadState = newState;
-
-                    if (newState == GlobalStreamThread.State.RUNNING) {
-                        maybeSetRunning();
-                    } else if (newState == GlobalStreamThread.State.DEAD) {
-                        if (state != State.PENDING_SHUTDOWN) {
-                            log.error("Global thread has died. The streams 
application or client will now close to ERROR.");
-                            closeToError();
-                        }
+            // StreamThreads first
+            if (thread instanceof StreamThread) {
+                final StreamThread.State newState = (StreamThread.State) 
abstractNewState;
+                threadState.put(thread.getId(), newState);
+
+                if (newState == StreamThread.State.PARTITIONS_REVOKED || 
newState == StreamThread.State.PARTITIONS_ASSIGNED) {
+                    setState(State.REBALANCING);
+                } else if (newState == StreamThread.State.RUNNING) {
+                    maybeSetRunning();
+                }
+            } else if (thread instanceof GlobalStreamThread) {
+                // global stream thread has different invariants
+                final GlobalStreamThread.State newState = 
(GlobalStreamThread.State) abstractNewState;
+                globalThreadState = newState;
+
+                if (newState == GlobalStreamThread.State.RUNNING) {
+                    maybeSetRunning();
+                } else if (newState == GlobalStreamThread.State.DEAD) {
+                    if (state != State.PENDING_SHUTDOWN) {
+                        log.error("Global thread has died. The streams 
application or client will now close to ERROR.");
+                        closeToError();
                     }
                 }
             }
         }
+
+        private synchronized void registerStreamThread(final StreamThread 
streamThread) {
+            threadState.put(streamThread.getId(), streamThread.state());
+        }
     }
 
     static final class DelegatingStateRestoreListener implements 
StateRestoreListener {
@@ -1047,8 +1044,7 @@ public class KafkaStreams implements AutoCloseable {
             globalThreadState = globalStreamThread.state();
         }
 
-        threadState = new HashMap<>(numStreamThreads);
-        streamStateListener = new StreamStateListener(threadState, 
globalThreadState);
+        streamStateListener = new StreamStateListener(globalThreadState);
 
         final GlobalStateStoreProvider globalStateStoreProvider = new 
GlobalStateStoreProvider(this.topologyMetadata.globalStateStores());
 
@@ -1084,9 +1080,9 @@ public class KafkaStreams implements AutoCloseable {
             KafkaStreams.this::closeToError,
             streamsUncaughtExceptionHandler
         );
+        streamStateListener.registerStreamThread(streamThread);
         streamThread.setStateListener(streamStateListener);
         threads.add(streamThread);
-        threadState.put(streamThread.getId(), streamThread.state());
         
queryableStoreProvider.addStoreProviderForThread(streamThread.getName(), new 
StreamThreadStateStoreProvider(streamThread));
         return streamThread;
     }

Reply via email to