mjsax commented on code in PR #15414: URL: https://github.com/apache/kafka/pull/15414#discussion_r1509703745
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java: ########## @@ -203,13 +202,27 @@ public void registerStore(final StateStore store, ); try { - restoreState( - stateRestoreCallback, - topicPartitions, - highWatermarks, - store.name(), - converterForStore(store) - ); + if (topology.storeNameToReprocessOnRestore().getOrDefault(store.name(), false)) { + globalConsumer.assign(topicPartitions); + globalConsumer.seekToBeginning(topicPartitions); + for (final TopicPartition topicPartition : topicPartitions) { + stateRestoreListener.onRestoreStart(topicPartition, store.name(), + checkpointFileCache.getOrDefault(topicPartition, 0L), + checkpointFileCache.getOrDefault(topicPartition, 0L)); + stateRestoreListener.onRestoreEnd(topicPartition, store.name(), 0L); Review Comment: Where does the actual restore happen? Note that the original `restoreState()` is the "bootstrapping phase" before we move to `RUNNING`, and we should preserve this behavior. It seem, your PR basically skips the bootstrapping, and relies on the regular processing to re-read the data? For this case, we would go to `RUNNING` with an empty global store and thus lookups might fail as the data is not loaded yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org