mjsax commented on code in PR #19188:
URL: https://github.com/apache/kafka/pull/19188#discussion_r1992830047
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java:
##########
@@ -1157,6 +1157,42 @@ public void
shouldSkipOnDeserializationErrorsWhenReprocessing() {
assertEquals(0, stateRestoreCallback.restored.size());
}
+ @Test
+ public void shouldListenForRestoreBatchEventsWhenReprocessing() {
Review Comment:
Thanks for adding this test. I was looking into this file, and found method
`shouldListenForRestoreEvents()` -- seems we could just add a line there
instead of adding a new test method?
We might still need to update the existing test to ensure we get multiple
batches restored though, to avoid that "total restored" and "batch restored"
are different: or (maybe better) we could also update `MockConsumer` adding a
new (optional) parameter `max.poll.records`?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -300,6 +300,7 @@ private void reprocessState(final List<TopicPartition>
topicPartitions,
currentDeadline = NO_DEADLINE;
}
+ int batchRestoreCount = 0;
Review Comment:
We pass a `long` into `onBatchRestored(...)` as last parameter.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -300,6 +300,7 @@ private void reprocessState(final List<TopicPartition>
topicPartitions,
currentDeadline = NO_DEADLINE;
}
+ int batchRestoreCount = 0;
Review Comment:
```suggestion
long batchRestoreCount = 0;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]