cadonna commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r632371505



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final 
InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
-            globalStoreNames.add(stateStore.name());
+        for (final StateStore stateStore : topology.globalStateStores()) {
             final String sourceTopic = 
storeToChangelogTopic.get(stateStore.name());
             changelogTopics.add(sourceTopic);
             stateStore.init((StateStoreContext) globalProcessorContext, 
stateStore);

Review comment:
       On a second thought, it might also be relevant for production code since 
we now can restart the stream thread after a fatal error. This is not yet 
possible for a global stream thread, but it might be possible in future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to