This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 9e8a99c12ce [FLINK-24456][Connectors / Kafka,Table SQL / Ecosystem] Support bounded offset in the Kafka table connector 9e8a99c12ce is described below commit 9e8a99c12ce939418086fa9555c1c4f74bcf6b59 Author: shihong90 <2572805...@qq.com> AuthorDate: Thu Dec 2 20:42:17 2021 +0800 [FLINK-24456][Connectors / Kafka,Table SQL / Ecosystem] Support bounded offset in the Kafka table connector Co-authored-by: shihong90 <2572805...@qq.com> This closes #21808 --- docs/content.zh/docs/connectors/table/kafka.md | 44 +++- docs/content/docs/connectors/table/kafka.md | 45 +++- .../flink/connector/kafka/source/KafkaSource.java | 5 + .../connectors/kafka/config/BoundedMode.java | 49 +++++ .../kafka/table/KafkaConnectorOptions.java | 60 ++++++ .../kafka/table/KafkaConnectorOptionsUtil.java | 118 ++++++++++- .../connectors/kafka/table/KafkaDynamicSource.java | 63 +++++- .../kafka/table/KafkaDynamicTableFactory.java | 20 ++ .../table/UpsertKafkaDynamicTableFactory.java | 4 + .../kafka/source/KafkaSourceTestUtils.java | 6 + .../kafka/table/KafkaDynamicTableFactoryTest.java | 228 ++++++++++++++++++++- .../connectors/kafka/table/KafkaTableITCase.java | 125 +++++++++++ .../table/UpsertKafkaDynamicTableFactoryTest.java | 4 + 13 files changed, 762 insertions(+), 9 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/kafka.md b/docs/content.zh/docs/connectors/table/kafka.md index b57da402ebd..408cb1a2f05 100644 --- a/docs/content.zh/docs/connectors/table/kafka.md +++ b/docs/content.zh/docs/connectors/table/kafka.md @@ -290,7 +290,7 @@ CREATE TABLE KafkaTable ( <td><h5>scan.startup.mode</h5></td> <td>可选</td> <td style="word-wrap: break-word;">group-offsets</td> - <td>String</td> + <td>Enum</td> <td>Kafka consumer 的启动模式。有效值为:<code>'earliest-offset'</code>,<code>'latest-offset'</code>,<code>'group-offsets'</code>,<code>'timestamp'</code> 和 <code>'specific-offsets'</code>。 请参阅下方 <a href="#起始消费位点">起始消费位点</a> 以获取更多细节。</td> </tr> @@ -309,6 +309,32 @@ CREATE TABLE KafkaTable ( <td>Long</td> <td>在使用 <code>'timestamp'</code> 启动模式时指定启动的时间戳(单位毫秒)。</td> </tr> + <tr> + <td><h5>scan.bounded.mode</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">unbounded</td> + <td>Enum</td> + <td>Bounded mode for Kafka consumer, valid values are <code>'latest-offset'</code>, <code>'group-offsets'</code>, <code>'timestamp'</code> and <code>'specific-offsets'</code>. + See the following <a href="#bounded-ending-position">Bounded Ending Position</a> for more details.</td> + </tr> + <tr> + <td><h5>scan.bounded.specific-offsets</h5></td> + <td>optional</td> + <td>yes</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Specify offsets for each partition in case of <code>'specific-offsets'</code> bounded mode, e.g. <code>'partition:0,offset:42;partition:1,offset:300'. If an offset + for a partition is not provided it will not consume from that partition.</code>. + </td> + </tr> + <tr> + <td><h5>scan.bounded.timestamp-millis</h5></td> + <td>optional</td> + <td>yes</td> + <td style="word-wrap: break-word;">(none)</td> + <td>Long</td> + <td>End at the specified epoch timestamp (milliseconds) used in case of <code>'timestamp'</code> bounded mode.</td> + </tr> <tr> <td><h5>scan.topic-partition-discovery.interval</h5></td> <td>可选</td> @@ -485,6 +511,22 @@ ROW<`version` INT, `behavior` STRING> 如果使用了 `specific-offsets`,必须使用另外一个配置项 `scan.startup.specific-offsets` 来为每个 partition 指定起始偏移量, 例如,选项值 `partition:0,offset:42;partition:1,offset:300` 表示 partition `0` 从偏移量 `42` 开始,partition `1` 从偏移量 `300` 开始。 +### Bounded Ending Position + +The config option `scan.bounded.mode` specifies the bounded mode for Kafka consumer. The valid enumerations are: +<ul> +<li><span markdown="span">`group-offsets`</span>: bounded by committed offsets in ZooKeeper / Kafka brokers of a specific consumer group. This is evaluated at the start of consumption from a given partition.</li> +<li><span markdown="span">`latest-offset`</span>: bounded by latest offsets. This is evaluated at the start of consumption from a given partition.</li> +<li><span markdown="span">`timestamp`</span>: bounded by a user-supplied timestamp.</li> +<li><span markdown="span">`specific-offsets`</span>: bounded by user-supplied specific offsets for each partition.</li> +</ul> + +If config option value `scan.bounded.mode` is not set the default is an unbounded table. + +If `timestamp` is specified, another config option `scan.bounded.timestamp-millis` is required to specify a specific bounded timestamp in milliseconds since January 1, 1970 00:00:00.000 GMT. + +If `specific-offsets` is specified, another config option `scan.bounded.specific-offsets` is required to specify specific bounded offsets for each partition, +e.g. an option value `partition:0,offset:42;partition:1,offset:300` indicates offset `42` for partition `0` and offset `300` for partition `1`. If an offset for a partition is not provided it will not consume from that partition. ### CDC 变更日志(Changelog) Source diff --git a/docs/content/docs/connectors/table/kafka.md b/docs/content/docs/connectors/table/kafka.md index a5d85989de1..3c9e739c28c 100644 --- a/docs/content/docs/connectors/table/kafka.md +++ b/docs/content/docs/connectors/table/kafka.md @@ -313,7 +313,7 @@ Connector Options <td>optional</td> <td>yes</td> <td style="word-wrap: break-word;">group-offsets</td> - <td>String</td> + <td>Enum</td> <td>Startup mode for Kafka consumer, valid values are <code>'earliest-offset'</code>, <code>'latest-offset'</code>, <code>'group-offsets'</code>, <code>'timestamp'</code> and <code>'specific-offsets'</code>. See the following <a href="#start-reading-position">Start Reading Position</a> for more details.</td> </tr> @@ -334,6 +334,32 @@ Connector Options <td>Long</td> <td>Start from the specified epoch timestamp (milliseconds) used in case of <code>'timestamp'</code> startup mode.</td> </tr> + <tr> + <td><h5>scan.bounded.mode</h5></td> + <td>optional</td> + <td style="word-wrap: break-word;">unbounded</td> + <td>Enum</td> + <td>Bounded mode for Kafka consumer, valid values are <code>'latest-offset'</code>, <code>'group-offsets'</code>, <code>'timestamp'</code> and <code>'specific-offsets'</code>. + See the following <a href="#bounded-ending-position">Bounded Ending Position</a> for more details.</td> + </tr> + <tr> + <td><h5>scan.bounded.specific-offsets</h5></td> + <td>optional</td> + <td>yes</td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Specify offsets for each partition in case of <code>'specific-offsets'</code> bounded mode, e.g. <code>'partition:0,offset:42;partition:1,offset:300'. If an offset + for a partition is not provided it will not consume from that partition.</code>. + </td> + </tr> + <tr> + <td><h5>scan.bounded.timestamp-millis</h5></td> + <td>optional</td> + <td>yes</td> + <td style="word-wrap: break-word;">(none)</td> + <td>Long</td> + <td>End at the specified epoch timestamp (milliseconds) used in case of <code>'timestamp'</code> bounded mode.</td> + </tr> <tr> <td><h5>scan.topic-partition-discovery.interval</h5></td> <td>optional</td> @@ -535,6 +561,23 @@ If `timestamp` is specified, another config option `scan.startup.timestamp-milli If `specific-offsets` is specified, another config option `scan.startup.specific-offsets` is required to specify specific startup offsets for each partition, e.g. an option value `partition:0,offset:42;partition:1,offset:300` indicates offset `42` for partition `0` and offset `300` for partition `1`. +### Bounded Ending Position + +The config option `scan.bounded.mode` specifies the bounded mode for Kafka consumer. The valid enumerations are: +<ul> +<li><span markdown="span">`group-offsets`</span>: bounded by committed offsets in ZooKeeper / Kafka brokers of a specific consumer group. This is evaluated at the start of consumption from a given partition.</li> +<li><span markdown="span">`latest-offset`</span>: bounded by latest offsets. This is evaluated at the start of consumption from a given partition.</li> +<li><span markdown="span">`timestamp`</span>: bounded by a user-supplied timestamp.</li> +<li><span markdown="span">`specific-offsets`</span>: bounded by user-supplied specific offsets for each partition.</li> +</ul> + +If config option value `scan.bounded.mode` is not set the default is an unbounded table. + +If `timestamp` is specified, another config option `scan.bounded.timestamp-millis` is required to specify a specific bounded timestamp in milliseconds since January 1, 1970 00:00:00.000 GMT. + +If `specific-offsets` is specified, another config option `scan.bounded.specific-offsets` is required to specify specific bounded offsets for each partition, +e.g. an option value `partition:0,offset:42;partition:1,offset:300` indicates offset `42` for partition `0` and offset `300` for partition `1`. If an offset for a partition is not provided it will not consume from that partition. + ### CDC Changelog Source Flink natively supports Kafka as a CDC changelog source. If messages in a Kafka topic are change event captured from other databases using a CDC tool, you can use the corresponding Flink CDC format to interpret the messages as INSERT/UPDATE/DELETE statements into a Flink SQL table. diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java index 1b327504587..7a17b1ff6f9 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java @@ -233,4 +233,9 @@ public class KafkaSource<OUT> KafkaSubscriber getKafkaSubscriber() { return subscriber; } + + @VisibleForTesting + OffsetsInitializer getStoppingOffsetsInitializer() { + return stoppingOffsetsInitializer; + } } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/BoundedMode.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/BoundedMode.java new file mode 100644 index 00000000000..beb2306b581 --- /dev/null +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/config/BoundedMode.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.config; + +import org.apache.flink.annotation.Internal; + +/** End modes for the Kafka Consumer. */ +@Internal +public enum BoundedMode { + + /** Do not end consuming. */ + UNBOUNDED, + + /** + * End from committed offsets in ZK / Kafka brokers of a specific consumer group. This is + * evaluated at the start of consumption from a given partition. + */ + GROUP_OFFSETS, + + /** + * End from the latest offset. This is evaluated at the start of consumption from a given + * partition. + */ + LATEST, + + /** End from user-supplied timestamp for each partition. */ + TIMESTAMP, + + /** + * End from user-supplied specific offsets for each partition. If an offset for a partition is + * not provided it will not consume from that partition. + */ + SPECIFIC_OFFSETS; +} diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java index 1150ab4b1c3..a6cdbcedc96 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java @@ -151,6 +151,12 @@ public class KafkaConnectorOptions { .defaultValue(ScanStartupMode.GROUP_OFFSETS) .withDescription("Startup mode for Kafka consumer."); + public static final ConfigOption<ScanBoundedMode> SCAN_BOUNDED_MODE = + ConfigOptions.key("scan.bounded.mode") + .enumType(ScanBoundedMode.class) + .defaultValue(ScanBoundedMode.UNBOUNDED) + .withDescription("Bounded mode for Kafka consumer."); + public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSETS = ConfigOptions.key("scan.startup.specific-offsets") .stringType() @@ -158,6 +164,13 @@ public class KafkaConnectorOptions { .withDescription( "Optional offsets used in case of \"specific-offsets\" startup mode"); + public static final ConfigOption<String> SCAN_BOUNDED_SPECIFIC_OFFSETS = + ConfigOptions.key("scan.bounded.specific-offsets") + .stringType() + .noDefaultValue() + .withDescription( + "Optional offsets used in case of \"specific-offsets\" bounded mode"); + public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS = ConfigOptions.key("scan.startup.timestamp-millis") .longType() @@ -165,6 +178,13 @@ public class KafkaConnectorOptions { .withDescription( "Optional timestamp used in case of \"timestamp\" startup mode"); + public static final ConfigOption<Long> SCAN_BOUNDED_TIMESTAMP_MILLIS = + ConfigOptions.key("scan.bounded.timestamp-millis") + .longType() + .noDefaultValue() + .withDescription( + "Optional timestamp used in case of \"timestamp\" bounded mode"); + public static final ConfigOption<Duration> SCAN_TOPIC_PARTITION_DISCOVERY = ConfigOptions.key("scan.topic-partition-discovery.interval") .durationType() @@ -291,5 +311,45 @@ public class KafkaConnectorOptions { } } + /** Bounded mode for the Kafka consumer, see {@link #SCAN_BOUNDED_MODE}. */ + public enum ScanBoundedMode implements DescribedEnum { + UNBOUNDED("unbounded", text("Do not stop consuming")), + LATEST_OFFSET( + "latest-offset", + text( + "Bounded by latest offsets. This is evaluated at the start of consumption" + + " from a given partition.")), + GROUP_OFFSETS( + "group-offsets", + text( + "Bounded by committed offsets in ZooKeeper / Kafka brokers of a specific" + + " consumer group. This is evaluated at the start of consumption" + + " from a given partition.")), + TIMESTAMP("timestamp", text("Bounded by a user-supplied timestamp.")), + SPECIFIC_OFFSETS( + "specific-offsets", + text( + "Bounded by user-supplied specific offsets for each partition. If an offset" + + " for a partition is not provided it will not consume from that" + + " partition.")); + private final String value; + private final InlineElement description; + + ScanBoundedMode(String value, InlineElement description) { + this.value = value; + this.description = description; + } + + @Override + public String toString() { + return value; + } + + @Override + public InlineElement getDescription() { + return description; + } + } + private KafkaConnectorOptions() {} } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java index fb848036e88..ef70644e5d9 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java @@ -24,10 +24,12 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ValueFieldsStrategy; import org.apache.flink.table.api.TableException; @@ -56,6 +58,9 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; @@ -101,6 +106,7 @@ class KafkaConnectorOptionsUtil { public static void validateTableSourceOptions(ReadableConfig tableOptions) { validateSourceTopic(tableOptions); validateScanStartupMode(tableOptions); + validateScanBoundedMode(tableOptions); } public static void validateTableSinkOptions(ReadableConfig tableOptions) { @@ -183,6 +189,49 @@ class KafkaConnectorOptionsUtil { }); } + private static void validateScanBoundedMode(ReadableConfig tableOptions) { + tableOptions + .getOptional(SCAN_BOUNDED_MODE) + .ifPresent( + mode -> { + switch (mode) { + case TIMESTAMP: + if (!tableOptions + .getOptional(SCAN_BOUNDED_TIMESTAMP_MILLIS) + .isPresent()) { + throw new ValidationException( + String.format( + "'%s' is required in '%s' bounded mode" + + " but missing.", + SCAN_BOUNDED_TIMESTAMP_MILLIS.key(), + ScanBoundedMode.TIMESTAMP)); + } + + break; + case SPECIFIC_OFFSETS: + if (!tableOptions + .getOptional(SCAN_BOUNDED_SPECIFIC_OFFSETS) + .isPresent()) { + throw new ValidationException( + String.format( + "'%s' is required in '%s' bounded mode" + + " but missing.", + SCAN_BOUNDED_SPECIFIC_OFFSETS.key(), + ScanBoundedMode.SPECIFIC_OFFSETS)); + } + if (!isSingleTopic(tableOptions)) { + throw new ValidationException( + "Currently Kafka source only supports specific offset for single topic."); + } + String specificOffsets = + tableOptions.get(SCAN_BOUNDED_SPECIFIC_OFFSETS); + parseSpecificOffsets( + specificOffsets, SCAN_BOUNDED_SPECIFIC_OFFSETS.key()); + break; + } + }); + } + private static void validateSinkPartitioner(ReadableConfig tableOptions) { tableOptions .getOptional(SINK_PARTITIONER) @@ -241,6 +290,23 @@ class KafkaConnectorOptionsUtil { return options; } + public static BoundedOptions getBoundedOptions(ReadableConfig tableOptions) { + final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>(); + final BoundedMode boundedMode = + KafkaConnectorOptionsUtil.fromOption(tableOptions.get(SCAN_BOUNDED_MODE)); + if (boundedMode == BoundedMode.SPECIFIC_OFFSETS) { + buildBoundedOffsets(tableOptions, tableOptions.get(TOPIC).get(0), specificOffsets); + } + + final BoundedOptions options = new BoundedOptions(); + options.boundedMode = boundedMode; + options.specificOffsets = specificOffsets; + if (boundedMode == BoundedMode.TIMESTAMP) { + options.boundedTimestampMillis = tableOptions.get(SCAN_BOUNDED_TIMESTAMP_MILLIS); + } + return options; + } + private static void buildSpecificOffsets( ReadableConfig tableOptions, String topic, @@ -256,6 +322,22 @@ class KafkaConnectorOptionsUtil { }); } + public static void buildBoundedOffsets( + ReadableConfig tableOptions, + String topic, + Map<KafkaTopicPartition, Long> specificOffsets) { + String specificOffsetsEndOpt = tableOptions.get(SCAN_BOUNDED_SPECIFIC_OFFSETS); + final Map<Integer, Long> offsetMap = + parseSpecificOffsets(specificOffsetsEndOpt, SCAN_BOUNDED_SPECIFIC_OFFSETS.key()); + + offsetMap.forEach( + (partition, offset) -> { + final KafkaTopicPartition topicPartition = + new KafkaTopicPartition(topic, partition); + specificOffsets.put(topicPartition, offset); + }); + } + /** * Returns the {@link StartupMode} of Kafka Consumer by passed-in table-specific {@link * ScanStartupMode}. @@ -279,6 +361,29 @@ class KafkaConnectorOptionsUtil { } } + /** + * Returns the {@link BoundedMode} of Kafka Consumer by passed-in table-specific {@link + * ScanBoundedMode}. + */ + private static BoundedMode fromOption(ScanBoundedMode scanBoundedMode) { + switch (scanBoundedMode) { + case UNBOUNDED: + return BoundedMode.UNBOUNDED; + case LATEST_OFFSET: + return BoundedMode.LATEST; + case GROUP_OFFSETS: + return BoundedMode.GROUP_OFFSETS; + case TIMESTAMP: + return BoundedMode.TIMESTAMP; + case SPECIFIC_OFFSETS: + return BoundedMode.SPECIFIC_OFFSETS; + + default: + throw new TableException( + "Unsupported bounded mode. Validator should have checked that."); + } + } + public static Properties getKafkaProperties(Map<String, String> tableOptions) { final Properties kafkaProperties = new Properties(); @@ -320,15 +425,15 @@ class KafkaConnectorOptionsUtil { } /** - * Parses SpecificOffsets String to Map. + * Parses specificOffsets String to Map. * - * <p>SpecificOffsets String format was given as following: + * <p>specificOffsets String format was given as following: * * <pre> * scan.startup.specific-offsets = partition:0,offset:42;partition:1,offset:300 * </pre> * - * @return SpecificOffsets with Map format, key is partition, and value is offset + * @return specificOffsets with Map format, key is partition, and value is offset */ public static Map<Integer, Long> parseSpecificOffsets( String specificOffsetsStr, String optionKey) { @@ -581,5 +686,12 @@ class KafkaConnectorOptionsUtil { public long startupTimestampMillis; } + /** Kafka bounded options. * */ + public static class BoundedOptions { + public BoundedMode boundedMode; + public Map<KafkaTopicPartition, Long> specificOffsets; + public long boundedTimestampMillis; + } + private KafkaConnectorOptionsUtil() {} } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java index d6bf27a104b..c963da76206 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java @@ -25,12 +25,14 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; +import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; +import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter; @@ -149,6 +151,21 @@ public class KafkaDynamicSource */ protected final long startupTimestampMillis; + /** The bounded mode for the contained consumer (default is an unbounded data stream). */ + protected final BoundedMode boundedMode; + + /** + * Specific end offsets; only relevant when bounded mode is {@link + * BoundedMode#SPECIFIC_OFFSETS}. + */ + protected final Map<KafkaTopicPartition, Long> specificBoundedOffsets; + + /** + * The bounded timestamp to locate partition offsets; only relevant when bounded mode is {@link + * BoundedMode#TIMESTAMP}. + */ + protected final long boundedTimestampMillis; + /** Flag to determine source mode. In upsert mode, it will keep the tombstone message. * */ protected final boolean upsertMode; @@ -167,6 +184,9 @@ public class KafkaDynamicSource StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets, long startupTimestampMillis, + BoundedMode boundedMode, + Map<KafkaTopicPartition, Long> specificBoundedOffsets, + long boundedTimestampMillis, boolean upsertMode, String tableIdentifier) { // Format attributes @@ -200,6 +220,12 @@ public class KafkaDynamicSource Preconditions.checkNotNull( specificStartupOffsets, "Specific offsets must not be null."); this.startupTimestampMillis = startupTimestampMillis; + this.boundedMode = + Preconditions.checkNotNull(boundedMode, "Bounded mode must not be null."); + this.specificBoundedOffsets = + Preconditions.checkNotNull( + specificBoundedOffsets, "Specific bounded offsets must not be null."); + this.boundedTimestampMillis = boundedTimestampMillis; this.upsertMode = upsertMode; this.tableIdentifier = tableIdentifier; } @@ -314,6 +340,9 @@ public class KafkaDynamicSource startupMode, specificStartupOffsets, startupTimestampMillis, + boundedMode, + specificBoundedOffsets, + boundedTimestampMillis, upsertMode, tableIdentifier); copy.producedDataType = producedDataType; @@ -350,6 +379,9 @@ public class KafkaDynamicSource && startupMode == that.startupMode && Objects.equals(specificStartupOffsets, that.specificStartupOffsets) && startupTimestampMillis == that.startupTimestampMillis + && boundedMode == that.boundedMode + && Objects.equals(specificBoundedOffsets, that.specificBoundedOffsets) + && boundedTimestampMillis == that.boundedTimestampMillis && Objects.equals(upsertMode, that.upsertMode) && Objects.equals(tableIdentifier, that.tableIdentifier) && Objects.equals(watermarkStrategy, that.watermarkStrategy); @@ -363,8 +395,8 @@ public class KafkaDynamicSource physicalDataType, keyDecodingFormat, valueDecodingFormat, - keyProjection, - valueProjection, + Arrays.hashCode(keyProjection), + Arrays.hashCode(valueProjection), keyPrefix, topics, topicPattern, @@ -372,6 +404,9 @@ public class KafkaDynamicSource startupMode, specificStartupOffsets, startupTimestampMillis, + boundedMode, + specificBoundedOffsets, + boundedTimestampMillis, upsertMode, tableIdentifier, watermarkStrategy); @@ -427,6 +462,30 @@ public class KafkaDynamicSource break; } + switch (boundedMode) { + case UNBOUNDED: + kafkaSourceBuilder.setUnbounded(new NoStoppingOffsetsInitializer()); + break; + case LATEST: + kafkaSourceBuilder.setBounded(OffsetsInitializer.latest()); + break; + case GROUP_OFFSETS: + kafkaSourceBuilder.setBounded(OffsetsInitializer.committedOffsets()); + break; + case SPECIFIC_OFFSETS: + Map<TopicPartition, Long> offsets = new HashMap<>(); + specificBoundedOffsets.forEach( + (tp, offset) -> + offsets.put( + new TopicPartition(tp.getTopic(), tp.getPartition()), + offset)); + kafkaSourceBuilder.setBounded(OffsetsInitializer.offsets(offsets)); + break; + case TIMESTAMP: + kafkaSourceBuilder.setBounded(OffsetsInitializer.timestamp(boundedTimestampMillis)); + break; + } + kafkaSourceBuilder .setProperties(properties) .setDeserializer(KafkaRecordDeserializationSchema.of(kafkaDeserializer)); diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java index 5ac146d7197..48c00918aa1 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java @@ -27,9 +27,11 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.source.KafkaSourceOptions; +import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.BoundedOptions; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.format.DecodingFormat; @@ -69,6 +71,9 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_GROUP_ID; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; @@ -85,6 +90,7 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.autoCompleteSchemaRegistrySubject; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getBoundedOptions; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getFlinkKafkaPartitioner; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getKafkaProperties; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getSourceTopicPattern; @@ -143,6 +149,9 @@ public class KafkaDynamicTableFactory options.add(DELIVERY_GUARANTEE); options.add(TRANSACTIONAL_ID_PREFIX); options.add(SINK_SEMANTIC); + options.add(SCAN_BOUNDED_MODE); + options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS); + options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS); return options; } @@ -187,6 +196,8 @@ public class KafkaDynamicTableFactory final StartupOptions startupOptions = getStartupOptions(tableOptions); + final BoundedOptions boundedOptions = getBoundedOptions(tableOptions); + final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions()); // add topic-partition discovery @@ -217,6 +228,9 @@ public class KafkaDynamicTableFactory startupOptions.startupMode, startupOptions.specificOffsets, startupOptions.startupTimestampMillis, + boundedOptions.boundedMode, + boundedOptions.specificOffsets, + boundedOptions.boundedTimestampMillis, context.getObjectIdentifier().asSummaryString()); } @@ -378,6 +392,9 @@ public class KafkaDynamicTableFactory StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets, long startupTimestampMillis, + BoundedMode boundedMode, + Map<KafkaTopicPartition, Long> specificEndOffsets, + long endTimestampMillis, String tableIdentifier) { return new KafkaDynamicSource( physicalDataType, @@ -392,6 +409,9 @@ public class KafkaDynamicTableFactory startupMode, specificStartupOffsets, startupTimestampMillis, + boundedMode, + specificEndOffsets, + endTimestampMillis, false, tableIdentifier); } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java index 8682e405121..254e1bf9852 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ResolvedCatalogTable; @@ -141,6 +142,9 @@ public class UpsertKafkaDynamicTableFactory earliest, Collections.emptyMap(), 0, + BoundedMode.UNBOUNDED, + Collections.emptyMap(), + 0, true, context.getObjectIdentifier().asSummaryString()); } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTestUtils.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTestUtils.java index 572b77d0787..e95f05babe1 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTestUtils.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceTestUtils.java @@ -20,6 +20,7 @@ package org.apache.flink.connector.kafka.source; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.reader.KafkaSourceReader; import java.util.Collection; @@ -50,4 +51,9 @@ public class KafkaSourceTestUtils { public static Configuration getKafkaSourceConfiguration(KafkaSource<?> kafkaSource) { return kafkaSource.getConfiguration(); } + + /** Get stopping offsets initializer. */ + public static OffsetsInitializer getStoppingOffsetsInitializer(KafkaSource<?> kafkaSource) { + return kafkaSource.getStoppingOffsetsInitializer(); + } } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java index 0b908abfa5f..a0b74a5b763 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.connectors.kafka.table; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; @@ -30,6 +31,7 @@ import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSourceOptions; import org.apache.flink.connector.kafka.source.KafkaSourceTestUtils; import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; import org.apache.flink.formats.avro.AvroRowDataSerializationSchema; import org.apache.flink.formats.avro.RowDataToAvroConverters; @@ -38,6 +40,7 @@ import org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroSer import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; @@ -73,7 +76,9 @@ import org.apache.flink.util.TestLoggerExtension; import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.junit.jupiter.api.Test; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.common.TopicPartition; +import org.junit.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.NullSource; @@ -82,6 +87,7 @@ import org.junit.jupiter.params.provider.ValueSource; import javax.annotation.Nullable; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -91,6 +97,7 @@ import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.function.Consumer; +import java.util.function.Function; import java.util.regex.Pattern; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; @@ -103,7 +110,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Abstract test base for {@link KafkaDynamicTableFactory}. */ +/** Tests for {@link KafkaDynamicTableFactory}. */ @ExtendWith(TestLoggerExtension.class) public class KafkaDynamicTableFactoryTest { @@ -425,6 +432,220 @@ public class KafkaDynamicTableFactoryTest { } } + @Test + public void testBoundedSpecificOffsetsValidate() { + final Map<String, String> modifiedOptions = + getModifiedOptions( + getBasicSourceOptions(), + options -> { + options.put( + KafkaConnectorOptions.SCAN_BOUNDED_MODE.key(), + "specific-offsets"); + }); + assertThatThrownBy(() -> createTableSource(SCHEMA, modifiedOptions)) + .cause() + .hasMessageContaining( + "'scan.bounded.specific-offsets' is required in 'specific-offsets' bounded mode but missing."); + } + + @Test + public void testBoundedSpecificOffsets() { + testBoundedOffsets( + "specific-offsets", + options -> { + options.put("scan.bounded.specific-offsets", "partition:0,offset:2"); + }, + source -> { + assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED); + OffsetsInitializer offsetsInitializer = + KafkaSourceTestUtils.getStoppingOffsetsInitializer(source); + TopicPartition partition = new TopicPartition(TOPIC, 0); + Map<TopicPartition, Long> partitionOffsets = + offsetsInitializer.getPartitionOffsets( + Collections.singletonList(partition), + MockPartitionOffsetsRetriever.noInteractions()); + assertThat(partitionOffsets) + .containsOnlyKeys(partition) + .containsEntry(partition, 2L); + }); + } + + @Test + public void testBoundedLatestOffset() { + testBoundedOffsets( + "latest-offset", + options -> {}, + source -> { + assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED); + OffsetsInitializer offsetsInitializer = + KafkaSourceTestUtils.getStoppingOffsetsInitializer(source); + TopicPartition partition = new TopicPartition(TOPIC, 0); + Map<TopicPartition, Long> partitionOffsets = + offsetsInitializer.getPartitionOffsets( + Collections.singletonList(partition), + MockPartitionOffsetsRetriever.noInteractions()); + assertThat(partitionOffsets) + .containsOnlyKeys(partition) + .containsEntry(partition, KafkaPartitionSplit.LATEST_OFFSET); + }); + } + + @Test + public void testBoundedGroupOffsets() { + testBoundedOffsets( + "group-offsets", + options -> {}, + source -> { + assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED); + OffsetsInitializer offsetsInitializer = + KafkaSourceTestUtils.getStoppingOffsetsInitializer(source); + TopicPartition partition = new TopicPartition(TOPIC, 0); + Map<TopicPartition, Long> partitionOffsets = + offsetsInitializer.getPartitionOffsets( + Collections.singletonList(partition), + MockPartitionOffsetsRetriever.noInteractions()); + assertThat(partitionOffsets) + .containsOnlyKeys(partition) + .containsEntry(partition, KafkaPartitionSplit.COMMITTED_OFFSET); + }); + } + + @Test + public void testBoundedTimestamp() { + testBoundedOffsets( + "timestamp", + options -> { + options.put("scan.bounded.timestamp-millis", "1"); + }, + source -> { + assertThat(source.getBoundedness()).isEqualTo(Boundedness.BOUNDED); + OffsetsInitializer offsetsInitializer = + KafkaSourceTestUtils.getStoppingOffsetsInitializer(source); + TopicPartition partition = new TopicPartition(TOPIC, 0); + long offsetForTimestamp = 123L; + Map<TopicPartition, Long> partitionOffsets = + offsetsInitializer.getPartitionOffsets( + Collections.singletonList(partition), + MockPartitionOffsetsRetriever.timestampAndEnd( + partitions -> { + assertThat(partitions) + .containsOnlyKeys(partition) + .containsEntry(partition, 1L); + Map<TopicPartition, OffsetAndTimestamp> result = + new HashMap<>(); + result.put( + partition, + new OffsetAndTimestamp( + offsetForTimestamp, 1L)); + return result; + }, + partitions -> { + Map<TopicPartition, Long> result = new HashMap<>(); + result.put( + partition, + // the end offset is bigger than given by + // timestamp + // to make sure the one for timestamp is + // used + offsetForTimestamp + 1000L); + return result; + })); + assertThat(partitionOffsets) + .containsOnlyKeys(partition) + .containsEntry(partition, offsetForTimestamp); + }); + } + + private void testBoundedOffsets( + String boundedMode, + Consumer<Map<String, String>> optionsConfig, + Consumer<KafkaSource<?>> validator) { + final Map<String, String> modifiedOptions = + getModifiedOptions( + getBasicSourceOptions(), + options -> { + options.put(KafkaConnectorOptions.SCAN_BOUNDED_MODE.key(), boundedMode); + optionsConfig.accept(options); + }); + final DynamicTableSource tableSource = createTableSource(SCHEMA, modifiedOptions); + assertThat(tableSource).isInstanceOf(KafkaDynamicSource.class); + ScanTableSource.ScanRuntimeProvider provider = + ((KafkaDynamicSource) tableSource) + .getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + assertThat(provider).isInstanceOf(DataStreamScanProvider.class); + final KafkaSource<?> kafkaSource = assertKafkaSource(provider); + validator.accept(kafkaSource); + } + + private interface OffsetsRetriever + extends Function<Collection<TopicPartition>, Map<TopicPartition, Long>> {} + + private interface TimestampOffsetsRetriever + extends Function<Map<TopicPartition, Long>, Map<TopicPartition, OffsetAndTimestamp>> {} + + private static final class MockPartitionOffsetsRetriever + implements OffsetsInitializer.PartitionOffsetsRetriever { + + public static final OffsetsRetriever UNSUPPORTED_RETRIEVAL = + partitions -> { + throw new UnsupportedOperationException( + "The method was not supposed to be called"); + }; + private final OffsetsRetriever committedOffsets; + private final OffsetsRetriever endOffsets; + private final OffsetsRetriever beginningOffsets; + private final TimestampOffsetsRetriever offsetsForTimes; + + static MockPartitionOffsetsRetriever noInteractions() { + return new MockPartitionOffsetsRetriever( + UNSUPPORTED_RETRIEVAL, + UNSUPPORTED_RETRIEVAL, + UNSUPPORTED_RETRIEVAL, + partitions -> { + throw new UnsupportedOperationException( + "The method was not supposed to be called"); + }); + } + + static MockPartitionOffsetsRetriever timestampAndEnd( + TimestampOffsetsRetriever retriever, OffsetsRetriever endOffsets) { + return new MockPartitionOffsetsRetriever( + UNSUPPORTED_RETRIEVAL, endOffsets, UNSUPPORTED_RETRIEVAL, retriever); + } + + private MockPartitionOffsetsRetriever( + OffsetsRetriever committedOffsets, + OffsetsRetriever endOffsets, + OffsetsRetriever beginningOffsets, + TimestampOffsetsRetriever offsetsForTimes) { + this.committedOffsets = committedOffsets; + this.endOffsets = endOffsets; + this.beginningOffsets = beginningOffsets; + this.offsetsForTimes = offsetsForTimes; + } + + @Override + public Map<TopicPartition, Long> committedOffsets(Collection<TopicPartition> partitions) { + return committedOffsets.apply(partitions); + } + + @Override + public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) { + return endOffsets.apply(partitions); + } + + @Override + public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) { + return beginningOffsets.apply(partitions); + } + + @Override + public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes( + Map<TopicPartition, Long> timestampsToSearch) { + return offsetsForTimes.apply(timestampsToSearch); + } + } + @Test public void testTableSink() { final Map<String, String> modifiedOptions = @@ -974,6 +1195,9 @@ public class KafkaDynamicTableFactoryTest { startupMode, specificStartupOffsets, startupTimestampMillis, + BoundedMode.UNBOUNDED, + Collections.emptyMap(), + 0, false, FactoryMocks.IDENTIFIER.asSummaryString()); } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java index 975e5cd4375..515526f935e 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.utils.EncodingUtils; import org.apache.flink.test.util.SuccessException; import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -40,6 +41,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.time.Duration; +import java.time.Instant; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; @@ -179,6 +181,129 @@ public class KafkaTableITCase extends KafkaTableTestBase { deleteTestTopic(topic); } + @Test + public void testKafkaSourceSinkWithBoundedSpecificOffsets() throws Exception { + // we always use a different topic name for each parameterized topic, + // in order to make sure the topic can be created. + final String topic = "bounded_" + format + "_" + UUID.randomUUID(); + createTestTopic(topic, 1, 1); + + // ---------- Produce an event time stream into Kafka ------------------- + String groupId = getStandardProps().getProperty("group.id"); + String bootstraps = getBootstrapServers(); + + final String createTable = + String.format( + "CREATE TABLE kafka (\n" + + " `user_id` INT,\n" + + " `item_id` INT,\n" + + " `behavior` STRING\n" + + ") WITH (\n" + + " 'connector' = '%s',\n" + + " 'topic' = '%s',\n" + + " 'properties.bootstrap.servers' = '%s',\n" + + " 'properties.group.id' = '%s',\n" + + " 'scan.startup.mode' = 'earliest-offset',\n" + + " 'scan.bounded.mode' = 'specific-offsets',\n" + + " 'scan.bounded.specific-offsets' = 'partition:0,offset:2',\n" + + " %s\n" + + ")\n", + KafkaDynamicTableFactory.IDENTIFIER, + topic, + bootstraps, + groupId, + formatOptions()); + + tEnv.executeSql(createTable); + + List<Row> values = + Arrays.asList( + Row.of(1, 1102, "behavior 1"), + Row.of(2, 1103, "behavior 2"), + Row.of(3, 1104, "behavior 3")); + tEnv.fromValues(values).insertInto("kafka").execute().await(); + + // ---------- Consume stream from Kafka ------------------- + + List<Row> results = new ArrayList<>(); + try (CloseableIterator<Row> resultsItr = + tEnv.sqlQuery("SELECT * from kafka").execute().collect()) { + while (resultsItr.hasNext()) { + results.add(resultsItr.next()); + } + } + + assertThat(results) + .containsExactly(Row.of(1, 1102, "behavior 1"), Row.of(2, 1103, "behavior 2")); + + // ------------- cleanup ------------------- + + deleteTestTopic(topic); + } + + @Test + public void testKafkaSourceSinkWithBoundedTimestamp() throws Exception { + // we always use a different topic name for each parameterized topic, + // in order to make sure the topic can be created. + final String topic = "bounded_" + format + "_" + UUID.randomUUID(); + createTestTopic(topic, 1, 1); + + // ---------- Produce an event time stream into Kafka ------------------- + String groupId = getStandardProps().getProperty("group.id"); + String bootstraps = getBootstrapServers(); + + final String createTable = + String.format( + "CREATE TABLE kafka (\n" + + " `user_id` INT,\n" + + " `item_id` INT,\n" + + " `behavior` STRING,\n" + + " `event_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'" + + ") WITH (\n" + + " 'connector' = '%s',\n" + + " 'topic' = '%s',\n" + + " 'properties.bootstrap.servers' = '%s',\n" + + " 'properties.group.id' = '%s',\n" + + " 'scan.startup.mode' = 'earliest-offset',\n" + + " 'scan.bounded.mode' = 'timestamp',\n" + + " 'scan.bounded.timestamp-millis' = '5',\n" + + " %s\n" + + ")\n", + KafkaDynamicTableFactory.IDENTIFIER, + topic, + bootstraps, + groupId, + formatOptions()); + + tEnv.executeSql(createTable); + + List<Row> values = + Arrays.asList( + Row.of(1, 1102, "behavior 1", Instant.ofEpochMilli(0L)), + Row.of(2, 1103, "behavior 2", Instant.ofEpochMilli(3L)), + Row.of(3, 1104, "behavior 3", Instant.ofEpochMilli(7L))); + tEnv.fromValues(values).insertInto("kafka").execute().await(); + + // ---------- Consume stream from Kafka ------------------- + + List<Row> results = new ArrayList<>(); + try (CloseableIterator<Row> resultsItr = + tEnv.sqlQuery("SELECT * from kafka").execute().collect()) { + while (resultsItr.hasNext()) { + results.add(resultsItr.next()); + } + } + + assertThat(results) + .containsExactly( + Row.of(1, 1102, "behavior 1", Instant.ofEpochMilli(0L)), + Row.of(2, 1103, "behavior 2", Instant.ofEpochMilli(3L))); + + // ------------- cleanup ------------------- + + deleteTestTopic(topic); + } + @Test public void testKafkaTableWithMultipleTopics() throws Exception { // ---------- create source and sink tables ------------------- diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java index 79884ad0929..5caaaa0acbc 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java @@ -34,6 +34,7 @@ import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory; import org.apache.flink.table.api.DataTypes; @@ -609,6 +610,9 @@ public class UpsertKafkaDynamicTableFactoryTest extends TestLogger { StartupMode.EARLIEST, Collections.emptyMap(), 0, + BoundedMode.UNBOUNDED, + Collections.emptyMap(), + 0, true, FactoryMocks.IDENTIFIER.asSummaryString()); }