cadonna commented on a change in pull request #10646: URL: https://github.com/apache/kafka/pull/10646#discussion_r629968874
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce } final Set<String> changelogTopics = new HashSet<>(); - for (final StateStore stateStore : globalStateStores) { - globalStoreNames.add(stateStore.name()); + for (final StateStore stateStore : topology.globalStateStores()) { Review comment: Not really related to this line. Could you verify that the state store is closed in the unit test that tests line 148? The name of the test is `shouldThrowStreamsExceptionForOldTopicPartitions()`. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final InternalProcessorContext globalProce } final Set<String> changelogTopics = new HashSet<>(); - for (final StateStore stateStore : globalStateStores) { - globalStoreNames.add(stateStore.name()); + for (final StateStore stateStore : topology.globalStateStores()) { final String sourceTopic = storeToChangelogTopic.get(stateStore.name()); changelogTopics.add(sourceTopic); stateStore.init((StateStoreContext) globalProcessorContext, stateStore); Review comment: There are a a `IllegalStateException` and a couple of `IllegalArgumentException`s on the path from opening the state store within `stateStore.init()` to line 182 in `this.registerStore()`. We do not close the state stores before we throw. I do not think this is relevant for production code, but we could leak state stores in unit tests if we do not explicitly close the state stores in the unit tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org