This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit 54e3b70deb349538edba1ec2b051ed9d9f79b563 Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org> AuthorDate: Mon Oct 23 10:56:53 2023 -0700 [FLINK-28303] Allow LATEST_OFFSET marker when restoring from old checkpoints --- .../kafka/source/split/KafkaPartitionSplit.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java index ef1b8b88..7c04600d 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java @@ -44,9 +44,9 @@ public class KafkaPartitionSplit implements SourceSplit { // Valid special starting offsets public static final Set<Long> VALID_STARTING_OFFSET_MARKERS = - new HashSet<>(Arrays.asList(EARLIEST_OFFSET, COMMITTED_OFFSET)); + new HashSet<>(Arrays.asList(EARLIEST_OFFSET, LATEST_OFFSET, COMMITTED_OFFSET)); public static final Set<Long> VALID_STOPPING_OFFSET_MARKERS = - new HashSet<>(Arrays.asList(COMMITTED_OFFSET, NO_STOPPING_OFFSET)); + new HashSet<>(Arrays.asList(LATEST_OFFSET, COMMITTED_OFFSET, NO_STOPPING_OFFSET)); private final TopicPartition tp; private final long startingOffset; @@ -133,8 +133,8 @@ public class KafkaPartitionSplit implements SourceSplit { String.format( "Invalid starting offset %d is specified for partition %s. " + "It should either be non-negative or be one of the " - + "[%d(earliest), %d(committed)].", - startingOffset, tp, EARLIEST_OFFSET, COMMITTED_OFFSET)); + + "[%d(earliest), %d(latest), %d(committed)].", + startingOffset, tp, LATEST_OFFSET, EARLIEST_OFFSET, COMMITTED_OFFSET)); } if (stoppingOffset < 0 && !VALID_STOPPING_OFFSET_MARKERS.contains(stoppingOffset)) { @@ -142,8 +142,12 @@ public class KafkaPartitionSplit implements SourceSplit { String.format( "Illegal stopping offset %d is specified for partition %s. " + "It should either be non-negative or be one of the " - + "[%d(committed), %d(Long.MIN_VALUE, no_stopping_offset)].", - stoppingOffset, tp, COMMITTED_OFFSET, NO_STOPPING_OFFSET)); + + "[%d(latest), %d(committed), %d(Long.MIN_VALUE, no_stopping_offset)].", + stoppingOffset, + tp, + LATEST_OFFSET, + COMMITTED_OFFSET, + NO_STOPPING_OFFSET)); } } }