vvcephei commented on a change in pull request #8248:
URL: https://github.com/apache/kafka/pull/8248#discussion_r418851326



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -70,18 +69,16 @@ static void registerStateStores(final Logger log,
                 e
             );
         }
+
         log.debug("Acquired state directory lock");
 
         final boolean storeDirsEmpty = 
stateDirectory.directoryForTaskIsEmpty(id);
 
         // We should only load checkpoint AFTER the corresponding state 
directory lock has been acquired and
         // the state stores have been registered; we should not try to load at 
the state manager construction time.
         // See https://issues.apache.org/jira/browse/KAFKA-8574
-        for (final StateStore store : topology.stateStores()) {
-            processorContext.uninitialize();
-            store.init(processorContext, store);
-            log.trace("Registered state store {}", store.name());
-        }
+        stateMgr.registerStateStores(topology.stateStores(), processorContext);

Review comment:
       Ah. Makes sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to