mjsax commented on code in PR #15414:
URL: https://github.com/apache/kafka/pull/15414#discussion_r1526870886


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -236,6 +247,76 @@ private List<TopicPartition> topicPartitionsForStore(final 
StateStore store) {
         }
         return topicPartitions;
     }
+    @SuppressWarnings("unchecked")
+    private void reprocessState(final List<TopicPartition> topicPartitions,
+                                final Map<TopicPartition, Long> highWatermarks,
+                                final InternalTopologyBuilder.ReprocessFactory 
reprocessFactory,
+                                final String storeName) {
+        final Processor source = reprocessFactory.processorSupplier().get();
+        source.init(globalProcessorContext);
+
+        for (final TopicPartition topicPartition : topicPartitions) {
+            long currentDeadline = NO_DEADLINE;
+
+            globalConsumer.assign(Collections.singletonList(topicPartition));
+            long offset;
+            final Long checkpoint = checkpointFileCache.get(topicPartition);
+            if (checkpoint != null) {
+                globalConsumer.seek(topicPartition, checkpoint);
+                offset = checkpoint;
+            } else {
+                
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+                offset = getGlobalConsumerOffset(topicPartition);
+            }
+            final Long highWatermark = highWatermarks.get(topicPartition);
+            stateRestoreListener.onRestoreStart(topicPartition, 
reprocessFactory.toString(), offset, highWatermark);
+
+            long restoreCount = 0L;
+
+            while (offset < highWatermark) {
+                // we add `request.timeout.ms` to `poll.ms` because `poll.ms` 
might be too short
+                // to give a fetch request a fair chance to actually complete 
and we don't want to
+                // start `task.timeout.ms` too early
+                //
+                // TODO with https://issues.apache.org/jira/browse/KAFKA-10315 
we can just call
+                //      `poll(pollMS)` without adding the request timeout and 
do a more precise
+                //      timeout handling
+                final ConsumerRecords<byte[], byte[]> records = 
globalConsumer.poll(pollMsPlusRequestTimeout);
+                if (records.isEmpty()) {
+                    currentDeadline = 
maybeUpdateDeadlineOrThrow(currentDeadline);
+                } else {
+                    currentDeadline = NO_DEADLINE;
+                }
+
+                for (final ConsumerRecord<byte[], byte[]> record : 
records.records(topicPartition)) {
+                    final ProcessorRecordContext recordContext =
+                        new ProcessorRecordContext(
+                            record.timestamp(),
+                            record.offset(),
+                            record.partition(),
+                            record.topic(),
+                            record.headers());
+                    globalProcessorContext.setRecordContext(recordContext);
+
+                    if (record.key() != null) {
+                        source.process(new Record<>(
+                            
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),

Review Comment:
   I did think about this approach, too, however, I think we should not 
auto-drop. Assume, somebody is using a SR and when the app is running and 
pushing data into the store, everything just works. Later, KS crashes and we 
need to restore global state, but during the restore process there is some 
issue with the SR and deserialization fails. If we auto-drop, we would get data 
loss, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to