cadonna commented on a change in pull request #10646:
URL: https://github.com/apache/kafka/pull/10646#discussion_r629968874



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final 
InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
-            globalStoreNames.add(stateStore.name());
+        for (final StateStore stateStore : topology.globalStateStores()) {

Review comment:
       Not really related to this line. Could you verify that the state store 
is closed in the unit test that tests line 148? The name of the test is 
`shouldThrowStreamsExceptionForOldTopicPartitions()`.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -128,8 +129,7 @@ public void setGlobalProcessorContext(final 
InternalProcessorContext globalProce
         }
 
         final Set<String> changelogTopics = new HashSet<>();
-        for (final StateStore stateStore : globalStateStores) {
-            globalStoreNames.add(stateStore.name());
+        for (final StateStore stateStore : topology.globalStateStores()) {
             final String sourceTopic = 
storeToChangelogTopic.get(stateStore.name());
             changelogTopics.add(sourceTopic);
             stateStore.init((StateStoreContext) globalProcessorContext, 
stateStore);

Review comment:
       There are a a `IllegalStateException` and a couple of 
`IllegalArgumentException`s on the path from opening the state store within 
`stateStore.init()` to line 182 in `this.registerStore()`. We do not close the 
state stores before we throw. I do not think this is relevant for production 
code, but we could leak state stores in unit tests if we do not explicitly 
close the state stores in the unit tests.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to