mjsax commented on code in PR #15414:
URL: https://github.com/apache/kafka/pull/15414#discussion_r1509703745


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -203,13 +202,27 @@ public void registerStore(final StateStore store,
         );
 
         try {
-            restoreState(
-                stateRestoreCallback,
-                topicPartitions,
-                highWatermarks,
-                store.name(),
-                converterForStore(store)
-            );
+            if 
(topology.storeNameToReprocessOnRestore().getOrDefault(store.name(), false)) {
+                globalConsumer.assign(topicPartitions);
+                globalConsumer.seekToBeginning(topicPartitions);
+                for (final TopicPartition topicPartition : topicPartitions) {
+                    stateRestoreListener.onRestoreStart(topicPartition, 
store.name(),
+                        checkpointFileCache.getOrDefault(topicPartition, 0L),
+                        checkpointFileCache.getOrDefault(topicPartition, 0L));
+                    stateRestoreListener.onRestoreEnd(topicPartition, 
store.name(), 0L);

Review Comment:
   Where does the actual restore happen?
   
   Note that the original `restoreState()` is the "bootstrapping phase" before 
we move to `RUNNING`, and we should preserve this behavior. It seem, your PR 
basically skips the bootstrapping, and relies on the regular processing to 
re-read the data? For this case, we would go to `RUNNING` with an empty global 
store and thus lookups might fail as the data is not loaded yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to