This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit 946df1ed8b03bc41f3aaf8851912e76c239ba550 Author: tanjialiang <tanjiali...@52tt.com> AuthorDate: Tue Sep 26 19:10:17 2023 +0800 [FLINK-28303] Support LatestOffsetsInitializer to avoid latest-offset strategy lose data This closes #52. --- .../initializer/LatestOffsetsInitializer.java | 47 +++++++ .../enumerator/initializer/OffsetsInitializer.java | 3 +- .../ReaderHandledOffsetsInitializer.java | 9 +- .../kafka/source/split/KafkaPartitionSplit.java | 19 ++- .../source/enumerator/KafkaEnumeratorTest.java | 2 +- .../initializer/OffsetsInitializerTest.java | 2 +- .../reader/KafkaPartitionSplitReaderTest.java | 4 +- .../kafka/source/reader/KafkaSourceReaderTest.java | 4 +- .../split/KafkaPartitionSplitSerializerTest.java | 6 +- .../kafka/table/KafkaDynamicTableFactoryTest.java | 8 +- .../connectors/kafka/table/KafkaTableITCase.java | 139 +++++++++++++++++++++ .../table/UpsertKafkaDynamicTableFactoryTest.java | 8 +- .../testutils/MockPartitionOffsetsRetriever.java | 11 ++ 13 files changed, 230 insertions(+), 32 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/LatestOffsetsInitializer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/LatestOffsetsInitializer.java new file mode 100644 index 00000000..b6c95a65 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/LatestOffsetsInitializer.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.source.enumerator.initializer; + +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; +import java.util.Map; + +/** + * An implementation of {@link OffsetsInitializer} to initialize the offsets based on a + * latest-offset. + * + * <p>Package private and should be instantiated via {@link OffsetsInitializer}. + */ +class LatestOffsetsInitializer implements OffsetsInitializer { + private static final long serialVersionUID = 3014700244733286989L; + + @Override + public Map<TopicPartition, Long> getPartitionOffsets( + Collection<TopicPartition> partitions, + PartitionOffsetsRetriever partitionOffsetsRetriever) { + return partitionOffsetsRetriever.endOffsets(partitions); + } + + @Override + public OffsetResetStrategy getAutoOffsetResetStrategy() { + return OffsetResetStrategy.LATEST; + } +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java index db682c6b..0f0c5d25 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializer.java @@ -155,8 +155,7 @@ public interface OffsetsInitializer extends Serializable { * @return an {@link OffsetsInitializer} which initializes the offsets to the latest offsets. */ static OffsetsInitializer latest() { - return new ReaderHandledOffsetsInitializer( - KafkaPartitionSplit.LATEST_OFFSET, OffsetResetStrategy.LATEST); + return new LatestOffsetsInitializer(); } /** diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java index 026320d9..42abd577 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/ReaderHandledOffsetsInitializer.java @@ -32,9 +32,9 @@ import java.util.Properties; import static org.apache.flink.util.Preconditions.checkState; /** - * A initializer that initialize the partitions to the earliest / latest / last-committed offsets. - * The offsets initialization are taken care of by the {@code KafkaPartitionSplitReader} instead of - * by the {@code KafkaSourceEnumerator}. + * A initializer that initialize the partitions to the earliest / last-committed offsets. The + * offsets initialization are taken care of by the {@code KafkaPartitionSplitReader} instead of by + * the {@code KafkaSourceEnumerator}. * * <p>Package private and should be instantiated via {@link OffsetsInitializer}. */ @@ -46,8 +46,7 @@ class ReaderHandledOffsetsInitializer implements OffsetsInitializer, OffsetsInit /** * The only valid value for startingOffset is following. {@link * KafkaPartitionSplit#EARLIEST_OFFSET EARLIEST_OFFSET}, {@link - * KafkaPartitionSplit#LATEST_OFFSET LATEST_OFFSET}, {@link KafkaPartitionSplit#COMMITTED_OFFSET - * COMMITTED_OFFSET} + * KafkaPartitionSplit#COMMITTED_OFFSET COMMITTED_OFFSET} */ ReaderHandledOffsetsInitializer(long startingOffset, OffsetResetStrategy offsetResetStrategy) { this.startingOffset = startingOffset; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java index 8c2a1fd1..ef1b8b88 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplit.java @@ -35,7 +35,8 @@ import java.util.Set; public class KafkaPartitionSplit implements SourceSplit { public static final long NO_STOPPING_OFFSET = Long.MIN_VALUE; // Indicating the split should consume from the latest. - public static final long LATEST_OFFSET = -1; + // @deprecated Only be used for compatibility with the history state, see FLINK-28303 + @Deprecated public static final long LATEST_OFFSET = -1; // Indicating the split should consume from the earliest. public static final long EARLIEST_OFFSET = -2; // Indicating the split should consume from the last committed offset. @@ -43,9 +44,9 @@ public class KafkaPartitionSplit implements SourceSplit { // Valid special starting offsets public static final Set<Long> VALID_STARTING_OFFSET_MARKERS = - new HashSet<>(Arrays.asList(EARLIEST_OFFSET, LATEST_OFFSET, COMMITTED_OFFSET)); + new HashSet<>(Arrays.asList(EARLIEST_OFFSET, COMMITTED_OFFSET)); public static final Set<Long> VALID_STOPPING_OFFSET_MARKERS = - new HashSet<>(Arrays.asList(LATEST_OFFSET, COMMITTED_OFFSET, NO_STOPPING_OFFSET)); + new HashSet<>(Arrays.asList(COMMITTED_OFFSET, NO_STOPPING_OFFSET)); private final TopicPartition tp; private final long startingOffset; @@ -132,8 +133,8 @@ public class KafkaPartitionSplit implements SourceSplit { String.format( "Invalid starting offset %d is specified for partition %s. " + "It should either be non-negative or be one of the " - + "[%d(earliest), %d(latest), %d(committed)].", - startingOffset, tp, LATEST_OFFSET, EARLIEST_OFFSET, COMMITTED_OFFSET)); + + "[%d(earliest), %d(committed)].", + startingOffset, tp, EARLIEST_OFFSET, COMMITTED_OFFSET)); } if (stoppingOffset < 0 && !VALID_STOPPING_OFFSET_MARKERS.contains(stoppingOffset)) { @@ -141,12 +142,8 @@ public class KafkaPartitionSplit implements SourceSplit { String.format( "Illegal stopping offset %d is specified for partition %s. " + "It should either be non-negative or be one of the " - + "[%d(latest), %d(committed), %d(Long.MIN_VALUE, no_stopping_offset)].", - stoppingOffset, - tp, - LATEST_OFFSET, - COMMITTED_OFFSET, - NO_STOPPING_OFFSET)); + + "[%d(committed), %d(Long.MIN_VALUE, no_stopping_offset)].", + stoppingOffset, tp, COMMITTED_OFFSET, NO_STOPPING_OFFSET)); } } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java index f30c660d..8b308af1 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java @@ -300,7 +300,7 @@ public class KafkaEnumeratorTest { getAllAssignSplits(context, PRE_EXISTING_TOPICS); assertThat(initialPartitionAssign) .extracting(KafkaPartitionSplit::getStartingOffset) - .containsOnly(KafkaPartitionSplit.LATEST_OFFSET); + .containsOnly((long) KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION); List<KafkaPartitionSplit> newPartitionAssign = getAllAssignSplits(context, Collections.singleton(DYNAMIC_TOPIC_NAME)); assertThat(newPartitionAssign) diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java index e0cd8506..46dd61a6 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java @@ -84,7 +84,7 @@ public class OffsetsInitializerTest { assertThat(offsets).hasSameSizeAs(partitions); assertThat(offsets.keySet()).containsAll(partitions); for (long offset : offsets.values()) { - assertThat(offset).isEqualTo(KafkaPartitionSplit.LATEST_OFFSET); + assertThat(offset).isEqualTo(KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION); } assertThat(initializer.getAutoOffsetResetStrategy()).isEqualTo(OffsetResetStrategy.LATEST); } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java index edd41326..b592a691 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java @@ -246,8 +246,8 @@ public class KafkaPartitionSplitReaderTest { final KafkaPartitionSplit emptySplit = new KafkaPartitionSplit( new TopicPartition(TOPIC2, 0), - KafkaPartitionSplit.LATEST_OFFSET, - KafkaPartitionSplit.LATEST_OFFSET); + KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION, + KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION); final KafkaPartitionSplit emptySplitWithZeroStoppingOffset = new KafkaPartitionSplit(new TopicPartition(TOPIC3, 0), 0, 0); diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java index b350d8c6..f5aa7f5f 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java @@ -397,7 +397,9 @@ public class KafkaSourceReaderTest extends SourceReaderTestBase<KafkaPartitionSp // Normal split with NUM_RECORDS_PER_SPLIT records final KafkaPartitionSplit normalSplit = new KafkaPartitionSplit( - new TopicPartition(TOPIC, 0), 0, KafkaPartitionSplit.LATEST_OFFSET); + new TopicPartition(TOPIC, 0), + 0, + KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION); // Empty split with no record final KafkaPartitionSplit emptySplit = new KafkaPartitionSplit( diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplitSerializerTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplitSerializerTest.java index 4ca5c9cb..db764724 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplitSerializerTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/split/KafkaPartitionSplitSerializerTest.java @@ -36,11 +36,7 @@ public class KafkaPartitionSplitSerializerTest { Long normalOffset = 1L; TopicPartition topicPartition = new TopicPartition(topic, 1); List<Long> stoppingOffsets = - Lists.newArrayList( - KafkaPartitionSplit.COMMITTED_OFFSET, - KafkaPartitionSplit.LATEST_OFFSET, - offsetZero, - normalOffset); + Lists.newArrayList(KafkaPartitionSplit.COMMITTED_OFFSET, offsetZero, normalOffset); KafkaPartitionSplitSerializer splitSerializer = new KafkaPartitionSplitSerializer(); for (Long stoppingOffset : stoppingOffsets) { KafkaPartitionSplit kafkaPartitionSplit = diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java index 2c82fc15..1246d53a 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java @@ -477,13 +477,17 @@ public class KafkaDynamicTableFactoryTest { OffsetsInitializer offsetsInitializer = KafkaSourceTestUtils.getStoppingOffsetsInitializer(source); TopicPartition partition = new TopicPartition(TOPIC, 0); + long endOffsets = 123L; Map<TopicPartition, Long> partitionOffsets = offsetsInitializer.getPartitionOffsets( Collections.singletonList(partition), - MockPartitionOffsetsRetriever.noInteractions()); + MockPartitionOffsetsRetriever.latest( + (tps) -> + Collections.singletonMap( + partition, endOffsets))); assertThat(partitionOffsets) .containsOnlyKeys(partition) - .containsEntry(partition, KafkaPartitionSplit.LATEST_OFFSET); + .containsEntry(partition, endOffsets); }); } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java index 2674183f..409acd97 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java @@ -18,12 +18,19 @@ package org.apache.flink.streaming.connectors.kafka.table; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.core.testutils.FlinkAssertions; +import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.data.RowData; import org.apache.flink.table.utils.EncodingUtils; @@ -944,6 +951,138 @@ public class KafkaTableITCase extends KafkaTableTestBase { deleteTestTopic(topic); } + @Test + public void testLatestOffsetStrategyResume() throws Exception { + // we always use a different topic name for each parameterized topic, + // in order to make sure the topic can be created. + final String topic = "latest_offset_resume_topic_" + format + "_" + UUID.randomUUID(); + createTestTopic(topic, 6, 1); + env.setParallelism(1); + + // ---------- Produce data into Kafka's partition 0-6 ------------------- + + String groupId = getStandardProps().getProperty("group.id"); + String bootstraps = getBootstrapServers(); + + final String createTable = + String.format( + "CREATE TABLE kafka (\n" + + " `partition_id` INT,\n" + + " `value` INT\n" + + ") WITH (\n" + + " 'connector' = 'kafka',\n" + + " 'topic' = '%s',\n" + + " 'properties.bootstrap.servers' = '%s',\n" + + " 'properties.group.id' = '%s',\n" + + " 'scan.startup.mode' = 'latest-offset',\n" + + " 'sink.partitioner' = '%s',\n" + + " 'format' = '%s'\n" + + ")", + topic, bootstraps, groupId, TestPartitioner.class.getName(), format); + + tEnv.executeSql(createTable); + + String initialValues = + "INSERT INTO kafka VALUES (0, 0), (1, 0), (2, 0), (3, 0), (4, 0), (5, 0)"; + tEnv.executeSql(initialValues).await(); + + // ---------- Consume stream from Kafka ------------------- + + String createSink = + "CREATE TABLE MySink(\n" + + " `id` INT,\n" + + " `value` INT\n" + + ") WITH (\n" + + " 'connector' = 'values'\n" + + ")"; + tEnv.executeSql(createSink); + + String executeInsert = "INSERT INTO MySink SELECT `partition_id`, `value` FROM kafka"; + TableResult tableResult = tEnv.executeSql(executeInsert); + + // ---------- Produce data into Kafka's partition 0-2 ------------------- + + String moreValues = "INSERT INTO kafka VALUES (0, 1), (1, 1), (2, 1)"; + tEnv.executeSql(moreValues).await(); + + final List<String> expected = Arrays.asList("+I[0, 1]", "+I[1, 1]", "+I[2, 1]"); + KafkaTableTestUtils.waitingExpectedResults("MySink", expected, Duration.ofSeconds(5)); + + // ---------- Stop the consume job with savepoint ------------------- + + String savepointBasePath = getTempDirPath(topic + "-savepoint"); + assert tableResult.getJobClient().isPresent(); + JobClient client = tableResult.getJobClient().get(); + String savepointPath = + client.stopWithSavepoint(false, savepointBasePath, SavepointFormatType.DEFAULT) + .get(); + + // ---------- Produce data into Kafka's partition 0-5 ------------------- + + String produceValuesBeforeResume = + "INSERT INTO kafka VALUES (0, 2), (1, 2), (2, 2), (3, 1), (4, 1), (5, 1)"; + tEnv.executeSql(produceValuesBeforeResume).await(); + + // ---------- Resume the consume job from savepoint ------------------- + + Configuration configuration = new Configuration(); + configuration.set(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath); + configuration.set(CoreOptions.DEFAULT_PARALLELISM, 1); + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + + tEnv.executeSql(createTable); + tEnv.executeSql(createSink); + tableResult = tEnv.executeSql(executeInsert); + + final List<String> afterResumeExpected = + Arrays.asList( + "+I[0, 1]", + "+I[1, 1]", + "+I[2, 1]", + "+I[0, 2]", + "+I[1, 2]", + "+I[2, 2]", + "+I[3, 1]", + "+I[4, 1]", + "+I[5, 1]"); + KafkaTableTestUtils.waitingExpectedResults( + "MySink", afterResumeExpected, Duration.ofSeconds(5)); + + // ---------- Produce data into Kafka's partition 0-5 ------------------- + + String produceValuesAfterResume = + "INSERT INTO kafka VALUES (0, 3), (1, 3), (2, 3), (3, 2), (4, 2), (5, 2)"; + this.tEnv.executeSql(produceValuesAfterResume).await(); + + final List<String> afterProduceExpected = + Arrays.asList( + "+I[0, 1]", + "+I[1, 1]", + "+I[2, 1]", + "+I[0, 2]", + "+I[1, 2]", + "+I[2, 2]", + "+I[3, 1]", + "+I[4, 1]", + "+I[5, 1]", + "+I[0, 3]", + "+I[1, 3]", + "+I[2, 3]", + "+I[3, 2]", + "+I[4, 2]", + "+I[5, 2]"); + KafkaTableTestUtils.waitingExpectedResults( + "MySink", afterProduceExpected, Duration.ofSeconds(5)); + + // ------------- cleanup ------------------- + + tableResult.getJobClient().ifPresent(JobClient::cancel); + deleteTestTopic(topic); + } + @Test public void testStartFromGroupOffsetsLatest() throws Exception { testStartFromGroupOffsets("latest"); diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java index 41d9e7eb..15c740d2 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java @@ -465,13 +465,17 @@ public class UpsertKafkaDynamicTableFactoryTest extends TestLogger { OffsetsInitializer offsetsInitializer = KafkaSourceTestUtils.getStoppingOffsetsInitializer(source); TopicPartition partition = new TopicPartition(SOURCE_TOPIC, 0); + long endOffsets = 123L; Map<TopicPartition, Long> partitionOffsets = offsetsInitializer.getPartitionOffsets( Collections.singletonList(partition), - MockPartitionOffsetsRetriever.noInteractions()); + MockPartitionOffsetsRetriever.latest( + (tps) -> + Collections.singletonMap( + partition, endOffsets))); assertThat(partitionOffsets) .containsOnlyKeys(partition) - .containsEntry(partition, KafkaPartitionSplit.LATEST_OFFSET); + .containsEntry(partition, endOffsets); }); } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockPartitionOffsetsRetriever.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockPartitionOffsetsRetriever.java index 175bddd6..9947bc5b 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockPartitionOffsetsRetriever.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockPartitionOffsetsRetriever.java @@ -68,6 +68,17 @@ public final class MockPartitionOffsetsRetriever UNSUPPORTED_RETRIEVAL, endOffsets, UNSUPPORTED_RETRIEVAL, retriever); } + public static MockPartitionOffsetsRetriever latest(OffsetsRetriever endOffsets) { + return new MockPartitionOffsetsRetriever( + UNSUPPORTED_RETRIEVAL, + endOffsets, + UNSUPPORTED_RETRIEVAL, + partitions -> { + throw new UnsupportedOperationException( + "The method was not supposed to be called"); + }); + } + private MockPartitionOffsetsRetriever( OffsetsRetriever committedOffsets, OffsetsRetriever endOffsets,