This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c320e990790ecd804f34e187cd7dab4dcaa8300c
Author: Timo Walther <twal...@apache.org>
AuthorDate: Mon Oct 19 13:37:14 2020 +0200

    [FLINK-19672][connector-kafka] Update Kafka table sources and sinks
    
    This closes #13681.
---
 .../connectors/kafka/table/KafkaDynamicSink.java   | 122 +++--
 .../kafka/table/KafkaDynamicSinkBase.java          | 141 -----
 .../connectors/kafka/table/KafkaDynamicSource.java | 188 +++++--
 .../kafka/table/KafkaDynamicSourceBase.java        | 239 --------
 .../kafka/table/KafkaDynamicTableFactory.java      | 153 +++++-
 .../kafka/table/KafkaDynamicTableFactoryBase.java  | 200 -------
 .../connectors/kafka/table/KafkaOptions.java       |   2 +-
 .../kafka/table/KafkaDynamicTableFactoryTest.java  | 553 ++++++++++++++++++-
 .../table/KafkaDynamicTableFactoryTestBase.java    | 606 ---------------------
 9 files changed, 896 insertions(+), 1308 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
index ec50a00..c13c594 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,22 +20,43 @@ package org.apache.flink.streaming.connectors.kafka.table;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
 
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 
 /**
- * Kafka table sink for writing data into Kafka.
+ * A version-agnostic Kafka {@link DynamicTableSink}.
  */
 @Internal
