guozhangwang commented on code in PR #12638: URL: https://github.com/apache/kafka/pull/12638#discussion_r972283623
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java: ########## @@ -209,7 +212,9 @@ void registerStateStores(final List<StateStore> allStores, final InternalProcess processorContext.uninitialize(); for (final StateStore store : allStores) { if (stores.containsKey(store.name())) { - maybeRegisterStoreWithChangelogReader(store.name()); + if (!stateUpdaterEnabled) { + maybeRegisterStoreWithChangelogReader(store.name()); Review Comment: Another case for this is when we handle EOS task corruption due to no checkpoint file detected: in that case we would remove the corrupted task's changelogs, and re-initialize them. In this case the stores would be initialized but changelog readers not registered as well. In the future we can decouple the registration of state stores and the `register(final TopicPartition partition, final ProcessorStateManager stateManager)` as long as in the latter case, we are sure that the `stateManager`'s stores map are already populated, which should always be true when the task is already in stateUpdater. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org