wcarlson5 commented on code in PR #15414:
URL: https://github.com/apache/kafka/pull/15414#discussion_r1526810331


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -236,6 +247,76 @@ private List<TopicPartition> topicPartitionsForStore(final 
StateStore store) {
         }
         return topicPartitions;
     }
+    @SuppressWarnings("unchecked")
+    private void reprocessState(final List<TopicPartition> topicPartitions,
+                                final Map<TopicPartition, Long> highWatermarks,
+                                final InternalTopologyBuilder.ReprocessFactory 
reprocessFactory,
+                                final String storeName) {
+        final Processor source = reprocessFactory.processorSupplier().get();
+        source.init(globalProcessorContext);
+
+        for (final TopicPartition topicPartition : topicPartitions) {
+            long currentDeadline = NO_DEADLINE;
+
+            globalConsumer.assign(Collections.singletonList(topicPartition));
+            long offset;
+            final Long checkpoint = checkpointFileCache.get(topicPartition);
+            if (checkpoint != null) {
+                globalConsumer.seek(topicPartition, checkpoint);
+                offset = checkpoint;
+            } else {
+                
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+                offset = getGlobalConsumerOffset(topicPartition);
+            }
+            final Long highWatermark = highWatermarks.get(topicPartition);
+            stateRestoreListener.onRestoreStart(topicPartition, 
reprocessFactory.toString(), offset, highWatermark);

Review Comment:
   should be store name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to