This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit 54e3b70deb349538edba1ec2b051ed9d9f79b563
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
AuthorDate: Mon Oct 23 10:56:53 2023 -0700

    [FLINK-28303] Allow LATEST_OFFSET marker when restoring from old checkpoints
---
 .../kafka/source/split/KafkaPartitionSplit.java          | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
index ef1b8b88..7c04600d 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
@@ -44,9 +44,9 @@ public class KafkaPartitionSplit implements SourceSplit {
 
     // Valid special starting offsets
     public static final Set<Long> VALID_STARTING_OFFSET_MARKERS =
-            new HashSet<>(Arrays.asList(EARLIEST_OFFSET, COMMITTED_OFFSET));
+            new HashSet<>(Arrays.asList(EARLIEST_OFFSET, LATEST_OFFSET, 
COMMITTED_OFFSET));
     public static final Set<Long> VALID_STOPPING_OFFSET_MARKERS =
-            new HashSet<>(Arrays.asList(COMMITTED_OFFSET, NO_STOPPING_OFFSET));
+            new HashSet<>(Arrays.asList(LATEST_OFFSET, COMMITTED_OFFSET, 
NO_STOPPING_OFFSET));
 
     private final TopicPartition tp;
     private final long startingOffset;
@@ -133,8 +133,8 @@ public class KafkaPartitionSplit implements SourceSplit {
                     String.format(
                             "Invalid starting offset %d is specified for 
partition %s. "
                                     + "It should either be non-negative or be 
one of the "
-                                    + "[%d(earliest), %d(committed)].",
-                            startingOffset, tp, EARLIEST_OFFSET, 
COMMITTED_OFFSET));
+                                    + "[%d(earliest), %d(latest), 
%d(committed)].",
+                            startingOffset, tp, LATEST_OFFSET, 
EARLIEST_OFFSET, COMMITTED_OFFSET));
         }
 
         if (stoppingOffset < 0 && 
!VALID_STOPPING_OFFSET_MARKERS.contains(stoppingOffset)) {
@@ -142,8 +142,12 @@ public class KafkaPartitionSplit implements SourceSplit {
                     String.format(
                             "Illegal stopping offset %d is specified for 
partition %s. "
                                     + "It should either be non-negative or be 
one of the "
-                                    + "[%d(committed), %d(Long.MIN_VALUE, 
no_stopping_offset)].",
-                            stoppingOffset, tp, COMMITTED_OFFSET, 
NO_STOPPING_OFFSET));
+                                    + "[%d(latest), %d(committed), 
%d(Long.MIN_VALUE, no_stopping_offset)].",
+                            stoppingOffset,
+                            tp,
+                            LATEST_OFFSET,
+                            COMMITTED_OFFSET,
+                            NO_STOPPING_OFFSET));
         }
     }
 }

Reply via email to