This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit 946df1ed8b03bc41f3aaf8851912e76c239ba550
Author: tanjialiang <tanjiali...@52tt.com>
AuthorDate: Tue Sep 26 19:10:17 2023 +0800

    [FLINK-28303] Support LatestOffsetsInitializer to avoid latest-offset 
strategy lose data
    
    This closes #52.
---
 .../initializer/LatestOffsetsInitializer.java      |  47 +++++++
 .../enumerator/initializer/OffsetsInitializer.java |   3 +-
 .../ReaderHandledOffsetsInitializer.java           |   9 +-
 .../kafka/source/split/KafkaPartitionSplit.java    |  19 ++-
 .../source/enumerator/KafkaEnumeratorTest.java     |   2 +-
 .../initializer/OffsetsInitializerTest.java        |   2 +-
 .../reader/KafkaPartitionSplitReaderTest.java      |   4 +-
 .../kafka/source/reader/KafkaSourceReaderTest.java |   4 +-
 .../split/KafkaPartitionSplitSerializerTest.java   |   6 +-
 .../kafka/table/KafkaDynamicTableFactoryTest.java  |   8 +-
 .../connectors/kafka/table/KafkaTableITCase.java   | 139 +++++++++++++++++++++
 .../table/UpsertKafkaDynamicTableFactoryTest.java  |   8 +-
 .../testutils/MockPartitionOffsetsRetriever.java   |  11 ++
 13 files changed, 230 insertions(+), 32 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/LatestOffsetsInitializer.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/LatestOffsetsInitializer.java
