This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit fbdd8d4dea4165cf0f7cd3058483342f14c84e7b
Author: Mason Chen <mas.c...@berkeley.edu>
AuthorDate: Thu Nov 24 23:48:28 2022 -0800

    [FLINK-28185][Connector/Kafka] Handle missing timestamps when there are 
empty partitions for the TimestampOffsetsInitializer. This closes #20370
---
 .../source/enumerator/KafkaSourceEnumerator.java   |  4 ++++
 .../enumerator/initializer/OffsetsInitializer.java |  2 +-
 .../initializer/TimestampOffsetsInitializer.java   | 27 +++++++++++-----------
 .../initializer/OffsetsInitializerTest.java        | 18 ++++++++++++++-
 4 files changed, 36 insertions(+), 15 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
index 27dbd22..9cf233c 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
@@ -612,6 +612,10 @@ public class KafkaSourceEnumerator
                                                             
OffsetSpec.forTimestamp(
                                                                     
entry.getValue()))))
                     .entrySet().stream()
+                    // OffsetAndTimestamp cannot be initialized with a 
negative offset, which is
+                    // possible if the timestamp does not correspond to an 
offset and the topic
+                    // partition is empty
+                    .filter(entry -> entry.getValue().offset() >= 0)
                     .collect(
                             Collectors.toMap(
                                     Map.Entry::getKey,
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
index 5a1bac5..db682c6 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
@@ -125,7 +125,7 @@ public interface OffsetsInitializer extends Serializable {
     /**
      * Get an {@link OffsetsInitializer} which initializes the offsets in each 
partition so that the
      * initialized offset is the offset of the first record whose record 
timestamp is greater than
-     * or equals the give timestamp (milliseconds).
+     * or equals the given timestamp (milliseconds).
      *
      * @param timestamp the timestamp (milliseconds) to start the consumption.
      * @return an {@link OffsetsInitializer} which initializes the offsets 
based on the given
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/TimestampOffsetsInitializer.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/TimestampOffsetsInitializer.java
index e008161..f411307 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/TimestampOffsetsInitializer.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/TimestampOffsetsInitializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.kafka.source.enumerator.initializer;
 
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 
@@ -53,23 +54,23 @@ class TimestampOffsetsInitializer implements 
OffsetsInitializer {
         // no message is going to be missed.
         Map<TopicPartition, Long> endOffsets = 
partitionOffsetsRetriever.endOffsets(partitions);
         partitions.forEach(tp -> startingTimestamps.put(tp, 
startingTimestamp));
-        partitionOffsetsRetriever
-                .offsetsForTimes(startingTimestamps)
-                .forEach(
-                        (tp, offsetMetadata) -> {
-                            if (offsetMetadata != null) {
-                                initialOffsets.put(tp, 
offsetMetadata.offset());
-                            } else {
-                                // The timestamp does not exist in the 
partition yet, we will just
-                                // consume from the latest.
-                                initialOffsets.put(tp, endOffsets.get(tp));
-                            }
-                        });
+        Map<TopicPartition, OffsetAndTimestamp> 
topicPartitionOffsetAndTimestampMap =
+                partitionOffsetsRetriever.offsetsForTimes(startingTimestamps);
+
+        for (TopicPartition tp : partitions) {
+            // offset may not have been resolved
+            if (topicPartitionOffsetAndTimestampMap.containsKey(tp)) {
+                initialOffsets.put(tp, 
topicPartitionOffsetAndTimestampMap.get(tp).offset());
+            } else {
+                initialOffsets.put(tp, endOffsets.get(tp));
+            }
+        }
+
         return initialOffsets;
     }
 
     @Override
     public OffsetResetStrategy getAutoOffsetResetStrategy() {
-        return OffsetResetStrategy.NONE;
+        return OffsetResetStrategy.LATEST;
     }
 }
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
index 5bf026e..e0cd850 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
@@ -33,6 +33,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -40,6 +41,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class OffsetsInitializerTest {
     private static final String TOPIC = "topic";
     private static final String TOPIC2 = "topic2";
+    private static final String EMPTY_TOPIC3 = "topic3";
     private static KafkaSourceEnumerator.PartitionOffsetsRetrieverImpl 
retriever;
 
     @BeforeClass
@@ -47,6 +49,8 @@ public class OffsetsInitializerTest {
         KafkaSourceTestEnv.setup();
         KafkaSourceTestEnv.setupTopic(TOPIC, true, true, 
KafkaSourceTestEnv::getRecordsForTopic);
         KafkaSourceTestEnv.setupTopic(TOPIC2, false, false, 
KafkaSourceTestEnv::getRecordsForTopic);
+        KafkaSourceTestEnv.createTestTopic(EMPTY_TOPIC3);
+
         retriever =
                 new KafkaSourceEnumerator.PartitionOffsetsRetrieverImpl(
                         KafkaSourceTestEnv.getAdminClient(), 
KafkaSourceTestEnv.GROUP_ID);
@@ -107,7 +111,19 @@ public class OffsetsInitializerTest {
                     long expectedOffset = tp.partition() > 2 ? tp.partition() 
: 3L;
                     assertThat((long) offset).isEqualTo(expectedOffset);
                 });
-        
assertThat(initializer.getAutoOffsetResetStrategy()).isEqualTo(OffsetResetStrategy.NONE);
+        
assertThat(initializer.getAutoOffsetResetStrategy()).isEqualTo(OffsetResetStrategy.LATEST);
+    }
+
+    @Test
+    public void testTimestampOffsetsInitializerForEmptyPartitions() {
+        OffsetsInitializer initializer = OffsetsInitializer.timestamp(2001);
+        List<TopicPartition> partitions = 
KafkaSourceTestEnv.getPartitionsForTopic(EMPTY_TOPIC3);
+        Map<TopicPartition, Long> expectedOffsets =
+                partitions.stream().collect(Collectors.toMap(tp -> tp, tp -> 
0L));
+        assertThat(initializer.getPartitionOffsets(partitions, retriever))
+                .as("offsets are equal to 0 since the timestamp is out of 
range.")
+                .isEqualTo(expectedOffsets);
+        
assertThat(initializer.getAutoOffsetResetStrategy()).isEqualTo(OffsetResetStrategy.LATEST);
     }
 
     @Test

Reply via email to