mjsax commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r586676476
##########
File path:
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -76,6 +79,30 @@ public void
testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
assertEquals(3, records.count());
}
+ @Test
+ public void
testResetToSpecificOffsetWhenPartitionIsEmptyResetsToLatestOffset() {
+ final MockConsumer<byte[], byte[]> emptyConsumer = new
MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+ final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+ endOffsets.put(topicPartition, 0L);
Review comment:
Nit: To make sure we don't have any default/fall-back offset of zero
encoded anywhere, it might be better to test with different offsets values for
endOffset/beginningOffset and the target offset?
Atm, if we would `seekToBeginning` as fallback instead of `seektToEnd` this
test would still pass. Maybe best to just use 5, 10, 20 (or similar) for start,
end, target.
##########
File path:
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -76,6 +79,30 @@ public void
testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
assertEquals(3, records.count());
}
+ @Test
+ public void
testResetToSpecificOffsetWhenPartitionIsEmptyResetsToLatestOffset() {
+ final MockConsumer<byte[], byte[]> emptyConsumer = new
MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+ final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+ endOffsets.put(topicPartition, 0L);
+ emptyConsumer.updateEndOffsets(endOffsets);
+
+ final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+ beginningOffsets.put(topicPartition, 0L);
+ emptyConsumer.updateBeginningOffsets(beginningOffsets);
+ streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions,
2L);
+ emptyConsumer.position(topicPartition);
+
+ emptyConsumer.commitSync();
Review comment:
Thanks for explaining on the other comment that `resetOffsetsTo` only
seeks but does not commit.
For this test, I am wondering if we can only verify if the seek happened
without calling commit in the test code? This may make the test "cleaner". If
fact, `emptyConsumer.position(topicPartition);` should return the seek position
and it seems sufficient to verify its return value?
I also hope we actually have a "integration test" that tests that both seek
and commit happens when using the resetting (ie, an full test for this case
instead of just a partial helper-method test).
We might also add a comment that the helper method `resetOffsetsTo` does
only seeks but not commits (for others to make sense of the test more easily).
##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -503,7 +506,16 @@ private void resetToDatetime(final Consumer<byte[],
byte[]> client,
final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset
= client.offsetsForTimes(topicPartitionsAndTimes);
for (final TopicPartition topicPartition : inputTopicPartitions) {
- client.seek(topicPartition,
topicPartitionsAndOffset.get(topicPartition).offset());
+ final Optional<Long> partitionOffset =
Optional.ofNullable(topicPartitionsAndOffset.get(topicPartition))
+ .map(OffsetAndTimestamp::offset)
+ .filter(offset -> offset !=
ListOffsetsResponse.UNKNOWN_OFFSET);
+ if (partitionOffset.isPresent()) {
+ client.seek(topicPartition, partitionOffset.get());
+ } else {
+ client.seekToEnd(Collections.singletonList(topicPartition));
+ System.out.println("Partition " + topicPartition.partition() +
" from topic " + topicPartition.topic() +
+ " is empty, without a committed record. Falling back
to latest known offset.");
Review comment:
If @jeqo has not objections, I think we should remove it for both cases.
##########
File path:
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -247,6 +274,32 @@ public void
shouldDetermineInternalTopicBasedOnTopicName1() {
assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-12323232-topic"));
}
+ @Test
+ public void testResetToDatetimeWhenPartitionIsEmptyResetsToLatestOffset() {
Review comment:
Same comments as for the first test.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]