new file mode 100644
index 00000000..b6c95a65
--- /dev/null
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/LatestOffsetsInitializer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.enumerator.initializer;
+
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * An implementation of {@link OffsetsInitializer} to initialize the offsets 
based on a
+ * latest-offset.
+ *
+ * <p>Package private and should be instantiated via {@link 
OffsetsInitializer}.
+ */
+class LatestOffsetsInitializer implements OffsetsInitializer {
+    private static final long serialVersionUID = 3014700244733286989L;
+
+    @Override
+    public Map<TopicPartition, Long> getPartitionOffsets(
+            Collection<TopicPartition> partitions,
+            PartitionOffsetsRetriever partitionOffsetsRetriever) {
+        return partitionOffsetsRetriever.endOffsets(partitions);
+    }
+
+    @Override
+    public OffsetResetStrategy getAutoOffsetResetStrategy() {
+        return OffsetResetStrategy.LATEST;
+    }
+}
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
index db682c6b..0f0c5d25 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java
@@ -155,8 +155,7 @@ public interface OffsetsInitializer extends Serializable {
      * @return an {@link OffsetsInitializer} which initializes the offsets to 
the latest offsets.
      */
     static OffsetsInitializer latest() {
-        return new ReaderHandledOffsetsInitializer(
-                KafkaPartitionSplit.LATEST_OFFSET, OffsetResetStrategy.LATEST);
+        return new LatestOffsetsInitializer();
     }
 
     /**
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java
index 026320d9..42abd577 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java
@@ -32,9 +32,9 @@ import java.util.Properties;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * A initializer that initialize the partitions to the earliest / latest / 
last-committed offsets.
- * The offsets initialization are taken care of by the {@code 
KafkaPartitionSplitReader} instead of
- * by the {@code KafkaSourceEnumerator}.
+ * A initializer that initialize the partitions to the earliest / 
last-committed offsets. The
+ * offsets initialization are taken care of by the {@code 
KafkaPartitionSplitReader} instead of by
+ * the {@code KafkaSourceEnumerator}.
  *
  * <p>Package private and should be instantiated via {@link 
OffsetsInitializer}.
  */
@@ -46,8 +46,7 @@ class ReaderHandledOffsetsInitializer implements 
OffsetsInitializer, OffsetsInit
     /**
      * The only valid value for startingOffset is following. {@link
      * KafkaPartitionSplit#EARLIEST_OFFSET EARLIEST_OFFSET}, {@link
-     * KafkaPartitionSplit#LATEST_OFFSET LATEST_OFFSET}, {@link 
KafkaPartitionSplit#COMMITTED_OFFSET
-     * COMMITTED_OFFSET}
+     * KafkaPartitionSplit#COMMITTED_OFFSET COMMITTED_OFFSET}
      */
     ReaderHandledOffsetsInitializer(long startingOffset, OffsetResetStrategy 
offsetResetStrategy) {
         this.startingOffset = startingOffset;
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
index 8c2a1fd1..ef1b8b88 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java
@@ -35,7 +35,8 @@ import java.util.Set;
 public class KafkaPartitionSplit implements SourceSplit {
     public static final long NO_STOPPING_OFFSET = Long.MIN_VALUE;
     // Indicating the split should consume from the latest.
-    public static final long LATEST_OFFSET = -1;
+    // @deprecated Only be used for compatibility with the history state, see 
FLINK-28303
+    @Deprecated public static final long LATEST_OFFSET = -1;
     // Indicating the split should consume from the earliest.
     public static final long EARLIEST_OFFSET = -2;
     // Indicating the split should consume from the last committed offset.
@@ -43,9 +44,9 @@ public class KafkaPartitionSplit implements SourceSplit {
 
     // Valid special starting offsets
     public static final Set<Long> VALID_STARTING_OFFSET_MARKERS =
-            new HashSet<>(Arrays.asList(EARLIEST_OFFSET, LATEST_OFFSET, 
COMMITTED_OFFSET));
+            new HashSet<>(Arrays.asList(EARLIEST_OFFSET, COMMITTED_OFFSET));
     public static final Set<Long> VALID_STOPPING_OFFSET_MARKERS =
-            new HashSet<>(Arrays.asList(LATEST_OFFSET, COMMITTED_OFFSET, 
NO_STOPPING_OFFSET));
+            new HashSet<>(Arrays.asList(COMMITTED_OFFSET, NO_STOPPING_OFFSET));
 
     private final TopicPartition tp;
     private final long startingOffset;
@@ -132,8 +133,8 @@ public class KafkaPartitionSplit implements SourceSplit {
                     String.format(
                             "Invalid starting offset %d is specified for 
partition %s. "
                                     + "It should either be non-negative or be 
one of the "
-                                    + "[%d(earliest), %d(latest), 
%d(committed)].",
-                            startingOffset, tp, LATEST_OFFSET, 
EARLIEST_OFFSET, COMMITTED_OFFSET));
+                                    + "[%d(earliest), %d(committed)].",
+                            startingOffset, tp, EARLIEST_OFFSET, 
COMMITTED_OFFSET));
         }
 
         if (stoppingOffset < 0 && 
!VALID_STOPPING_OFFSET_MARKERS.contains(stoppingOffset)) {
@@ -141,12 +142,8 @@ public class KafkaPartitionSplit implements SourceSplit {
                     String.format(
                             "Illegal stopping offset %d is specified for 
partition %s. "
                                     + "It should either be non-negative or be 
one of the "
-                                    + "[%d(latest), %d(committed), 
%d(Long.MIN_VALUE, no_stopping_offset)].",
-                            stoppingOffset,
-                            tp,
-                            LATEST_OFFSET,
-                            COMMITTED_OFFSET,
-                            NO_STOPPING_OFFSET));
+                                    + "[%d(committed), %d(Long.MIN_VALUE, 
no_stopping_offset)].",
+                            stoppingOffset, tp, COMMITTED_OFFSET, 
NO_STOPPING_OFFSET));
         }
     }
 }
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
index f30c660d..8b308af1 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
@@ -300,7 +300,7 @@ public class KafkaEnumeratorTest {
                     getAllAssignSplits(context, PRE_EXISTING_TOPICS);
             assertThat(initialPartitionAssign)
                     .extracting(KafkaPartitionSplit::getStartingOffset)
-                    .containsOnly(KafkaPartitionSplit.LATEST_OFFSET);
+                    .containsOnly((long) 
KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION);
             List<KafkaPartitionSplit> newPartitionAssign =
                     getAllAssignSplits(context, 
Collections.singleton(DYNAMIC_TOPIC_NAME));
             assertThat(newPartitionAssign)
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
index e0cd8506..46dd61a6 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java
@@ -84,7 +84,7 @@ public class OffsetsInitializerTest {
         assertThat(offsets).hasSameSizeAs(partitions);
         assertThat(offsets.keySet()).containsAll(partitions);
         for (long offset : offsets.values()) {
-            assertThat(offset).isEqualTo(KafkaPartitionSplit.LATEST_OFFSET);
+            
assertThat(offset).isEqualTo(KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION);
         }
         
assertThat(initializer.getAutoOffsetResetStrategy()).isEqualTo(OffsetResetStrategy.LATEST);
     }
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
index edd41326..b592a691 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java
@@ -246,8 +246,8 @@ public class KafkaPartitionSplitReaderTest {
         final KafkaPartitionSplit emptySplit =
                 new KafkaPartitionSplit(
                         new TopicPartition(TOPIC2, 0),
-                        KafkaPartitionSplit.LATEST_OFFSET,
-                        KafkaPartitionSplit.LATEST_OFFSET);
+                        KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION,
+                        KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION);
         final KafkaPartitionSplit emptySplitWithZeroStoppingOffset =
                 new KafkaPartitionSplit(new TopicPartition(TOPIC3, 0), 0, 0);
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
index b350d8c6..f5aa7f5f 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
@@ -397,7 +397,9 @@ public class KafkaSourceReaderTest extends 
SourceReaderTestBase<KafkaPartitionSp
         // Normal split with NUM_RECORDS_PER_SPLIT records
         final KafkaPartitionSplit normalSplit =
                 new KafkaPartitionSplit(
-                        new TopicPartition(TOPIC, 0), 0, 
KafkaPartitionSplit.LATEST_OFFSET);
+                        new TopicPartition(TOPIC, 0),
+                        0,
+                        KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION);
         // Empty split with no record
         final KafkaPartitionSplit emptySplit =
                 new KafkaPartitionSplit(
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplitSerializerTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplitSerializerTest.java
index 4ca5c9cb..db764724 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplitSerializerTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplitSerializerTest.java
@@ -36,11 +36,7 @@ public class KafkaPartitionSplitSerializerTest {
         Long normalOffset = 1L;
         TopicPartition topicPartition = new TopicPartition(topic, 1);
         List<Long> stoppingOffsets =
-                Lists.newArrayList(
-                        KafkaPartitionSplit.COMMITTED_OFFSET,
-                        KafkaPartitionSplit.LATEST_OFFSET,
-                        offsetZero,
-                        normalOffset);
+                Lists.newArrayList(KafkaPartitionSplit.COMMITTED_OFFSET, 
offsetZero, normalOffset);
         KafkaPartitionSplitSerializer splitSerializer = new 
KafkaPartitionSplitSerializer();
         for (Long stoppingOffset : stoppingOffsets) {
             KafkaPartitionSplit kafkaPartitionSplit =
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
index 2c82fc15..1246d53a 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
@@ -477,13 +477,17 @@ public class KafkaDynamicTableFactoryTest {
                     OffsetsInitializer offsetsInitializer =
                             
KafkaSourceTestUtils.getStoppingOffsetsInitializer(source);
                     TopicPartition partition = new TopicPartition(TOPIC, 0);
+                    long endOffsets = 123L;
                     Map<TopicPartition, Long> partitionOffsets =
                             offsetsInitializer.getPartitionOffsets(
                                     Collections.singletonList(partition),
-                                    
MockPartitionOffsetsRetriever.noInteractions());
+                                    MockPartitionOffsetsRetriever.latest(
+                                            (tps) ->
+                                                    Collections.singletonMap(
+                                                            partition, 
endOffsets)));
                     assertThat(partitionOffsets)
                             .containsOnlyKeys(partition)
-                            .containsEntry(partition, 
KafkaPartitionSplit.LATEST_OFFSET);
+                            .containsEntry(partition, endOffsets);
                 });
     }
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
index 2674183f..409acd97 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
@@ -18,12 +18,19 @@
 
 package org.apache.flink.streaming.connectors.kafka.table;
 
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.utils.EncodingUtils;
@@ -944,6 +951,138 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         deleteTestTopic(topic);
     }
 
