guozhangwang commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626160997



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -396,6 +396,21 @@ public void close() {
                 log.info("Skipping to close non-initialized store {}", 
entry.getKey());
             }
         }
+        for (final StateStore store : globalStateStores) {

Review comment:
       Surprised to see we actually have two as well. Did a quick look at them, 
and I think they can be consolidated indeed. Some more details:
   
   1) The `mgr.initialize` would try to register all stores within 
`globalStateStores`, which would put them into the other `globalStores` map one 
by one via the `registerStateStore` call. So after the `initialize` call, the 
two stores should contain the same piece of metadata.
   
   2) But note that, before `initialize` call, no stores should be opened yet. 
So if a failure happens before that call, then all stores should be in `closed` 
form, and this logic would never be triggered.
   
   3) Within `initialize` we call `restoreState`, and only after that we would 
add the stores to the `globalStores` here. So if a failure happens during 
`restoreState`, the `globalStores` would not contain it while we have to rely 
on `globalStateStores`.
   
   Based on that, I can file a quick follow-up fix after your PR to consolidate 
these two.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
##########
@@ -383,8 +386,7 @@ public void 
shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetricsVersionLatest(
     }
 
     private void shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetrics(final 
String builtInMetricsVersion) {
-        final InternalMockProcessorContext context = 
createInternalMockProcessorContext(builtInMetricsVersion);
-        processor.init(context);
+        setup(builtInMetricsVersion, true);

Review comment:
       Could you remind me: why we need to enable caching here, but not in 
others below?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
##########
@@ -189,16 +199,6 @@ public void shouldCloseStateStoresOnClose() throws 
Exception {
         assertFalse(globalStore.isOpen());
     }
 
-    @Test
-    public void shouldTransitionToDeadOnClose() throws Exception {

Review comment:
       Why remove this test?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
##########
@@ -242,6 +242,7 @@ private void shouldLogAndMeterWhenSkippingNullLeftKey(final 
String builtInMetric
         props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, 
builtInMetricsVersion);
 
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(KStreamKTableJoin.class)) {
+            driver.close();

Review comment:
       Nice catch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to