This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new d0b5cca3 [FLINK-38920] create Table source and factory for
DynamicKafkaSource in Flink Table API (#212)
d0b5cca3 is described below
commit d0b5cca3e8a1cc682568530a5fe2b397b5aba3d5
Author: bowenli86 <[email protected]>
AuthorDate: Tue Jan 20 12:58:57 2026 -0800
[FLINK-38920] create Table source and factory for DynamicKafkaSource in
Flink Table API (#212)
---
.../docs/connectors/table/dynamic-kafka.md | 151 +++++
.../content/docs/connectors/table/dynamic-kafka.md | 156 +++++
.../kafka/table/DynamicKafkaConnectorOptions.java | 63 ++
.../kafka/table/DynamicKafkaTableFactory.java | 515 +++++++++++++++
.../kafka/table/DynamicKafkaTableSource.java | 696 +++++++++++++++++++++
.../org.apache.flink.table.factories.Factory | 1 +
.../kafka/table/DynamicKafkaTableFactoryTest.java | 104 +++
7 files changed, 1686 insertions(+)
diff --git a/docs/content.zh/docs/connectors/table/dynamic-kafka.md
b/docs/content.zh/docs/connectors/table/dynamic-kafka.md
new file mode 100644
index 00000000..39ba2871
--- /dev/null
+++ b/docs/content.zh/docs/connectors/table/dynamic-kafka.md
@@ -0,0 +1,151 @@
+---
+title: Dynamic Kafka
+weight: 5
+type: docs
+aliases:
+ - /dev/table/connectors/dynamic-kafka.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Dynamic Kafka SQL 连接器
+
+{{< label "Scan Source: Unbounded" >}}
+
+Dynamic Kafka 连接器允许从可跨集群变更的 Kafka 主题读取数据,无需重启作业。
+流的解析由 Kafka 元数据服务完成,尤其适用于集群迁移和动态主题/集群变更场景。
+
+依赖
+----
+
+{{< sql_connector_download_table "kafka" >}}
+
+Kafka 连接器不包含在二进制发行版中。
+请参考 [这里]({{< ref "docs/dev/configuration/overview" >}}) 了解如何在集群执行时引入。
+
+如何创建 Dynamic Kafka 表
+------------------------
+
+下面示例使用内置的 `single-cluster` 元数据服务创建 Dynamic Kafka 表。
+在该模式下,stream id 会被解释为单集群中的 Kafka 主题。
+
+```sql
+CREATE TABLE DynamicKafkaTable (
+ `user_id` BIGINT,
+ `item_id` BIGINT,
+ `behavior` STRING,
+ `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
+) WITH (
+ 'connector' = 'dynamic-kafka',
+ 'stream-ids' = 'user_behavior;user_behavior_v2',
+ 'metadata-service' = 'single-cluster',
+ 'metadata-service.cluster-id' = 'cluster-0',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'properties.group.id' = 'testGroup',
+ 'scan.startup.mode' = 'earliest-offset',
+ 'format' = 'csv'
+)
+```
+
+该连接器也支持自定义元数据服务,通过 `metadata-service` 指定。服务类需实现
+`KafkaMetadataService`,并且需要提供无参构造函数或接收 `Properties` 参数的构造函数。
+连接器会将所有 `properties.*` 选项作为 Kafka 属性传入构造函数。
+
+可用元数据
+--------
+
+Dynamic Kafka 连接器暴露的元数据列与 Kafka 连接器一致。
+请参考 [Kafka SQL 连接器]({{< ref "docs/connectors/table/kafka"
>}}#available-metadata)。
+
+连接器参数
+---------
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">参数</th>
+ <th class="text-center" style="width: 8%">必需</th>
+ <th class="text-center" style="width: 8%">转发</th>
+ <th class="text-center" style="width: 7%">默认值</th>
+ <th class="text-center" style="width: 10%">类型</th>
+ <th class="text-center" style="width: 42%">说明</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>connector</h5></td>
+ <td>必需</td>
+ <td>否</td>
+ <td style="word-wrap: break-word;">(无)</td>
+ <td>String</td>
+ <td>指定要使用的连接器,Dynamic Kafka 使用 <code>'dynamic-kafka'</code>。</td>
+ </tr>
+ <tr>
+ <td><h5>stream-ids</h5></td>
+ <td>可选</td>
+ <td>否</td>
+ <td style="word-wrap: break-word;">(无)</td>
+ <td>String</td>
+ <td>以分号分隔的 stream id 列表。<code>stream-ids</code> 与
<code>stream-pattern</code> 只能设置其一。</td>
+ </tr>
+ <tr>
+ <td><h5>stream-pattern</h5></td>
+ <td>可选</td>
+ <td>否</td>
+ <td style="word-wrap: break-word;">(无)</td>
+ <td>String</td>
+ <td>stream id 的正则表达式。<code>stream-ids</code> 与
<code>stream-pattern</code> 只能设置其一。</td>
+ </tr>
+ <tr>
+ <td><h5>metadata-service</h5></td>
+ <td>必需</td>
+ <td>否</td>
+ <td style="word-wrap: break-word;">(无)</td>
+ <td>String</td>
+ <td>元数据服务标识。可使用 <code>'single-cluster'</code> 或实现
<code>KafkaMetadataService</code> 的全限定类名。</td>
+ </tr>
+ <tr>
+ <td><h5>metadata-service.cluster-id</h5></td>
+ <td>可选</td>
+ <td>否</td>
+ <td style="word-wrap: break-word;">(无)</td>
+ <td>String</td>
+ <td><code>single-cluster</code> 元数据服务所需的集群 ID。</td>
+ </tr>
+ <tr>
+ <td><h5>stream-metadata-discovery-interval-ms</h5></td>
+ <td>可选</td>
+ <td>否</td>
+ <td style="word-wrap: break-word;">-1</td>
+ <td>Long</td>
+ <td>发现 stream 元数据变更的周期(毫秒)。非正值表示关闭发现。</td>
+ </tr>
+ <tr>
+ <td><h5>stream-metadata-discovery-failure-threshold</h5></td>
+ <td>可选</td>
+ <td>否</td>
+ <td style="word-wrap: break-word;">1</td>
+ <td>Integer</td>
+ <td>连续发现失败次数阈值,超过将触发作业失败。</td>
+ </tr>
+ </tbody>
+</table>
+
+Dynamic Kafka 连接器同样支持 Kafka 连接器的格式参数与 Kafka 客户端参数。
+完整列表请参考 [Kafka SQL 连接器]({{< ref "docs/connectors/table/kafka"
>}}#connector-options)。
diff --git a/docs/content/docs/connectors/table/dynamic-kafka.md
b/docs/content/docs/connectors/table/dynamic-kafka.md
new file mode 100644
index 00000000..52c7b8c6
--- /dev/null
+++ b/docs/content/docs/connectors/table/dynamic-kafka.md
@@ -0,0 +1,156 @@
+---
+title: Dynamic Kafka
+weight: 5
+type: docs
+aliases:
+ - /dev/table/connectors/dynamic-kafka.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Dynamic Kafka SQL Connector
+
+{{< label "Scan Source: Unbounded" >}}
+
+The Dynamic Kafka connector allows for reading data from Kafka topics that can
move across
+clusters without restarting the job. Streams are resolved via a Kafka metadata
service. This is
+especially useful for cluster migrations and dynamic topic/cluster changes.
+
+Dependencies
+------------
+
+{{< sql_connector_download_table "kafka" >}}
+
+The Kafka connector is not part of the binary distribution.
+See how to link with it for cluster execution [here]({{< ref
"docs/dev/configuration/overview" >}}).
+
+How to create a Dynamic Kafka table
+-----------------------------------
+
+The example below shows how to create a Dynamic Kafka table using the built-in
+`single-cluster` metadata service. With this service, stream ids are
interpreted as topics
+in a single Kafka cluster.
+
+```sql
+CREATE TABLE DynamicKafkaTable (
+ `user_id` BIGINT,
+ `item_id` BIGINT,
+ `behavior` STRING,
+ `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
+) WITH (
+ 'connector' = 'dynamic-kafka',
+ 'stream-ids' = 'user_behavior;user_behavior_v2',
+ 'metadata-service' = 'single-cluster',
+ 'metadata-service.cluster-id' = 'cluster-0',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'properties.group.id' = 'testGroup',
+ 'scan.startup.mode' = 'earliest-offset',
+ 'format' = 'csv'
+)
+```
+
+The connector also supports a custom metadata service via the
`metadata-service` option. The
+service class must implement `KafkaMetadataService` and should either have a
public no-arg
+constructor or a constructor that accepts `Properties`. The connector will
pass the Kafka
+properties (all `properties.*` options) into the constructor when available.
+
+Available Metadata
+------------------
+
+The Dynamic Kafka connector exposes the same metadata columns as the Kafka
connector.
+See [Kafka SQL Connector]({{< ref "docs/connectors/table/kafka"
>}}#available-metadata) for
+the full list.
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-center" style="width: 8%">Required</th>
+ <th class="text-center" style="width: 8%">Forwarded</th>
+ <th class="text-center" style="width: 7%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 42%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>connector</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify what connector to use, for Dynamic Kafka use
<code>'dynamic-kafka'</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>stream-ids</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Semicolon-separated stream ids to subscribe. Only one of
<code>stream-ids</code> and <code>stream-pattern</code> can be set.</td>
+ </tr>
+ <tr>
+ <td><h5>stream-pattern</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Regex pattern for stream ids to subscribe. Only one of
<code>stream-ids</code> and <code>stream-pattern</code> can be set.</td>
+ </tr>
+ <tr>
+ <td><h5>metadata-service</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Metadata service identifier. Use <code>'single-cluster'</code> or a
fully qualified class name implementing <code>KafkaMetadataService</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>metadata-service.cluster-id</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Cluster id required by <code>single-cluster</code> metadata
service.</td>
+ </tr>
+ <tr>
+ <td><h5>stream-metadata-discovery-interval-ms</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">-1</td>
+ <td>Long</td>
+ <td>Interval in milliseconds for discovering stream metadata changes. A
non-positive value disables discovery.</td>
+ </tr>
+ <tr>
+ <td><h5>stream-metadata-discovery-failure-threshold</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">1</td>
+ <td>Integer</td>
+ <td>Number of consecutive discovery failures before failing the job.</td>
+ </tr>
+ </tbody>
+</table>
+
+The connector also supports the same format options and Kafka client
properties as the Kafka
+connector. See [Kafka SQL Connector]({{< ref "docs/connectors/table/kafka"
>}}#connector-options)
+for the full list of format options and Kafka properties (all
<code>properties.*</code> options).
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaConnectorOptions.java
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaConnectorOptions.java
new file mode 100644
index 00000000..b182487f
--- /dev/null
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaConnectorOptions.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import java.util.List;
+
+/** Options for the Dynamic Kafka table connector. */
+@PublicEvolving
+public class DynamicKafkaConnectorOptions {
+
+ public static final ConfigOption<List<String>> STREAM_IDS =
+ ConfigOptions.key("stream-ids")
+ .stringType()
+ .asList()
+ .noDefaultValue()
+ .withDescription(
+ "Stream id(s) to read data from when the table is
used as source. "
+ + "Only one of 'stream-ids' and
'stream-pattern' can be set.");
+
+ public static final ConfigOption<String> STREAM_PATTERN =
+ ConfigOptions.key("stream-pattern")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Regular expression pattern for stream ids to read
data from. "
+ + "Only one of 'stream-ids' and
'stream-pattern' can be set.");
+
+ public static final ConfigOption<String> METADATA_SERVICE =
+ ConfigOptions.key("metadata-service")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Metadata service implementation identifier. Use
'single-cluster' or "
+ + "a fully qualified class name
implementing KafkaMetadataService.");
+
+ public static final ConfigOption<String> METADATA_SERVICE_CLUSTER_ID =
+ ConfigOptions.key("metadata-service.cluster-id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Kafka cluster id for the
'single-cluster' metadata service.");
+
+ private DynamicKafkaConnectorOptions() {}
+}
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactory.java
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactory.java
new file mode 100644
index 00000000..fea44744
--- /dev/null
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactory.java
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
+import
org.apache.flink.connector.kafka.dynamic.metadata.SingleClusterTopicMetadataService;
+import
org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceOptions;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode;
+import
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import static
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.METADATA_SERVICE;
+import static
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.METADATA_SERVICE_CLUSTER_ID;
+import static
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.STREAM_IDS;
+import static
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.STREAM_PATTERN;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_GROUP_ID;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_PARALLELISM;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_TOPIC_PARTITION_DISCOVERY;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getKafkaProperties;
+
+/** Factory for creating configured instances of {@link
DynamicKafkaTableSource}. */
+@Internal
+public class DynamicKafkaTableFactory implements DynamicTableSourceFactory {
+
+ public static final String IDENTIFIER = "dynamic-kafka";
+ private static final String METADATA_SERVICE_SINGLE_CLUSTER =
"single-cluster";
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(METADATA_SERVICE);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(FactoryUtil.FORMAT);
+ options.add(KEY_FORMAT);
+ options.add(KEY_FIELDS);
+ options.add(KEY_FIELDS_PREFIX);
+ options.add(VALUE_FORMAT);
+ options.add(VALUE_FIELDS_INCLUDE);
+ options.add(STREAM_IDS);
+ options.add(STREAM_PATTERN);
+ options.add(METADATA_SERVICE_CLUSTER_ID);
+ options.add(PROPS_GROUP_ID);
+ options.add(SCAN_STARTUP_MODE);
+ options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
+ options.add(SCAN_BOUNDED_MODE);
+ options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS);
+ options.add(SCAN_TOPIC_PARTITION_DISCOVERY);
+ options.add(SCAN_PARALLELISM);
+
options.add(DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS);
+
options.add(DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD);
+ return options;
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ final TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
+
+ final Optional<DecodingFormat<DeserializationSchema<RowData>>>
keyDecodingFormat =
+ getKeyDecodingFormat(helper);
+
+ final DecodingFormat<DeserializationSchema<RowData>>
valueDecodingFormat =
+ getValueDecodingFormat(helper);
+
+ helper.validateExcept(PROPERTIES_PREFIX);
+
+ final ReadableConfig tableOptions = helper.getOptions();
+
+ validateTableSourceOptions(tableOptions);
+
+ validatePKConstraints(
+ context.getObjectIdentifier(),
+ context.getPrimaryKeyIndexes(),
+ context.getCatalogTable().getOptions(),
+ valueDecodingFormat);
+
+ final StartupOptions startupOptions = getStartupOptions(tableOptions);
+ final BoundedOptions boundedOptions = getBoundedOptions(tableOptions);
+
+ final Properties properties =
getKafkaProperties(context.getCatalogTable().getOptions());
+
+ // add topic-partition discovery
+ final Duration partitionDiscoveryInterval =
+ tableOptions.get(SCAN_TOPIC_PARTITION_DISCOVERY);
+ properties.setProperty(
+ KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
+ Long.toString(partitionDiscoveryInterval.toMillis()));
+
+ applyDynamicDiscoveryOptions(tableOptions, properties);
+
+ final KafkaMetadataService kafkaMetadataService =
+ createMetadataService(tableOptions, properties,
context.getClassLoader());
+
+ final DataType physicalDataType = context.getPhysicalRowDataType();
+
+ final int[] keyProjection = createKeyFormatProjection(tableOptions,
physicalDataType);
+
+ final int[] valueProjection =
createValueFormatProjection(tableOptions, physicalDataType);
+
+ final String keyPrefix =
tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
+
+ final Integer parallelism =
tableOptions.getOptional(SCAN_PARALLELISM).orElse(null);
+
+ return new DynamicKafkaTableSource(
+ physicalDataType,
+ keyDecodingFormat.orElse(null),
+ valueDecodingFormat,
+ keyProjection,
+ valueProjection,
+ keyPrefix,
+ getStreamIds(tableOptions),
+ getStreamPattern(tableOptions),
+ kafkaMetadataService,
+ properties,
+ startupOptions.startupMode,
+ startupOptions.specificOffsets,
+ startupOptions.startupTimestampMillis,
+ boundedOptions.boundedMode,
+ boundedOptions.specificOffsets,
+ boundedOptions.boundedTimestampMillis,
+ false,
+ context.getObjectIdentifier().asSummaryString(),
+ parallelism);
+ }
+
+ private static void applyDynamicDiscoveryOptions(
+ ReadableConfig tableOptions, Properties properties) {
+ tableOptions
+
.getOptional(DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS)
+ .ifPresent(
+ value ->
+ properties.setProperty(
+ DynamicKafkaSourceOptions
+
.STREAM_METADATA_DISCOVERY_INTERVAL_MS
+ .key(),
+ Long.toString(value)));
+ tableOptions
+
.getOptional(DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD)
+ .ifPresent(
+ value ->
+ properties.setProperty(
+ DynamicKafkaSourceOptions
+
.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD
+ .key(),
+ Integer.toString(value)));
+ }
+
+ private static Optional<DecodingFormat<DeserializationSchema<RowData>>>
getKeyDecodingFormat(
+ TableFactoryHelper helper) {
+ final Optional<DecodingFormat<DeserializationSchema<RowData>>>
keyDecodingFormat =
+ helper.discoverOptionalDecodingFormat(
+ DeserializationFormatFactory.class, KEY_FORMAT);
+ keyDecodingFormat.ifPresent(
+ format -> {
+ if
(!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+ throw new ValidationException(
+ String.format(
+ "A key format should only deal with
INSERT-only records. "
+ + "But %s has a changelog mode
of %s.",
+ helper.getOptions().get(KEY_FORMAT),
+ format.getChangelogMode()));
+ }
+ });
+ return keyDecodingFormat;
+ }
+
+ private static DecodingFormat<DeserializationSchema<RowData>>
getValueDecodingFormat(
+ TableFactoryHelper helper) {
+ return helper.discoverOptionalDecodingFormat(
+ DeserializationFormatFactory.class, FactoryUtil.FORMAT)
+ .orElseGet(
+ () ->
+ helper.discoverDecodingFormat(
+ DeserializationFormatFactory.class,
VALUE_FORMAT));
+ }
+
+ private static void validatePKConstraints(
+ ObjectIdentifier tableName,
+ int[] primaryKeyIndexes,
+ Map<String, String> options,
+ Format format) {
+ if (primaryKeyIndexes.length > 0
+ && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+ Configuration configuration = Configuration.fromMap(options);
+ String formatName =
+ configuration
+ .getOptional(FactoryUtil.FORMAT)
+ .orElse(configuration.get(VALUE_FORMAT));
+ throw new ValidationException(
+ String.format(
+ "The Dynamic Kafka table '%s' with '%s' format
doesn't support defining PRIMARY KEY constraint"
+ + " on the table, because it can't
guarantee the semantic of primary key.",
+ tableName.asSummaryString(), formatName));
+ }
+ }
+
+ private static void validateTableSourceOptions(ReadableConfig
tableOptions) {
+ validateStreams(tableOptions);
+ validateMetadataService(tableOptions);
+ validateScanStartupMode(tableOptions);
+ validateScanBoundedMode(tableOptions);
+ }
+
+ private static void validateStreams(ReadableConfig tableOptions) {
+ Optional<List<String>> streamIds =
tableOptions.getOptional(STREAM_IDS);
+ Optional<String> pattern = tableOptions.getOptional(STREAM_PATTERN);
+
+ if (streamIds.isPresent() && pattern.isPresent()) {
+ throw new ValidationException(
+ "Option 'stream-ids' and 'stream-pattern' shouldn't be set
together.");
+ }
+
+ if (!streamIds.isPresent() && !pattern.isPresent()) {
+ throw new ValidationException("Either 'stream-ids' or
'stream-pattern' must be set.");
+ }
+
+ streamIds.ifPresent(
+ ids -> {
+ if (ids.isEmpty()) {
+ throw new ValidationException("Option 'stream-ids'
cannot be empty.");
+ }
+ });
+
+ pattern.ifPresent(
+ value -> {
+ try {
+ Pattern.compile(value);
+ } catch (PatternSyntaxException e) {
+ throw new ValidationException(
+ "Option 'stream-pattern' contains an invalid
regular expression.",
+ e);
+ }
+ });
+ }
+
+ private static void validateMetadataService(ReadableConfig tableOptions) {
+ Optional<String> metadataService =
tableOptions.getOptional(METADATA_SERVICE);
+ if (!metadataService.isPresent()) {
+ throw new ValidationException("Option 'metadata-service' must be
set.");
+ }
+ if (isSingleClusterMetadataService(metadataService.get())) {
+ if
(!tableOptions.getOptional(METADATA_SERVICE_CLUSTER_ID).isPresent()) {
+ throw new ValidationException(
+ "Option 'metadata-service.cluster-id' is required for
'single-cluster' metadata service.");
+ }
+ if
(!tableOptions.getOptional(PROPS_BOOTSTRAP_SERVERS).isPresent()) {
+ throw new ValidationException(
+ "Option 'properties.bootstrap.servers' is required for
'single-cluster' metadata service.");
+ }
+ }
+ }
+
+ private static void validateScanStartupMode(ReadableConfig tableOptions) {
+ tableOptions
+ .getOptional(SCAN_STARTUP_MODE)
+ .ifPresent(
+ mode -> {
+ switch (mode) {
+ case TIMESTAMP:
+ if (!tableOptions
+
.getOptional(SCAN_STARTUP_TIMESTAMP_MILLIS)
+ .isPresent()) {
+ throw new ValidationException(
+ String.format(
+ "'%s' is required in
'%s' startup mode"
+ + " but
missing.",
+
SCAN_STARTUP_TIMESTAMP_MILLIS.key(),
+
ScanStartupMode.TIMESTAMP));
+ }
+ break;
+ case SPECIFIC_OFFSETS:
+ throw new ValidationException(
+ "Dynamic Kafka source does not
support 'specific-offsets' startup mode.");
+ }
+ });
+ }
+
+ private static void validateScanBoundedMode(ReadableConfig tableOptions) {
+ tableOptions
+ .getOptional(SCAN_BOUNDED_MODE)
+ .ifPresent(
+ mode -> {
+ switch (mode) {
+ case TIMESTAMP:
+ if (!tableOptions
+
.getOptional(SCAN_BOUNDED_TIMESTAMP_MILLIS)
+ .isPresent()) {
+ throw new ValidationException(
+ String.format(
+ "'%s' is required in
'%s' bounded mode"
+ + " but
missing.",
+
SCAN_BOUNDED_TIMESTAMP_MILLIS.key(),
+
ScanBoundedMode.TIMESTAMP));
+ }
+ break;
+ case SPECIFIC_OFFSETS:
+ throw new ValidationException(
+ "Dynamic Kafka source does not
support 'specific-offsets' bounded mode.");
+ }
+ });
+ }
+
+ private static List<String> getStreamIds(ReadableConfig tableOptions) {
+ return tableOptions.getOptional(STREAM_IDS).orElse(null);
+ }
+
+ private static Pattern getStreamPattern(ReadableConfig tableOptions) {
+ return
tableOptions.getOptional(STREAM_PATTERN).map(Pattern::compile).orElse(null);
+ }
+
+ private static StartupOptions getStartupOptions(ReadableConfig
tableOptions) {
+ final StartupOptions options = new StartupOptions();
+ final StartupMode startupMode =
+ tableOptions
+ .getOptional(SCAN_STARTUP_MODE)
+ .map(DynamicKafkaTableFactory::toStartupMode)
+ .orElse(StartupMode.GROUP_OFFSETS);
+ options.startupMode = startupMode;
+ options.specificOffsets = Collections.emptyMap();
+ if (startupMode == StartupMode.TIMESTAMP) {
+ options.startupTimestampMillis =
tableOptions.get(SCAN_STARTUP_TIMESTAMP_MILLIS);
+ }
+ return options;
+ }
+
+ private static BoundedOptions getBoundedOptions(ReadableConfig
tableOptions) {
+ final BoundedOptions options = new BoundedOptions();
+ final BoundedMode boundedMode =
toBoundedMode(tableOptions.get(SCAN_BOUNDED_MODE));
+ options.boundedMode = boundedMode;
+ options.specificOffsets = Collections.emptyMap();
+ if (boundedMode == BoundedMode.TIMESTAMP) {
+ options.boundedTimestampMillis =
tableOptions.get(SCAN_BOUNDED_TIMESTAMP_MILLIS);
+ }
+ return options;
+ }
+
+ private static StartupMode toStartupMode(ScanStartupMode scanStartupMode) {
+ switch (scanStartupMode) {
+ case EARLIEST_OFFSET:
+ return StartupMode.EARLIEST;
+ case LATEST_OFFSET:
+ return StartupMode.LATEST;
+ case GROUP_OFFSETS:
+ return StartupMode.GROUP_OFFSETS;
+ case SPECIFIC_OFFSETS:
+ return StartupMode.SPECIFIC_OFFSETS;
+ case TIMESTAMP:
+ return StartupMode.TIMESTAMP;
+ default:
+ throw new ValidationException(
+ "Unsupported startup mode. Validator should have
checked that.");
+ }
+ }
+
+ private static BoundedMode toBoundedMode(ScanBoundedMode scanBoundedMode) {
+ switch (scanBoundedMode) {
+ case UNBOUNDED:
+ return BoundedMode.UNBOUNDED;
+ case LATEST_OFFSET:
+ return BoundedMode.LATEST;
+ case GROUP_OFFSETS:
+ return BoundedMode.GROUP_OFFSETS;
+ case TIMESTAMP:
+ return BoundedMode.TIMESTAMP;
+ case SPECIFIC_OFFSETS:
+ return BoundedMode.SPECIFIC_OFFSETS;
+ default:
+ throw new ValidationException(
+ "Unsupported bounded mode. Validator should have
checked that.");
+ }
+ }
+
+ private static KafkaMetadataService createMetadataService(
+ ReadableConfig tableOptions, Properties properties, ClassLoader
classLoader) {
+ String metadataService = tableOptions.get(METADATA_SERVICE);
+ if (isSingleClusterMetadataService(metadataService)) {
+ String clusterId = tableOptions.get(METADATA_SERVICE_CLUSTER_ID);
+ return new SingleClusterTopicMetadataService(clusterId,
properties);
+ }
+
+ try {
+ Class<?> clazz = Class.forName(metadataService, true, classLoader);
+ if (!KafkaMetadataService.class.isAssignableFrom(clazz)) {
+ throw new ValidationException(
+ String.format(
+ "Metadata service class '%s' should implement
%s",
+ metadataService,
KafkaMetadataService.class.getName()));
+ }
+ KafkaMetadataService withProperties =
instantiateWithProperties(clazz, properties);
+ if (withProperties != null) {
+ return withProperties;
+ }
+ return InstantiationUtil.instantiate(
+ metadataService, KafkaMetadataService.class, classLoader);
+ } catch (ValidationException e) {
+ throw e;
+ } catch (ClassNotFoundException | FlinkException e) {
+ throw new ValidationException(
+ String.format(
+ "Could not find and instantiate metadata service
class '%s'",
+ metadataService),
+ e);
+ } catch (ReflectiveOperationException e) {
+ throw new ValidationException(
+ String.format(
+ "Could not instantiate metadata service class '%s'
with properties.",
+ metadataService),
+ e);
+ }
+ }
+
+ private static @Nullable KafkaMetadataService instantiateWithProperties(
+ Class<?> clazz, Properties properties) throws
ReflectiveOperationException {
+ try {
+ Constructor<?> constructor =
clazz.getConstructor(Properties.class);
+ return (KafkaMetadataService) constructor.newInstance(properties);
+ } catch (NoSuchMethodException e) {
+ return null;
+ }
+ }
+
+ private static boolean isSingleClusterMetadataService(String
metadataService) {
+ return METADATA_SERVICE_SINGLE_CLUSTER.equals(metadataService)
+ ||
SingleClusterTopicMetadataService.class.getName().equals(metadataService);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Inner classes
+ //
--------------------------------------------------------------------------------------------
+
+ private static class StartupOptions {
+ private StartupMode startupMode;
+ private Map<org.apache.kafka.common.TopicPartition, Long>
specificOffsets;
+ private long startupTimestampMillis;
+ }
+
+ private static class BoundedOptions {
+ private BoundedMode boundedMode;
+ private Map<org.apache.kafka.common.TopicPartition, Long>
specificOffsets;
+ private long boundedTimestampMillis;
+ }
+}
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableSource.java
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableSource.java
new file mode 100644
index 00000000..c3ea65ef
--- /dev/null
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableSource.java
@@ -0,0 +1,696 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
+import org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSource;
+import
org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceBuilder;
+import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/** A {@link ScanTableSource} for {@link DynamicKafkaSource}. */
+@Internal
+public class DynamicKafkaTableSource
+ implements ScanTableSource, SupportsReadingMetadata,
SupportsWatermarkPushDown {
+
+ private static final String KAFKA_TRANSFORMATION = "kafka";
+
+ //
--------------------------------------------------------------------------------------------
+ // Mutable attributes
+ //
--------------------------------------------------------------------------------------------
+
+ /** Data type that describes the final output of the source. */
+ protected DataType producedDataType;
+
+ /** Metadata that is appended at the end of a physical source row. */
+ protected List<String> metadataKeys;
+
+ /** Watermark strategy that is used to generate per-partition watermark. */
+ protected @Nullable WatermarkStrategy<RowData> watermarkStrategy;
+
+ //
--------------------------------------------------------------------------------------------
+ // Format attributes
+ //
--------------------------------------------------------------------------------------------
+
+ private static final String VALUE_METADATA_PREFIX = "value.";
+
+ /** Data type to configure the formats. */
+ protected final DataType physicalDataType;
+
+ /** Optional format for decoding keys from Kafka. */
+ protected final @Nullable DecodingFormat<DeserializationSchema<RowData>>
keyDecodingFormat;
+
+ /** Format for decoding values from Kafka. */
+ protected final DecodingFormat<DeserializationSchema<RowData>>
valueDecodingFormat;
+
+ /** Indices that determine the key fields and the target position in the
produced row. */
+ protected final int[] keyProjection;
+
+ /** Indices that determine the value fields and the target position in the
produced row. */
+ protected final int[] valueProjection;
+
+ /** Prefix that needs to be removed from fields when constructing the
physical data type. */
+ protected final @Nullable String keyPrefix;
+
+ //
--------------------------------------------------------------------------------------------
+ // Dynamic Kafka specific attributes
+ //
--------------------------------------------------------------------------------------------
+
+ /** The stream ids to consume. */
+ protected final @Nullable List<String> streamIds;
+
+ /** The stream pattern to consume. */
+ protected final @Nullable Pattern streamPattern;
+
+ /** Metadata service for resolving streams to topics and clusters. */
+ protected final KafkaMetadataService kafkaMetadataService;
+
+ /** Properties for the Kafka consumer. */
+ protected final Properties properties;
+
+ /**
+ * The startup mode for the contained consumer (default is {@link
StartupMode#GROUP_OFFSETS}).
+ */
+ protected final StartupMode startupMode;
+
+ /**
+ * Specific startup offsets; only relevant when startup mode is {@link
+ * StartupMode#SPECIFIC_OFFSETS}.
+ */
+ protected final Map<TopicPartition, Long> specificStartupOffsets;
+
+ /**
+ * The start timestamp to locate partition offsets; only relevant when
startup mode is {@link
+ * StartupMode#TIMESTAMP}.
+ */
+ protected final long startupTimestampMillis;
+
+ /** The bounded mode for the contained consumer (default is an unbounded
data stream). */
+ protected final BoundedMode boundedMode;
+
+ /**
+ * Specific end offsets; only relevant when bounded mode is {@link
+ * BoundedMode#SPECIFIC_OFFSETS}.
+ */
+ protected final Map<TopicPartition, Long> specificBoundedOffsets;
+
+ /**
+ * The bounded timestamp to locate partition offsets; only relevant when
bounded mode is {@link
+ * BoundedMode#TIMESTAMP}.
+ */
+ protected final long boundedTimestampMillis;
+
+ /** Flag to determine source mode. In upsert mode, it will keep the
tombstone message. * */
+ protected final boolean upsertMode;
+
+ protected final String tableIdentifier;
+
+ /** Parallelism of the physical Kafka consumer. * */
+ protected final @Nullable Integer parallelism;
+
+ public DynamicKafkaTableSource(
+ DataType physicalDataType,
+ @Nullable DecodingFormat<DeserializationSchema<RowData>>
keyDecodingFormat,
+ DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
+ int[] keyProjection,
+ int[] valueProjection,
+ @Nullable String keyPrefix,
+ @Nullable List<String> streamIds,
+ @Nullable Pattern streamPattern,
+ KafkaMetadataService kafkaMetadataService,
+ Properties properties,
+ StartupMode startupMode,
+ Map<TopicPartition, Long> specificStartupOffsets,
+ long startupTimestampMillis,
+ BoundedMode boundedMode,
+ Map<TopicPartition, Long> specificBoundedOffsets,
+ long boundedTimestampMillis,
+ boolean upsertMode,
+ String tableIdentifier,
+ @Nullable Integer parallelism) {
+ // Format attributes
+ this.physicalDataType =
+ Preconditions.checkNotNull(
+ physicalDataType, "Physical data type must not be
null.");
+ this.keyDecodingFormat = keyDecodingFormat;
+ this.valueDecodingFormat =
+ Preconditions.checkNotNull(
+ valueDecodingFormat, "Value decoding format must not
be null.");
+ this.keyProjection =
+ Preconditions.checkNotNull(keyProjection, "Key projection must
not be null.");
+ this.valueProjection =
+ Preconditions.checkNotNull(valueProjection, "Value projection
must not be null.");
+ this.keyPrefix = keyPrefix;
+ // Mutable attributes
+ this.producedDataType = physicalDataType;
+ this.metadataKeys = Collections.emptyList();
+ this.watermarkStrategy = null;
+ // Dynamic Kafka specific attributes
+ Preconditions.checkArgument(
+ (streamIds != null && streamPattern == null)
+ || (streamIds == null && streamPattern != null),
+ "Either stream ids or stream pattern must be set for source.");
+ this.streamIds = streamIds;
+ this.streamPattern = streamPattern;
+ this.kafkaMetadataService =
+ Preconditions.checkNotNull(
+ kafkaMetadataService, "Kafka metadata service must not
be null.");
+ this.properties = Preconditions.checkNotNull(properties, "Properties
must not be null.");
+ this.startupMode =
+ Preconditions.checkNotNull(startupMode, "Startup mode must not
be null.");
+ this.specificStartupOffsets =
+ Preconditions.checkNotNull(
+ specificStartupOffsets, "Specific offsets must not be
null.");
+ this.startupTimestampMillis = startupTimestampMillis;
+ this.boundedMode =
+ Preconditions.checkNotNull(boundedMode, "Bounded mode must not
be null.");
+ this.specificBoundedOffsets =
+ Preconditions.checkNotNull(
+ specificBoundedOffsets, "Specific bounded offsets must
not be null.");
+ this.boundedTimestampMillis = boundedTimestampMillis;
+ this.upsertMode = upsertMode;
+ this.tableIdentifier = tableIdentifier;
+ this.parallelism = parallelism;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return valueDecodingFormat.getChangelogMode();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
+ final DeserializationSchema<RowData> keyDeserialization =
+ createDeserialization(context, keyDecodingFormat,
keyProjection, keyPrefix);
+
+ final DeserializationSchema<RowData> valueDeserialization =
+ createDeserialization(context, valueDecodingFormat,
valueProjection, null);
+
+ final TypeInformation<RowData> producedTypeInfo =
+ context.createTypeInformation(producedDataType);
+
+ final DynamicKafkaSource<RowData> kafkaSource =
+ createDynamicKafkaSource(
+ keyDeserialization, valueDeserialization,
producedTypeInfo);
+
+ return new DataStreamScanProvider() {
+ @Override
+ public DataStream<RowData> produceDataStream(
+ ProviderContext providerContext,
StreamExecutionEnvironment execEnv) {
+ if (watermarkStrategy == null) {
+ watermarkStrategy = WatermarkStrategy.noWatermarks();
+ }
+ DataStreamSource<RowData> sourceStream =
+ execEnv.fromSource(
+ kafkaSource,
+ watermarkStrategy,
+ "DynamicKafkaSource-" + tableIdentifier);
+
providerContext.generateUid(KAFKA_TRANSFORMATION).ifPresent(sourceStream::uid);
+ return sourceStream;
+ }
+
+ @Override
+ public boolean isBounded() {
+ return kafkaSource.getBoundedness() == Boundedness.BOUNDED;
+ }
+
+ @Override
+ public Optional<Integer> getParallelism() {
+ return Optional.ofNullable(parallelism);
+ }
+ };
+ }
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+
+ // according to convention, the order of the final row must be
+ // PHYSICAL + FORMAT METADATA + CONNECTOR METADATA
+ // where the format metadata has highest precedence
+
+ // add value format metadata with prefix
+ valueDecodingFormat
+ .listReadableMetadata()
+ .forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX
+ key, value));
+
+ // add connector metadata
+ Stream.of(ReadableMetadata.values())
+ .forEachOrdered(m -> metadataMap.putIfAbsent(m.key,
m.dataType));
+
+ return metadataMap;
+ }
+
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {
+ // separate connector and format metadata
+ final List<String> formatMetadataKeys =
+ metadataKeys.stream()
+ .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
+ .collect(Collectors.toList());
+ final List<String> connectorMetadataKeys = new
ArrayList<>(metadataKeys);
+ connectorMetadataKeys.removeAll(formatMetadataKeys);
+
+ // push down format metadata
+ final Map<String, DataType> formatMetadata =
valueDecodingFormat.listReadableMetadata();
+ if (formatMetadata.size() > 0) {
+ final List<String> requestedFormatMetadataKeys =
+ formatMetadataKeys.stream()
+ .map(k ->
k.substring(VALUE_METADATA_PREFIX.length()))
+ .collect(Collectors.toList());
+
valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+ }
+
+ this.metadataKeys = connectorMetadataKeys;
+ this.producedDataType = producedDataType;
+ }
+
+ @Override
+ public boolean supportsMetadataProjection() {
+ return false;
+ }
+
+ @Override
+ public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+ this.watermarkStrategy = watermarkStrategy;
+ }
+
+ @Override
+ public DynamicTableSource copy() {
+ final DynamicKafkaTableSource copy =
+ new DynamicKafkaTableSource(
+ physicalDataType,
+ keyDecodingFormat,
+ valueDecodingFormat,
+ keyProjection,
+ valueProjection,
+ keyPrefix,
+ streamIds,
+ streamPattern,
+ kafkaMetadataService,
+ properties,
+ startupMode,
+ specificStartupOffsets,
+ startupTimestampMillis,
+ boundedMode,
+ specificBoundedOffsets,
+ boundedTimestampMillis,
+ upsertMode,
+ tableIdentifier,
+ parallelism);
+ copy.producedDataType = producedDataType;
+ copy.metadataKeys = metadataKeys;
+ copy.watermarkStrategy = watermarkStrategy;
+ return copy;
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "Dynamic Kafka table source";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final DynamicKafkaTableSource that = (DynamicKafkaTableSource) o;
+ return Objects.equals(producedDataType, that.producedDataType)
+ && Objects.equals(metadataKeys, that.metadataKeys)
+ && Objects.equals(physicalDataType, that.physicalDataType)
+ && Objects.equals(keyDecodingFormat, that.keyDecodingFormat)
+ && Objects.equals(valueDecodingFormat,
that.valueDecodingFormat)
+ && Arrays.equals(keyProjection, that.keyProjection)
+ && Arrays.equals(valueProjection, that.valueProjection)
+ && Objects.equals(keyPrefix, that.keyPrefix)
+ && Objects.equals(streamIds, that.streamIds)
+ && Objects.equals(String.valueOf(streamPattern),
String.valueOf(that.streamPattern))
+ && Objects.equals(kafkaMetadataService,
that.kafkaMetadataService)
+ && Objects.equals(properties, that.properties)
+ && startupMode == that.startupMode
+ && Objects.equals(specificStartupOffsets,
that.specificStartupOffsets)
+ && startupTimestampMillis == that.startupTimestampMillis
+ && boundedMode == that.boundedMode
+ && Objects.equals(specificBoundedOffsets,
that.specificBoundedOffsets)
+ && boundedTimestampMillis == that.boundedTimestampMillis
+ && Objects.equals(upsertMode, that.upsertMode)
+ && Objects.equals(tableIdentifier, that.tableIdentifier)
+ && Objects.equals(watermarkStrategy, that.watermarkStrategy)
+ && Objects.equals(parallelism, that.parallelism);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ producedDataType,
+ metadataKeys,
+ physicalDataType,
+ keyDecodingFormat,
+ valueDecodingFormat,
+ Arrays.hashCode(keyProjection),
+ Arrays.hashCode(valueProjection),
+ keyPrefix,
+ streamIds,
+ streamPattern,
+ kafkaMetadataService,
+ properties,
+ startupMode,
+ specificStartupOffsets,
+ startupTimestampMillis,
+ boundedMode,
+ specificBoundedOffsets,
+ boundedTimestampMillis,
+ upsertMode,
+ tableIdentifier,
+ watermarkStrategy,
+ parallelism);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+
+ protected DynamicKafkaSource<RowData> createDynamicKafkaSource(
+ DeserializationSchema<RowData> keyDeserialization,
+ DeserializationSchema<RowData> valueDeserialization,
+ TypeInformation<RowData> producedTypeInfo) {
+
+ final KafkaRecordDeserializationSchema<RowData> kafkaDeserializer =
+ createKafkaDeserializationSchema(
+ keyDeserialization, valueDeserialization,
producedTypeInfo);
+
+ final DynamicKafkaSourceBuilder<RowData> dynamicKafkaSourceBuilder =
+ DynamicKafkaSource.builder();
+
+ if (streamIds != null) {
+ dynamicKafkaSourceBuilder.setStreamIds(new HashSet<>(streamIds));
+ } else {
+ dynamicKafkaSourceBuilder.setStreamPattern(streamPattern);
+ }
+
+ dynamicKafkaSourceBuilder
+ .setKafkaMetadataService(kafkaMetadataService)
+ .setDeserializer(kafkaDeserializer)
+ .setProperties(properties);
+
+ switch (startupMode) {
+ case EARLIEST:
+
dynamicKafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.earliest());
+ break;
+ case LATEST:
+
dynamicKafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.latest());
+ break;
+ case GROUP_OFFSETS:
+ String offsetResetConfig =
+ properties.getProperty(
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ OffsetResetStrategy.NONE.name());
+ OffsetResetStrategy offsetResetStrategy =
getResetStrategy(offsetResetConfig);
+ dynamicKafkaSourceBuilder.setStartingOffsets(
+
OffsetsInitializer.committedOffsets(offsetResetStrategy));
+ break;
+ case SPECIFIC_OFFSETS:
+ dynamicKafkaSourceBuilder.setStartingOffsets(
+ OffsetsInitializer.offsets(specificStartupOffsets));
+ break;
+ case TIMESTAMP:
+ dynamicKafkaSourceBuilder.setStartingOffsets(
+ OffsetsInitializer.timestamp(startupTimestampMillis));
+ break;
+ }
+
+ switch (boundedMode) {
+ case UNBOUNDED:
+ break;
+ case LATEST:
+
dynamicKafkaSourceBuilder.setBounded(OffsetsInitializer.latest());
+ break;
+ case GROUP_OFFSETS:
+
dynamicKafkaSourceBuilder.setBounded(OffsetsInitializer.committedOffsets());
+ break;
+ case SPECIFIC_OFFSETS:
+ dynamicKafkaSourceBuilder.setBounded(
+ OffsetsInitializer.offsets(specificBoundedOffsets));
+ break;
+ case TIMESTAMP:
+ dynamicKafkaSourceBuilder.setBounded(
+ OffsetsInitializer.timestamp(boundedTimestampMillis));
+ break;
+ }
+
+ return dynamicKafkaSourceBuilder.build();
+ }
+
+ private OffsetResetStrategy getResetStrategy(String offsetResetConfig) {
+ return Arrays.stream(OffsetResetStrategy.values())
+ .filter(ors ->
ors.name().equals(offsetResetConfig.toUpperCase(Locale.ROOT)))
+ .findAny()
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ String.format(
+ "%s can not be set to %s.
Valid values: [%s]",
+
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ offsetResetConfig,
+
Arrays.stream(OffsetResetStrategy.values())
+ .map(Enum::name)
+
.map(String::toLowerCase)
+
.collect(Collectors.joining(",")))));
+ }
+
+ private KafkaRecordDeserializationSchema<RowData>
createKafkaDeserializationSchema(
+ DeserializationSchema<RowData> keyDeserialization,
+ DeserializationSchema<RowData> valueDeserialization,
+ TypeInformation<RowData> producedTypeInfo) {
+ final MetadataConverter[] metadataConverters =
+ metadataKeys.stream()
+ .map(
+ k ->
+ Stream.of(ReadableMetadata.values())
+ .filter(rm -> rm.key.equals(k))
+ .findFirst()
+
.orElseThrow(IllegalStateException::new))
+ .map(m -> m.converter)
+ .toArray(MetadataConverter[]::new);
+
+ // check if connector metadata is used at all
+ final boolean hasMetadata = metadataKeys.size() > 0;
+
+ // adjust physical arity with value format's metadata
+ final int adjustedPhysicalArity =
+ DataType.getFieldDataTypes(producedDataType).size() -
metadataKeys.size();
+
+ // adjust value format projection to include value format's metadata
columns at the end
+ final int[] adjustedValueProjection =
+ IntStream.concat(
+ IntStream.of(valueProjection),
+ IntStream.range(
+ keyProjection.length +
valueProjection.length,
+ adjustedPhysicalArity))
+ .toArray();
+
+ return new DynamicKafkaDeserializationSchema(
+ adjustedPhysicalArity,
+ keyDeserialization,
+ keyProjection,
+ valueDeserialization,
+ adjustedValueProjection,
+ hasMetadata,
+ metadataConverters,
+ producedTypeInfo,
+ upsertMode);
+ }
+
+ private @Nullable DeserializationSchema<RowData> createDeserialization(
+ DynamicTableSource.Context context,
+ @Nullable DecodingFormat<DeserializationSchema<RowData>> format,
+ int[] projection,
+ @Nullable String prefix) {
+ if (format == null) {
+ return null;
+ }
+ DataType physicalFormatDataType =
Projection.of(projection).project(this.physicalDataType);
+ if (prefix != null) {
+ physicalFormatDataType =
+ TableDataTypeUtils.stripRowPrefix(physicalFormatDataType,
prefix);
+ }
+ return format.createRuntimeDecoder(context, physicalFormatDataType);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Metadata handling
+ //
--------------------------------------------------------------------------------------------
+
+ enum ReadableMetadata {
+ TOPIC(
+ "topic",
+ DataTypes.STRING().notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(ConsumerRecord<?, ?> record) {
+ return StringData.fromString(record.topic());
+ }
+ }),
+
+ PARTITION(
+ "partition",
+ DataTypes.INT().notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(ConsumerRecord<?, ?> record) {
+ return record.partition();
+ }
+ }),
+
+ HEADERS(
+ "headers",
+ // key and value of the map are nullable to make handling
easier in queries
+ DataTypes.MAP(DataTypes.STRING().nullable(),
DataTypes.BYTES().nullable())
+ .notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(ConsumerRecord<?, ?> record) {
+ final Map<StringData, byte[]> map = new HashMap<>();
+ for (Header header : record.headers()) {
+ map.put(StringData.fromString(header.key()),
header.value());
+ }
+ return new GenericMapData(map);
+ }
+ }),
+
+ LEADER_EPOCH(
+ "leader-epoch",
+ DataTypes.INT().nullable(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(ConsumerRecord<?, ?> record) {
+ return record.leaderEpoch().orElse(null);
+ }
+ }),
+
+ OFFSET(
+ "offset",
+ DataTypes.BIGINT().notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(ConsumerRecord<?, ?> record) {
+ return record.offset();
+ }
+ }),
+
+ TIMESTAMP(
+ "timestamp",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(ConsumerRecord<?, ?> record) {
+ return
TimestampData.fromEpochMillis(record.timestamp());
+ }
+ }),
+
+ TIMESTAMP_TYPE(
+ "timestamp-type",
+ DataTypes.STRING().notNull(),
+ new MetadataConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object read(ConsumerRecord<?, ?> record) {
+ return
StringData.fromString(record.timestampType().toString());
+ }
+ });
+
+ final String key;
+
+ final DataType dataType;
+
+ final MetadataConverter converter;
+
+ ReadableMetadata(String key, DataType dataType, MetadataConverter
converter) {
+ this.key = key;
+ this.dataType = dataType;
+ this.converter = converter;
+ }
+ }
+}
diff --git
a/flink-connector-kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/flink-connector-kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 9b8bf8e0..f67172a6 100644
---
a/flink-connector-kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++
b/flink-connector-kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -15,3 +15,4 @@
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory
+org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaTableFactory
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactoryTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactoryTest.java
new file mode 100644
index 00000000..33f63c59
--- /dev/null
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactoryTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.TestFormatFactory;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/** Tests for {@link DynamicKafkaTableFactory}. */
+public class DynamicKafkaTableFactoryTest {
+
+ private static final ResolvedSchema SCHEMA =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("name",
DataTypes.STRING().notNull()),
+ Column.physical("count", DataTypes.INT())),
+ Collections.emptyList(),
+ null);
+
+ @Test
+ public void testTableSourceWithSingleClusterMetadataService() {
+ final DynamicTableSource actualSource =
+ createTableSource(SCHEMA, getSingleClusterSourceOptions());
+ assertThat(actualSource).isInstanceOf(DynamicKafkaTableSource.class);
+
+ final DynamicKafkaTableSource actualKafkaSource =
(DynamicKafkaTableSource) actualSource;
+ assertThat(actualKafkaSource.streamIds).containsExactly("stream-1",
"stream-2");
+ assertThat(actualKafkaSource.streamPattern).isNull();
+
assertThat(actualKafkaSource.startupMode).isEqualTo(StartupMode.EARLIEST);
+
assertThat(actualKafkaSource.boundedMode).isEqualTo(BoundedMode.UNBOUNDED);
+
assertThat(actualKafkaSource.properties.getProperty("bootstrap.servers"))
+ .isEqualTo("bootstrap-single:9092");
+
assertThat(actualKafkaSource.properties.getProperty("partition.discovery.interval.ms"))
+ .isEqualTo("1000");
+ }
+
+ @Test
+ public void testInvalidStreamOptions() {
+ final Map<String, String> options = getSingleClusterSourceOptions();
+ options.put("stream-pattern", "stream-.*");
+
+ assertThatExceptionOfType(ValidationException.class)
+ .isThrownBy(() -> createTableSource(SCHEMA, options))
+ .withMessageContaining("stream-ids")
+ .withMessageContaining("stream-pattern");
+ }
+
+ private static Map<String, String> getSingleClusterSourceOptions() {
+ Map<String, String> tableOptions = new HashMap<>();
+ // Dynamic Kafka specific options.
+ tableOptions.put("connector", DynamicKafkaTableFactory.IDENTIFIER);
+ tableOptions.put("stream-ids", "stream-1;stream-2");
+ tableOptions.put("metadata-service", "single-cluster");
+ tableOptions.put("metadata-service.cluster-id", "cluster-single");
+ tableOptions.put("properties.bootstrap.servers",
"bootstrap-single:9092");
+ tableOptions.put("properties.group.id", "dummy");
+ tableOptions.put("scan.startup.mode", "earliest-offset");
+ tableOptions.put("scan.topic-partition-discovery.interval", "1000 ms");
+ // Format options.
+ tableOptions.put("format", TestFormatFactory.IDENTIFIER);
+ final String formatDelimiterKey =
+ String.format(
+ "%s.%s", TestFormatFactory.IDENTIFIER,
TestFormatFactory.DELIMITER.key());
+ final String failOnMissingKey =
+ String.format(
+ "%s.%s",
+ TestFormatFactory.IDENTIFIER,
TestFormatFactory.FAIL_ON_MISSING.key());
+ tableOptions.put(formatDelimiterKey, ",");
+ tableOptions.put(failOnMissingKey, "true");
+ return tableOptions;
+ }
+}