mjsax commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r586676476



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -76,6 +79,30 @@ public void 
testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
         assertEquals(3, records.count());
     }
 
+    @Test
+    public void 
testResetToSpecificOffsetWhenPartitionIsEmptyResetsToLatestOffset() {
+        final MockConsumer<byte[], byte[]> emptyConsumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 0L);

Review comment:
       Nit: To make sure we don't have any default/fall-back offset of zero 
encoded anywhere, it might be better to test with different offsets values for 
endOffset/beginningOffset and the target offset?
   
   Atm, if we would `seekToBeginning` as fallback instead of `seektToEnd` this 
test would still pass. Maybe best to just use 5, 10, 20 (or similar) for start, 
end, target.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -76,6 +79,30 @@ public void 
testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
         assertEquals(3, records.count());
     }
 
+    @Test
+    public void 
testResetToSpecificOffsetWhenPartitionIsEmptyResetsToLatestOffset() {
+        final MockConsumer<byte[], byte[]> emptyConsumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateBeginningOffsets(beginningOffsets);
+        streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 
2L);
+        emptyConsumer.position(topicPartition);
+
+        emptyConsumer.commitSync();

Review comment:
       Thanks for explaining on the other comment that `resetOffsetsTo` only 
seeks but does not commit.
   
   For this test, I am wondering if we can only verify if the seek happened 
without calling commit in the test code? This may make the test "cleaner". If 
fact, `emptyConsumer.position(topicPartition);` should return the seek position 
and it seems sufficient to verify its return value?
   
   I also hope we actually have a "integration test" that tests that both seek 
and commit happens when using the resetting (ie, an full test for this case 
instead of just a partial helper-method test).
   
   We might also add a comment that the helper method `resetOffsetsTo` does 
only seeks but not commits (for others to make sense of the test more easily).

##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -503,7 +506,16 @@ private void resetToDatetime(final Consumer<byte[], 
byte[]> client,
         final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset 
= client.offsetsForTimes(topicPartitionsAndTimes);
 
         for (final TopicPartition topicPartition : inputTopicPartitions) {
-            client.seek(topicPartition, 
topicPartitionsAndOffset.get(topicPartition).offset());
+            final Optional<Long> partitionOffset = 
Optional.ofNullable(topicPartitionsAndOffset.get(topicPartition))
+                    .map(OffsetAndTimestamp::offset)
+                    .filter(offset -> offset != 
ListOffsetsResponse.UNKNOWN_OFFSET);
+            if (partitionOffset.isPresent()) {
+                client.seek(topicPartition, partitionOffset.get());
+            } else {
+                client.seekToEnd(Collections.singletonList(topicPartition));
+                System.out.println("Partition " + topicPartition.partition() + 
" from topic " + topicPartition.topic() +
+                        " is empty, without a committed record. Falling back 
to latest known offset.");

Review comment:
       If @jeqo has not objections, I think we should remove it for both cases.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -247,6 +274,32 @@ public void 
shouldDetermineInternalTopicBasedOnTopicName1() {
         
assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-12323232-topic"));
     }
 
+    @Test
+    public void testResetToDatetimeWhenPartitionIsEmptyResetsToLatestOffset() {

Review comment:
       Same comments as for the first test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to