guozhangwang commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r972283623


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -209,7 +212,9 @@ void registerStateStores(final List<StateStore> allStores, 
final InternalProcess
         processorContext.uninitialize();
         for (final StateStore store : allStores) {
             if (stores.containsKey(store.name())) {
-                maybeRegisterStoreWithChangelogReader(store.name());
+                if (!stateUpdaterEnabled) {
+                    maybeRegisterStoreWithChangelogReader(store.name());

Review Comment:
   Another case for this is when we handle EOS task corruption due to no 
checkpoint file detected: in that case we would remove the corrupted task's 
changelogs, and re-initialize them. In this case the stores would be 
initialized but changelog readers not registered as well.
   
   In the future we can decouple the registration of state stores and the 
`register(final TopicPartition partition, final ProcessorStateManager 
stateManager)` as long as in the latter case, we are sure that the 
`stateManager`'s stores map are already populated, which should always be true 
when the task is already in stateUpdater.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to