cadonna commented on a change in pull request #10646: URL: https://github.com/apache/kafka/pull/10646#discussion_r632371505
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce } final Set<String> changelogTopics = new HashSet<>(); - for (final StateStore stateStore : globalStateStores) { - globalStoreNames.add(stateStore.name()); + for (final StateStore stateStore : topology.globalStateStores()) { final String sourceTopic = storeToChangelogTopic.get(stateStore.name()); changelogTopics.add(sourceTopic); stateStore.init((StateStoreContext) globalProcessorContext, stateStore); Review comment: On a second thought, it might also be relevant for production code since we now can restart the stream thread after a fatal error. This is not yet possible for a global stream thread, but it might be possible in future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org