guozhangwang commented on a change in pull request #10568: URL: https://github.com/apache/kafka/pull/10568#discussion_r626160997
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ########## @@ -396,6 +396,21 @@ public void close() { log.info("Skipping to close non-initialized store {}", entry.getKey()); } } + for (final StateStore store : globalStateStores) { Review comment: Surprised to see we actually have two as well. Did a quick look at them, and I think they can be consolidated indeed. Some more details: 1) The `mgr.initialize` would try to register all stores within `globalStateStores`, which would put them into the other `globalStores` map one by one via the `registerStateStore` call. So after the `initialize` call, the two stores should contain the same piece of metadata. 2) But note that, before `initialize` call, no stores should be opened yet. So if a failure happens before that call, then all stores should be in `closed` form, and this logic would never be triggered. 3) Within `initialize` we call `restoreState`, and only after that we would add the stores to the `globalStores` here. So if a failure happens during `restoreState`, the `globalStores` would not contain it while we have to rely on `globalStateStores`. Based on that, I can file a quick follow-up fix after your PR to consolidate these two. ########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java ########## @@ -383,8 +386,7 @@ public void shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetricsVersionLatest( } private void shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetrics(final String builtInMetricsVersion) { - final InternalMockProcessorContext context = createInternalMockProcessorContext(builtInMetricsVersion); - processor.init(context); + setup(builtInMetricsVersion, true); Review comment: Could you remind me: why we need to enable caching here, but not in others below? ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java ########## @@ -189,16 +199,6 @@ public void shouldCloseStateStoresOnClose() throws Exception { assertFalse(globalStore.isOpen()); } - @Test - public void shouldTransitionToDeadOnClose() throws Exception { Review comment: Why remove this test? ########## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java ########## @@ -242,6 +242,7 @@ private void shouldLogAndMeterWhenSkippingNullLeftKey(final String builtInMetric props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, builtInMetricsVersion); try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamKTableJoin.class)) { + driver.close(); Review comment: Nice catch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org