This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new d0b5cca3 [FLINK-38920] create Table source and factory for 
DynamicKafkaSource in Flink Table API (#212)
d0b5cca3 is described below

commit d0b5cca3e8a1cc682568530a5fe2b397b5aba3d5
Author: bowenli86 <[email protected]>
AuthorDate: Tue Jan 20 12:58:57 2026 -0800

    [FLINK-38920] create Table source and factory for DynamicKafkaSource in 
Flink Table API (#212)
---
 .../docs/connectors/table/dynamic-kafka.md         | 151 +++++
 .../content/docs/connectors/table/dynamic-kafka.md | 156 +++++
 .../kafka/table/DynamicKafkaConnectorOptions.java  |  63 ++
 .../kafka/table/DynamicKafkaTableFactory.java      | 515 +++++++++++++++
 .../kafka/table/DynamicKafkaTableSource.java       | 696 +++++++++++++++++++++
 .../org.apache.flink.table.factories.Factory       |   1 +
 .../kafka/table/DynamicKafkaTableFactoryTest.java  | 104 +++
 7 files changed, 1686 insertions(+)

diff --git a/docs/content.zh/docs/connectors/table/dynamic-kafka.md 
b/docs/content.zh/docs/connectors/table/dynamic-kafka.md
new file mode 100644
index 00000000..39ba2871
--- /dev/null
+++ b/docs/content.zh/docs/connectors/table/dynamic-kafka.md
@@ -0,0 +1,151 @@
+---
+title: Dynamic Kafka
+weight: 5
+type: docs
+aliases:
+  - /dev/table/connectors/dynamic-kafka.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Dynamic Kafka SQL 连接器
+
+{{< label "Scan Source: Unbounded" >}}
+
+Dynamic Kafka 连接器允许从可跨集群变更的 Kafka 主题读取数据,无需重启作业。
+流的解析由 Kafka 元数据服务完成,尤其适用于集群迁移和动态主题/集群变更场景。
+
+依赖
+----
+
+{{< sql_connector_download_table "kafka" >}}
+
+Kafka 连接器不包含在二进制发行版中。
+请参考 [这里]({{< ref "docs/dev/configuration/overview" >}}) 了解如何在集群执行时引入。
+
+如何创建 Dynamic Kafka 表
+------------------------
+
+下面示例使用内置的 `single-cluster` 元数据服务创建 Dynamic Kafka 表。
+在该模式下,stream id 会被解释为单集群中的 Kafka 主题。
+
+```sql
+CREATE TABLE DynamicKafkaTable (
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `behavior` STRING,
+  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
+) WITH (
+  'connector' = 'dynamic-kafka',
+  'stream-ids' = 'user_behavior;user_behavior_v2',
+  'metadata-service' = 'single-cluster',
+  'metadata-service.cluster-id' = 'cluster-0',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'properties.group.id' = 'testGroup',
+  'scan.startup.mode' = 'earliest-offset',
+  'format' = 'csv'
+)
+```
+
+该连接器也支持自定义元数据服务,通过 `metadata-service` 指定。服务类需实现
+`KafkaMetadataService`,并且需要提供无参构造函数或接收 `Properties` 参数的构造函数。
+连接器会将所有 `properties.*` 选项作为 Kafka 属性传入构造函数。
+
+可用元数据
+--------
+
+Dynamic Kafka 连接器暴露的元数据列与 Kafka 连接器一致。
+请参考 [Kafka SQL 连接器]({{< ref "docs/connectors/table/kafka" 
>}}#available-metadata)。
+
+连接器参数
+---------
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">参数</th>
+      <th class="text-center" style="width: 8%">必需</th>
+      <th class="text-center" style="width: 8%">转发</th>
+      <th class="text-center" style="width: 7%">默认值</th>
+      <th class="text-center" style="width: 10%">类型</th>
+      <th class="text-center" style="width: 42%">说明</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>必需</td>
+      <td>否</td>
+      <td style="word-wrap: break-word;">(无)</td>
+      <td>String</td>
+      <td>指定要使用的连接器,Dynamic Kafka 使用 <code>'dynamic-kafka'</code>。</td>
+    </tr>
+    <tr>
+      <td><h5>stream-ids</h5></td>
+      <td>可选</td>
+      <td>否</td>
+      <td style="word-wrap: break-word;">(无)</td>
+      <td>String</td>
+      <td>以分号分隔的 stream id 列表。<code>stream-ids</code> 与 
<code>stream-pattern</code> 只能设置其一。</td>
+    </tr>
+    <tr>
+      <td><h5>stream-pattern</h5></td>
+      <td>可选</td>
+      <td>否</td>
+      <td style="word-wrap: break-word;">(无)</td>
+      <td>String</td>
+      <td>stream id 的正则表达式。<code>stream-ids</code> 与 
<code>stream-pattern</code> 只能设置其一。</td>
+    </tr>
+    <tr>
+      <td><h5>metadata-service</h5></td>
+      <td>必需</td>
+      <td>否</td>
+      <td style="word-wrap: break-word;">(无)</td>
+      <td>String</td>
+      <td>元数据服务标识。可使用 <code>'single-cluster'</code> 或实现 
<code>KafkaMetadataService</code> 的全限定类名。</td>
+    </tr>
+    <tr>
+      <td><h5>metadata-service.cluster-id</h5></td>
+      <td>可选</td>
+      <td>否</td>
+      <td style="word-wrap: break-word;">(无)</td>
+      <td>String</td>
+      <td><code>single-cluster</code> 元数据服务所需的集群 ID。</td>
+    </tr>
+    <tr>
+      <td><h5>stream-metadata-discovery-interval-ms</h5></td>
+      <td>可选</td>
+      <td>否</td>
+      <td style="word-wrap: break-word;">-1</td>
+      <td>Long</td>
+      <td>发现 stream 元数据变更的周期(毫秒)。非正值表示关闭发现。</td>
+    </tr>
+    <tr>
+      <td><h5>stream-metadata-discovery-failure-threshold</h5></td>
+      <td>可选</td>
+      <td>否</td>
+      <td style="word-wrap: break-word;">1</td>
+      <td>Integer</td>
+      <td>连续发现失败次数阈值,超过将触发作业失败。</td>
+    </tr>
+    </tbody>
+</table>
+
+Dynamic Kafka 连接器同样支持 Kafka 连接器的格式参数与 Kafka 客户端参数。
+完整列表请参考 [Kafka SQL 连接器]({{< ref "docs/connectors/table/kafka" 
>}}#connector-options)。
diff --git a/docs/content/docs/connectors/table/dynamic-kafka.md 
b/docs/content/docs/connectors/table/dynamic-kafka.md
new file mode 100644
index 00000000..52c7b8c6
--- /dev/null
+++ b/docs/content/docs/connectors/table/dynamic-kafka.md
@@ -0,0 +1,156 @@
+---
+title: Dynamic Kafka
+weight: 5
+type: docs
+aliases:
+  - /dev/table/connectors/dynamic-kafka.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Dynamic Kafka SQL Connector
+
+{{< label "Scan Source: Unbounded" >}}
+
+The Dynamic Kafka connector allows for reading data from Kafka topics that can 
move across
+clusters without restarting the job. Streams are resolved via a Kafka metadata 
service. This is
+especially useful for cluster migrations and dynamic topic/cluster changes.
+
+Dependencies
+------------
+
+{{< sql_connector_download_table "kafka" >}}
+
+The Kafka connector is not part of the binary distribution.
+See how to link with it for cluster execution [here]({{< ref 
"docs/dev/configuration/overview" >}}).
+
+How to create a Dynamic Kafka table
+-----------------------------------
+
+The example below shows how to create a Dynamic Kafka table using the built-in
+`single-cluster` metadata service. With this service, stream ids are 
interpreted as topics
+in a single Kafka cluster.
+
+```sql
+CREATE TABLE DynamicKafkaTable (
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `behavior` STRING,
+  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
+) WITH (
+  'connector' = 'dynamic-kafka',
+  'stream-ids' = 'user_behavior;user_behavior_v2',
+  'metadata-service' = 'single-cluster',
+  'metadata-service.cluster-id' = 'cluster-0',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'properties.group.id' = 'testGroup',
+  'scan.startup.mode' = 'earliest-offset',
+  'format' = 'csv'
+)
+```
+
+The connector also supports a custom metadata service via the 
`metadata-service` option. The
+service class must implement `KafkaMetadataService` and should either have a 
public no-arg
+constructor or a constructor that accepts `Properties`. The connector will 
pass the Kafka
+properties (all `properties.*` options) into the constructor when available.
+
+Available Metadata
+------------------
+
+The Dynamic Kafka connector exposes the same metadata columns as the Kafka 
connector.
+See [Kafka SQL Connector]({{< ref "docs/connectors/table/kafka" 
>}}#available-metadata) for
+the full list.
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Option</th>
+      <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 8%">Forwarded</th>
+      <th class="text-center" style="width: 7%">Default</th>
+      <th class="text-center" style="width: 10%">Type</th>
+      <th class="text-center" style="width: 42%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use, for Dynamic Kafka use 
<code>'dynamic-kafka'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>stream-ids</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Semicolon-separated stream ids to subscribe. Only one of 
<code>stream-ids</code> and <code>stream-pattern</code> can be set.</td>
+    </tr>
+    <tr>
+      <td><h5>stream-pattern</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Regex pattern for stream ids to subscribe. Only one of 
<code>stream-ids</code> and <code>stream-pattern</code> can be set.</td>
+    </tr>
+    <tr>
+      <td><h5>metadata-service</h5></td>
+      <td>required</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Metadata service identifier. Use <code>'single-cluster'</code> or a 
fully qualified class name implementing <code>KafkaMetadataService</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>metadata-service.cluster-id</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Cluster id required by <code>single-cluster</code> metadata 
service.</td>
+    </tr>
+    <tr>
+      <td><h5>stream-metadata-discovery-interval-ms</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">-1</td>
+      <td>Long</td>
+      <td>Interval in milliseconds for discovering stream metadata changes. A 
non-positive value disables discovery.</td>
+    </tr>
+    <tr>
+      <td><h5>stream-metadata-discovery-failure-threshold</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1</td>
+      <td>Integer</td>
+      <td>Number of consecutive discovery failures before failing the job.</td>
+    </tr>
+    </tbody>
+</table>
+
+The connector also supports the same format options and Kafka client 
properties as the Kafka
+connector. See [Kafka SQL Connector]({{< ref "docs/connectors/table/kafka" 
>}}#connector-options)
+for the full list of format options and Kafka properties (all 
<code>properties.*</code> options).
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaConnectorOptions.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaConnectorOptions.java
new file mode 100644
index 00000000..b182487f
--- /dev/null
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaConnectorOptions.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import java.util.List;
+
+/** Options for the Dynamic Kafka table connector. */
+@PublicEvolving
+public class DynamicKafkaConnectorOptions {
+
+    public static final ConfigOption<List<String>> STREAM_IDS =
+            ConfigOptions.key("stream-ids")
+                    .stringType()
+                    .asList()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Stream id(s) to read data from when the table is 
used as source. "
+                                    + "Only one of 'stream-ids' and 
'stream-pattern' can be set.");
+
+    public static final ConfigOption<String> STREAM_PATTERN =
+            ConfigOptions.key("stream-pattern")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Regular expression pattern for stream ids to read 
data from. "
+                                    + "Only one of 'stream-ids' and 
'stream-pattern' can be set.");
+
+    public static final ConfigOption<String> METADATA_SERVICE =
+            ConfigOptions.key("metadata-service")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Metadata service implementation identifier. Use 
'single-cluster' or "
+                                    + "a fully qualified class name 
implementing KafkaMetadataService.");
+
+    public static final ConfigOption<String> METADATA_SERVICE_CLUSTER_ID =
+            ConfigOptions.key("metadata-service.cluster-id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Kafka cluster id for the 
'single-cluster' metadata service.");
+
+    private DynamicKafkaConnectorOptions() {}
+}
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactory.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactory.java
new file mode 100644
index 00000000..fea44744
--- /dev/null
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactory.java
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
+import 
org.apache.flink.connector.kafka.dynamic.metadata.SingleClusterTopicMetadataService;
+import 
org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceOptions;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode;
+import 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.Format;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.Constructor;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.METADATA_SERVICE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.METADATA_SERVICE_CLUSTER_ID;
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.STREAM_IDS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaConnectorOptions.STREAM_PATTERN;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS_PREFIX;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_GROUP_ID;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_PARALLELISM;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_TOPIC_PARTITION_DISCOVERY;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createKeyFormatProjection;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.createValueFormatProjection;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.getKafkaProperties;
+
+/** Factory for creating configured instances of {@link 
DynamicKafkaTableSource}. */
+@Internal
+public class DynamicKafkaTableFactory implements DynamicTableSourceFactory {
+
+    public static final String IDENTIFIER = "dynamic-kafka";
+    private static final String METADATA_SERVICE_SINGLE_CLUSTER = 
"single-cluster";
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(METADATA_SERVICE);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(FactoryUtil.FORMAT);
+        options.add(KEY_FORMAT);
+        options.add(KEY_FIELDS);
+        options.add(KEY_FIELDS_PREFIX);
+        options.add(VALUE_FORMAT);
+        options.add(VALUE_FIELDS_INCLUDE);
+        options.add(STREAM_IDS);
+        options.add(STREAM_PATTERN);
+        options.add(METADATA_SERVICE_CLUSTER_ID);
+        options.add(PROPS_GROUP_ID);
+        options.add(SCAN_STARTUP_MODE);
+        options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
+        options.add(SCAN_BOUNDED_MODE);
+        options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS);
+        options.add(SCAN_TOPIC_PARTITION_DISCOVERY);
+        options.add(SCAN_PARALLELISM);
+        
options.add(DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS);
+        
options.add(DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD);
+        return options;
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        final TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+
+        final Optional<DecodingFormat<DeserializationSchema<RowData>>> 
keyDecodingFormat =
+                getKeyDecodingFormat(helper);
+
+        final DecodingFormat<DeserializationSchema<RowData>> 
valueDecodingFormat =
+                getValueDecodingFormat(helper);
+
+        helper.validateExcept(PROPERTIES_PREFIX);
+
+        final ReadableConfig tableOptions = helper.getOptions();
+
+        validateTableSourceOptions(tableOptions);
+
+        validatePKConstraints(
+                context.getObjectIdentifier(),
+                context.getPrimaryKeyIndexes(),
+                context.getCatalogTable().getOptions(),
+                valueDecodingFormat);
+
+        final StartupOptions startupOptions = getStartupOptions(tableOptions);
+        final BoundedOptions boundedOptions = getBoundedOptions(tableOptions);
+
+        final Properties properties = 
getKafkaProperties(context.getCatalogTable().getOptions());
+
+        // add topic-partition discovery
+        final Duration partitionDiscoveryInterval =
+                tableOptions.get(SCAN_TOPIC_PARTITION_DISCOVERY);
+        properties.setProperty(
+                KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
+                Long.toString(partitionDiscoveryInterval.toMillis()));
+
+        applyDynamicDiscoveryOptions(tableOptions, properties);
+
+        final KafkaMetadataService kafkaMetadataService =
+                createMetadataService(tableOptions, properties, 
context.getClassLoader());
+
+        final DataType physicalDataType = context.getPhysicalRowDataType();
+
+        final int[] keyProjection = createKeyFormatProjection(tableOptions, 
physicalDataType);
+
+        final int[] valueProjection = 
createValueFormatProjection(tableOptions, physicalDataType);
+
+        final String keyPrefix = 
tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
+
+        final Integer parallelism = 
tableOptions.getOptional(SCAN_PARALLELISM).orElse(null);
+
+        return new DynamicKafkaTableSource(
+                physicalDataType,
+                keyDecodingFormat.orElse(null),
+                valueDecodingFormat,
+                keyProjection,
+                valueProjection,
+                keyPrefix,
+                getStreamIds(tableOptions),
+                getStreamPattern(tableOptions),
+                kafkaMetadataService,
+                properties,
+                startupOptions.startupMode,
+                startupOptions.specificOffsets,
+                startupOptions.startupTimestampMillis,
+                boundedOptions.boundedMode,
+                boundedOptions.specificOffsets,
+                boundedOptions.boundedTimestampMillis,
+                false,
+                context.getObjectIdentifier().asSummaryString(),
+                parallelism);
+    }
+
+    private static void applyDynamicDiscoveryOptions(
+            ReadableConfig tableOptions, Properties properties) {
+        tableOptions
+                
.getOptional(DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS)
+                .ifPresent(
+                        value ->
+                                properties.setProperty(
+                                        DynamicKafkaSourceOptions
+                                                
.STREAM_METADATA_DISCOVERY_INTERVAL_MS
+                                                .key(),
+                                        Long.toString(value)));
+        tableOptions
+                
.getOptional(DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD)
+                .ifPresent(
+                        value ->
+                                properties.setProperty(
+                                        DynamicKafkaSourceOptions
+                                                
.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD
+                                                .key(),
+                                        Integer.toString(value)));
+    }
+
+    private static Optional<DecodingFormat<DeserializationSchema<RowData>>> 
getKeyDecodingFormat(
+            TableFactoryHelper helper) {
+        final Optional<DecodingFormat<DeserializationSchema<RowData>>> 
keyDecodingFormat =
+                helper.discoverOptionalDecodingFormat(
+                        DeserializationFormatFactory.class, KEY_FORMAT);
+        keyDecodingFormat.ifPresent(
+                format -> {
+                    if 
(!format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+                        throw new ValidationException(
+                                String.format(
+                                        "A key format should only deal with 
INSERT-only records. "
+                                                + "But %s has a changelog mode 
of %s.",
+                                        helper.getOptions().get(KEY_FORMAT),
+                                        format.getChangelogMode()));
+                    }
+                });
+        return keyDecodingFormat;
+    }
+
+    private static DecodingFormat<DeserializationSchema<RowData>> 
getValueDecodingFormat(
+            TableFactoryHelper helper) {
+        return helper.discoverOptionalDecodingFormat(
+                        DeserializationFormatFactory.class, FactoryUtil.FORMAT)
+                .orElseGet(
+                        () ->
+                                helper.discoverDecodingFormat(
+                                        DeserializationFormatFactory.class, 
VALUE_FORMAT));
+    }
+
+    private static void validatePKConstraints(
+            ObjectIdentifier tableName,
+            int[] primaryKeyIndexes,
+            Map<String, String> options,
+            Format format) {
+        if (primaryKeyIndexes.length > 0
+                && format.getChangelogMode().containsOnly(RowKind.INSERT)) {
+            Configuration configuration = Configuration.fromMap(options);
+            String formatName =
+                    configuration
+                            .getOptional(FactoryUtil.FORMAT)
+                            .orElse(configuration.get(VALUE_FORMAT));
+            throw new ValidationException(
+                    String.format(
+                            "The Dynamic Kafka table '%s' with '%s' format 
doesn't support defining PRIMARY KEY constraint"
+                                    + " on the table, because it can't 
guarantee the semantic of primary key.",
+                            tableName.asSummaryString(), formatName));
+        }
+    }
+
+    private static void validateTableSourceOptions(ReadableConfig 
tableOptions) {
+        validateStreams(tableOptions);
+        validateMetadataService(tableOptions);
+        validateScanStartupMode(tableOptions);
+        validateScanBoundedMode(tableOptions);
+    }
+
+    private static void validateStreams(ReadableConfig tableOptions) {
+        Optional<List<String>> streamIds = 
tableOptions.getOptional(STREAM_IDS);
+        Optional<String> pattern = tableOptions.getOptional(STREAM_PATTERN);
+
+        if (streamIds.isPresent() && pattern.isPresent()) {
+            throw new ValidationException(
+                    "Option 'stream-ids' and 'stream-pattern' shouldn't be set 
together.");
+        }
+
+        if (!streamIds.isPresent() && !pattern.isPresent()) {
+            throw new ValidationException("Either 'stream-ids' or 
'stream-pattern' must be set.");
+        }
+
+        streamIds.ifPresent(
+                ids -> {
+                    if (ids.isEmpty()) {
+                        throw new ValidationException("Option 'stream-ids' 
cannot be empty.");
+                    }
+                });
+
+        pattern.ifPresent(
+                value -> {
+                    try {
+                        Pattern.compile(value);
+                    } catch (PatternSyntaxException e) {
+                        throw new ValidationException(
+                                "Option 'stream-pattern' contains an invalid 
regular expression.",
+                                e);
+                    }
+                });
+    }
+
+    private static void validateMetadataService(ReadableConfig tableOptions) {
+        Optional<String> metadataService = 
tableOptions.getOptional(METADATA_SERVICE);
+        if (!metadataService.isPresent()) {
+            throw new ValidationException("Option 'metadata-service' must be 
set.");
+        }
+        if (isSingleClusterMetadataService(metadataService.get())) {
+            if 
(!tableOptions.getOptional(METADATA_SERVICE_CLUSTER_ID).isPresent()) {
+                throw new ValidationException(
+                        "Option 'metadata-service.cluster-id' is required for 
'single-cluster' metadata service.");
+            }
+            if 
(!tableOptions.getOptional(PROPS_BOOTSTRAP_SERVERS).isPresent()) {
+                throw new ValidationException(
+                        "Option 'properties.bootstrap.servers' is required for 
'single-cluster' metadata service.");
+            }
+        }
+    }
+
+    private static void validateScanStartupMode(ReadableConfig tableOptions) {
+        tableOptions
+                .getOptional(SCAN_STARTUP_MODE)
+                .ifPresent(
+                        mode -> {
+                            switch (mode) {
+                                case TIMESTAMP:
+                                    if (!tableOptions
+                                            
.getOptional(SCAN_STARTUP_TIMESTAMP_MILLIS)
+                                            .isPresent()) {
+                                        throw new ValidationException(
+                                                String.format(
+                                                        "'%s' is required in 
'%s' startup mode"
+                                                                + " but 
missing.",
+                                                        
SCAN_STARTUP_TIMESTAMP_MILLIS.key(),
+                                                        
ScanStartupMode.TIMESTAMP));
+                                    }
+                                    break;
+                                case SPECIFIC_OFFSETS:
+                                    throw new ValidationException(
+                                            "Dynamic Kafka source does not 
support 'specific-offsets' startup mode.");
+                            }
+                        });
+    }
+
+    private static void validateScanBoundedMode(ReadableConfig tableOptions) {
+        tableOptions
+                .getOptional(SCAN_BOUNDED_MODE)
+                .ifPresent(
+                        mode -> {
+                            switch (mode) {
+                                case TIMESTAMP:
+                                    if (!tableOptions
+                                            
.getOptional(SCAN_BOUNDED_TIMESTAMP_MILLIS)
+                                            .isPresent()) {
+                                        throw new ValidationException(
+                                                String.format(
+                                                        "'%s' is required in 
'%s' bounded mode"
+                                                                + " but 
missing.",
+                                                        
SCAN_BOUNDED_TIMESTAMP_MILLIS.key(),
+                                                        
ScanBoundedMode.TIMESTAMP));
+                                    }
+                                    break;
+                                case SPECIFIC_OFFSETS:
+                                    throw new ValidationException(
+                                            "Dynamic Kafka source does not 
support 'specific-offsets' bounded mode.");
+                            }
+                        });
+    }
+
+    private static List<String> getStreamIds(ReadableConfig tableOptions) {
+        return tableOptions.getOptional(STREAM_IDS).orElse(null);
+    }
+
+    private static Pattern getStreamPattern(ReadableConfig tableOptions) {
+        return 
tableOptions.getOptional(STREAM_PATTERN).map(Pattern::compile).orElse(null);
+    }
+
+    private static StartupOptions getStartupOptions(ReadableConfig 
tableOptions) {
+        final StartupOptions options = new StartupOptions();
+        final StartupMode startupMode =
+                tableOptions
+                        .getOptional(SCAN_STARTUP_MODE)
+                        .map(DynamicKafkaTableFactory::toStartupMode)
+                        .orElse(StartupMode.GROUP_OFFSETS);
+        options.startupMode = startupMode;
+        options.specificOffsets = Collections.emptyMap();
+        if (startupMode == StartupMode.TIMESTAMP) {
+            options.startupTimestampMillis = 
tableOptions.get(SCAN_STARTUP_TIMESTAMP_MILLIS);
+        }
+        return options;
+    }
+
+    private static BoundedOptions getBoundedOptions(ReadableConfig 
tableOptions) {
+        final BoundedOptions options = new BoundedOptions();
+        final BoundedMode boundedMode = 
toBoundedMode(tableOptions.get(SCAN_BOUNDED_MODE));
+        options.boundedMode = boundedMode;
+        options.specificOffsets = Collections.emptyMap();
+        if (boundedMode == BoundedMode.TIMESTAMP) {
+            options.boundedTimestampMillis = 
tableOptions.get(SCAN_BOUNDED_TIMESTAMP_MILLIS);
+        }
+        return options;
+    }
+
+    private static StartupMode toStartupMode(ScanStartupMode scanStartupMode) {
+        switch (scanStartupMode) {
+            case EARLIEST_OFFSET:
+                return StartupMode.EARLIEST;
+            case LATEST_OFFSET:
+                return StartupMode.LATEST;
+            case GROUP_OFFSETS:
+                return StartupMode.GROUP_OFFSETS;
+            case SPECIFIC_OFFSETS:
+                return StartupMode.SPECIFIC_OFFSETS;
+            case TIMESTAMP:
+                return StartupMode.TIMESTAMP;
+            default:
+                throw new ValidationException(
+                        "Unsupported startup mode. Validator should have 
checked that.");
+        }
+    }
+
+    private static BoundedMode toBoundedMode(ScanBoundedMode scanBoundedMode) {
+        switch (scanBoundedMode) {
+            case UNBOUNDED:
+                return BoundedMode.UNBOUNDED;
+            case LATEST_OFFSET:
+                return BoundedMode.LATEST;
+            case GROUP_OFFSETS:
+                return BoundedMode.GROUP_OFFSETS;
+            case TIMESTAMP:
+                return BoundedMode.TIMESTAMP;
+            case SPECIFIC_OFFSETS:
+                return BoundedMode.SPECIFIC_OFFSETS;
+            default:
+                throw new ValidationException(
+                        "Unsupported bounded mode. Validator should have 
checked that.");
+        }
+    }
+
+    private static KafkaMetadataService createMetadataService(
+            ReadableConfig tableOptions, Properties properties, ClassLoader 
classLoader) {
+        String metadataService = tableOptions.get(METADATA_SERVICE);
+        if (isSingleClusterMetadataService(metadataService)) {
+            String clusterId = tableOptions.get(METADATA_SERVICE_CLUSTER_ID);
+            return new SingleClusterTopicMetadataService(clusterId, 
properties);
+        }
+
+        try {
+            Class<?> clazz = Class.forName(metadataService, true, classLoader);
+            if (!KafkaMetadataService.class.isAssignableFrom(clazz)) {
+                throw new ValidationException(
+                        String.format(
+                                "Metadata service class '%s' should implement 
%s",
+                                metadataService, 
KafkaMetadataService.class.getName()));
+            }
+            KafkaMetadataService withProperties = 
instantiateWithProperties(clazz, properties);
+            if (withProperties != null) {
+                return withProperties;
+            }
+            return InstantiationUtil.instantiate(
+                    metadataService, KafkaMetadataService.class, classLoader);
+        } catch (ValidationException e) {
+            throw e;
+        } catch (ClassNotFoundException | FlinkException e) {
+            throw new ValidationException(
+                    String.format(
+                            "Could not find and instantiate metadata service 
class '%s'",
+                            metadataService),
+                    e);
+        } catch (ReflectiveOperationException e) {
+            throw new ValidationException(
+                    String.format(
+                            "Could not instantiate metadata service class '%s' 
with properties.",
+                            metadataService),
+                    e);
+        }
+    }
+
+    private static @Nullable KafkaMetadataService instantiateWithProperties(
+            Class<?> clazz, Properties properties) throws 
ReflectiveOperationException {
+        try {
+            Constructor<?> constructor = 
clazz.getConstructor(Properties.class);
+            return (KafkaMetadataService) constructor.newInstance(properties);
+        } catch (NoSuchMethodException e) {
+            return null;
+        }
+    }
+
+    private static boolean isSingleClusterMetadataService(String 
metadataService) {
+        return METADATA_SERVICE_SINGLE_CLUSTER.equals(metadataService)
+                || 
SingleClusterTopicMetadataService.class.getName().equals(metadataService);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Inner classes
+    // 
--------------------------------------------------------------------------------------------
+
+    private static class StartupOptions {
+        private StartupMode startupMode;
+        private Map<org.apache.kafka.common.TopicPartition, Long> 
specificOffsets;
+        private long startupTimestampMillis;
+    }
+
+    private static class BoundedOptions {
+        private BoundedMode boundedMode;
+        private Map<org.apache.kafka.common.TopicPartition, Long> 
specificOffsets;
+        private long boundedTimestampMillis;
+    }
+}
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableSource.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableSource.java
new file mode 100644
index 00000000..c3ea65ef
--- /dev/null
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableSource.java
@@ -0,0 +1,696 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
+import org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSource;
+import 
org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceBuilder;
+import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/** A {@link ScanTableSource} for {@link DynamicKafkaSource}. */
+@Internal
+public class DynamicKafkaTableSource
+        implements ScanTableSource, SupportsReadingMetadata, 
SupportsWatermarkPushDown {
+
+    private static final String KAFKA_TRANSFORMATION = "kafka";
+
+    // 
--------------------------------------------------------------------------------------------
+    // Mutable attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Data type that describes the final output of the source. */
+    protected DataType producedDataType;
+
+    /** Metadata that is appended at the end of a physical source row. */
+    protected List<String> metadataKeys;
+
+    /** Watermark strategy that is used to generate per-partition watermark. */
+    protected @Nullable WatermarkStrategy<RowData> watermarkStrategy;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Format attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    private static final String VALUE_METADATA_PREFIX = "value.";
+
+    /** Data type to configure the formats. */
+    protected final DataType physicalDataType;
+
+    /** Optional format for decoding keys from Kafka. */
+    protected final @Nullable DecodingFormat<DeserializationSchema<RowData>> 
keyDecodingFormat;
+
+    /** Format for decoding values from Kafka. */
+    protected final DecodingFormat<DeserializationSchema<RowData>> 
valueDecodingFormat;
+
+    /** Indices that determine the key fields and the target position in the 
produced row. */
+    protected final int[] keyProjection;
+
+    /** Indices that determine the value fields and the target position in the 
produced row. */
+    protected final int[] valueProjection;
+
+    /** Prefix that needs to be removed from fields when constructing the 
physical data type. */
+    protected final @Nullable String keyPrefix;
+
+    // 
--------------------------------------------------------------------------------------------
+    // Dynamic Kafka specific attributes
+    // 
--------------------------------------------------------------------------------------------
+
+    /** The stream ids to consume. */
+    protected final @Nullable List<String> streamIds;
+
+    /** The stream pattern to consume. */
+    protected final @Nullable Pattern streamPattern;
+
+    /** Metadata service for resolving streams to topics and clusters. */
+    protected final KafkaMetadataService kafkaMetadataService;
+
+    /** Properties for the Kafka consumer. */
+    protected final Properties properties;
+
+    /**
+     * The startup mode for the contained consumer (default is {@link 
StartupMode#GROUP_OFFSETS}).
+     */
+    protected final StartupMode startupMode;
+
+    /**
+     * Specific startup offsets; only relevant when startup mode is {@link
+     * StartupMode#SPECIFIC_OFFSETS}.
+     */
+    protected final Map<TopicPartition, Long> specificStartupOffsets;
+
+    /**
+     * The start timestamp to locate partition offsets; only relevant when 
startup mode is {@link
+     * StartupMode#TIMESTAMP}.
+     */
+    protected final long startupTimestampMillis;
+
+    /** The bounded mode for the contained consumer (default is an unbounded 
data stream). */
+    protected final BoundedMode boundedMode;
+
+    /**
+     * Specific end offsets; only relevant when bounded mode is {@link
+     * BoundedMode#SPECIFIC_OFFSETS}.
+     */
+    protected final Map<TopicPartition, Long> specificBoundedOffsets;
+
+    /**
+     * The bounded timestamp to locate partition offsets; only relevant when 
bounded mode is {@link
+     * BoundedMode#TIMESTAMP}.
+     */
+    protected final long boundedTimestampMillis;
+
+    /** Flag to determine source mode. In upsert mode, it will keep the 
tombstone message. * */
+    protected final boolean upsertMode;
+
+    protected final String tableIdentifier;
+
+    /** Parallelism of the physical Kafka consumer. * */
+    protected final @Nullable Integer parallelism;
+
+    public DynamicKafkaTableSource(
+            DataType physicalDataType,
+            @Nullable DecodingFormat<DeserializationSchema<RowData>> 
keyDecodingFormat,
+            DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
+            int[] keyProjection,
+            int[] valueProjection,
+            @Nullable String keyPrefix,
+            @Nullable List<String> streamIds,
+            @Nullable Pattern streamPattern,
+            KafkaMetadataService kafkaMetadataService,
+            Properties properties,
+            StartupMode startupMode,
+            Map<TopicPartition, Long> specificStartupOffsets,
+            long startupTimestampMillis,
+            BoundedMode boundedMode,
+            Map<TopicPartition, Long> specificBoundedOffsets,
+            long boundedTimestampMillis,
+            boolean upsertMode,
+            String tableIdentifier,
+            @Nullable Integer parallelism) {
+        // Format attributes
+        this.physicalDataType =
+                Preconditions.checkNotNull(
+                        physicalDataType, "Physical data type must not be 
null.");
+        this.keyDecodingFormat = keyDecodingFormat;
+        this.valueDecodingFormat =
+                Preconditions.checkNotNull(
+                        valueDecodingFormat, "Value decoding format must not 
be null.");
+        this.keyProjection =
+                Preconditions.checkNotNull(keyProjection, "Key projection must 
not be null.");
+        this.valueProjection =
+                Preconditions.checkNotNull(valueProjection, "Value projection 
must not be null.");
+        this.keyPrefix = keyPrefix;
+        // Mutable attributes
+        this.producedDataType = physicalDataType;
+        this.metadataKeys = Collections.emptyList();
+        this.watermarkStrategy = null;
+        // Dynamic Kafka specific attributes
+        Preconditions.checkArgument(
+                (streamIds != null && streamPattern == null)
+                        || (streamIds == null && streamPattern != null),
+                "Either stream ids or stream pattern must be set for source.");
+        this.streamIds = streamIds;
+        this.streamPattern = streamPattern;
+        this.kafkaMetadataService =
+                Preconditions.checkNotNull(
+                        kafkaMetadataService, "Kafka metadata service must not 
be null.");
+        this.properties = Preconditions.checkNotNull(properties, "Properties 
must not be null.");
+        this.startupMode =
+                Preconditions.checkNotNull(startupMode, "Startup mode must not 
be null.");
+        this.specificStartupOffsets =
+                Preconditions.checkNotNull(
+                        specificStartupOffsets, "Specific offsets must not be 
null.");
+        this.startupTimestampMillis = startupTimestampMillis;
+        this.boundedMode =
+                Preconditions.checkNotNull(boundedMode, "Bounded mode must not 
be null.");
+        this.specificBoundedOffsets =
+                Preconditions.checkNotNull(
+                        specificBoundedOffsets, "Specific bounded offsets must 
not be null.");
+        this.boundedTimestampMillis = boundedTimestampMillis;
+        this.upsertMode = upsertMode;
+        this.tableIdentifier = tableIdentifier;
+        this.parallelism = parallelism;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return valueDecodingFormat.getChangelogMode();
+    }
+
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
+        final DeserializationSchema<RowData> keyDeserialization =
+                createDeserialization(context, keyDecodingFormat, 
keyProjection, keyPrefix);
+
+        final DeserializationSchema<RowData> valueDeserialization =
+                createDeserialization(context, valueDecodingFormat, 
valueProjection, null);
+
+        final TypeInformation<RowData> producedTypeInfo =
+                context.createTypeInformation(producedDataType);
+
+        final DynamicKafkaSource<RowData> kafkaSource =
+                createDynamicKafkaSource(
+                        keyDeserialization, valueDeserialization, 
producedTypeInfo);
+
+        return new DataStreamScanProvider() {
+            @Override
+            public DataStream<RowData> produceDataStream(
+                    ProviderContext providerContext, 
StreamExecutionEnvironment execEnv) {
+                if (watermarkStrategy == null) {
+                    watermarkStrategy = WatermarkStrategy.noWatermarks();
+                }
+                DataStreamSource<RowData> sourceStream =
+                        execEnv.fromSource(
+                                kafkaSource,
+                                watermarkStrategy,
+                                "DynamicKafkaSource-" + tableIdentifier);
+                
providerContext.generateUid(KAFKA_TRANSFORMATION).ifPresent(sourceStream::uid);
+                return sourceStream;
+            }
+
+            @Override
+            public boolean isBounded() {
+                return kafkaSource.getBoundedness() == Boundedness.BOUNDED;
+            }
+
+            @Override
+            public Optional<Integer> getParallelism() {
+                return Optional.ofNullable(parallelism);
+            }
+        };
+    }
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+
+        // according to convention, the order of the final row must be
+        // PHYSICAL + FORMAT METADATA + CONNECTOR METADATA
+        // where the format metadata has highest precedence
+
+        // add value format metadata with prefix
+        valueDecodingFormat
+                .listReadableMetadata()
+                .forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX 
+ key, value));
+
+        // add connector metadata
+        Stream.of(ReadableMetadata.values())
+                .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, 
m.dataType));
+
+        return metadataMap;
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {
+        // separate connector and format metadata
+        final List<String> formatMetadataKeys =
+                metadataKeys.stream()
+                        .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
+                        .collect(Collectors.toList());
+        final List<String> connectorMetadataKeys = new 
ArrayList<>(metadataKeys);
+        connectorMetadataKeys.removeAll(formatMetadataKeys);
+
+        // push down format metadata
+        final Map<String, DataType> formatMetadata = 
valueDecodingFormat.listReadableMetadata();
+        if (formatMetadata.size() > 0) {
+            final List<String> requestedFormatMetadataKeys =
+                    formatMetadataKeys.stream()
+                            .map(k -> 
k.substring(VALUE_METADATA_PREFIX.length()))
+                            .collect(Collectors.toList());
+            
valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+        }
+
+        this.metadataKeys = connectorMetadataKeys;
+        this.producedDataType = producedDataType;
+    }
+
+    @Override
+    public boolean supportsMetadataProjection() {
+        return false;
+    }
+
+    @Override
+    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+        this.watermarkStrategy = watermarkStrategy;
+    }
+
+    @Override
+    public DynamicTableSource copy() {
+        final DynamicKafkaTableSource copy =
+                new DynamicKafkaTableSource(
+                        physicalDataType,
+                        keyDecodingFormat,
+                        valueDecodingFormat,
+                        keyProjection,
+                        valueProjection,
+                        keyPrefix,
+                        streamIds,
+                        streamPattern,
+                        kafkaMetadataService,
+                        properties,
+                        startupMode,
+                        specificStartupOffsets,
+                        startupTimestampMillis,
+                        boundedMode,
+                        specificBoundedOffsets,
+                        boundedTimestampMillis,
+                        upsertMode,
+                        tableIdentifier,
+                        parallelism);
+        copy.producedDataType = producedDataType;
+        copy.metadataKeys = metadataKeys;
+        copy.watermarkStrategy = watermarkStrategy;
+        return copy;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "Dynamic Kafka table source";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final DynamicKafkaTableSource that = (DynamicKafkaTableSource) o;
+        return Objects.equals(producedDataType, that.producedDataType)
+                && Objects.equals(metadataKeys, that.metadataKeys)
+                && Objects.equals(physicalDataType, that.physicalDataType)
+                && Objects.equals(keyDecodingFormat, that.keyDecodingFormat)
+                && Objects.equals(valueDecodingFormat, 
that.valueDecodingFormat)
+                && Arrays.equals(keyProjection, that.keyProjection)
+                && Arrays.equals(valueProjection, that.valueProjection)
+                && Objects.equals(keyPrefix, that.keyPrefix)
+                && Objects.equals(streamIds, that.streamIds)
+                && Objects.equals(String.valueOf(streamPattern), 
String.valueOf(that.streamPattern))
+                && Objects.equals(kafkaMetadataService, 
that.kafkaMetadataService)
+                && Objects.equals(properties, that.properties)
+                && startupMode == that.startupMode
+                && Objects.equals(specificStartupOffsets, 
that.specificStartupOffsets)
+                && startupTimestampMillis == that.startupTimestampMillis
+                && boundedMode == that.boundedMode
+                && Objects.equals(specificBoundedOffsets, 
that.specificBoundedOffsets)
+                && boundedTimestampMillis == that.boundedTimestampMillis
+                && Objects.equals(upsertMode, that.upsertMode)
+                && Objects.equals(tableIdentifier, that.tableIdentifier)
+                && Objects.equals(watermarkStrategy, that.watermarkStrategy)
+                && Objects.equals(parallelism, that.parallelism);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                producedDataType,
+                metadataKeys,
+                physicalDataType,
+                keyDecodingFormat,
+                valueDecodingFormat,
+                Arrays.hashCode(keyProjection),
+                Arrays.hashCode(valueProjection),
+                keyPrefix,
+                streamIds,
+                streamPattern,
+                kafkaMetadataService,
+                properties,
+                startupMode,
+                specificStartupOffsets,
+                startupTimestampMillis,
+                boundedMode,
+                specificBoundedOffsets,
+                boundedTimestampMillis,
+                upsertMode,
+                tableIdentifier,
+                watermarkStrategy,
+                parallelism);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    protected DynamicKafkaSource<RowData> createDynamicKafkaSource(
+            DeserializationSchema<RowData> keyDeserialization,
+            DeserializationSchema<RowData> valueDeserialization,
+            TypeInformation<RowData> producedTypeInfo) {
+
+        final KafkaRecordDeserializationSchema<RowData> kafkaDeserializer =
+                createKafkaDeserializationSchema(
+                        keyDeserialization, valueDeserialization, 
producedTypeInfo);
+
+        final DynamicKafkaSourceBuilder<RowData> dynamicKafkaSourceBuilder =
+                DynamicKafkaSource.builder();
+
+        if (streamIds != null) {
+            dynamicKafkaSourceBuilder.setStreamIds(new HashSet<>(streamIds));
+        } else {
+            dynamicKafkaSourceBuilder.setStreamPattern(streamPattern);
+        }
+
+        dynamicKafkaSourceBuilder
+                .setKafkaMetadataService(kafkaMetadataService)
+                .setDeserializer(kafkaDeserializer)
+                .setProperties(properties);
+
+        switch (startupMode) {
+            case EARLIEST:
+                
dynamicKafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.earliest());
+                break;
+            case LATEST:
+                
dynamicKafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.latest());
+                break;
+            case GROUP_OFFSETS:
+                String offsetResetConfig =
+                        properties.getProperty(
+                                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+                                OffsetResetStrategy.NONE.name());
+                OffsetResetStrategy offsetResetStrategy = 
getResetStrategy(offsetResetConfig);
+                dynamicKafkaSourceBuilder.setStartingOffsets(
+                        
OffsetsInitializer.committedOffsets(offsetResetStrategy));
+                break;
+            case SPECIFIC_OFFSETS:
+                dynamicKafkaSourceBuilder.setStartingOffsets(
+                        OffsetsInitializer.offsets(specificStartupOffsets));
+                break;
+            case TIMESTAMP:
+                dynamicKafkaSourceBuilder.setStartingOffsets(
+                        OffsetsInitializer.timestamp(startupTimestampMillis));
+                break;
+        }
+
+        switch (boundedMode) {
+            case UNBOUNDED:
+                break;
+            case LATEST:
+                
dynamicKafkaSourceBuilder.setBounded(OffsetsInitializer.latest());
+                break;
+            case GROUP_OFFSETS:
+                
dynamicKafkaSourceBuilder.setBounded(OffsetsInitializer.committedOffsets());
+                break;
+            case SPECIFIC_OFFSETS:
+                dynamicKafkaSourceBuilder.setBounded(
+                        OffsetsInitializer.offsets(specificBoundedOffsets));
+                break;
+            case TIMESTAMP:
+                dynamicKafkaSourceBuilder.setBounded(
+                        OffsetsInitializer.timestamp(boundedTimestampMillis));
+                break;
+        }
+
+        return dynamicKafkaSourceBuilder.build();
+    }
+
+    private OffsetResetStrategy getResetStrategy(String offsetResetConfig) {
+        return Arrays.stream(OffsetResetStrategy.values())
+                .filter(ors -> 
ors.name().equals(offsetResetConfig.toUpperCase(Locale.ROOT)))
+                .findAny()
+                .orElseThrow(
+                        () ->
+                                new IllegalArgumentException(
+                                        String.format(
+                                                "%s can not be set to %s. 
Valid values: [%s]",
+                                                
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+                                                offsetResetConfig,
+                                                
Arrays.stream(OffsetResetStrategy.values())
+                                                        .map(Enum::name)
+                                                        
.map(String::toLowerCase)
+                                                        
.collect(Collectors.joining(",")))));
+    }
+
+    private KafkaRecordDeserializationSchema<RowData> 
createKafkaDeserializationSchema(
+            DeserializationSchema<RowData> keyDeserialization,
+            DeserializationSchema<RowData> valueDeserialization,
+            TypeInformation<RowData> producedTypeInfo) {
+        final MetadataConverter[] metadataConverters =
+                metadataKeys.stream()
+                        .map(
+                                k ->
+                                        Stream.of(ReadableMetadata.values())
+                                                .filter(rm -> rm.key.equals(k))
+                                                .findFirst()
+                                                
.orElseThrow(IllegalStateException::new))
+                        .map(m -> m.converter)
+                        .toArray(MetadataConverter[]::new);
+
+        // check if connector metadata is used at all
+        final boolean hasMetadata = metadataKeys.size() > 0;
+
+        // adjust physical arity with value format's metadata
+        final int adjustedPhysicalArity =
+                DataType.getFieldDataTypes(producedDataType).size() - 
metadataKeys.size();
+
+        // adjust value format projection to include value format's metadata 
columns at the end
+        final int[] adjustedValueProjection =
+                IntStream.concat(
+                                IntStream.of(valueProjection),
+                                IntStream.range(
+                                        keyProjection.length + 
valueProjection.length,
+                                        adjustedPhysicalArity))
+                        .toArray();
+
+        return new DynamicKafkaDeserializationSchema(
+                adjustedPhysicalArity,
+                keyDeserialization,
+                keyProjection,
+                valueDeserialization,
+                adjustedValueProjection,
+                hasMetadata,
+                metadataConverters,
+                producedTypeInfo,
+                upsertMode);
+    }
+
+    private @Nullable DeserializationSchema<RowData> createDeserialization(
+            DynamicTableSource.Context context,
+            @Nullable DecodingFormat<DeserializationSchema<RowData>> format,
+            int[] projection,
+            @Nullable String prefix) {
+        if (format == null) {
+            return null;
+        }
+        DataType physicalFormatDataType = 
Projection.of(projection).project(this.physicalDataType);
+        if (prefix != null) {
+            physicalFormatDataType =
+                    TableDataTypeUtils.stripRowPrefix(physicalFormatDataType, 
prefix);
+        }
+        return format.createRuntimeDecoder(context, physicalFormatDataType);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Metadata handling
+    // 
--------------------------------------------------------------------------------------------
+
+    enum ReadableMetadata {
+        TOPIC(
+                "topic",
+                DataTypes.STRING().notNull(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        return StringData.fromString(record.topic());
+                    }
+                }),
+
+        PARTITION(
+                "partition",
+                DataTypes.INT().notNull(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        return record.partition();
+                    }
+                }),
+
+        HEADERS(
+                "headers",
+                // key and value of the map are nullable to make handling 
easier in queries
+                DataTypes.MAP(DataTypes.STRING().nullable(), 
DataTypes.BYTES().nullable())
+                        .notNull(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        final Map<StringData, byte[]> map = new HashMap<>();
+                        for (Header header : record.headers()) {
+                            map.put(StringData.fromString(header.key()), 
header.value());
+                        }
+                        return new GenericMapData(map);
+                    }
+                }),
+
+        LEADER_EPOCH(
+                "leader-epoch",
+                DataTypes.INT().nullable(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        return record.leaderEpoch().orElse(null);
+                    }
+                }),
+
+        OFFSET(
+                "offset",
+                DataTypes.BIGINT().notNull(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        return record.offset();
+                    }
+                }),
+
+        TIMESTAMP(
+                "timestamp",
+                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        return 
TimestampData.fromEpochMillis(record.timestamp());
+                    }
+                }),
+
+        TIMESTAMP_TYPE(
+                "timestamp-type",
+                DataTypes.STRING().notNull(),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object read(ConsumerRecord<?, ?> record) {
+                        return 
StringData.fromString(record.timestampType().toString());
+                    }
+                });
+
+        final String key;
+
+        final DataType dataType;
+
+        final MetadataConverter converter;
+
+        ReadableMetadata(String key, DataType dataType, MetadataConverter 
converter) {
+            this.key = key;
+            this.dataType = dataType;
+            this.converter = converter;
+        }
+    }
+}
diff --git 
a/flink-connector-kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-connector-kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 9b8bf8e0..f67172a6 100644
--- 
a/flink-connector-kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/flink-connector-kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -15,3 +15,4 @@
 
 org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
 
org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory
+org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaTableFactory
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactoryTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactoryTest.java
new file mode 100644
index 00000000..33f63c59
--- /dev/null
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaTableFactoryTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.TestFormatFactory;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/** Tests for {@link DynamicKafkaTableFactory}. */
+public class DynamicKafkaTableFactoryTest {
+
+    private static final ResolvedSchema SCHEMA =
+            new ResolvedSchema(
+                    Arrays.asList(
+                            Column.physical("name", 
DataTypes.STRING().notNull()),
+                            Column.physical("count", DataTypes.INT())),
+                    Collections.emptyList(),
+                    null);
+
+    @Test
+    public void testTableSourceWithSingleClusterMetadataService() {
+        final DynamicTableSource actualSource =
+                createTableSource(SCHEMA, getSingleClusterSourceOptions());
+        assertThat(actualSource).isInstanceOf(DynamicKafkaTableSource.class);
+
+        final DynamicKafkaTableSource actualKafkaSource = 
(DynamicKafkaTableSource) actualSource;
+        assertThat(actualKafkaSource.streamIds).containsExactly("stream-1", 
"stream-2");
+        assertThat(actualKafkaSource.streamPattern).isNull();
+        
assertThat(actualKafkaSource.startupMode).isEqualTo(StartupMode.EARLIEST);
+        
assertThat(actualKafkaSource.boundedMode).isEqualTo(BoundedMode.UNBOUNDED);
+        
assertThat(actualKafkaSource.properties.getProperty("bootstrap.servers"))
+                .isEqualTo("bootstrap-single:9092");
+        
assertThat(actualKafkaSource.properties.getProperty("partition.discovery.interval.ms"))
+                .isEqualTo("1000");
+    }
+
+    @Test
+    public void testInvalidStreamOptions() {
+        final Map<String, String> options = getSingleClusterSourceOptions();
+        options.put("stream-pattern", "stream-.*");
+
+        assertThatExceptionOfType(ValidationException.class)
+                .isThrownBy(() -> createTableSource(SCHEMA, options))
+                .withMessageContaining("stream-ids")
+                .withMessageContaining("stream-pattern");
+    }
+
+    private static Map<String, String> getSingleClusterSourceOptions() {
+        Map<String, String> tableOptions = new HashMap<>();
+        // Dynamic Kafka specific options.
+        tableOptions.put("connector", DynamicKafkaTableFactory.IDENTIFIER);
+        tableOptions.put("stream-ids", "stream-1;stream-2");
+        tableOptions.put("metadata-service", "single-cluster");
+        tableOptions.put("metadata-service.cluster-id", "cluster-single");
+        tableOptions.put("properties.bootstrap.servers", 
"bootstrap-single:9092");
+        tableOptions.put("properties.group.id", "dummy");
+        tableOptions.put("scan.startup.mode", "earliest-offset");
+        tableOptions.put("scan.topic-partition-discovery.interval", "1000 ms");
+        // Format options.
+        tableOptions.put("format", TestFormatFactory.IDENTIFIER);
+        final String formatDelimiterKey =
+                String.format(
+                        "%s.%s", TestFormatFactory.IDENTIFIER, 
TestFormatFactory.DELIMITER.key());
+        final String failOnMissingKey =
+                String.format(
+                        "%s.%s",
+                        TestFormatFactory.IDENTIFIER, 
TestFormatFactory.FAIL_ON_MISSING.key());
+        tableOptions.put(formatDelimiterKey, ",");
+        tableOptions.put(failOnMissingKey, "true");
+        return tableOptions;
+    }
+}

Reply via email to