mjsax commented on a change in pull request #10042: URL: https://github.com/apache/kafka/pull/10042#discussion_r586676476
########## File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java ########## @@ -76,6 +79,30 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() { assertEquals(3, records.count()); } + @Test + public void testResetToSpecificOffsetWhenPartitionIsEmptyResetsToLatestOffset() { + final MockConsumer<byte[], byte[]> emptyConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + emptyConsumer.assign(Collections.singletonList(topicPartition)); + + final Map<TopicPartition, Long> endOffsets = new HashMap<>(); + endOffsets.put(topicPartition, 0L); Review comment: Nit: To make sure we don't have any default/fall-back offset of zero encoded anywhere, it might be better to test with different offsets values for endOffset/beginningOffset and the target offset? Atm, if we would `seekToBeginning` as fallback instead of `seektToEnd` this test would still pass. Maybe best to just use 5, 10, 20 (or similar) for start, end, target. ########## File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java ########## @@ -76,6 +79,30 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() { assertEquals(3, records.count()); } + @Test + public void testResetToSpecificOffsetWhenPartitionIsEmptyResetsToLatestOffset() { + final MockConsumer<byte[], byte[]> emptyConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + emptyConsumer.assign(Collections.singletonList(topicPartition)); + + final Map<TopicPartition, Long> endOffsets = new HashMap<>(); + endOffsets.put(topicPartition, 0L); + emptyConsumer.updateEndOffsets(endOffsets); + + final Map<TopicPartition, Long> beginningOffsets = new HashMap<>(); + beginningOffsets.put(topicPartition, 0L); + emptyConsumer.updateBeginningOffsets(beginningOffsets); + streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L); + emptyConsumer.position(topicPartition); + + emptyConsumer.commitSync(); Review comment: Thanks for explaining on the other comment that `resetOffsetsTo` only seeks but does not commit. For this test, I am wondering if we can only verify if the seek happened without calling commit in the test code? This may make the test "cleaner". If fact, `emptyConsumer.position(topicPartition);` should return the seek position and it seems sufficient to verify its return value? I also hope we actually have a "integration test" that tests that both seek and commit happens when using the resetting (ie, an full test for this case instead of just a partial helper-method test). We might also add a comment that the helper method `resetOffsetsTo` does only seeks but not commits (for others to make sense of the test more easily). ########## File path: core/src/main/scala/kafka/tools/StreamsResetter.java ########## @@ -503,7 +506,16 @@ private void resetToDatetime(final Consumer<byte[], byte[]> client, final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes); for (final TopicPartition topicPartition : inputTopicPartitions) { - client.seek(topicPartition, topicPartitionsAndOffset.get(topicPartition).offset()); + final Optional<Long> partitionOffset = Optional.ofNullable(topicPartitionsAndOffset.get(topicPartition)) + .map(OffsetAndTimestamp::offset) + .filter(offset -> offset != ListOffsetsResponse.UNKNOWN_OFFSET); + if (partitionOffset.isPresent()) { + client.seek(topicPartition, partitionOffset.get()); + } else { + client.seekToEnd(Collections.singletonList(topicPartition)); + System.out.println("Partition " + topicPartition.partition() + " from topic " + topicPartition.topic() + + " is empty, without a committed record. Falling back to latest known offset."); Review comment: If @jeqo has not objections, I think we should remove it for both cases. ########## File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java ########## @@ -247,6 +274,32 @@ public void shouldDetermineInternalTopicBasedOnTopicName1() { assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-12323232-topic")); } + @Test + public void testResetToDatetimeWhenPartitionIsEmptyResetsToLatestOffset() { Review comment: Same comments as for the first test. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org