-public class KafkaDynamicSink extends KafkaDynamicSinkBase {
+public class KafkaDynamicSink implements DynamicTableSink {
+
+       /** Consumed data type of the table. */
+       protected final DataType consumedDataType;
+
+       /** The Kafka topic to write to. */
+       protected final String topic;
+
+       /** Properties for the Kafka producer. */
+       protected final Properties properties;
+
+       /** Sink format for encoding records to Kafka. */
+       protected final EncodingFormat<SerializationSchema<RowData>> 
encodingFormat;
+
+       /** Partitioner to select Kafka partition for each item. */
+       protected final Optional<FlinkKafkaPartitioner<RowData>> partitioner;
+
+       /** Sink commit semantic.*/
+       protected final KafkaSinkSemantic semantic;
 
        public KafkaDynamicSink(
                        DataType consumedDataType,
@@ -43,29 +65,27 @@ public class KafkaDynamicSink extends KafkaDynamicSinkBase {
                        Optional<FlinkKafkaPartitioner<RowData>> partitioner,
                        EncodingFormat<SerializationSchema<RowData>> 
encodingFormat,
                        KafkaSinkSemantic semantic) {
-               super(
-                               consumedDataType,
-                               topic,
-                               properties,
-                               partitioner,
-                               encodingFormat,
-                               semantic);
+               this.consumedDataType = 
Preconditions.checkNotNull(consumedDataType, "Consumed data type must not be 
null.");
+               this.topic = Preconditions.checkNotNull(topic, "Topic must not 
be null.");
+               this.properties = Preconditions.checkNotNull(properties, 
"Properties must not be null.");
+               this.partitioner = Preconditions.checkNotNull(partitioner, 
"Partitioner must not be null.");
+               this.encodingFormat = 
Preconditions.checkNotNull(encodingFormat, "Encoding format must not be null.");
+               this.semantic = Preconditions.checkNotNull(semantic, "Semantic 
must not be null.");
        }
 
        @Override
-       protected SinkFunction<RowData> createKafkaProducer(
-                       String topic,
-                       Properties properties,
-                       SerializationSchema<RowData> serializationSchema,
-                       Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-                       KafkaSinkSemantic semantic) {
-               return new FlinkKafkaProducer<>(
-                               topic,
-                               serializationSchema,
-                               properties,
-                               partitioner.orElse(null),
-                               
FlinkKafkaProducer.Semantic.valueOf(semantic.toString()),
-                               
FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
+       public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+               return this.encodingFormat.getChangelogMode();
+       }
+
+       @Override
+       public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+               final SerializationSchema<RowData> serializationSchema =
+                               
this.encodingFormat.createRuntimeEncoder(context, this.consumedDataType);
+
+               final FlinkKafkaProducer<RowData> kafkaProducer = 
createKafkaProducer(serializationSchema);
+
+               return SinkFunctionProvider.of(kafkaProducer);
        }
 
        @Override
@@ -81,6 +101,46 @@ public class KafkaDynamicSink extends KafkaDynamicSinkBase {
 
        @Override
        public String asSummaryString() {
-               return "Kafka universal table sink";
+               return "Kafka table sink";
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               final KafkaDynamicSink that = (KafkaDynamicSink) o;
+               return Objects.equals(consumedDataType, that.consumedDataType) 
&&
+                       Objects.equals(topic, that.topic) &&
+                       Objects.equals(properties, that.properties) &&
+                       Objects.equals(encodingFormat, that.encodingFormat) &&
+                       Objects.equals(partitioner, that.partitioner) &&
+                       Objects.equals(semantic, that.semantic);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(
+                       consumedDataType,
+                       topic,
+                       properties,
+                       encodingFormat,
+                       partitioner,
+                       semantic);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       protected FlinkKafkaProducer<RowData> 
createKafkaProducer(SerializationSchema<RowData> serializationSchema) {
+               return new FlinkKafkaProducer<>(
+                       topic,
+                       serializationSchema,
+                       properties,
+                       partitioner.orElse(null),
+                       
FlinkKafkaProducer.Semantic.valueOf(semantic.toString()),
+                       FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
        }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSinkBase.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSinkBase.java
deleted file mode 100644
index 2d6e893..0000000
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSinkBase.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.table;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.EncodingFormat;
-import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.SinkFunctionProvider;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * A version-agnostic Kafka {@link DynamicTableSink}.
- *
- * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #createKafkaProducer(String, Properties, 
SerializationSchema, Optional, KafkaSinkSemantic)}}.
- */
-@Internal
-public abstract class KafkaDynamicSinkBase implements DynamicTableSink {
-       /** Consumed data type of the table. */
-       protected final DataType consumedDataType;
-
-       /** The Kafka topic to write to. */
-       protected final String topic;
-
-       /** Properties for the Kafka producer. */
-       protected final Properties properties;
-
-       /** Sink format for encoding records to Kafka. */
-       protected final EncodingFormat<SerializationSchema<RowData>> 
encodingFormat;
-
-       /** Partitioner to select Kafka partition for each item. */
-       protected final Optional<FlinkKafkaPartitioner<RowData>> partitioner;
-
-       /** Sink commit semantic.*/
-       protected final KafkaSinkSemantic semantic;
-
-       protected KafkaDynamicSinkBase(
-                       DataType consumedDataType,
-                       String topic,
-                       Properties properties,
-                       Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-                       EncodingFormat<SerializationSchema<RowData>> 
encodingFormat,
-                       KafkaSinkSemantic semantic) {
-               this.consumedDataType = 
Preconditions.checkNotNull(consumedDataType, "Consumed data type must not be 
null.");
-               this.topic = Preconditions.checkNotNull(topic, "Topic must not 
be null.");
-               this.properties = Preconditions.checkNotNull(properties, 
"Properties must not be null.");
-               this.partitioner = Preconditions.checkNotNull(partitioner, 
"Partitioner must not be null.");
-               this.encodingFormat = 
Preconditions.checkNotNull(encodingFormat, "Encoding format must not be null.");
-               this.semantic = Preconditions.checkNotNull(semantic, "Semantic 
must not be null.");
-       }
-
-       @Override
-       public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
-               return this.encodingFormat.getChangelogMode();
-       }
-
-       @Override
-       public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
-               SerializationSchema<RowData> serializationSchema =
-                               
this.encodingFormat.createRuntimeEncoder(context, this.consumedDataType);
-
-               final SinkFunction<RowData> kafkaProducer = createKafkaProducer(
-                               this.topic,
-                               properties,
-                               serializationSchema,
-                               this.partitioner,
-                               this.semantic);
-
-               return SinkFunctionProvider.of(kafkaProducer);
-       }
-
-       /**
-        * Returns the version-specific Kafka producer.
-        *
-        * @param topic               Kafka topic to produce to.
-        * @param properties          Properties for the Kafka producer.
-        * @param serializationSchema Serialization schema to use to create 
Kafka records.
-        * @param partitioner         Partitioner to select Kafka partition.
-        * @return The version-specific Kafka producer
-        */
-       protected abstract SinkFunction<RowData> createKafkaProducer(
-               String topic,
-               Properties properties,
-               SerializationSchema<RowData> serializationSchema,
-               Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-               KafkaSinkSemantic semantic);
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-               final KafkaDynamicSinkBase that = (KafkaDynamicSinkBase) o;
-               return Objects.equals(consumedDataType, that.consumedDataType) 
&&
-                       Objects.equals(topic, that.topic) &&
-                       Objects.equals(properties, that.properties) &&
-                       Objects.equals(encodingFormat, that.encodingFormat) &&
-                       Objects.equals(partitioner, that.partitioner) &&
-                       Objects.equals(semantic, that.semantic);
-       }
-
-       @Override
-       public int hashCode() {
-               return Objects.hash(
-                       consumedDataType,
-                       topic,
-                       properties,
-                       encodingFormat,
-                       partitioner,
-                       semantic);
-       }
-}
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
index 186144c..49a5d227 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,42 +21,68 @@ package org.apache.flink.streaming.connectors.kafka.table;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.regex.Pattern;
 
 /**
- * Kafka {@link org.apache.flink.table.connector.source.DynamicTableSource}.
+ * A version-agnostic Kafka {@link ScanTableSource}.
  */
 @Internal
-public class KafkaDynamicSource extends KafkaDynamicSourceBase {
-
-       /**
-        * Creates a generic Kafka {@link StreamTableSource}.
-        *
-        * @param outputDataType         Source output data type
-        * @param topics                 Kafka topic to consume
-        * @param topicPattern           Kafka topic pattern to consume
-        * @param properties             Properties for the Kafka consumer
-        * @param decodingFormat         Decoding format for decoding records 
from Kafka
-        * @param startupMode            Startup mode for the contained consumer
-        * @param specificStartupOffsets Specific startup offsets; only 
relevant when startup
-        *                               mode is {@link 
StartupMode#SPECIFIC_OFFSETS}
-        */
+public class KafkaDynamicSource implements ScanTableSource {
+
+       // 
--------------------------------------------------------------------------------------------
+       // Common attributes
+       // 
--------------------------------------------------------------------------------------------
+
+       protected final DataType producedDataType;
+
+       // 
--------------------------------------------------------------------------------------------
+       // Scan format attributes
+       // 
--------------------------------------------------------------------------------------------
+
+       /** Scan format for decoding records from Kafka. */
+       protected final DecodingFormat<DeserializationSchema<RowData>> 
decodingFormat;
+
+       // 
--------------------------------------------------------------------------------------------
+       // Kafka-specific attributes
+       // 
--------------------------------------------------------------------------------------------
+
+       /** The Kafka topics to consume. */
+       protected final List<String> topics;
+
+       /** The Kafka topic pattern to consume. */
+       protected final Pattern topicPattern;
+
+       /** Properties for the Kafka consumer. */
+       protected final Properties properties;
+
+       /** The startup mode for the contained consumer (default is {@link 
StartupMode#GROUP_OFFSETS}). */
+       protected final StartupMode startupMode;
+
+       /** Specific startup offsets; only relevant when startup mode is {@link 
StartupMode#SPECIFIC_OFFSETS}. */
+       protected final Map<KafkaTopicPartition, Long> specificStartupOffsets;
+
+       /** The start timestamp to locate partition offsets; only relevant when 
startup mode is {@link StartupMode#TIMESTAMP}.*/
+       protected final long startupTimestampMillis;
+
        public KafkaDynamicSource(
-                       DataType outputDataType,
+                       DataType producedDataType,
                        @Nullable List<String> topics,
                        @Nullable Pattern topicPattern,
                        Properties properties,
@@ -63,38 +90,40 @@ public class KafkaDynamicSource extends 
KafkaDynamicSourceBase {
                        StartupMode startupMode,
                        Map<KafkaTopicPartition, Long> specificStartupOffsets,
                        long startupTimestampMillis) {
-
-               super(
-                       outputDataType,
-                       topics,
-                       topicPattern,
-                       properties,
-                       decodingFormat,
-                       startupMode,
-                       specificStartupOffsets,
-                       startupTimestampMillis);
+               this.producedDataType = 
Preconditions.checkNotNull(producedDataType, "Produced data type must not be 
null.");
+               Preconditions.checkArgument((topics != null && topicPattern == 
null) ||
+                               (topics == null && topicPattern != null),
+                       "Either Topic or Topic Pattern must be set for 
source.");
+               this.topics = topics;
+               this.topicPattern = topicPattern;
+               this.properties = Preconditions.checkNotNull(properties, 
"Properties must not be null.");
+               this.decodingFormat = Preconditions.checkNotNull(
+                       decodingFormat, "Decoding format must not be null.");
+               this.startupMode = Preconditions.checkNotNull(startupMode, 
"Startup mode must not be null.");
+               this.specificStartupOffsets = Preconditions.checkNotNull(
+                       specificStartupOffsets, "Specific offsets must not be 
null.");
+               this.startupTimestampMillis = startupTimestampMillis;
        }
 
        @Override
-       protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
-                       List<String> topics,
-                       Properties properties,
-                       DeserializationSchema<RowData> deserializationSchema) {
-               return new FlinkKafkaConsumer<>(topics, deserializationSchema, 
properties);
+       public ChangelogMode getChangelogMode() {
+               return this.decodingFormat.getChangelogMode();
        }
 
        @Override
-       protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
-                       Pattern topicPattern,
-                       Properties properties,
-                       DeserializationSchema<RowData> deserializationSchema) {
-               return new FlinkKafkaConsumer<>(topicPattern, 
deserializationSchema, properties);
+       public ScanRuntimeProvider getScanRuntimeProvider(ScanContext 
runtimeProviderContext) {
+               final DeserializationSchema<RowData> deserializationSchema =
+                               
this.decodingFormat.createRuntimeDecoder(runtimeProviderContext, 
this.producedDataType);
+
+               final FlinkKafkaConsumer<RowData> kafkaConsumer = 
createKafkaConsumer(deserializationSchema);
+
+               return SourceFunctionProvider.of(kafkaConsumer, false);
        }
 
        @Override
        public DynamicTableSource copy() {
                return new KafkaDynamicSource(
-                               this.outputDataType,
+                               this.producedDataType,
                                this.topics,
                                this.topicPattern,
                                this.properties,
@@ -106,6 +135,71 @@ public class KafkaDynamicSource extends 
KafkaDynamicSourceBase {
 
        @Override
        public String asSummaryString() {
-               return "Kafka";
+               return "Kafka table source";
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               final KafkaDynamicSource that = (KafkaDynamicSource) o;
+               return Objects.equals(producedDataType, that.producedDataType) 
&&
+                       Objects.equals(topics, that.topics) &&
+                       Objects.equals(String.valueOf(topicPattern), 
String.valueOf(that.topicPattern)) &&
+                       Objects.equals(properties, that.properties) &&
+                       Objects.equals(decodingFormat, that.decodingFormat) &&
+                       startupMode == that.startupMode &&
+                       Objects.equals(specificStartupOffsets, 
that.specificStartupOffsets) &&
+                       startupTimestampMillis == that.startupTimestampMillis;
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(
+                       producedDataType,
+                       topics,
+                       topicPattern,
+                       properties,
+                       decodingFormat,
+                       startupMode,
+                       specificStartupOffsets,
+                       startupTimestampMillis);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       protected FlinkKafkaConsumer<RowData> 
createKafkaConsumer(DeserializationSchema<RowData> deserializationSchema) {
+               final FlinkKafkaConsumer<RowData> kafkaConsumer;
+               if (topics != null) {
+                       kafkaConsumer = new FlinkKafkaConsumer<>(topics, 
deserializationSchema, properties);
+               } else {
+                       kafkaConsumer = new FlinkKafkaConsumer<>(topicPattern, 
deserializationSchema, properties);
+               }
+
+               switch (startupMode) {
+                       case EARLIEST:
+                               kafkaConsumer.setStartFromEarliest();
+                               break;
+                       case LATEST:
+                               kafkaConsumer.setStartFromLatest();
+                               break;
+                       case GROUP_OFFSETS:
+                               kafkaConsumer.setStartFromGroupOffsets();
+                               break;
+                       case SPECIFIC_OFFSETS:
+                               
kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
+                               break;
+                       case TIMESTAMP:
+                               
kafkaConsumer.setStartFromTimestamp(startupTimestampMillis);
+                               break;
+               }
+
+               
kafkaConsumer.setCommitOffsetsOnCheckpoints(properties.getProperty("group.id") 
!= null);
+
+               return kafkaConsumer;
        }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java
deleted file mode 100644
index 7bf3667..0000000
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.table;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.DecodingFormat;
-import org.apache.flink.table.connector.source.ScanTableSource;
-import org.apache.flink.table.connector.source.SourceFunctionProvider;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.regex.Pattern;
-
-/**
- * A version-agnostic Kafka {@link ScanTableSource}.
- *
- * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #createKafkaConsumer(List, Properties, 
DeserializationSchema)}} and
- *  {@link #createKafkaConsumer(Pattern, Properties, DeserializationSchema)}}.
- */
-@Internal
-public abstract class KafkaDynamicSourceBase implements ScanTableSource {
-
-       // 
--------------------------------------------------------------------------------------------
-       // Common attributes
-       // 
--------------------------------------------------------------------------------------------
-       protected final DataType outputDataType;
-
-       // 
--------------------------------------------------------------------------------------------
-       // Scan format attributes
-       // 
--------------------------------------------------------------------------------------------
-
-       /** Scan format for decoding records from Kafka. */
-       protected final DecodingFormat<DeserializationSchema<RowData>> 
decodingFormat;
-
-       // 
--------------------------------------------------------------------------------------------
-       // Kafka-specific attributes
-       // 
--------------------------------------------------------------------------------------------
-
-       /** The Kafka topics to consume. */
-       protected final List<String> topics;
-
-       /** The Kafka topic pattern to consume. */
-       protected final Pattern topicPattern;
-
-       /** Properties for the Kafka consumer. */
-       protected final Properties properties;
-
-       /** The startup mode for the contained consumer (default is {@link 
StartupMode#GROUP_OFFSETS}). */
-       protected final StartupMode startupMode;
-
-       /** Specific startup offsets; only relevant when startup mode is {@link 
StartupMode#SPECIFIC_OFFSETS}. */
-       protected final Map<KafkaTopicPartition, Long> specificStartupOffsets;
-
-       /** The start timestamp to locate partition offsets; only relevant when 
startup mode is {@link StartupMode#TIMESTAMP}.*/
-       protected final long startupTimestampMillis;
-
-       /** The default value when startup timestamp is not used.*/
-       private static final long DEFAULT_STARTUP_TIMESTAMP_MILLIS = 0L;
-
-       /**
-        * Creates a generic Kafka {@link StreamTableSource}.
-        * @param outputDataType         Source produced data type
-        * @param topics                 Kafka topics to consume.
-        * @param topicPattern           Kafka topic pattern to consume.
-        * @param properties             Properties for the Kafka consumer.
-        * @param decodingFormat         Decoding format for decoding records 
from Kafka.
-        * @param startupMode            Startup mode for the contained 
consumer.
-        * @param specificStartupOffsets Specific startup offsets; only 
relevant when startup
-        *                               mode is {@link 
StartupMode#SPECIFIC_OFFSETS}.
-        * @param startupTimestampMillis Startup timestamp for offsets; only 
relevant when startup
-        *                               mode is {@link StartupMode#TIMESTAMP}.
-        */
-       protected KafkaDynamicSourceBase(
-                       DataType outputDataType,
-                       @Nullable List<String> topics,
-                       @Nullable Pattern topicPattern,
-                       Properties properties,
-                       DecodingFormat<DeserializationSchema<RowData>> 
decodingFormat,
-                       StartupMode startupMode,
-                       Map<KafkaTopicPartition, Long> specificStartupOffsets,
-                       long startupTimestampMillis) {
-               this.outputDataType = Preconditions.checkNotNull(
-                               outputDataType, "Produced data type must not be 
null.");
-               Preconditions.checkArgument((topics != null && topicPattern == 
null) ||
-                               (topics == null && topicPattern != null),
-                       "Either Topic or Topic Pattern must be set for 
source.");
-               this.topics = topics;
-               this.topicPattern = topicPattern;
-               this.properties = Preconditions.checkNotNull(properties, 
"Properties must not be null.");
-               this.decodingFormat = Preconditions.checkNotNull(
-                       decodingFormat, "Decoding format must not be null.");
-               this.startupMode = Preconditions.checkNotNull(startupMode, 
"Startup mode must not be null.");
-               this.specificStartupOffsets = Preconditions.checkNotNull(
-                       specificStartupOffsets, "Specific offsets must not be 
null.");
-               this.startupTimestampMillis = startupTimestampMillis;
-       }
-
-       @Override
-       public ChangelogMode getChangelogMode() {
-               return this.decodingFormat.getChangelogMode();
-       }
-
-       @Override
-       public ScanRuntimeProvider getScanRuntimeProvider(ScanContext 
runtimeProviderContext) {
-               DeserializationSchema<RowData> deserializationSchema =
-                               
this.decodingFormat.createRuntimeDecoder(runtimeProviderContext, 
this.outputDataType);
-               // Version-specific Kafka consumer
-               FlinkKafkaConsumerBase<RowData> kafkaConsumer = 
getKafkaConsumer(deserializationSchema);
-               return SourceFunctionProvider.of(kafkaConsumer, false);
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-               final KafkaDynamicSourceBase that = (KafkaDynamicSourceBase) o;
-               return Objects.equals(outputDataType, that.outputDataType) &&
-                       Objects.equals(topics, that.topics) &&
-                       Objects.equals(topicPattern, topicPattern) &&
-                       Objects.equals(properties, that.properties) &&
-                       Objects.equals(decodingFormat, that.decodingFormat) &&
-                       startupMode == that.startupMode &&
-                       Objects.equals(specificStartupOffsets, 
that.specificStartupOffsets) &&
-                       startupTimestampMillis == that.startupTimestampMillis;
-       }
-
-       @Override
-       public int hashCode() {
-               return Objects.hash(
-                       outputDataType,
-                       topics,
-                       topicPattern,
-                       properties,
-                       decodingFormat,
-                       startupMode,
-                       specificStartupOffsets,
-                       startupTimestampMillis);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // Abstract methods for subclasses
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * Creates a version-specific Kafka consumer.
-        *
-        * @param topics                Kafka topic to consume.
-        * @param properties            Properties for the Kafka consumer.
-        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
-        * @return The version-specific Kafka consumer
-        */
-       protected abstract FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
-                       List<String> topics,
-                       Properties properties,
-                       DeserializationSchema<RowData> deserializationSchema);
-
-       /**
-        * Creates a version-specific Kafka consumer.
-        *
-        * @param topicPattern          afka topic to consume.
-        * @param properties            Properties for the Kafka consumer.
-        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
-        * @return The version-specific Kafka consumer
-        */
-       protected abstract FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
-                       Pattern topicPattern,
-                       Properties properties,
-                       DeserializationSchema<RowData> deserializationSchema);
-
-       // 
--------------------------------------------------------------------------------------------
-       // Utilities
-       // 
--------------------------------------------------------------------------------------------
-
-       /**
-        * Returns a version-specific Kafka consumer with the start position 
configured.
-
-        * @param deserializationSchema Deserialization schema to use for Kafka 
records.
-        * @return The version-specific Kafka consumer
-        */
-       protected FlinkKafkaConsumerBase<RowData> 
getKafkaConsumer(DeserializationSchema<RowData> deserializationSchema) {
-               FlinkKafkaConsumerBase<RowData> kafkaConsumer = topics != null ?
-                       createKafkaConsumer(topics, properties, 
deserializationSchema) :
-                       createKafkaConsumer(topicPattern, properties, 
deserializationSchema);
-
-               switch (startupMode) {
-                       case EARLIEST:
-                               kafkaConsumer.setStartFromEarliest();
-                               break;
-                       case LATEST:
-                               kafkaConsumer.setStartFromLatest();
-                               break;
-                       case GROUP_OFFSETS:
-                               kafkaConsumer.setStartFromGroupOffsets();
-                               break;
-                       case SPECIFIC_OFFSETS:
-                               
kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
-                               break;
-                       case TIMESTAMP:
-                               
kafkaConsumer.setStartFromTimestamp(startupTimestampMillis);
-                               break;
-                       }
-               
kafkaConsumer.setCommitOffsetsOnCheckpoints(properties.getProperty("group.id") 
!= null);
-               return kafkaConsumer;
-       }
-}
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
index 2a74f30..b304228 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,32 +18,156 @@
 
 package org.apache.flink.streaming.connectors.kafka.table;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
 import org.apache.flink.table.types.DataType;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
 import java.util.regex.Pattern;
 
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_GROUP_ID;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_MODE;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_TOPIC_PARTITION_DISCOVERY;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_PARTITIONER;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_SEMANTIC;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.StartupOptions;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC_PATTERN;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getFlinkKafkaPartitioner;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getKafkaProperties;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getStartupOptions;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSinkOptions;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSourceOptions;
+
 /**
- * Factory for creating configured instances of {@link KafkaDynamicSource}.
+ * Factory for creating configured instances of {@link KafkaDynamicSource} and 
{@link KafkaDynamicSink}.
  */
-public class KafkaDynamicTableFactory extends KafkaDynamicTableFactoryBase {
+@Internal
+public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, 
DynamicTableSinkFactory {
+
        public static final String IDENTIFIER = "kafka";
 
        @Override
-       protected KafkaDynamicSourceBase createKafkaTableSource(
+       public String factoryIdentifier() {
+               return IDENTIFIER;
+       }
+
+       @Override
+       public Set<ConfigOption<?>> requiredOptions() {
+               final Set<ConfigOption<?>> options = new HashSet<>();
+               options.add(FactoryUtil.FORMAT);
+               options.add(PROPS_BOOTSTRAP_SERVERS);
+               return options;
+       }
+
+       @Override
+       public Set<ConfigOption<?>> optionalOptions() {
+               final Set<ConfigOption<?>> options = new HashSet<>();
+               options.add(TOPIC);
+               options.add(TOPIC_PATTERN);
+               options.add(PROPS_GROUP_ID);
+               options.add(SCAN_STARTUP_MODE);
+               options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
+               options.add(SCAN_TOPIC_PARTITION_DISCOVERY);
+               options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
+               options.add(SINK_PARTITIONER);
+               options.add(SINK_SEMANTIC);
+               return options;
+       }
+
+       @Override
+       public DynamicTableSource createDynamicTableSource(Context context) {
+               FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+
+               ReadableConfig tableOptions = helper.getOptions();
+               DecodingFormat<DeserializationSchema<RowData>> decodingFormat = 
helper.discoverDecodingFormat(
+                               DeserializationFormatFactory.class,
+                               FactoryUtil.FORMAT);
+               // Validate the option data type.
+               helper.validateExcept(PROPERTIES_PREFIX);
+               // Validate the option values.
+               validateTableSourceOptions(tableOptions);
+
+               DataType producedDataType = 
context.getCatalogTable().getSchema().toPhysicalRowDataType();
+
+               final StartupOptions startupOptions = 
getStartupOptions(tableOptions);
+               final Properties properties = 
getKafkaProperties(context.getCatalogTable().getOptions());
+               // add topic-partition discovery
+               
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
+                       String.valueOf(tableOptions
+                               .getOptional(SCAN_TOPIC_PARTITION_DISCOVERY)
+                               .map(Duration::toMillis)
+                               
.orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED)));
+
+               return createKafkaTableSource(
+                       producedDataType,
+                       KafkaOptions.getSourceTopics(tableOptions),
+                       KafkaOptions.getSourceTopicPattern(tableOptions),
+                       properties,
+                       decodingFormat,
+                       startupOptions.startupMode,
+                       startupOptions.specificOffsets,
+                       startupOptions.startupTimestampMillis);
+       }
+
+       @Override
+       public DynamicTableSink createDynamicTableSink(Context context) {
+               FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+
+               ReadableConfig tableOptions = helper.getOptions();
+
+               EncodingFormat<SerializationSchema<RowData>> encodingFormat = 
helper.discoverEncodingFormat(
+                               SerializationFormatFactory.class,
+                               FactoryUtil.FORMAT);
+
+               // Validate the option values.
+               validateTableSinkOptions(tableOptions);
+               // Validate the option data type.
+               helper.validateExcept(PROPERTIES_PREFIX);
+
+               DataType consumedDataType = 
context.getCatalogTable().getSchema().toPhysicalRowDataType();
+               return createKafkaTableSink(
+                               consumedDataType,
+                               tableOptions.get(TOPIC).get(0),
+                               
getKafkaProperties(context.getCatalogTable().getOptions()),
+                               getFlinkKafkaPartitioner(tableOptions, 
context.getClassLoader()),
+                               encodingFormat,
+                               getSinkSemantic(tableOptions));
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       protected KafkaDynamicSource createKafkaTableSource(
                        DataType producedDataType,
                        @Nullable List<String> topics,
                        @Nullable Pattern topicPattern,
@@ -62,8 +187,7 @@ public class KafkaDynamicTableFactory extends 
KafkaDynamicTableFactoryBase {
                                startupTimestampMillis);
        }
 
-       @Override
-       protected KafkaDynamicSinkBase createKafkaTableSink(
+       protected KafkaDynamicSink createKafkaTableSink(
                        DataType consumedDataType,
                        String topic,
                        Properties properties,
@@ -78,9 +202,4 @@ public class KafkaDynamicTableFactory extends 
KafkaDynamicTableFactoryBase {
                                encodingFormat,
                                semantic);
        }
-
-       @Override
-       public String factoryIdentifier() {
-               return IDENTIFIER;
-       }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
deleted file mode 100644
index 5136ba4..0000000
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.table;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.table.connector.format.DecodingFormat;
-import org.apache.flink.table.connector.format.EncodingFormat;
-import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.DeserializationFormatFactory;
-import org.apache.flink.table.factories.DynamicTableSinkFactory;
-import org.apache.flink.table.factories.DynamicTableSourceFactory;
-import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.factories.SerializationFormatFactory;
-import org.apache.flink.table.types.DataType;
-
-import javax.annotation.Nullable;
-
-import java.time.Duration;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_GROUP_ID;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_MODE;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_TOPIC_PARTITION_DISCOVERY;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_PARTITIONER;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_SEMANTIC;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.StartupOptions;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC_PATTERN;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getFlinkKafkaPartitioner;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getKafkaProperties;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getStartupOptions;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSinkOptions;
-import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSourceOptions;
-
-/**
- * Factory for creating configured instances of
- * {@link KafkaDynamicSourceBase} and {@link KafkaDynamicSinkBase}.
- */
-public abstract class KafkaDynamicTableFactoryBase implements
-               DynamicTableSourceFactory,
-               DynamicTableSinkFactory {
-
-       @Override
-       public DynamicTableSource createDynamicTableSource(Context context) {
-               FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
-
-               ReadableConfig tableOptions = helper.getOptions();
-               DecodingFormat<DeserializationSchema<RowData>> decodingFormat = 
helper.discoverDecodingFormat(
-                               DeserializationFormatFactory.class,
-                               FactoryUtil.FORMAT);
-               // Validate the option data type.
-               helper.validateExcept(PROPERTIES_PREFIX);
-               // Validate the option values.
-               validateTableSourceOptions(tableOptions);
-
-               DataType producedDataType = 
context.getCatalogTable().getSchema().toPhysicalRowDataType();
-
-               final StartupOptions startupOptions = 
getStartupOptions(tableOptions);
-               final Properties properties = 
getKafkaProperties(context.getCatalogTable().getOptions());
-               // add topic-partition discovery
-               
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
-                       String.valueOf(tableOptions
-                               .getOptional(SCAN_TOPIC_PARTITION_DISCOVERY)
-                               .map(Duration::toMillis)
-                               
.orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED)));
-
-               return createKafkaTableSource(
-                       producedDataType,
-                       KafkaOptions.getSourceTopics(tableOptions),
-                       KafkaOptions.getSourceTopicPattern(tableOptions),
-                       properties,
-                       decodingFormat,
-                       startupOptions.startupMode,
-                       startupOptions.specificOffsets,
-                       startupOptions.startupTimestampMillis);
-       }
-
-       @Override
-       public DynamicTableSink createDynamicTableSink(Context context) {
-               FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
-
-               ReadableConfig tableOptions = helper.getOptions();
-
-               EncodingFormat<SerializationSchema<RowData>> encodingFormat = 
helper.discoverEncodingFormat(
-                               SerializationFormatFactory.class,
-                               FactoryUtil.FORMAT);
-
-               // Validate the option values.
-               validateTableSinkOptions(tableOptions);
-               // Validate the option data type.
-               helper.validateExcept(PROPERTIES_PREFIX);
-
-               DataType consumedDataType = 
context.getCatalogTable().getSchema().toPhysicalRowDataType();
-               return createKafkaTableSink(
-                               consumedDataType,
-                               tableOptions.get(TOPIC).get(0),
-                               
getKafkaProperties(context.getCatalogTable().getOptions()),
-                               getFlinkKafkaPartitioner(tableOptions, 
context.getClassLoader()),
-                               encodingFormat,
-                               getSinkSemantic(tableOptions));
-       }
-
-       /**
-        * Constructs the version-specific Kafka table source.
-        *
-        * @param producedDataType       Source produced data type
-        * @param topics                 Kafka topics to consume
-        * @param topicPattern           Kafka topic pattern to consume
-        * @param properties             Properties for the Kafka consumer
-        * @param decodingFormat         Decoding format for decoding records 
from Kafka
-        * @param startupMode            Startup mode for the contained consumer
-        * @param specificStartupOffsets Specific startup offsets; only 
relevant when startup
-        *                               mode is {@link 
StartupMode#SPECIFIC_OFFSETS}
-        */
-       protected abstract KafkaDynamicSourceBase createKafkaTableSource(
-                       DataType producedDataType,
-                       @Nullable List<String> topics,
-                       @Nullable Pattern topicPattern,
-                       Properties properties,
-                       DecodingFormat<DeserializationSchema<RowData>> 
decodingFormat,
-                       StartupMode startupMode,
-                       Map<KafkaTopicPartition, Long> specificStartupOffsets,
-                       long startupTimestampMillis);
-
-       /**
-        * Constructs the version-specific Kafka table sink.
-        *
-        * @param consumedDataType Sink consumed data type
-        * @param topic            Kafka topic to consume
-        * @param properties       Properties for the Kafka consumer
-        * @param partitioner      Partitioner to select Kafka partition for 
each item
-        * @param encodingFormat   Encoding format for encoding records to Kafka
-        */
-       protected abstract KafkaDynamicSinkBase createKafkaTableSink(
-                       DataType consumedDataType,
-                       String topic,
-                       Properties properties,
-                       Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-                       EncodingFormat<SerializationSchema<RowData>> 
encodingFormat,
-                       KafkaSinkSemantic semantic);
-
-       @Override
-       public Set<ConfigOption<?>> requiredOptions() {
-               final Set<ConfigOption<?>> options = new HashSet<>();
-               options.add(FactoryUtil.FORMAT);
-               options.add(PROPS_BOOTSTRAP_SERVERS);
-               return options;
-       }
-
-       @Override
-       public Set<ConfigOption<?>> optionalOptions() {
-               final Set<ConfigOption<?>> options = new HashSet<>();
-               options.add(TOPIC);
-               options.add(TOPIC_PATTERN);
-               options.add(PROPS_GROUP_ID);
-               options.add(SCAN_STARTUP_MODE);
-               options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
-               options.add(SCAN_TOPIC_PARTITION_DISCOVERY);
-               options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
-               options.add(SINK_PARTITIONER);
-               options.add(SINK_SEMANTIC);
-               return options;
-       }
-}
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
index 40c3712..c2b38e8 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
index abf6f85..14da7a3 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,49 +20,484 @@ package org.apache.flink.streaming.connectors.kafka.table;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
-import org.apache.flink.streaming.connectors.kafka.KafkaTableSink;
-import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
-import org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TestFormatFactory;
+import 
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import javax.annotation.Nullable;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.function.Consumer;
 import java.util.regex.Pattern;
 
+import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
 /**
- * Test for {@link KafkaTableSource} and {@link KafkaTableSink} created
- * by {@link KafkaTableSourceSinkFactory}.
+ * Abstract test base for {@link KafkaDynamicTableFactory}.
  */
-public class KafkaDynamicTableFactoryTest extends 
KafkaDynamicTableFactoryTestBase {
-       @Override
-       protected String factoryIdentifier() {
-               return KafkaDynamicTableFactory.IDENTIFIER;
+public class KafkaDynamicTableFactoryTest extends TestLogger {
+
+       @Rule
+       public ExpectedException thrown = ExpectedException.none();
+
+       private static final String TOPIC = "myTopic";
+       private static final String TOPICS = "myTopic-1;myTopic-2;myTopic-3";
+       private static final String TOPIC_REGEX = "myTopic-\\d+";
+       private static final List<String> TOPIC_LIST = 
Arrays.asList("myTopic-1", "myTopic-2", "myTopic-3");
+       private static final int PARTITION_0 = 0;
+       private static final long OFFSET_0 = 100L;
+       private static final int PARTITION_1 = 1;
+       private static final long OFFSET_1 = 123L;
+       private static final String NAME = "name";
+       private static final String COUNT = "count";
+       private static final String TIME = "time";
+       private static final String WATERMARK_EXPRESSION = TIME + " - INTERVAL 
'5' SECOND";
+       private static final DataType WATERMARK_DATATYPE = 
DataTypes.TIMESTAMP(3);
+       private static final String COMPUTED_COLUMN_NAME = "computed-column";
+       private static final String COMPUTED_COLUMN_EXPRESSION = COUNT + " + 
1.0";
+       private static final DataType COMPUTED_COLUMN_DATATYPE = 
DataTypes.DECIMAL(10, 3);
+       private static final String DISCOVERY_INTERVAL = "1000 ms";
+
+       private static final Properties KAFKA_SOURCE_PROPERTIES = new 
Properties();
+       private static final Properties KAFKA_SINK_PROPERTIES = new 
Properties();
+       static {
+               KAFKA_SOURCE_PROPERTIES.setProperty("group.id", "dummy");
+               KAFKA_SOURCE_PROPERTIES.setProperty("bootstrap.servers", 
"dummy");
+               
KAFKA_SOURCE_PROPERTIES.setProperty("flink.partition-discovery.interval-millis",
 "1000");
+
+               KAFKA_SINK_PROPERTIES.setProperty("group.id", "dummy");
+               KAFKA_SINK_PROPERTIES.setProperty("bootstrap.servers", "dummy");
+       }
+
+       private static final String PROPS_SCAN_OFFSETS =
+                       
String.format("partition:%d,offset:%d;partition:%d,offset:%d",
+                                       PARTITION_0, OFFSET_0, PARTITION_1, 
OFFSET_1);
+
+       private static final TableSchema SOURCE_SCHEMA = TableSchema.builder()
+                       .field(NAME, DataTypes.STRING())
+                       .field(COUNT, DataTypes.DECIMAL(38, 18))
+                       .field(TIME, DataTypes.TIMESTAMP(3))
+                       .field(COMPUTED_COLUMN_NAME, COMPUTED_COLUMN_DATATYPE, 
COMPUTED_COLUMN_EXPRESSION)
+                               .watermark(TIME, WATERMARK_EXPRESSION, 
WATERMARK_DATATYPE)
+                               .build();
+
+       private static final TableSchema SINK_SCHEMA = TableSchema.builder()
+                       .field(NAME, DataTypes.STRING())
+                       .field(COUNT, DataTypes.DECIMAL(38, 18))
+                       .field(TIME, DataTypes.TIMESTAMP(3))
+                       .build();
+
+       @Test
+       public void testTableSource() {
+               // prepare parameters for Kafka table source
+               final DataType producedDataType = 
SOURCE_SCHEMA.toPhysicalRowDataType();
+
+               final Map<KafkaTopicPartition, Long> specificOffsets = new 
HashMap<>();
+               specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_0), OFFSET_0);
+               specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_1), OFFSET_1);
+
+               DecodingFormat<DeserializationSchema<RowData>> decodingFormat =
+                               new TestFormatFactory.DecodingFormatMock(",", 
true);
+
+               // Construct table source using options and table source factory
+               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+                               "default",
+                               "default",
+                               "scanTable");
+               CatalogTable catalogTable = createKafkaSourceCatalogTable();
+               final DynamicTableSource actualSource = 
FactoryUtil.createTableSource(null,
+                               objectIdentifier,
+                               catalogTable,
+                               new Configuration(),
+                               Thread.currentThread().getContextClassLoader(),
+                               false);
+
+               // Test scan source equals
+               final KafkaDynamicSource expectedKafkaSource = 
createExpectedScanSource(
+                               producedDataType,
+                               Collections.singletonList(TOPIC),
+                               null,
+                               KAFKA_SOURCE_PROPERTIES,
+                               decodingFormat,
+                               StartupMode.SPECIFIC_OFFSETS,
+                               specificOffsets,
+                               0);
+               final KafkaDynamicSource actualKafkaSource = 
(KafkaDynamicSource) actualSource;
+               assertEquals(actualKafkaSource, expectedKafkaSource);
+
+               // Test Kafka consumer
+               ScanTableSource.ScanRuntimeProvider provider =
+                               
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+               assertThat(provider, instanceOf(SourceFunctionProvider.class));
+               final SourceFunctionProvider sourceFunctionProvider = 
(SourceFunctionProvider) provider;
+               final SourceFunction<RowData> sourceFunction = 
sourceFunctionProvider.createSourceFunction();
+               assertThat(sourceFunction, 
instanceOf(FlinkKafkaConsumer.class));
+               //  Test commitOnCheckpoints flag should be true when set 
consumer group
+               assertTrue(((FlinkKafkaConsumer<?>) 
sourceFunction).getEnableCommitOnCheckpoints());
+       }
+
+       @Test
+       public void testTableSourceCommitOnCheckpointsDisabled() {
+               //Construct table source using options and table source factory
+               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+                       "default",
+                       "default",
+                       "scanTable");
+               Map<String, String> tableOptions = getFullSourceOptions();
+               tableOptions.remove("properties.group.id");
+               CatalogTable catalogTable = 
createKafkaSourceCatalogTable(tableOptions);
+               final DynamicTableSource tableSource = 
FactoryUtil.createTableSource(null,
+                       objectIdentifier,
+                       catalogTable,
+                       new Configuration(),
+                       Thread.currentThread().getContextClassLoader(),
+                       false);
+
+               // Test commitOnCheckpoints flag should be false when do not 
set consumer group.
+               assertThat(tableSource, instanceOf(KafkaDynamicSource.class));
+               ScanTableSource.ScanRuntimeProvider providerWithoutGroupId = 
((KafkaDynamicSource) tableSource)
+                       
.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+               assertThat(providerWithoutGroupId, 
instanceOf(SourceFunctionProvider.class));
+               final SourceFunctionProvider functionProviderWithoutGroupId = 
(SourceFunctionProvider) providerWithoutGroupId;
+               final SourceFunction<RowData> function = 
functionProviderWithoutGroupId.createSourceFunction();
+               assertFalse(((FlinkKafkaConsumer<?>) 
function).getEnableCommitOnCheckpoints());
+       }
+
+       @Test
+       public void testTableSourceWithPattern() {
+               // prepare parameters for Kafka table source
+               final DataType producedDataType = 
SOURCE_SCHEMA.toPhysicalRowDataType();
+
+               final Map<KafkaTopicPartition, Long> specificOffsets = new 
HashMap<>();
+
+               DecodingFormat<DeserializationSchema<RowData>> decodingFormat =
+                       new TestFormatFactory.DecodingFormatMock(",", true);
+
+               // Construct table source using options and table source factory
+               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+                       "default",
+                       "default",
+                       "scanTable");
+               final Map<String, String> modifiedOptions = getModifiedOptions(
+                       getFullSourceOptions(),
+                       options -> {
+                               options.remove("topic");
+                               options.put("topic-pattern", TOPIC_REGEX);
+                               options.put("scan.startup.mode", 
KafkaOptions.SCAN_STARTUP_MODE_VALUE_EARLIEST);
+                               options.remove("scan.startup.specific-offsets");
+                       });
+               CatalogTable catalogTable = 
createKafkaSourceCatalogTable(modifiedOptions);
+
+               final DynamicTableSource actualSource = 
FactoryUtil.createTableSource(null,
+                       objectIdentifier,
+                       catalogTable,
+                       new Configuration(),
+                       Thread.currentThread().getContextClassLoader(),
+                       false);
+
+               // Test scan source equals
+               final KafkaDynamicSource expectedKafkaSource = 
createExpectedScanSource(
+                       producedDataType,
+                       null,
+                       Pattern.compile(TOPIC_REGEX),
+                       KAFKA_SOURCE_PROPERTIES,
+                       decodingFormat,
+                       StartupMode.EARLIEST,
+                       specificOffsets,
+                       0);
+               final KafkaDynamicSource actualKafkaSource = 
(KafkaDynamicSource) actualSource;
+               assertEquals(actualKafkaSource, expectedKafkaSource);
+
+               // Test Kafka consumer
+               ScanTableSource.ScanRuntimeProvider provider =
+                       
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+               assertThat(provider, instanceOf(SourceFunctionProvider.class));
+               final SourceFunctionProvider sourceFunctionProvider = 
(SourceFunctionProvider) provider;
+               final SourceFunction<RowData> sourceFunction = 
sourceFunctionProvider.createSourceFunction();
+               assertThat(sourceFunction, 
instanceOf(FlinkKafkaConsumer.class));
+       }
+
+       @Test
+       public void testTableSink() {
+               final DataType consumedDataType = 
SINK_SCHEMA.toPhysicalRowDataType();
+               EncodingFormat<SerializationSchema<RowData>> encodingFormat =
+                               new TestFormatFactory.EncodingFormatMock(",");
+
+               // Construct table sink using options and table sink factory.
+               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+                               "default",
+                               "default",
+                               "sinkTable");
+               final CatalogTable sinkTable = createKafkaSinkCatalogTable();
+               final DynamicTableSink actualSink = FactoryUtil.createTableSink(
+                               null,
+                               objectIdentifier,
+                               sinkTable,
+                               new Configuration(),
+                               Thread.currentThread().getContextClassLoader(),
+                               false);
+
+               final DynamicTableSink expectedSink = createExpectedSink(
+                               consumedDataType,
+                               TOPIC,
+                               KAFKA_SINK_PROPERTIES,
+                               Optional.of(new FlinkFixedPartitioner<>()),
+                               encodingFormat,
+                               KafkaSinkSemantic.EXACTLY_ONCE
+                       );
+               assertEquals(expectedSink, actualSink);
+
+               // Test sink format.
+               final KafkaDynamicSink actualKafkaSink = (KafkaDynamicSink) 
actualSink;
+               assertEquals(encodingFormat, actualKafkaSink.encodingFormat);
+
+               // Test kafka producer.
+               DynamicTableSink.SinkRuntimeProvider provider =
+                               actualKafkaSink.getSinkRuntimeProvider(new 
SinkRuntimeProviderContext(false));
+               assertThat(provider, instanceOf(SinkFunctionProvider.class));
+               final SinkFunctionProvider sinkFunctionProvider = 
(SinkFunctionProvider) provider;
+               final SinkFunction<RowData> sinkFunction = 
sinkFunctionProvider.createSinkFunction();
+               assertThat(sinkFunction, instanceOf(FlinkKafkaProducer.class));
        }
 
-       @Override
-       protected Class<?> getExpectedConsumerClass() {
-               return FlinkKafkaConsumer.class;
+       // 
--------------------------------------------------------------------------------------------
+       // Negative tests
+       // 
--------------------------------------------------------------------------------------------
+       @Test
+       public void testInvalidScanStartupMode() {
+               // Construct table source using DDL and table source factory
+               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+                               "default",
+                               "default",
+                               "scanTable");
+               final Map<String, String> modifiedOptions = getModifiedOptions(
+                               getFullSourceOptions(),
+                               options -> options.put("scan.startup.mode", 
"abc"));
+               CatalogTable catalogTable = 
createKafkaSourceCatalogTable(modifiedOptions);
+
+               thrown.expect(ValidationException.class);
+               thrown.expect(containsCause(new ValidationException("Invalid 
value for option 'scan.startup.mode'. "
+                               + "Supported values are [earliest-offset, 
latest-offset, group-offsets, specific-offsets, timestamp], "
+                               + "but was: abc")));
+               FactoryUtil.createTableSource(null,
+                               objectIdentifier,
+                               catalogTable,
+                               new Configuration(),
+                               Thread.currentThread().getContextClassLoader(),
+                               false);
+       }
+
+       @Test
+       public void testSourceTableWithTopicAndTopicPattern() {
+               // Construct table source using DDL and table source factory
+               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+                       "default",
+                       "default",
+                       "scanTable");
+               final Map<String, String> modifiedOptions = getModifiedOptions(
+                       getFullSourceOptions(),
+                       options -> {
+                               options.put("topic", TOPICS);
+                               options.put("topic-pattern", TOPIC_REGEX);
+                       });
+               CatalogTable catalogTable = 
createKafkaSourceCatalogTable(modifiedOptions);
+
+               thrown.expect(ValidationException.class);
+               thrown.expect(containsCause(new ValidationException("Option 
'topic' and 'topic-pattern' shouldn't be set together.")));
+               FactoryUtil.createTableSource(null,
+                       objectIdentifier,
+                       catalogTable,
+                       new Configuration(),
+                       Thread.currentThread().getContextClassLoader(),
+                       false);
+       }
+
+       @Test
+       public void testMissingStartupTimestamp() {
+               // Construct table source using DDL and table source factory
+               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+                               "default",
+                               "default",
+                               "scanTable");
+               final Map<String, String> modifiedOptions = getModifiedOptions(
+                               getFullSourceOptions(),
+                               options -> options.put("scan.startup.mode", 
"timestamp"));
+               CatalogTable catalogTable = 
createKafkaSourceCatalogTable(modifiedOptions);
+
+               thrown.expect(ValidationException.class);
+               thrown.expect(containsCause(new 
ValidationException("'scan.startup.timestamp-millis' "
+                               + "is required in 'timestamp' startup mode but 
missing.")));
+               FactoryUtil.createTableSource(null,
+                               objectIdentifier,
+                               catalogTable,
+                               new Configuration(),
+                               Thread.currentThread().getContextClassLoader(),
+                               false);
        }
 
-       @Override
-       protected Class<?> getExpectedProducerClass() {
-               return FlinkKafkaProducer.class;
+       @Test
+       public void testMissingSpecificOffsets() {
+               // Construct table source using DDL and table source factory
+               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+                               "default",
+                               "default",
+                               "scanTable");
+               final Map<String, String> modifiedOptions = getModifiedOptions(
+                               getFullSourceOptions(),
+                               options -> 
options.remove("scan.startup.specific-offsets"));
+               CatalogTable catalogTable = 
createKafkaSourceCatalogTable(modifiedOptions);
+
+               thrown.expect(ValidationException.class);
+               thrown.expect(containsCause(new 
ValidationException("'scan.startup.specific-offsets' "
+                               + "is required in 'specific-offsets' startup 
mode but missing.")));
+               FactoryUtil.createTableSource(null,
+                               objectIdentifier,
+                               catalogTable,
+                               new Configuration(),
+                               Thread.currentThread().getContextClassLoader(),
+                               false);
        }
 
-       @Override
-       protected KafkaDynamicSourceBase getExpectedScanSource(
+       @Test
+       public void testInvalidSinkPartitioner() {
+               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+                               "default",
+                               "default",
+                               "sinkTable");
+
+               final Map<String, String> modifiedOptions = getModifiedOptions(
+                               getFullSourceOptions(),
+                               options -> options.put("sink.partitioner", 
"abc"));
+               final CatalogTable sinkTable = 
createKafkaSinkCatalogTable(modifiedOptions);
+
+               thrown.expect(ValidationException.class);
+               thrown.expect(containsCause(new ValidationException("Could not 
find and instantiate partitioner class 'abc'")));
+               FactoryUtil.createTableSink(
+                               null,
+                               objectIdentifier,
+                               sinkTable,
+                               new Configuration(),
+                               Thread.currentThread().getContextClassLoader(),
+                               false);
+       }
+
+       @Test
+       public void testInvalidSinkSemantic(){
+               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+                       "default",
+                       "default",
+                       "sinkTable");
+
+               final Map<String, String> modifiedOptions = getModifiedOptions(
+                       getFullSourceOptions(),
+                       options -> options.put("sink.semantic", "xyz"));
+               final CatalogTable sinkTable = 
createKafkaSinkCatalogTable(modifiedOptions);
+
+               thrown.expect(ValidationException.class);
+               thrown.expect(containsCause(new 
ValidationException("Unsupported value 'xyz' for 'sink.semantic'. Supported 
values are ['at-least-once', 'exactly-once', 'none'].")));
+               FactoryUtil.createTableSink(
+                       null,
+                       objectIdentifier,
+                       sinkTable,
+                       new Configuration(),
+                       Thread.currentThread().getContextClassLoader(),
+                       false);
+       }
+
+       @Test
+       public void testSinkWithTopicListOrTopicPattern(){
+               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
+                       "default",
+                       "default",
+                       "sinkTable");
+
+               Map<String, String> modifiedOptions = getModifiedOptions(
+                       getFullSourceOptions(),
+                       options -> {
+                               options.put("topic", TOPICS);
+                               options.put("scan.startup.mode", 
"earliest-offset");
+                               options.remove("specific-offsets");
+                       });
+               CatalogTable sinkTable = 
createKafkaSinkCatalogTable(modifiedOptions);
+               String errorMessageTemp = "Flink Kafka sink currently only 
supports single topic, but got %s: %s.";
+
+               try {
+                       FactoryUtil.createTableSink(
+                               null,
+                               objectIdentifier,
+                               sinkTable,
+                               new Configuration(),
+                               Thread.currentThread().getContextClassLoader(),
+                               false);
+               } catch (Throwable t) {
+                       assertEquals(String.format(errorMessageTemp, "'topic'", 
String.format("[%s]", String.join(", ", TOPIC_LIST))),
+                               t.getCause().getMessage());
+               }
+
+               modifiedOptions = getModifiedOptions(
+                       getFullSourceOptions(),
+                       options -> options.put("topic-pattern", TOPIC_REGEX));
+               sinkTable = createKafkaSinkCatalogTable(modifiedOptions);
+
+               try {
+                       FactoryUtil.createTableSink(
+                               null,
+                               objectIdentifier,
+                               sinkTable,
+                               new Configuration(),
+                               Thread.currentThread().getContextClassLoader(),
+                               false);
+               } catch (Throwable t) {
+                       assertEquals(String.format(errorMessageTemp, 
"'topic-pattern'", TOPIC_REGEX), t.getCause().getMessage());
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Utilities
+       // 
--------------------------------------------------------------------------------------------
+
+       private static KafkaDynamicSource createExpectedScanSource(
                        DataType producedDataType,
                        @Nullable List<String> topics,
                        @Nullable Pattern topicPattern,
@@ -81,8 +517,7 @@ public class KafkaDynamicTableFactoryTest extends 
KafkaDynamicTableFactoryTestBa
                                startupTimestamp);
        }
 
-       @Override
-       protected KafkaDynamicSinkBase getExpectedSink(
+       private static KafkaDynamicSink createExpectedSink(
                        DataType consumedDataType,
                        String topic,
                        Properties properties,
@@ -97,4 +532,70 @@ public class KafkaDynamicTableFactoryTest extends 
KafkaDynamicTableFactoryTestBa
                                encodingFormat,
                                semantic);
        }
+
+       private static CatalogTable createKafkaSourceCatalogTable() {
+               return createKafkaSourceCatalogTable(getFullSourceOptions());
+       }
+
+       private static CatalogTable createKafkaSinkCatalogTable() {
+               return createKafkaSinkCatalogTable(getFullSinkOptions());
+       }
+
+       private static CatalogTable createKafkaSourceCatalogTable(Map<String, 
String> options) {
+               return new CatalogTableImpl(SOURCE_SCHEMA, options, 
"scanTable");
+       }
+
+       private static CatalogTable createKafkaSinkCatalogTable(Map<String, 
String> options) {
+               return new CatalogTableImpl(SINK_SCHEMA, options, "sinkTable");
+       }
+
+       /**
+        * Returns the full options modified by the given consumer {@code 
optionModifier}.
+        *
+        * @param optionModifier Consumer to modify the options
+        */
+       private static Map<String, String> getModifiedOptions(
+                       Map<String, String> options,
+                       Consumer<Map<String, String>> optionModifier) {
+               optionModifier.accept(options);
+               return options;
+       }
+
+       private static Map<String, String> getFullSourceOptions() {
+               Map<String, String> tableOptions = new HashMap<>();
+               // Kafka specific options.
+               tableOptions.put("connector", 
KafkaDynamicTableFactory.IDENTIFIER);
+               tableOptions.put("topic", TOPIC);
+               tableOptions.put("properties.group.id", "dummy");
+               tableOptions.put("properties.bootstrap.servers", "dummy");
+               tableOptions.put("scan.startup.mode", "specific-offsets");
+               tableOptions.put("scan.startup.specific-offsets", 
PROPS_SCAN_OFFSETS);
+               tableOptions.put("scan.topic-partition-discovery.interval", 
DISCOVERY_INTERVAL);
+               // Format options.
+               tableOptions.put("format", TestFormatFactory.IDENTIFIER);
+               final String formatDelimiterKey = String.format("%s.%s",
+                               TestFormatFactory.IDENTIFIER, 
TestFormatFactory.DELIMITER.key());
+               final String failOnMissingKey = String.format("%s.%s",
+                               TestFormatFactory.IDENTIFIER, 
TestFormatFactory.FAIL_ON_MISSING.key());
+               tableOptions.put(formatDelimiterKey, ",");
+               tableOptions.put(failOnMissingKey, "true");
+               return tableOptions;
+       }
+
+       private static Map<String, String> getFullSinkOptions() {
+               Map<String, String> tableOptions = new HashMap<>();
+               // Kafka specific options.
+               tableOptions.put("connector", 
KafkaDynamicTableFactory.IDENTIFIER);
+               tableOptions.put("topic", TOPIC);
+               tableOptions.put("properties.group.id", "dummy");
+               tableOptions.put("properties.bootstrap.servers", "dummy");
+               tableOptions.put("sink.partitioner", 
KafkaOptions.SINK_PARTITIONER_VALUE_FIXED);
+               tableOptions.put("sink.semantic", 
KafkaOptions.SINK_SEMANTIC_VALUE_EXACTLY_ONCE);
+               // Format options.
+               tableOptions.put("format", TestFormatFactory.IDENTIFIER);
+               final String formatDelimiterKey = String.format("%s.%s",
+                               TestFormatFactory.IDENTIFIER, 
TestFormatFactory.DELIMITER.key());
+               tableOptions.put(formatDelimiterKey, ",");
+               return tableOptions;
+       }
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
deleted file mode 100644
index 5d1938f..0000000
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
+++ /dev/null
@@ -1,606 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.table;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
-import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
-import org.apache.flink.table.catalog.ObjectIdentifier;
-import org.apache.flink.table.connector.format.DecodingFormat;
-import org.apache.flink.table.connector.format.EncodingFormat;
-import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.SinkFunctionProvider;
-import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.connector.source.ScanTableSource;
-import org.apache.flink.table.connector.source.SourceFunctionProvider;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.factories.TestFormatFactory;
-import 
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
-import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.function.Consumer;
-import java.util.regex.Pattern;
-
-import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Abstract test base for {@link KafkaDynamicTableFactoryBase}.
- */
-public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
-
-       @Rule
-       public ExpectedException thrown = ExpectedException.none();
-
-       private static final String TOPIC = "myTopic";
-       private static final String TOPICS = "myTopic-1;myTopic-2;myTopic-3";
-       private static final String TOPIC_REGEX = "myTopic-\\d+";
-       private static final List<String> TOPIC_LIST = 
Arrays.asList("myTopic-1", "myTopic-2", "myTopic-3");
-       private static final int PARTITION_0 = 0;
-       private static final long OFFSET_0 = 100L;
-       private static final int PARTITION_1 = 1;
-       private static final long OFFSET_1 = 123L;
-       private static final String NAME = "name";
-       private static final String COUNT = "count";
-       private static final String TIME = "time";
-       private static final String WATERMARK_EXPRESSION = TIME + " - INTERVAL 
'5' SECOND";
-       private static final DataType WATERMARK_DATATYPE = 
DataTypes.TIMESTAMP(3);
-       private static final String COMPUTED_COLUMN_NAME = "computed-column";
-       private static final String COMPUTED_COLUMN_EXPRESSION = COUNT + " + 
1.0";
-       private static final DataType COMPUTED_COLUMN_DATATYPE = 
DataTypes.DECIMAL(10, 3);
-       private static final String SEMANTIC = "exactly-once";
-       private static final String DISCOVERY_INTERVAL = "1000 ms";
-
-       private static final Properties KAFKA_SOURCE_PROPERTIES = new 
Properties();
-       private static final Properties KAFKA_SINK_PROPERTIES = new 
Properties();
-       static {
-               KAFKA_SOURCE_PROPERTIES.setProperty("group.id", "dummy");
-               KAFKA_SOURCE_PROPERTIES.setProperty("bootstrap.servers", 
"dummy");
-               
KAFKA_SOURCE_PROPERTIES.setProperty("flink.partition-discovery.interval-millis",
 "1000");
-
-               KAFKA_SINK_PROPERTIES.setProperty("group.id", "dummy");
-               KAFKA_SINK_PROPERTIES.setProperty("bootstrap.servers", "dummy");
-       }
-
-       private static final String PROPS_SCAN_OFFSETS =
-                       
String.format("partition:%d,offset:%d;partition:%d,offset:%d",
-                                       PARTITION_0, OFFSET_0, PARTITION_1, 
OFFSET_1);
-
-       private static final TableSchema SOURCE_SCHEMA = TableSchema.builder()
-                       .field(NAME, DataTypes.STRING())
-                       .field(COUNT, DataTypes.DECIMAL(38, 18))
-                       .field(TIME, DataTypes.TIMESTAMP(3))
-                       .field(COMPUTED_COLUMN_NAME, COMPUTED_COLUMN_DATATYPE, 
COMPUTED_COLUMN_EXPRESSION)
-                               .watermark(TIME, WATERMARK_EXPRESSION, 
WATERMARK_DATATYPE)
-                               .build();
-
-       private static final TableSchema SINK_SCHEMA = TableSchema.builder()
-                       .field(NAME, DataTypes.STRING())
-                       .field(COUNT, DataTypes.DECIMAL(38, 18))
-                       .field(TIME, DataTypes.TIMESTAMP(3))
-                       .build();
-
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testTableSource() {
-               // prepare parameters for Kafka table source
-               final DataType producedDataType = 
SOURCE_SCHEMA.toPhysicalRowDataType();
-
-               final Map<KafkaTopicPartition, Long> specificOffsets = new 
HashMap<>();
-               specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_0), OFFSET_0);
-               specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_1), OFFSET_1);
-
-               DecodingFormat<DeserializationSchema<RowData>> decodingFormat =
-                               new TestFormatFactory.DecodingFormatMock(",", 
true);
-
-               // Construct table source using options and table source factory
-               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
-                               "default",
-                               "default",
-                               "scanTable");
-               CatalogTable catalogTable = createKafkaSourceCatalogTable();
-               final DynamicTableSource actualSource = 
FactoryUtil.createTableSource(null,
-                               objectIdentifier,
-                               catalogTable,
-                               new Configuration(),
-                               Thread.currentThread().getContextClassLoader(),
-                               false);
-
-               // Test scan source equals
-               final KafkaDynamicSourceBase expectedKafkaSource = 
getExpectedScanSource(
-                               producedDataType,
-                               Collections.singletonList(TOPIC),
-                               null,
-                               KAFKA_SOURCE_PROPERTIES,
-                               decodingFormat,
-                               StartupMode.SPECIFIC_OFFSETS,
-                               specificOffsets,
-                               0);
-               final KafkaDynamicSourceBase actualKafkaSource = 
(KafkaDynamicSourceBase) actualSource;
-               assertEquals(actualKafkaSource, expectedKafkaSource);
-
-               // Test Kafka consumer
-               ScanTableSource.ScanRuntimeProvider provider =
-                               
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
-               assertThat(provider, instanceOf(SourceFunctionProvider.class));
-               final SourceFunctionProvider sourceFunctionProvider = 
(SourceFunctionProvider) provider;
-               final SourceFunction<RowData> sourceFunction = 
sourceFunctionProvider.createSourceFunction();
-               assertThat(sourceFunction, 
instanceOf(getExpectedConsumerClass()));
-               //  Test commitOnCheckpoints flag should be true when set 
consumer group
-               assertTrue(((FlinkKafkaConsumerBase) 
sourceFunction).getEnableCommitOnCheckpoints());
-       }
-
-       @Test
-       public void testTableSourceCommitOnCheckpointsDisabled() {
-               //Construct table source using options and table source factory
-               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
-                       "default",
-                       "default",
-                       "scanTable");
-               Map<String, String> tableOptions = getFullSourceOptions();
-               tableOptions.remove("properties.group.id");
-               CatalogTable catalogTable = 
createKafkaSourceCatalogTable(tableOptions);
-               final DynamicTableSource tableSource = 
FactoryUtil.createTableSource(null,
-                       objectIdentifier,
-                       catalogTable,
-                       new Configuration(),
-                       Thread.currentThread().getContextClassLoader(),
-                       false);
-
-               // Test commitOnCheckpoints flag should be false when do not 
set consumer group.
-               assertThat(tableSource, 
instanceOf(KafkaDynamicSourceBase.class));
-               ScanTableSource.ScanRuntimeProvider providerWithoutGroupId = 
((KafkaDynamicSourceBase) tableSource)
-                       
.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
-               assertThat(providerWithoutGroupId, 
instanceOf(SourceFunctionProvider.class));
-               final SourceFunctionProvider functionProviderWithoutGroupId = 
(SourceFunctionProvider) providerWithoutGroupId;
-               final SourceFunction<RowData> function = 
functionProviderWithoutGroupId.createSourceFunction();
-               assertFalse(((FlinkKafkaConsumerBase) 
function).getEnableCommitOnCheckpoints());
-       }
-
-       @Test
-       public void testTableSourceWithPattern() {
-               // prepare parameters for Kafka table source
-               final DataType producedDataType = 
SOURCE_SCHEMA.toPhysicalRowDataType();
-
-               final Map<KafkaTopicPartition, Long> specificOffsets = new 
HashMap<>();
-
-               DecodingFormat<DeserializationSchema<RowData>> decodingFormat =
-                       new TestFormatFactory.DecodingFormatMock(",", true);
-
-               // Construct table source using options and table source factory
-               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
-                       "default",
-                       "default",
-                       "scanTable");
-               final Map<String, String> modifiedOptions = getModifiedOptions(
-                       getFullSourceOptions(),
-                       options -> {
-                               options.remove("topic");
-                               options.put("topic-pattern", TOPIC_REGEX);
-                               options.put("scan.startup.mode", 
KafkaOptions.SCAN_STARTUP_MODE_VALUE_EARLIEST);
-                               options.remove("scan.startup.specific-offsets");
-                       });
-               CatalogTable catalogTable = 
createKafkaSourceCatalogTable(modifiedOptions);
-
-               final DynamicTableSource actualSource = 
FactoryUtil.createTableSource(null,
-                       objectIdentifier,
-                       catalogTable,
-                       new Configuration(),
-                       Thread.currentThread().getContextClassLoader(),
-                       false);
-
-               // Test scan source equals
-               final KafkaDynamicSourceBase expectedKafkaSource = 
getExpectedScanSource(
-                       producedDataType,
-                       null,
-                       Pattern.compile(TOPIC_REGEX),
-                       KAFKA_SOURCE_PROPERTIES,
-                       decodingFormat,
-                       StartupMode.EARLIEST,
-                       specificOffsets,
-                       0);
-               final KafkaDynamicSourceBase actualKafkaSource = 
(KafkaDynamicSourceBase) actualSource;
-               assertEquals(actualKafkaSource, expectedKafkaSource);
-
-               // Test Kafka consumer
-               ScanTableSource.ScanRuntimeProvider provider =
-                       
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
-               assertThat(provider, instanceOf(SourceFunctionProvider.class));
-               final SourceFunctionProvider sourceFunctionProvider = 
(SourceFunctionProvider) provider;
-               final SourceFunction<RowData> sourceFunction = 
sourceFunctionProvider.createSourceFunction();
-               assertThat(sourceFunction, 
instanceOf(getExpectedConsumerClass()));
-       }
-
-       @Test
-       public void testTableSink() {
-               final DataType consumedDataType = 
SINK_SCHEMA.toPhysicalRowDataType();
-               EncodingFormat<SerializationSchema<RowData>> encodingFormat =
-                               new TestFormatFactory.EncodingFormatMock(",");
-
-               // Construct table sink using options and table sink factory.
-               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
-                               "default",
-                               "default",
-                               "sinkTable");
-               final CatalogTable sinkTable = createKafkaSinkCatalogTable();
-               final DynamicTableSink actualSink = FactoryUtil.createTableSink(
-                               null,
-                               objectIdentifier,
-                               sinkTable,
-                               new Configuration(),
-                               Thread.currentThread().getContextClassLoader(),
-                               false);
-
-               final DynamicTableSink expectedSink = getExpectedSink(
-                               consumedDataType,
-                               TOPIC,
-                               KAFKA_SINK_PROPERTIES,
-                               Optional.of(new FlinkFixedPartitioner<>()),
-                               encodingFormat,
-                               KafkaSinkSemantic.EXACTLY_ONCE
-                       );
-               assertEquals(expectedSink, actualSink);
-
-               // Test sink format.
-               final KafkaDynamicSinkBase actualKafkaSink = 
(KafkaDynamicSinkBase) actualSink;
-               assertEquals(encodingFormat, actualKafkaSink.encodingFormat);
-
-               // Test kafka producer.
-               DynamicTableSink.SinkRuntimeProvider provider =
-                               actualKafkaSink.getSinkRuntimeProvider(new 
SinkRuntimeProviderContext(false));
-               assertThat(provider, instanceOf(SinkFunctionProvider.class));
-               final SinkFunctionProvider sinkFunctionProvider = 
(SinkFunctionProvider) provider;
-               final SinkFunction<RowData> sinkFunction = 
sinkFunctionProvider.createSinkFunction();
-               assertThat(sinkFunction, 
instanceOf(getExpectedProducerClass()));
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // Negative tests
-       // 
--------------------------------------------------------------------------------------------
-       @Test
-       public void testInvalidScanStartupMode() {
-               // Construct table source using DDL and table source factory
-               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
-                               "default",
-                               "default",
-                               "scanTable");
-               final Map<String, String> modifiedOptions = getModifiedOptions(
-                               getFullSourceOptions(),
-                               options -> {
-                                       options.put("scan.startup.mode", "abc");
-                               });
-               CatalogTable catalogTable = 
createKafkaSourceCatalogTable(modifiedOptions);
-
-               thrown.expect(ValidationException.class);
-               thrown.expect(containsCause(new ValidationException("Invalid 
value for option 'scan.startup.mode'. "
-                               + "Supported values are [earliest-offset, 
latest-offset, group-offsets, specific-offsets, timestamp], "
-                               + "but was: abc")));
-               FactoryUtil.createTableSource(null,
-                               objectIdentifier,
-                               catalogTable,
-                               new Configuration(),
-                               Thread.currentThread().getContextClassLoader(),
-                               false);
-       }
-
-       @Test
-       public void testSourceTableWithTopicAndTopicPattern() {
-               // Construct table source using DDL and table source factory
-               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
-                       "default",
-                       "default",
-                       "scanTable");
-               final Map<String, String> modifiedOptions = getModifiedOptions(
-                       getFullSourceOptions(),
-                       options -> {
-                               options.put("topic", TOPICS);
-                               options.put("topic-pattern", TOPIC_REGEX);
-                       });
-               CatalogTable catalogTable = 
createKafkaSourceCatalogTable(modifiedOptions);
-
-               thrown.expect(ValidationException.class);
-               thrown.expect(containsCause(new ValidationException("Option 
'topic' and 'topic-pattern' shouldn't be set together.")));
-               FactoryUtil.createTableSource(null,
-                       objectIdentifier,
-                       catalogTable,
-                       new Configuration(),
-                       Thread.currentThread().getContextClassLoader(),
-                       false);
-       }
-
-       @Test
-       public void testMissingStartupTimestamp() {
-               // Construct table source using DDL and table source factory
-               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
-                               "default",
-                               "default",
-                               "scanTable");
-               final Map<String, String> modifiedOptions = getModifiedOptions(
-                               getFullSourceOptions(),
-                               options -> {
-                                       options.put("scan.startup.mode", 
"timestamp");
-                               });
-               CatalogTable catalogTable = 
createKafkaSourceCatalogTable(modifiedOptions);
-
-               thrown.expect(ValidationException.class);
-               thrown.expect(containsCause(new 
ValidationException("'scan.startup.timestamp-millis' "
-                               + "is required in 'timestamp' startup mode but 
missing.")));
-               FactoryUtil.createTableSource(null,
-                               objectIdentifier,
-                               catalogTable,
-                               new Configuration(),
-                               Thread.currentThread().getContextClassLoader(),
-                               false);
-       }
-
-       @Test
-       public void testMissingSpecificOffsets() {
-               // Construct table source using DDL and table source factory
-               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
-                               "default",
-                               "default",
-                               "scanTable");
-               final Map<String, String> modifiedOptions = getModifiedOptions(
-                               getFullSourceOptions(),
-                               options -> {
-                                       
options.remove("scan.startup.specific-offsets");
-                               });
-               CatalogTable catalogTable = 
createKafkaSourceCatalogTable(modifiedOptions);
-
-               thrown.expect(ValidationException.class);
-               thrown.expect(containsCause(new 
ValidationException("'scan.startup.specific-offsets' "
-                               + "is required in 'specific-offsets' startup 
mode but missing.")));
-               FactoryUtil.createTableSource(null,
-                               objectIdentifier,
-                               catalogTable,
-                               new Configuration(),
-                               Thread.currentThread().getContextClassLoader(),
-                               false);
-       }
-
-       @Test
-       public void testInvalidSinkPartitioner() {
-               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
-                               "default",
-                               "default",
-                               "sinkTable");
-
-               final Map<String, String> modifiedOptions = getModifiedOptions(
-                               getFullSourceOptions(),
-                               options -> {
-                                       options.put("sink.partitioner", "abc");
-                               });
-               final CatalogTable sinkTable = 
createKafkaSinkCatalogTable(modifiedOptions);
-
-               thrown.expect(ValidationException.class);
-               thrown.expect(containsCause(new ValidationException("Could not 
find and instantiate partitioner class 'abc'")));
-               FactoryUtil.createTableSink(
-                               null,
-                               objectIdentifier,
-                               sinkTable,
-                               new Configuration(),
-                               Thread.currentThread().getContextClassLoader(),
-                               false);
-       }
-
-       @Test
-       public void testInvalidSinkSemantic(){
-               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
-                       "default",
-                       "default",
-                       "sinkTable");
-
-               final Map<String, String> modifiedOptions = getModifiedOptions(
-                       getFullSourceOptions(),
-                       options -> {
-                               options.put("sink.semantic", "xyz");
-                       });
-               final CatalogTable sinkTable = 
createKafkaSinkCatalogTable(modifiedOptions);
-
-               thrown.expect(ValidationException.class);
-               thrown.expect(containsCause(new 
ValidationException("Unsupported value 'xyz' for 'sink.semantic'. Supported 
values are ['at-least-once', 'exactly-once', 'none'].")));
-               FactoryUtil.createTableSink(
-                       null,
-                       objectIdentifier,
-                       sinkTable,
-                       new Configuration(),
-                       Thread.currentThread().getContextClassLoader(),
-                       false);
-       }
-
-       @Test
-       public void testSinkWithTopicListOrTopicPattern(){
-               ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
-                       "default",
-                       "default",
-                       "sinkTable");
-
-               Map<String, String> modifiedOptions = getModifiedOptions(
-                       getFullSourceOptions(),
-                       options -> {
-                               options.put("topic", TOPICS);
-                               options.put("scan.startup.mode", 
"earliest-offset");
-                               options.remove("specific-offsets");
-                       });
-               CatalogTable sinkTable = 
createKafkaSinkCatalogTable(modifiedOptions);
-               String errorMessageTemp = "Flink Kafka sink currently only 
supports single topic, but got %s: %s.";
-
-               try {
-                       FactoryUtil.createTableSink(
-                               null,
-                               objectIdentifier,
-                               sinkTable,
-                               new Configuration(),
-                               Thread.currentThread().getContextClassLoader(),
-                               false);
-               } catch (Throwable t) {
-                       assertEquals(String.format(errorMessageTemp, "'topic'", 
String.format("[%s]", String.join(", ", TOPIC_LIST))),
-                               t.getCause().getMessage());
-               }
-
-               modifiedOptions = getModifiedOptions(
-                       getFullSourceOptions(),
-                       options -> {
-                               options.put("topic-pattern", TOPIC_REGEX);
-                       });
-               sinkTable = createKafkaSinkCatalogTable(modifiedOptions);
-
-               try {
-                       FactoryUtil.createTableSink(
-                               null,
-                               objectIdentifier,
-                               sinkTable,
-                               new Configuration(),
-                               Thread.currentThread().getContextClassLoader(),
-                               false);
-               } catch (Throwable t) {
-                       assertEquals(String.format(errorMessageTemp, 
"'topic-pattern'", TOPIC_REGEX), t.getCause().getMessage());
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // Utilities
-       // 
--------------------------------------------------------------------------------------------
-
-       private CatalogTable createKafkaSourceCatalogTable() {
-               return createKafkaSourceCatalogTable(getFullSourceOptions());
-       }
-
-       private CatalogTable createKafkaSinkCatalogTable() {
-               return createKafkaSinkCatalogTable(getFullSinkOptions());
-       }
-
-       private CatalogTable createKafkaSourceCatalogTable(Map<String, String> 
options) {
-               return new CatalogTableImpl(SOURCE_SCHEMA, options, 
"scanTable");
-       }
-
-       protected CatalogTable createKafkaSinkCatalogTable(Map<String, String> 
options) {
-               return new CatalogTableImpl(SINK_SCHEMA, options, "sinkTable");
-       }
-
-       /**
-        * Returns the full options modified by the given consumer {@code 
optionModifier}.
-        *
-        * @param optionModifier Consumer to modify the options
-        */
-       protected static Map<String, String> getModifiedOptions(
-                       Map<String, String> options,
-                       Consumer<Map<String, String>> optionModifier) {
-               optionModifier.accept(options);
-               return options;
-       }
-
-       protected Map<String, String> getFullSourceOptions() {
-               Map<String, String> tableOptions = new HashMap<>();
-               // Kafka specific options.
-               tableOptions.put("connector", factoryIdentifier());
-               tableOptions.put("topic", TOPIC);
-               tableOptions.put("properties.group.id", "dummy");
-               tableOptions.put("properties.bootstrap.servers", "dummy");
-               tableOptions.put("scan.startup.mode", "specific-offsets");
-               tableOptions.put("scan.startup.specific-offsets", 
PROPS_SCAN_OFFSETS);
-               tableOptions.put("scan.topic-partition-discovery.interval", 
DISCOVERY_INTERVAL);
-               // Format options.
-               tableOptions.put("format", TestFormatFactory.IDENTIFIER);
-               final String formatDelimiterKey = String.format("%s.%s",
-                               TestFormatFactory.IDENTIFIER, 
TestFormatFactory.DELIMITER.key());
-               final String failOnMissingKey = String.format("%s.%s",
-                               TestFormatFactory.IDENTIFIER, 
TestFormatFactory.FAIL_ON_MISSING.key());
-               tableOptions.put(formatDelimiterKey, ",");
-               tableOptions.put(failOnMissingKey, "true");
-               return tableOptions;
-       }
-
-       protected Map<String, String> getFullSinkOptions() {
-               Map<String, String> tableOptions = new HashMap<>();
-               // Kafka specific options.
-               tableOptions.put("connector", factoryIdentifier());
-               tableOptions.put("topic", TOPIC);
-               tableOptions.put("properties.group.id", "dummy");
-               tableOptions.put("properties.bootstrap.servers", "dummy");
-               tableOptions.put("sink.partitioner", 
KafkaOptions.SINK_PARTITIONER_VALUE_FIXED);
-               tableOptions.put("sink.semantic", 
KafkaOptions.SINK_SEMANTIC_VALUE_EXACTLY_ONCE);
-               // Format options.
-               tableOptions.put("format", TestFormatFactory.IDENTIFIER);
-               final String formatDelimiterKey = String.format("%s.%s",
-                               TestFormatFactory.IDENTIFIER, 
TestFormatFactory.DELIMITER.key());
-               tableOptions.put(formatDelimiterKey, ",");
-               return tableOptions;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // For version-specific tests
-       // 
--------------------------------------------------------------------------------------------
-
-       protected abstract String factoryIdentifier();
-
-       protected abstract Class<?> getExpectedConsumerClass();
-
-       protected abstract Class<?> getExpectedProducerClass();
-
-       protected abstract KafkaDynamicSourceBase getExpectedScanSource(
-                       DataType producedDataType,
-                       List<String> topics,
-                       Pattern topicPattern,
-                       Properties properties,
-                       DecodingFormat<DeserializationSchema<RowData>> 
decodingFormat,
-                       StartupMode startupMode,
-                       Map<KafkaTopicPartition, Long> specificStartupOffsets,
-                       long startupTimestamp
-       );
-
-       protected abstract KafkaDynamicSinkBase getExpectedSink(
-                       DataType consumedDataType,
-                       String topic,
-                       Properties properties,
-                       Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-                       EncodingFormat<SerializationSchema<RowData>> 
encodingFormat,
-                       KafkaSinkSemantic semantic
-       );
-}

Reply via email to