This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit fbdd8d4dea4165cf0f7cd3058483342f14c84e7b Author: Mason Chen <mas.c...@berkeley.edu> AuthorDate: Thu Nov 24 23:48:28 2022 -0800 [FLINK-28185][Connector/Kafka] Handle missing timestamps when there are empty partitions for the TimestampOffsetsInitializer. This closes #20370 --- .../source/enumerator/KafkaSourceEnumerator.java | 4 ++++ .../enumerator/initializer/OffsetsInitializer.java | 2 +- .../initializer/TimestampOffsetsInitializer.java | 27 +++++++++++----------- .../initializer/OffsetsInitializerTest.java | 18 ++++++++++++++- 4 files changed, 36 insertions(+), 15 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java index 27dbd22..9cf233c 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java @@ -612,6 +612,10 @@ public class KafkaSourceEnumerator OffsetSpec.forTimestamp( entry.getValue())))) .entrySet().stream() + // OffsetAndTimestamp cannot be initialized with a negative offset, which is + // possible if the timestamp does not correspond to an offset and the topic + // partition is empty + .filter(entry -> entry.getValue().offset() >= 0) .collect( Collectors.toMap( Map.Entry::getKey, diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java index 5a1bac5..db682c6 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java @@ -125,7 +125,7 @@ public interface OffsetsInitializer extends Serializable { /** * Get an {@link OffsetsInitializer} which initializes the offsets in each partition so that the * initialized offset is the offset of the first record whose record timestamp is greater than - * or equals the give timestamp (milliseconds). + * or equals the given timestamp (milliseconds). * * @param timestamp the timestamp (milliseconds) to start the consumption. * @return an {@link OffsetsInitializer} which initializes the offsets based on the given diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/TimestampOffsetsInitializer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/TimestampOffsetsInitializer.java index e008161..f411307 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/TimestampOffsetsInitializer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/TimestampOffsetsInitializer.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.kafka.source.enumerator.initializer; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.TopicPartition; @@ -53,23 +54,23 @@ class TimestampOffsetsInitializer implements OffsetsInitializer { // no message is going to be missed. Map<TopicPartition, Long> endOffsets = partitionOffsetsRetriever.endOffsets(partitions); partitions.forEach(tp -> startingTimestamps.put(tp, startingTimestamp)); - partitionOffsetsRetriever - .offsetsForTimes(startingTimestamps) - .forEach( - (tp, offsetMetadata) -> { - if (offsetMetadata != null) { - initialOffsets.put(tp, offsetMetadata.offset()); - } else { - // The timestamp does not exist in the partition yet, we will just - // consume from the latest. - initialOffsets.put(tp, endOffsets.get(tp)); - } - }); + Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = + partitionOffsetsRetriever.offsetsForTimes(startingTimestamps); + + for (TopicPartition tp : partitions) { + // offset may not have been resolved + if (topicPartitionOffsetAndTimestampMap.containsKey(tp)) { + initialOffsets.put(tp, topicPartitionOffsetAndTimestampMap.get(tp).offset()); + } else { + initialOffsets.put(tp, endOffsets.get(tp)); + } + } + return initialOffsets; } @Override public OffsetResetStrategy getAutoOffsetResetStrategy() { - return OffsetResetStrategy.NONE; + return OffsetResetStrategy.LATEST; } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java index 5bf026e..e0cd850 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java @@ -33,6 +33,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -40,6 +41,7 @@ import static org.assertj.core.api.Assertions.assertThat; public class OffsetsInitializerTest { private static final String TOPIC = "topic"; private static final String TOPIC2 = "topic2"; + private static final String EMPTY_TOPIC3 = "topic3"; private static KafkaSourceEnumerator.PartitionOffsetsRetrieverImpl retriever; @BeforeClass @@ -47,6 +49,8 @@ public class OffsetsInitializerTest { KafkaSourceTestEnv.setup(); KafkaSourceTestEnv.setupTopic(TOPIC, true, true, KafkaSourceTestEnv::getRecordsForTopic); KafkaSourceTestEnv.setupTopic(TOPIC2, false, false, KafkaSourceTestEnv::getRecordsForTopic); + KafkaSourceTestEnv.createTestTopic(EMPTY_TOPIC3); + retriever = new KafkaSourceEnumerator.PartitionOffsetsRetrieverImpl( KafkaSourceTestEnv.getAdminClient(), KafkaSourceTestEnv.GROUP_ID); @@ -107,7 +111,19 @@ public class OffsetsInitializerTest { long expectedOffset = tp.partition() > 2 ? tp.partition() : 3L; assertThat((long) offset).isEqualTo(expectedOffset); }); - assertThat(initializer.getAutoOffsetResetStrategy()).isEqualTo(OffsetResetStrategy.NONE); + assertThat(initializer.getAutoOffsetResetStrategy()).isEqualTo(OffsetResetStrategy.LATEST); + } + + @Test + public void testTimestampOffsetsInitializerForEmptyPartitions() { + OffsetsInitializer initializer = OffsetsInitializer.timestamp(2001); + List<TopicPartition> partitions = KafkaSourceTestEnv.getPartitionsForTopic(EMPTY_TOPIC3); + Map<TopicPartition, Long> expectedOffsets = + partitions.stream().collect(Collectors.toMap(tp -> tp, tp -> 0L)); + assertThat(initializer.getPartitionOffsets(partitions, retriever)) + .as("offsets are equal to 0 since the timestamp is out of range.") + .isEqualTo(expectedOffsets); + assertThat(initializer.getAutoOffsetResetStrategy()).isEqualTo(OffsetResetStrategy.LATEST); } @Test