mjsax commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r579446056



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -247,6 +268,27 @@ public void 
shouldDetermineInternalTopicBasedOnTopicName1() {
         
assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-12323232-topic"));
     }
 
+    @Test
+    public void emptyPartitionsAreCorrectlyHandledWhenResettingByDateAndTime() 
{
+        final MockConsumer<byte[], byte[]> emptyConsumer = new 
EmptyPartitionConsumer<>(OffsetResetStrategy.EARLIEST);
+        emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateBeginningOffsets(beginningOffsets);
+
+        final long yesterdayTimestamp = 
Instant.now().minus(Duration.ofDays(1)).toEpochMilli();
+
+        streamsResetter.resetToDatetime(emptyConsumer, inputTopicPartitions, 
yesterdayTimestamp);
+
+        final ConsumerRecords<byte[], byte[]> records = 
emptyConsumer.poll(Duration.ofMillis(500));
+        assertEquals(0, records.count());

Review comment:
       Some comment as above.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -76,6 +78,25 @@ public void 
testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
         assertEquals(3, records.count());
     }
 
+    @Test
+    public void testResetToSpecificOffsetWhenPartitionIsEmpty() {
+        final MockConsumer<byte[], byte[]> emptyConsumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateBeginningOffsets(beginningOffsets);
+
+        streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 
2L);
+
+        final ConsumerRecords<byte[], byte[]> records = 
emptyConsumer.poll(Duration.ofMillis(500));
+        assertEquals(0, records.count());

Review comment:
       Not sure if I understand this test. As we use a `MockConsumer` and we 
never call `addRecord()` this condition should be `true` independent of 
`StreamsResetter`.
   
   Should we not rather verify if `streamsResetter` did _commit_ offsets as 
expected?

##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -503,7 +506,16 @@ private void resetToDatetime(final Consumer<byte[], 
byte[]> client,
         final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset 
= client.offsetsForTimes(topicPartitionsAndTimes);
 
         for (final TopicPartition topicPartition : inputTopicPartitions) {
-            client.seek(topicPartition, 
topicPartitionsAndOffset.get(topicPartition).offset());
+            final Optional<Long> partitionOffset = 
Optional.ofNullable(topicPartitionsAndOffset.get(topicPartition))
+                    .map(OffsetAndTimestamp::offset)
+                    .filter(offset -> offset != 
ListOffsetsResponse.UNKNOWN_OFFSET);
+            if (partitionOffset.isPresent()) {
+                client.seek(topicPartition, partitionOffset.get());
+            } else {
+                client.seekToEnd(Collections.singletonList(topicPartition));
+                System.out.println("Partition " + topicPartition.partition() + 
" from topic " + topicPartition.topic() +
+                        " is empty, without a committed record. Falling back 
to latest known offset.");

Review comment:
       > without a committed record
   
   Not sure what this means? Does `is empty` need any clarification?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -76,6 +78,25 @@ public void 
testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
         assertEquals(3, records.count());
     }
 
+    @Test
+    public void testResetToSpecificOffsetWhenPartitionIsEmpty() {

Review comment:
       This test seems to be orthogonal to the fix. Was just wondering why we 
add it and what it's purpose is? It's always good to close testing gaps, just 
not sure if I understand what this test really tests?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to