wcarlson5 commented on code in PR #15414: URL: https://github.com/apache/kafka/pull/15414#discussion_r1528810589
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java: ########## @@ -236,6 +252,91 @@ private List<TopicPartition> topicPartitionsForStore(final StateStore store) { } return topicPartitions; } + @SuppressWarnings("unchecked") + private void reprocessState(final List<TopicPartition> topicPartitions, + final Map<TopicPartition, Long> highWatermarks, + final InternalTopologyBuilder.ReprocessFactory reprocessFactory, + final String storeName) { + final Processor source = reprocessFactory.processorSupplier().get(); + source.init(globalProcessorContext); + + for (final TopicPartition topicPartition : topicPartitions) { + long currentDeadline = NO_DEADLINE; + + globalConsumer.assign(Collections.singletonList(topicPartition)); + long offset; + final Long checkpoint = checkpointFileCache.get(topicPartition); + if (checkpoint != null) { + globalConsumer.seek(topicPartition, checkpoint); + offset = checkpoint; + } else { + globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); + offset = getGlobalConsumerOffset(topicPartition); + } + final Long highWatermark = highWatermarks.get(topicPartition); + stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); + + long restoreCount = 0L; + + while (offset < highWatermark) { + // we add `request.timeout.ms` to `poll.ms` because `poll.ms` might be too short + // to give a fetch request a fair chance to actually complete and we don't want to + // start `task.timeout.ms` too early + // + // TODO with https://issues.apache.org/jira/browse/KAFKA-10315 we can just call + // `poll(pollMS)` without adding the request timeout and do a more precise + // timeout handling + final ConsumerRecords<byte[], byte[]> records = globalConsumer.poll(pollMsPlusRequestTimeout); + if (records.isEmpty()) { + currentDeadline = maybeUpdateDeadlineOrThrow(currentDeadline); + } else { + currentDeadline = NO_DEADLINE; + } + + for (final ConsumerRecord<byte[], byte[]> record : records.records(topicPartition)) { + final ProcessorRecordContext recordContext = + new ProcessorRecordContext( + record.timestamp(), + record.offset(), + record.partition(), + record.topic(), + record.headers()); + globalProcessorContext.setRecordContext(recordContext); + + try { + if (record.key() != null) { + source.process(new Record<>( + reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()), + reprocessFactory.valueDeserializer().deserialize(record.topic(), record.value()), + record.timestamp(), + record.headers())); + restoreCount++; + } + } catch (final Exception deserializationException) { + handleDeserializationFailure( Review Comment: That was my first thought too. Maybe we could refactor it a bit more, but a `RecordDeserializer` wants a `SourceNode`. It seemed like that would be changing more surfaces than necessary. But we can -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org