+    @Test
+    public void testLatestOffsetStrategyResume() throws Exception {
+        // we always use a different topic name for each parameterized topic,
+        // in order to make sure the topic can be created.
+        final String topic = "latest_offset_resume_topic_" + format + "_" + 
UUID.randomUUID();
+        createTestTopic(topic, 6, 1);
+        env.setParallelism(1);
+
+        // ---------- Produce data into Kafka's partition 0-6 
-------------------
+
+        String groupId = getStandardProps().getProperty("group.id");
+        String bootstraps = getBootstrapServers();
+
+        final String createTable =
+                String.format(
+                        "CREATE TABLE kafka (\n"
+                                + "  `partition_id` INT,\n"
+                                + "  `value` INT\n"
+                                + ") WITH (\n"
+                                + "  'connector' = 'kafka',\n"
+                                + "  'topic' = '%s',\n"
+                                + "  'properties.bootstrap.servers' = '%s',\n"
+                                + "  'properties.group.id' = '%s',\n"
+                                + "  'scan.startup.mode' = 'latest-offset',\n"
+                                + "  'sink.partitioner' = '%s',\n"
+                                + "  'format' = '%s'\n"
+                                + ")",
+                        topic, bootstraps, groupId, 
TestPartitioner.class.getName(), format);
+
+        tEnv.executeSql(createTable);
+
+        String initialValues =
+                "INSERT INTO kafka VALUES (0, 0), (1, 0), (2, 0), (3, 0), (4, 
0), (5, 0)";
+        tEnv.executeSql(initialValues).await();
+
+        // ---------- Consume stream from Kafka -------------------
+
+        String createSink =
+                "CREATE TABLE MySink(\n"
+                        + "  `id` INT,\n"
+                        + "  `value` INT\n"
+                        + ") WITH (\n"
+                        + "  'connector' = 'values'\n"
+                        + ")";
+        tEnv.executeSql(createSink);
+
+        String executeInsert = "INSERT INTO MySink SELECT `partition_id`, 
`value` FROM kafka";
+        TableResult tableResult = tEnv.executeSql(executeInsert);
+
+        // ---------- Produce data into Kafka's partition 0-2 
-------------------
+
+        String moreValues = "INSERT INTO kafka VALUES (0, 1), (1, 1), (2, 1)";
+        tEnv.executeSql(moreValues).await();
+
+        final List<String> expected = Arrays.asList("+I[0, 1]", "+I[1, 1]", 
"+I[2, 1]");
+        KafkaTableTestUtils.waitingExpectedResults("MySink", expected, 
Duration.ofSeconds(5));
+
+        // ---------- Stop the consume job with savepoint  -------------------
+
+        String savepointBasePath = getTempDirPath(topic + "-savepoint");
+        assert tableResult.getJobClient().isPresent();
+        JobClient client = tableResult.getJobClient().get();
+        String savepointPath =
+                client.stopWithSavepoint(false, savepointBasePath, 
SavepointFormatType.DEFAULT)
+                        .get();
+
+        // ---------- Produce data into Kafka's partition 0-5 
-------------------
+
+        String produceValuesBeforeResume =
+                "INSERT INTO kafka VALUES (0, 2), (1, 2), (2, 2), (3, 1), (4, 
1), (5, 1)";
+        tEnv.executeSql(produceValuesBeforeResume).await();
+
+        // ---------- Resume the consume job from savepoint  
-------------------
+
+        Configuration configuration = new Configuration();
+        configuration.set(SavepointConfigOptions.SAVEPOINT_PATH, 
savepointPath);
+        configuration.set(CoreOptions.DEFAULT_PARALLELISM, 1);
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+        env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+
+        tEnv.executeSql(createTable);
+        tEnv.executeSql(createSink);
+        tableResult = tEnv.executeSql(executeInsert);
+
+        final List<String> afterResumeExpected =
+                Arrays.asList(
+                        "+I[0, 1]",
+                        "+I[1, 1]",
+                        "+I[2, 1]",
+                        "+I[0, 2]",
+                        "+I[1, 2]",
+                        "+I[2, 2]",
+                        "+I[3, 1]",
+                        "+I[4, 1]",
+                        "+I[5, 1]");
+        KafkaTableTestUtils.waitingExpectedResults(
+                "MySink", afterResumeExpected, Duration.ofSeconds(5));
+
+        // ---------- Produce data into Kafka's partition 0-5 
-------------------
+
+        String produceValuesAfterResume =
+                "INSERT INTO kafka VALUES (0, 3), (1, 3), (2, 3), (3, 2), (4, 
2), (5, 2)";
+        this.tEnv.executeSql(produceValuesAfterResume).await();
+
+        final List<String> afterProduceExpected =
+                Arrays.asList(
+                        "+I[0, 1]",
+                        "+I[1, 1]",
+                        "+I[2, 1]",
+                        "+I[0, 2]",
+                        "+I[1, 2]",
+                        "+I[2, 2]",
+                        "+I[3, 1]",
+                        "+I[4, 1]",
+                        "+I[5, 1]",
+                        "+I[0, 3]",
+                        "+I[1, 3]",
+                        "+I[2, 3]",
+                        "+I[3, 2]",
+                        "+I[4, 2]",
+                        "+I[5, 2]");
+        KafkaTableTestUtils.waitingExpectedResults(
+                "MySink", afterProduceExpected, Duration.ofSeconds(5));
+
+        // ------------- cleanup -------------------
+
+        tableResult.getJobClient().ifPresent(JobClient::cancel);
+        deleteTestTopic(topic);
+    }
+
     @Test
     public void testStartFromGroupOffsetsLatest() throws Exception {
         testStartFromGroupOffsets("latest");
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
index 41d9e7eb..15c740d2 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
@@ -465,13 +465,17 @@ public class UpsertKafkaDynamicTableFactoryTest extends 
TestLogger {
                     OffsetsInitializer offsetsInitializer =
                             
KafkaSourceTestUtils.getStoppingOffsetsInitializer(source);
                     TopicPartition partition = new 
TopicPartition(SOURCE_TOPIC, 0);
+                    long endOffsets = 123L;
                     Map<TopicPartition, Long> partitionOffsets =
                             offsetsInitializer.getPartitionOffsets(
                                     Collections.singletonList(partition),
-                                    
MockPartitionOffsetsRetriever.noInteractions());
+                                    MockPartitionOffsetsRetriever.latest(
+                                            (tps) ->
+                                                    Collections.singletonMap(
+                                                            partition, 
endOffsets)));
                     assertThat(partitionOffsets)
                             .containsOnlyKeys(partition)
-                            .containsEntry(partition, 
KafkaPartitionSplit.LATEST_OFFSET);
+                            .containsEntry(partition, endOffsets);
                 });
     }
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockPartitionOffsetsRetriever.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockPartitionOffsetsRetriever.java
index 175bddd6..9947bc5b 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockPartitionOffsetsRetriever.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockPartitionOffsetsRetriever.java
@@ -68,6 +68,17 @@ public final class MockPartitionOffsetsRetriever
                 UNSUPPORTED_RETRIEVAL, endOffsets, UNSUPPORTED_RETRIEVAL, 
retriever);
     }
 
+    public static MockPartitionOffsetsRetriever latest(OffsetsRetriever 
endOffsets) {
+        return new MockPartitionOffsetsRetriever(
+                UNSUPPORTED_RETRIEVAL,
+                endOffsets,
+                UNSUPPORTED_RETRIEVAL,
+                partitions -> {
+                    throw new UnsupportedOperationException(
+                            "The method was not supposed to be called");
+                });
+    }
+
     private MockPartitionOffsetsRetriever(
             OffsetsRetriever committedOffsets,
             OffsetsRetriever endOffsets,

Reply via email to