This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c320e990790ecd804f34e187cd7dab4dcaa8300c Author: Timo Walther <twal...@apache.org> AuthorDate: Mon Oct 19 13:37:14 2020 +0200 [FLINK-19672][connector-kafka] Update Kafka table sources and sinks This closes #13681. --- .../connectors/kafka/table/KafkaDynamicSink.java | 122 +++-- .../kafka/table/KafkaDynamicSinkBase.java | 141 ----- .../connectors/kafka/table/KafkaDynamicSource.java | 188 +++++-- .../kafka/table/KafkaDynamicSourceBase.java | 239 -------- .../kafka/table/KafkaDynamicTableFactory.java | 153 +++++- .../kafka/table/KafkaDynamicTableFactoryBase.java | 200 ------- .../connectors/kafka/table/KafkaOptions.java | 2 +- .../kafka/table/KafkaDynamicTableFactoryTest.java | 553 ++++++++++++++++++- .../table/KafkaDynamicTableFactoryTestBase.java | 606 --------------------- 9 files changed, 896 insertions(+), 1308 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java index ec50a00..c13c594 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java @@ -1,12 +1,13 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,22 +20,43 @@ package org.apache.flink.streaming.connectors.kafka.table; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.format.EncodingFormat; import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; +import java.util.Objects; import java.util.Optional; import java.util.Properties; /** - * Kafka table sink for writing data into Kafka. + * A version-agnostic Kafka {@link DynamicTableSink}. */ @Internal -public class KafkaDynamicSink extends KafkaDynamicSinkBase { +public class KafkaDynamicSink implements DynamicTableSink { + + /** Consumed data type of the table. */ + protected final DataType consumedDataType; + + /** The Kafka topic to write to. */ + protected final String topic; + + /** Properties for the Kafka producer. */ + protected final Properties properties; + + /** Sink format for encoding records to Kafka. */ + protected final EncodingFormat<SerializationSchema<RowData>> encodingFormat; + + /** Partitioner to select Kafka partition for each item. */ + protected final Optional<FlinkKafkaPartitioner<RowData>> partitioner; + + /** Sink commit semantic.*/ + protected final KafkaSinkSemantic semantic; public KafkaDynamicSink( DataType consumedDataType, @@ -43,29 +65,27 @@ public class KafkaDynamicSink extends KafkaDynamicSinkBase { Optional<FlinkKafkaPartitioner<RowData>> partitioner, EncodingFormat<SerializationSchema<RowData>> encodingFormat, KafkaSinkSemantic semantic) { - super( - consumedDataType, - topic, - properties, - partitioner, - encodingFormat, - semantic); + this.consumedDataType = Preconditions.checkNotNull(consumedDataType, "Consumed data type must not be null."); + this.topic = Preconditions.checkNotNull(topic, "Topic must not be null."); + this.properties = Preconditions.checkNotNull(properties, "Properties must not be null."); + this.partitioner = Preconditions.checkNotNull(partitioner, "Partitioner must not be null."); + this.encodingFormat = Preconditions.checkNotNull(encodingFormat, "Encoding format must not be null."); + this.semantic = Preconditions.checkNotNull(semantic, "Semantic must not be null."); } @Override - protected SinkFunction<RowData> createKafkaProducer( - String topic, - Properties properties, - SerializationSchema<RowData> serializationSchema, - Optional<FlinkKafkaPartitioner<RowData>> partitioner, - KafkaSinkSemantic semantic) { - return new FlinkKafkaProducer<>( - topic, - serializationSchema, - properties, - partitioner.orElse(null), - FlinkKafkaProducer.Semantic.valueOf(semantic.toString()), - FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); + public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { + return this.encodingFormat.getChangelogMode(); + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + final SerializationSchema<RowData> serializationSchema = + this.encodingFormat.createRuntimeEncoder(context, this.consumedDataType); + + final FlinkKafkaProducer<RowData> kafkaProducer = createKafkaProducer(serializationSchema); + + return SinkFunctionProvider.of(kafkaProducer); } @Override @@ -81,6 +101,46 @@ public class KafkaDynamicSink extends KafkaDynamicSinkBase { @Override public String asSummaryString() { - return "Kafka universal table sink"; + return "Kafka table sink"; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final KafkaDynamicSink that = (KafkaDynamicSink) o; + return Objects.equals(consumedDataType, that.consumedDataType) && + Objects.equals(topic, that.topic) && + Objects.equals(properties, that.properties) && + Objects.equals(encodingFormat, that.encodingFormat) && + Objects.equals(partitioner, that.partitioner) && + Objects.equals(semantic, that.semantic); + } + + @Override + public int hashCode() { + return Objects.hash( + consumedDataType, + topic, + properties, + encodingFormat, + partitioner, + semantic); + } + + // -------------------------------------------------------------------------------------------- + + protected FlinkKafkaProducer<RowData> createKafkaProducer(SerializationSchema<RowData> serializationSchema) { + return new FlinkKafkaProducer<>( + topic, + serializationSchema, + properties, + partitioner.orElse(null), + FlinkKafkaProducer.Semantic.valueOf(semantic.toString()), + FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); } } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSinkBase.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSinkBase.java deleted file mode 100644 index 2d6e893..0000000 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSinkBase.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.table; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; -import org.apache.flink.table.connector.ChangelogMode; -import org.apache.flink.table.connector.format.EncodingFormat; -import org.apache.flink.table.connector.sink.DynamicTableSink; -import org.apache.flink.table.connector.sink.SinkFunctionProvider; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.types.DataType; -import org.apache.flink.util.Preconditions; - -import java.util.Objects; -import java.util.Optional; -import java.util.Properties; - -/** - * A version-agnostic Kafka {@link DynamicTableSink}. - * - * <p>The version-specific Kafka consumers need to extend this class and - * override {@link #createKafkaProducer(String, Properties, SerializationSchema, Optional, KafkaSinkSemantic)}}. - */ -@Internal -public abstract class KafkaDynamicSinkBase implements DynamicTableSink { - /** Consumed data type of the table. */ - protected final DataType consumedDataType; - - /** The Kafka topic to write to. */ - protected final String topic; - - /** Properties for the Kafka producer. */ - protected final Properties properties; - - /** Sink format for encoding records to Kafka. */ - protected final EncodingFormat<SerializationSchema<RowData>> encodingFormat; - - /** Partitioner to select Kafka partition for each item. */ - protected final Optional<FlinkKafkaPartitioner<RowData>> partitioner; - - /** Sink commit semantic.*/ - protected final KafkaSinkSemantic semantic; - - protected KafkaDynamicSinkBase( - DataType consumedDataType, - String topic, - Properties properties, - Optional<FlinkKafkaPartitioner<RowData>> partitioner, - EncodingFormat<SerializationSchema<RowData>> encodingFormat, - KafkaSinkSemantic semantic) { - this.consumedDataType = Preconditions.checkNotNull(consumedDataType, "Consumed data type must not be null."); - this.topic = Preconditions.checkNotNull(topic, "Topic must not be null."); - this.properties = Preconditions.checkNotNull(properties, "Properties must not be null."); - this.partitioner = Preconditions.checkNotNull(partitioner, "Partitioner must not be null."); - this.encodingFormat = Preconditions.checkNotNull(encodingFormat, "Encoding format must not be null."); - this.semantic = Preconditions.checkNotNull(semantic, "Semantic must not be null."); - } - - @Override - public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { - return this.encodingFormat.getChangelogMode(); - } - - @Override - public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { - SerializationSchema<RowData> serializationSchema = - this.encodingFormat.createRuntimeEncoder(context, this.consumedDataType); - - final SinkFunction<RowData> kafkaProducer = createKafkaProducer( - this.topic, - properties, - serializationSchema, - this.partitioner, - this.semantic); - - return SinkFunctionProvider.of(kafkaProducer); - } - - /** - * Returns the version-specific Kafka producer. - * - * @param topic Kafka topic to produce to. - * @param properties Properties for the Kafka producer. - * @param serializationSchema Serialization schema to use to create Kafka records. - * @param partitioner Partitioner to select Kafka partition. - * @return The version-specific Kafka producer - */ - protected abstract SinkFunction<RowData> createKafkaProducer( - String topic, - Properties properties, - SerializationSchema<RowData> serializationSchema, - Optional<FlinkKafkaPartitioner<RowData>> partitioner, - KafkaSinkSemantic semantic); - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final KafkaDynamicSinkBase that = (KafkaDynamicSinkBase) o; - return Objects.equals(consumedDataType, that.consumedDataType) && - Objects.equals(topic, that.topic) && - Objects.equals(properties, that.properties) && - Objects.equals(encodingFormat, that.encodingFormat) && - Objects.equals(partitioner, that.partitioner) && - Objects.equals(semantic, that.semantic); - } - - @Override - public int hashCode() { - return Objects.hash( - consumedDataType, - topic, - properties, - encodingFormat, - partitioner, - semantic); - } -} diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java index 186144c..49a5d227 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java @@ -1,12 +1,13 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,42 +21,68 @@ package org.apache.flink.streaming.connectors.kafka.table; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.table.types.DataType; +import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.regex.Pattern; /** - * Kafka {@link org.apache.flink.table.connector.source.DynamicTableSource}. + * A version-agnostic Kafka {@link ScanTableSource}. */ @Internal -public class KafkaDynamicSource extends KafkaDynamicSourceBase { - - /** - * Creates a generic Kafka {@link StreamTableSource}. - * - * @param outputDataType Source output data type - * @param topics Kafka topic to consume - * @param topicPattern Kafka topic pattern to consume - * @param properties Properties for the Kafka consumer - * @param decodingFormat Decoding format for decoding records from Kafka - * @param startupMode Startup mode for the contained consumer - * @param specificStartupOffsets Specific startup offsets; only relevant when startup - * mode is {@link StartupMode#SPECIFIC_OFFSETS} - */ +public class KafkaDynamicSource implements ScanTableSource { + + // -------------------------------------------------------------------------------------------- + // Common attributes + // -------------------------------------------------------------------------------------------- + + protected final DataType producedDataType; + + // -------------------------------------------------------------------------------------------- + // Scan format attributes + // -------------------------------------------------------------------------------------------- + + /** Scan format for decoding records from Kafka. */ + protected final DecodingFormat<DeserializationSchema<RowData>> decodingFormat; + + // -------------------------------------------------------------------------------------------- + // Kafka-specific attributes + // -------------------------------------------------------------------------------------------- + + /** The Kafka topics to consume. */ + protected final List<String> topics; + + /** The Kafka topic pattern to consume. */ + protected final Pattern topicPattern; + + /** Properties for the Kafka consumer. */ + protected final Properties properties; + + /** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */ + protected final StartupMode startupMode; + + /** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */ + protected final Map<KafkaTopicPartition, Long> specificStartupOffsets; + + /** The start timestamp to locate partition offsets; only relevant when startup mode is {@link StartupMode#TIMESTAMP}.*/ + protected final long startupTimestampMillis; + public KafkaDynamicSource( - DataType outputDataType, + DataType producedDataType, @Nullable List<String> topics, @Nullable Pattern topicPattern, Properties properties, @@ -63,38 +90,40 @@ public class KafkaDynamicSource extends KafkaDynamicSourceBase { StartupMode startupMode, Map<KafkaTopicPartition, Long> specificStartupOffsets, long startupTimestampMillis) { - - super( - outputDataType, - topics, - topicPattern, - properties, - decodingFormat, - startupMode, - specificStartupOffsets, - startupTimestampMillis); + this.producedDataType = Preconditions.checkNotNull(producedDataType, "Produced data type must not be null."); + Preconditions.checkArgument((topics != null && topicPattern == null) || + (topics == null && topicPattern != null), + "Either Topic or Topic Pattern must be set for source."); + this.topics = topics; + this.topicPattern = topicPattern; + this.properties = Preconditions.checkNotNull(properties, "Properties must not be null."); + this.decodingFormat = Preconditions.checkNotNull( + decodingFormat, "Decoding format must not be null."); + this.startupMode = Preconditions.checkNotNull(startupMode, "Startup mode must not be null."); + this.specificStartupOffsets = Preconditions.checkNotNull( + specificStartupOffsets, "Specific offsets must not be null."); + this.startupTimestampMillis = startupTimestampMillis; } @Override - protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer( - List<String> topics, - Properties properties, - DeserializationSchema<RowData> deserializationSchema) { - return new FlinkKafkaConsumer<>(topics, deserializationSchema, properties); + public ChangelogMode getChangelogMode() { + return this.decodingFormat.getChangelogMode(); } @Override - protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer( - Pattern topicPattern, - Properties properties, - DeserializationSchema<RowData> deserializationSchema) { - return new FlinkKafkaConsumer<>(topicPattern, deserializationSchema, properties); + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { + final DeserializationSchema<RowData> deserializationSchema = + this.decodingFormat.createRuntimeDecoder(runtimeProviderContext, this.producedDataType); + + final FlinkKafkaConsumer<RowData> kafkaConsumer = createKafkaConsumer(deserializationSchema); + + return SourceFunctionProvider.of(kafkaConsumer, false); } @Override public DynamicTableSource copy() { return new KafkaDynamicSource( - this.outputDataType, + this.producedDataType, this.topics, this.topicPattern, this.properties, @@ -106,6 +135,71 @@ public class KafkaDynamicSource extends KafkaDynamicSourceBase { @Override public String asSummaryString() { - return "Kafka"; + return "Kafka table source"; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final KafkaDynamicSource that = (KafkaDynamicSource) o; + return Objects.equals(producedDataType, that.producedDataType) && + Objects.equals(topics, that.topics) && + Objects.equals(String.valueOf(topicPattern), String.valueOf(that.topicPattern)) && + Objects.equals(properties, that.properties) && + Objects.equals(decodingFormat, that.decodingFormat) && + startupMode == that.startupMode && + Objects.equals(specificStartupOffsets, that.specificStartupOffsets) && + startupTimestampMillis == that.startupTimestampMillis; + } + + @Override + public int hashCode() { + return Objects.hash( + producedDataType, + topics, + topicPattern, + properties, + decodingFormat, + startupMode, + specificStartupOffsets, + startupTimestampMillis); + } + + // -------------------------------------------------------------------------------------------- + + protected FlinkKafkaConsumer<RowData> createKafkaConsumer(DeserializationSchema<RowData> deserializationSchema) { + final FlinkKafkaConsumer<RowData> kafkaConsumer; + if (topics != null) { + kafkaConsumer = new FlinkKafkaConsumer<>(topics, deserializationSchema, properties); + } else { + kafkaConsumer = new FlinkKafkaConsumer<>(topicPattern, deserializationSchema, properties); + } + + switch (startupMode) { + case EARLIEST: + kafkaConsumer.setStartFromEarliest(); + break; + case LATEST: + kafkaConsumer.setStartFromLatest(); + break; + case GROUP_OFFSETS: + kafkaConsumer.setStartFromGroupOffsets(); + break; + case SPECIFIC_OFFSETS: + kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets); + break; + case TIMESTAMP: + kafkaConsumer.setStartFromTimestamp(startupTimestampMillis); + break; + } + + kafkaConsumer.setCommitOffsetsOnCheckpoints(properties.getProperty("group.id") != null); + + return kafkaConsumer; } } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java deleted file mode 100644 index 7bf3667..0000000 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.table; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; -import org.apache.flink.streaming.connectors.kafka.config.StartupMode; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.table.connector.ChangelogMode; -import org.apache.flink.table.connector.format.DecodingFormat; -import org.apache.flink.table.connector.source.ScanTableSource; -import org.apache.flink.table.connector.source.SourceFunctionProvider; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.sources.StreamTableSource; -import org.apache.flink.table.types.DataType; -import org.apache.flink.util.Preconditions; - -import javax.annotation.Nullable; - -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Properties; -import java.util.regex.Pattern; - -/** - * A version-agnostic Kafka {@link ScanTableSource}. - * - * <p>The version-specific Kafka consumers need to extend this class and - * override {@link #createKafkaConsumer(List, Properties, DeserializationSchema)}} and - * {@link #createKafkaConsumer(Pattern, Properties, DeserializationSchema)}}. - */ -@Internal -public abstract class KafkaDynamicSourceBase implements ScanTableSource { - - // -------------------------------------------------------------------------------------------- - // Common attributes - // -------------------------------------------------------------------------------------------- - protected final DataType outputDataType; - - // -------------------------------------------------------------------------------------------- - // Scan format attributes - // -------------------------------------------------------------------------------------------- - - /** Scan format for decoding records from Kafka. */ - protected final DecodingFormat<DeserializationSchema<RowData>> decodingFormat; - - // -------------------------------------------------------------------------------------------- - // Kafka-specific attributes - // -------------------------------------------------------------------------------------------- - - /** The Kafka topics to consume. */ - protected final List<String> topics; - - /** The Kafka topic pattern to consume. */ - protected final Pattern topicPattern; - - /** Properties for the Kafka consumer. */ - protected final Properties properties; - - /** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */ - protected final StartupMode startupMode; - - /** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */ - protected final Map<KafkaTopicPartition, Long> specificStartupOffsets; - - /** The start timestamp to locate partition offsets; only relevant when startup mode is {@link StartupMode#TIMESTAMP}.*/ - protected final long startupTimestampMillis; - - /** The default value when startup timestamp is not used.*/ - private static final long DEFAULT_STARTUP_TIMESTAMP_MILLIS = 0L; - - /** - * Creates a generic Kafka {@link StreamTableSource}. - * @param outputDataType Source produced data type - * @param topics Kafka topics to consume. - * @param topicPattern Kafka topic pattern to consume. - * @param properties Properties for the Kafka consumer. - * @param decodingFormat Decoding format for decoding records from Kafka. - * @param startupMode Startup mode for the contained consumer. - * @param specificStartupOffsets Specific startup offsets; only relevant when startup - * mode is {@link StartupMode#SPECIFIC_OFFSETS}. - * @param startupTimestampMillis Startup timestamp for offsets; only relevant when startup - * mode is {@link StartupMode#TIMESTAMP}. - */ - protected KafkaDynamicSourceBase( - DataType outputDataType, - @Nullable List<String> topics, - @Nullable Pattern topicPattern, - Properties properties, - DecodingFormat<DeserializationSchema<RowData>> decodingFormat, - StartupMode startupMode, - Map<KafkaTopicPartition, Long> specificStartupOffsets, - long startupTimestampMillis) { - this.outputDataType = Preconditions.checkNotNull( - outputDataType, "Produced data type must not be null."); - Preconditions.checkArgument((topics != null && topicPattern == null) || - (topics == null && topicPattern != null), - "Either Topic or Topic Pattern must be set for source."); - this.topics = topics; - this.topicPattern = topicPattern; - this.properties = Preconditions.checkNotNull(properties, "Properties must not be null."); - this.decodingFormat = Preconditions.checkNotNull( - decodingFormat, "Decoding format must not be null."); - this.startupMode = Preconditions.checkNotNull(startupMode, "Startup mode must not be null."); - this.specificStartupOffsets = Preconditions.checkNotNull( - specificStartupOffsets, "Specific offsets must not be null."); - this.startupTimestampMillis = startupTimestampMillis; - } - - @Override - public ChangelogMode getChangelogMode() { - return this.decodingFormat.getChangelogMode(); - } - - @Override - public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { - DeserializationSchema<RowData> deserializationSchema = - this.decodingFormat.createRuntimeDecoder(runtimeProviderContext, this.outputDataType); - // Version-specific Kafka consumer - FlinkKafkaConsumerBase<RowData> kafkaConsumer = getKafkaConsumer(deserializationSchema); - return SourceFunctionProvider.of(kafkaConsumer, false); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final KafkaDynamicSourceBase that = (KafkaDynamicSourceBase) o; - return Objects.equals(outputDataType, that.outputDataType) && - Objects.equals(topics, that.topics) && - Objects.equals(topicPattern, topicPattern) && - Objects.equals(properties, that.properties) && - Objects.equals(decodingFormat, that.decodingFormat) && - startupMode == that.startupMode && - Objects.equals(specificStartupOffsets, that.specificStartupOffsets) && - startupTimestampMillis == that.startupTimestampMillis; - } - - @Override - public int hashCode() { - return Objects.hash( - outputDataType, - topics, - topicPattern, - properties, - decodingFormat, - startupMode, - specificStartupOffsets, - startupTimestampMillis); - } - - // -------------------------------------------------------------------------------------------- - // Abstract methods for subclasses - // -------------------------------------------------------------------------------------------- - - /** - * Creates a version-specific Kafka consumer. - * - * @param topics Kafka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param deserializationSchema Deserialization schema to use for Kafka records. - * @return The version-specific Kafka consumer - */ - protected abstract FlinkKafkaConsumerBase<RowData> createKafkaConsumer( - List<String> topics, - Properties properties, - DeserializationSchema<RowData> deserializationSchema); - - /** - * Creates a version-specific Kafka consumer. - * - * @param topicPattern afka topic to consume. - * @param properties Properties for the Kafka consumer. - * @param deserializationSchema Deserialization schema to use for Kafka records. - * @return The version-specific Kafka consumer - */ - protected abstract FlinkKafkaConsumerBase<RowData> createKafkaConsumer( - Pattern topicPattern, - Properties properties, - DeserializationSchema<RowData> deserializationSchema); - - // -------------------------------------------------------------------------------------------- - // Utilities - // -------------------------------------------------------------------------------------------- - - /** - * Returns a version-specific Kafka consumer with the start position configured. - - * @param deserializationSchema Deserialization schema to use for Kafka records. - * @return The version-specific Kafka consumer - */ - protected FlinkKafkaConsumerBase<RowData> getKafkaConsumer(DeserializationSchema<RowData> deserializationSchema) { - FlinkKafkaConsumerBase<RowData> kafkaConsumer = topics != null ? - createKafkaConsumer(topics, properties, deserializationSchema) : - createKafkaConsumer(topicPattern, properties, deserializationSchema); - - switch (startupMode) { - case EARLIEST: - kafkaConsumer.setStartFromEarliest(); - break; - case LATEST: - kafkaConsumer.setStartFromLatest(); - break; - case GROUP_OFFSETS: - kafkaConsumer.setStartFromGroupOffsets(); - break; - case SPECIFIC_OFFSETS: - kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets); - break; - case TIMESTAMP: - kafkaConsumer.setStartFromTimestamp(startupTimestampMillis); - break; - } - kafkaConsumer.setCommitOffsetsOnCheckpoints(properties.getProperty("group.id") != null); - return kafkaConsumer; - } -} diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java index 2a74f30..b304228 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java @@ -1,12 +1,13 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,32 +18,156 @@ package org.apache.flink.streaming.connectors.kafka.table; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; import org.apache.flink.table.types.DataType; import javax.annotation.Nullable; +import java.time.Duration; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.Set; import java.util.regex.Pattern; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_GROUP_ID; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_MODE; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_SPECIFIC_OFFSETS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_TOPIC_PARTITION_DISCOVERY; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_PARTITIONER; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_SEMANTIC; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.StartupOptions; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC_PATTERN; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getFlinkKafkaPartitioner; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getKafkaProperties; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getStartupOptions; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSinkOptions; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSourceOptions; + /** - * Factory for creating configured instances of {@link KafkaDynamicSource}. + * Factory for creating configured instances of {@link KafkaDynamicSource} and {@link KafkaDynamicSink}. */ -public class KafkaDynamicTableFactory extends KafkaDynamicTableFactoryBase { +@Internal +public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { + public static final String IDENTIFIER = "kafka"; @Override - protected KafkaDynamicSourceBase createKafkaTableSource( + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(FactoryUtil.FORMAT); + options.add(PROPS_BOOTSTRAP_SERVERS); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + final Set<ConfigOption<?>> options = new HashSet<>(); + options.add(TOPIC); + options.add(TOPIC_PATTERN); + options.add(PROPS_GROUP_ID); + options.add(SCAN_STARTUP_MODE); + options.add(SCAN_STARTUP_SPECIFIC_OFFSETS); + options.add(SCAN_TOPIC_PARTITION_DISCOVERY); + options.add(SCAN_STARTUP_TIMESTAMP_MILLIS); + options.add(SINK_PARTITIONER); + options.add(SINK_SEMANTIC); + return options; + } + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + + ReadableConfig tableOptions = helper.getOptions(); + DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat( + DeserializationFormatFactory.class, + FactoryUtil.FORMAT); + // Validate the option data type. + helper.validateExcept(PROPERTIES_PREFIX); + // Validate the option values. + validateTableSourceOptions(tableOptions); + + DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType(); + + final StartupOptions startupOptions = getStartupOptions(tableOptions); + final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions()); + // add topic-partition discovery + properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, + String.valueOf(tableOptions + .getOptional(SCAN_TOPIC_PARTITION_DISCOVERY) + .map(Duration::toMillis) + .orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED))); + + return createKafkaTableSource( + producedDataType, + KafkaOptions.getSourceTopics(tableOptions), + KafkaOptions.getSourceTopicPattern(tableOptions), + properties, + decodingFormat, + startupOptions.startupMode, + startupOptions.specificOffsets, + startupOptions.startupTimestampMillis); + } + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + + ReadableConfig tableOptions = helper.getOptions(); + + EncodingFormat<SerializationSchema<RowData>> encodingFormat = helper.discoverEncodingFormat( + SerializationFormatFactory.class, + FactoryUtil.FORMAT); + + // Validate the option values. + validateTableSinkOptions(tableOptions); + // Validate the option data type. + helper.validateExcept(PROPERTIES_PREFIX); + + DataType consumedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType(); + return createKafkaTableSink( + consumedDataType, + tableOptions.get(TOPIC).get(0), + getKafkaProperties(context.getCatalogTable().getOptions()), + getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()), + encodingFormat, + getSinkSemantic(tableOptions)); + } + + // -------------------------------------------------------------------------------------------- + + protected KafkaDynamicSource createKafkaTableSource( DataType producedDataType, @Nullable List<String> topics, @Nullable Pattern topicPattern, @@ -62,8 +187,7 @@ public class KafkaDynamicTableFactory extends KafkaDynamicTableFactoryBase { startupTimestampMillis); } - @Override - protected KafkaDynamicSinkBase createKafkaTableSink( + protected KafkaDynamicSink createKafkaTableSink( DataType consumedDataType, String topic, Properties properties, @@ -78,9 +202,4 @@ public class KafkaDynamicTableFactory extends KafkaDynamicTableFactoryBase { encodingFormat, semantic); } - - @Override - public String factoryIdentifier() { - return IDENTIFIER; - } } diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java deleted file mode 100644 index 5136ba4..0000000 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.table; - -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; -import org.apache.flink.streaming.connectors.kafka.config.StartupMode; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; -import org.apache.flink.table.connector.format.DecodingFormat; -import org.apache.flink.table.connector.format.EncodingFormat; -import org.apache.flink.table.connector.sink.DynamicTableSink; -import org.apache.flink.table.connector.source.DynamicTableSource; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.factories.DeserializationFormatFactory; -import org.apache.flink.table.factories.DynamicTableSinkFactory; -import org.apache.flink.table.factories.DynamicTableSourceFactory; -import org.apache.flink.table.factories.FactoryUtil; -import org.apache.flink.table.factories.SerializationFormatFactory; -import org.apache.flink.table.types.DataType; - -import javax.annotation.Nullable; - -import java.time.Duration; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; -import java.util.Set; -import java.util.regex.Pattern; - -import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPERTIES_PREFIX; -import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS; -import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_GROUP_ID; -import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_MODE; -import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_SPECIFIC_OFFSETS; -import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; -import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_TOPIC_PARTITION_DISCOVERY; -import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_PARTITIONER; -import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_SEMANTIC; -import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.StartupOptions; -import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC; -import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC_PATTERN; -import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getFlinkKafkaPartitioner; -import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getKafkaProperties; -import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getSinkSemantic; -import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getStartupOptions; -import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSinkOptions; -import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSourceOptions; - -/** - * Factory for creating configured instances of - * {@link KafkaDynamicSourceBase} and {@link KafkaDynamicSinkBase}. - */ -public abstract class KafkaDynamicTableFactoryBase implements - DynamicTableSourceFactory, - DynamicTableSinkFactory { - - @Override - public DynamicTableSource createDynamicTableSource(Context context) { - FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); - - ReadableConfig tableOptions = helper.getOptions(); - DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat( - DeserializationFormatFactory.class, - FactoryUtil.FORMAT); - // Validate the option data type. - helper.validateExcept(PROPERTIES_PREFIX); - // Validate the option values. - validateTableSourceOptions(tableOptions); - - DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType(); - - final StartupOptions startupOptions = getStartupOptions(tableOptions); - final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions()); - // add topic-partition discovery - properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, - String.valueOf(tableOptions - .getOptional(SCAN_TOPIC_PARTITION_DISCOVERY) - .map(Duration::toMillis) - .orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED))); - - return createKafkaTableSource( - producedDataType, - KafkaOptions.getSourceTopics(tableOptions), - KafkaOptions.getSourceTopicPattern(tableOptions), - properties, - decodingFormat, - startupOptions.startupMode, - startupOptions.specificOffsets, - startupOptions.startupTimestampMillis); - } - - @Override - public DynamicTableSink createDynamicTableSink(Context context) { - FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); - - ReadableConfig tableOptions = helper.getOptions(); - - EncodingFormat<SerializationSchema<RowData>> encodingFormat = helper.discoverEncodingFormat( - SerializationFormatFactory.class, - FactoryUtil.FORMAT); - - // Validate the option values. - validateTableSinkOptions(tableOptions); - // Validate the option data type. - helper.validateExcept(PROPERTIES_PREFIX); - - DataType consumedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType(); - return createKafkaTableSink( - consumedDataType, - tableOptions.get(TOPIC).get(0), - getKafkaProperties(context.getCatalogTable().getOptions()), - getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()), - encodingFormat, - getSinkSemantic(tableOptions)); - } - - /** - * Constructs the version-specific Kafka table source. - * - * @param producedDataType Source produced data type - * @param topics Kafka topics to consume - * @param topicPattern Kafka topic pattern to consume - * @param properties Properties for the Kafka consumer - * @param decodingFormat Decoding format for decoding records from Kafka - * @param startupMode Startup mode for the contained consumer - * @param specificStartupOffsets Specific startup offsets; only relevant when startup - * mode is {@link StartupMode#SPECIFIC_OFFSETS} - */ - protected abstract KafkaDynamicSourceBase createKafkaTableSource( - DataType producedDataType, - @Nullable List<String> topics, - @Nullable Pattern topicPattern, - Properties properties, - DecodingFormat<DeserializationSchema<RowData>> decodingFormat, - StartupMode startupMode, - Map<KafkaTopicPartition, Long> specificStartupOffsets, - long startupTimestampMillis); - - /** - * Constructs the version-specific Kafka table sink. - * - * @param consumedDataType Sink consumed data type - * @param topic Kafka topic to consume - * @param properties Properties for the Kafka consumer - * @param partitioner Partitioner to select Kafka partition for each item - * @param encodingFormat Encoding format for encoding records to Kafka - */ - protected abstract KafkaDynamicSinkBase createKafkaTableSink( - DataType consumedDataType, - String topic, - Properties properties, - Optional<FlinkKafkaPartitioner<RowData>> partitioner, - EncodingFormat<SerializationSchema<RowData>> encodingFormat, - KafkaSinkSemantic semantic); - - @Override - public Set<ConfigOption<?>> requiredOptions() { - final Set<ConfigOption<?>> options = new HashSet<>(); - options.add(FactoryUtil.FORMAT); - options.add(PROPS_BOOTSTRAP_SERVERS); - return options; - } - - @Override - public Set<ConfigOption<?>> optionalOptions() { - final Set<ConfigOption<?>> options = new HashSet<>(); - options.add(TOPIC); - options.add(TOPIC_PATTERN); - options.add(PROPS_GROUP_ID); - options.add(SCAN_STARTUP_MODE); - options.add(SCAN_STARTUP_SPECIFIC_OFFSETS); - options.add(SCAN_TOPIC_PARTITION_DISCOVERY); - options.add(SCAN_STARTUP_TIMESTAMP_MILLIS); - options.add(SINK_PARTITIONER); - options.add(SINK_SEMANTIC); - return options; - } -} diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java index 40c3712..c2b38e8 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java index abf6f85..14da7a3 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java @@ -1,12 +1,13 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,49 +20,484 @@ package org.apache.flink.streaming.connectors.kafka.table; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; -import org.apache.flink.streaming.connectors.kafka.KafkaTableSink; -import org.apache.flink.streaming.connectors.kafka.KafkaTableSource; -import org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogTableImpl; +import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.TestFormatFactory; +import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext; +import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; import org.apache.flink.table.types.DataType; +import org.apache.flink.util.TestLogger; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; import javax.annotation.Nullable; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.function.Consumer; import java.util.regex.Pattern; +import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + /** - * Test for {@link KafkaTableSource} and {@link KafkaTableSink} created - * by {@link KafkaTableSourceSinkFactory}. + * Abstract test base for {@link KafkaDynamicTableFactory}. */ -public class KafkaDynamicTableFactoryTest extends KafkaDynamicTableFactoryTestBase { - @Override - protected String factoryIdentifier() { - return KafkaDynamicTableFactory.IDENTIFIER; +public class KafkaDynamicTableFactoryTest extends TestLogger { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + private static final String TOPIC = "myTopic"; + private static final String TOPICS = "myTopic-1;myTopic-2;myTopic-3"; + private static final String TOPIC_REGEX = "myTopic-\\d+"; + private static final List<String> TOPIC_LIST = Arrays.asList("myTopic-1", "myTopic-2", "myTopic-3"); + private static final int PARTITION_0 = 0; + private static final long OFFSET_0 = 100L; + private static final int PARTITION_1 = 1; + private static final long OFFSET_1 = 123L; + private static final String NAME = "name"; + private static final String COUNT = "count"; + private static final String TIME = "time"; + private static final String WATERMARK_EXPRESSION = TIME + " - INTERVAL '5' SECOND"; + private static final DataType WATERMARK_DATATYPE = DataTypes.TIMESTAMP(3); + private static final String COMPUTED_COLUMN_NAME = "computed-column"; + private static final String COMPUTED_COLUMN_EXPRESSION = COUNT + " + 1.0"; + private static final DataType COMPUTED_COLUMN_DATATYPE = DataTypes.DECIMAL(10, 3); + private static final String DISCOVERY_INTERVAL = "1000 ms"; + + private static final Properties KAFKA_SOURCE_PROPERTIES = new Properties(); + private static final Properties KAFKA_SINK_PROPERTIES = new Properties(); + static { + KAFKA_SOURCE_PROPERTIES.setProperty("group.id", "dummy"); + KAFKA_SOURCE_PROPERTIES.setProperty("bootstrap.servers", "dummy"); + KAFKA_SOURCE_PROPERTIES.setProperty("flink.partition-discovery.interval-millis", "1000"); + + KAFKA_SINK_PROPERTIES.setProperty("group.id", "dummy"); + KAFKA_SINK_PROPERTIES.setProperty("bootstrap.servers", "dummy"); + } + + private static final String PROPS_SCAN_OFFSETS = + String.format("partition:%d,offset:%d;partition:%d,offset:%d", + PARTITION_0, OFFSET_0, PARTITION_1, OFFSET_1); + + private static final TableSchema SOURCE_SCHEMA = TableSchema.builder() + .field(NAME, DataTypes.STRING()) + .field(COUNT, DataTypes.DECIMAL(38, 18)) + .field(TIME, DataTypes.TIMESTAMP(3)) + .field(COMPUTED_COLUMN_NAME, COMPUTED_COLUMN_DATATYPE, COMPUTED_COLUMN_EXPRESSION) + .watermark(TIME, WATERMARK_EXPRESSION, WATERMARK_DATATYPE) + .build(); + + private static final TableSchema SINK_SCHEMA = TableSchema.builder() + .field(NAME, DataTypes.STRING()) + .field(COUNT, DataTypes.DECIMAL(38, 18)) + .field(TIME, DataTypes.TIMESTAMP(3)) + .build(); + + @Test + public void testTableSource() { + // prepare parameters for Kafka table source + final DataType producedDataType = SOURCE_SCHEMA.toPhysicalRowDataType(); + + final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>(); + specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0); + specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1); + + DecodingFormat<DeserializationSchema<RowData>> decodingFormat = + new TestFormatFactory.DecodingFormatMock(",", true); + + // Construct table source using options and table source factory + ObjectIdentifier objectIdentifier = ObjectIdentifier.of( + "default", + "default", + "scanTable"); + CatalogTable catalogTable = createKafkaSourceCatalogTable(); + final DynamicTableSource actualSource = FactoryUtil.createTableSource(null, + objectIdentifier, + catalogTable, + new Configuration(), + Thread.currentThread().getContextClassLoader(), + false); + + // Test scan source equals + final KafkaDynamicSource expectedKafkaSource = createExpectedScanSource( + producedDataType, + Collections.singletonList(TOPIC), + null, + KAFKA_SOURCE_PROPERTIES, + decodingFormat, + StartupMode.SPECIFIC_OFFSETS, + specificOffsets, + 0); + final KafkaDynamicSource actualKafkaSource = (KafkaDynamicSource) actualSource; + assertEquals(actualKafkaSource, expectedKafkaSource); + + // Test Kafka consumer + ScanTableSource.ScanRuntimeProvider provider = + actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + assertThat(provider, instanceOf(SourceFunctionProvider.class)); + final SourceFunctionProvider sourceFunctionProvider = (SourceFunctionProvider) provider; + final SourceFunction<RowData> sourceFunction = sourceFunctionProvider.createSourceFunction(); + assertThat(sourceFunction, instanceOf(FlinkKafkaConsumer.class)); + // Test commitOnCheckpoints flag should be true when set consumer group + assertTrue(((FlinkKafkaConsumer<?>) sourceFunction).getEnableCommitOnCheckpoints()); + } + + @Test + public void testTableSourceCommitOnCheckpointsDisabled() { + //Construct table source using options and table source factory + ObjectIdentifier objectIdentifier = ObjectIdentifier.of( + "default", + "default", + "scanTable"); + Map<String, String> tableOptions = getFullSourceOptions(); + tableOptions.remove("properties.group.id"); + CatalogTable catalogTable = createKafkaSourceCatalogTable(tableOptions); + final DynamicTableSource tableSource = FactoryUtil.createTableSource(null, + objectIdentifier, + catalogTable, + new Configuration(), + Thread.currentThread().getContextClassLoader(), + false); + + // Test commitOnCheckpoints flag should be false when do not set consumer group. + assertThat(tableSource, instanceOf(KafkaDynamicSource.class)); + ScanTableSource.ScanRuntimeProvider providerWithoutGroupId = ((KafkaDynamicSource) tableSource) + .getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + assertThat(providerWithoutGroupId, instanceOf(SourceFunctionProvider.class)); + final SourceFunctionProvider functionProviderWithoutGroupId = (SourceFunctionProvider) providerWithoutGroupId; + final SourceFunction<RowData> function = functionProviderWithoutGroupId.createSourceFunction(); + assertFalse(((FlinkKafkaConsumer<?>) function).getEnableCommitOnCheckpoints()); + } + + @Test + public void testTableSourceWithPattern() { + // prepare parameters for Kafka table source + final DataType producedDataType = SOURCE_SCHEMA.toPhysicalRowDataType(); + + final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>(); + + DecodingFormat<DeserializationSchema<RowData>> decodingFormat = + new TestFormatFactory.DecodingFormatMock(",", true); + + // Construct table source using options and table source factory + ObjectIdentifier objectIdentifier = ObjectIdentifier.of( + "default", + "default", + "scanTable"); + final Map<String, String> modifiedOptions = getModifiedOptions( + getFullSourceOptions(), + options -> { + options.remove("topic"); + options.put("topic-pattern", TOPIC_REGEX); + options.put("scan.startup.mode", KafkaOptions.SCAN_STARTUP_MODE_VALUE_EARLIEST); + options.remove("scan.startup.specific-offsets"); + }); + CatalogTable catalogTable = createKafkaSourceCatalogTable(modifiedOptions); + + final DynamicTableSource actualSource = FactoryUtil.createTableSource(null, + objectIdentifier, + catalogTable, + new Configuration(), + Thread.currentThread().getContextClassLoader(), + false); + + // Test scan source equals + final KafkaDynamicSource expectedKafkaSource = createExpectedScanSource( + producedDataType, + null, + Pattern.compile(TOPIC_REGEX), + KAFKA_SOURCE_PROPERTIES, + decodingFormat, + StartupMode.EARLIEST, + specificOffsets, + 0); + final KafkaDynamicSource actualKafkaSource = (KafkaDynamicSource) actualSource; + assertEquals(actualKafkaSource, expectedKafkaSource); + + // Test Kafka consumer + ScanTableSource.ScanRuntimeProvider provider = + actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + assertThat(provider, instanceOf(SourceFunctionProvider.class)); + final SourceFunctionProvider sourceFunctionProvider = (SourceFunctionProvider) provider; + final SourceFunction<RowData> sourceFunction = sourceFunctionProvider.createSourceFunction(); + assertThat(sourceFunction, instanceOf(FlinkKafkaConsumer.class)); + } + + @Test + public void testTableSink() { + final DataType consumedDataType = SINK_SCHEMA.toPhysicalRowDataType(); + EncodingFormat<SerializationSchema<RowData>> encodingFormat = + new TestFormatFactory.EncodingFormatMock(","); + + // Construct table sink using options and table sink factory. + ObjectIdentifier objectIdentifier = ObjectIdentifier.of( + "default", + "default", + "sinkTable"); + final CatalogTable sinkTable = createKafkaSinkCatalogTable(); + final DynamicTableSink actualSink = FactoryUtil.createTableSink( + null, + objectIdentifier, + sinkTable, + new Configuration(), + Thread.currentThread().getContextClassLoader(), + false); + + final DynamicTableSink expectedSink = createExpectedSink( + consumedDataType, + TOPIC, + KAFKA_SINK_PROPERTIES, + Optional.of(new FlinkFixedPartitioner<>()), + encodingFormat, + KafkaSinkSemantic.EXACTLY_ONCE + ); + assertEquals(expectedSink, actualSink); + + // Test sink format. + final KafkaDynamicSink actualKafkaSink = (KafkaDynamicSink) actualSink; + assertEquals(encodingFormat, actualKafkaSink.encodingFormat); + + // Test kafka producer. + DynamicTableSink.SinkRuntimeProvider provider = + actualKafkaSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)); + assertThat(provider, instanceOf(SinkFunctionProvider.class)); + final SinkFunctionProvider sinkFunctionProvider = (SinkFunctionProvider) provider; + final SinkFunction<RowData> sinkFunction = sinkFunctionProvider.createSinkFunction(); + assertThat(sinkFunction, instanceOf(FlinkKafkaProducer.class)); } - @Override - protected Class<?> getExpectedConsumerClass() { - return FlinkKafkaConsumer.class; + // -------------------------------------------------------------------------------------------- + // Negative tests + // -------------------------------------------------------------------------------------------- + @Test + public void testInvalidScanStartupMode() { + // Construct table source using DDL and table source factory + ObjectIdentifier objectIdentifier = ObjectIdentifier.of( + "default", + "default", + "scanTable"); + final Map<String, String> modifiedOptions = getModifiedOptions( + getFullSourceOptions(), + options -> options.put("scan.startup.mode", "abc")); + CatalogTable catalogTable = createKafkaSourceCatalogTable(modifiedOptions); + + thrown.expect(ValidationException.class); + thrown.expect(containsCause(new ValidationException("Invalid value for option 'scan.startup.mode'. " + + "Supported values are [earliest-offset, latest-offset, group-offsets, specific-offsets, timestamp], " + + "but was: abc"))); + FactoryUtil.createTableSource(null, + objectIdentifier, + catalogTable, + new Configuration(), + Thread.currentThread().getContextClassLoader(), + false); + } + + @Test + public void testSourceTableWithTopicAndTopicPattern() { + // Construct table source using DDL and table source factory + ObjectIdentifier objectIdentifier = ObjectIdentifier.of( + "default", + "default", + "scanTable"); + final Map<String, String> modifiedOptions = getModifiedOptions( + getFullSourceOptions(), + options -> { + options.put("topic", TOPICS); + options.put("topic-pattern", TOPIC_REGEX); + }); + CatalogTable catalogTable = createKafkaSourceCatalogTable(modifiedOptions); + + thrown.expect(ValidationException.class); + thrown.expect(containsCause(new ValidationException("Option 'topic' and 'topic-pattern' shouldn't be set together."))); + FactoryUtil.createTableSource(null, + objectIdentifier, + catalogTable, + new Configuration(), + Thread.currentThread().getContextClassLoader(), + false); + } + + @Test + public void testMissingStartupTimestamp() { + // Construct table source using DDL and table source factory + ObjectIdentifier objectIdentifier = ObjectIdentifier.of( + "default", + "default", + "scanTable"); + final Map<String, String> modifiedOptions = getModifiedOptions( + getFullSourceOptions(), + options -> options.put("scan.startup.mode", "timestamp")); + CatalogTable catalogTable = createKafkaSourceCatalogTable(modifiedOptions); + + thrown.expect(ValidationException.class); + thrown.expect(containsCause(new ValidationException("'scan.startup.timestamp-millis' " + + "is required in 'timestamp' startup mode but missing."))); + FactoryUtil.createTableSource(null, + objectIdentifier, + catalogTable, + new Configuration(), + Thread.currentThread().getContextClassLoader(), + false); } - @Override - protected Class<?> getExpectedProducerClass() { - return FlinkKafkaProducer.class; + @Test + public void testMissingSpecificOffsets() { + // Construct table source using DDL and table source factory + ObjectIdentifier objectIdentifier = ObjectIdentifier.of( + "default", + "default", + "scanTable"); + final Map<String, String> modifiedOptions = getModifiedOptions( + getFullSourceOptions(), + options -> options.remove("scan.startup.specific-offsets")); + CatalogTable catalogTable = createKafkaSourceCatalogTable(modifiedOptions); + + thrown.expect(ValidationException.class); + thrown.expect(containsCause(new ValidationException("'scan.startup.specific-offsets' " + + "is required in 'specific-offsets' startup mode but missing."))); + FactoryUtil.createTableSource(null, + objectIdentifier, + catalogTable, + new Configuration(), + Thread.currentThread().getContextClassLoader(), + false); } - @Override - protected KafkaDynamicSourceBase getExpectedScanSource( + @Test + public void testInvalidSinkPartitioner() { + ObjectIdentifier objectIdentifier = ObjectIdentifier.of( + "default", + "default", + "sinkTable"); + + final Map<String, String> modifiedOptions = getModifiedOptions( + getFullSourceOptions(), + options -> options.put("sink.partitioner", "abc")); + final CatalogTable sinkTable = createKafkaSinkCatalogTable(modifiedOptions); + + thrown.expect(ValidationException.class); + thrown.expect(containsCause(new ValidationException("Could not find and instantiate partitioner class 'abc'"))); + FactoryUtil.createTableSink( + null, + objectIdentifier, + sinkTable, + new Configuration(), + Thread.currentThread().getContextClassLoader(), + false); + } + + @Test + public void testInvalidSinkSemantic(){ + ObjectIdentifier objectIdentifier = ObjectIdentifier.of( + "default", + "default", + "sinkTable"); + + final Map<String, String> modifiedOptions = getModifiedOptions( + getFullSourceOptions(), + options -> options.put("sink.semantic", "xyz")); + final CatalogTable sinkTable = createKafkaSinkCatalogTable(modifiedOptions); + + thrown.expect(ValidationException.class); + thrown.expect(containsCause(new ValidationException("Unsupported value 'xyz' for 'sink.semantic'. Supported values are ['at-least-once', 'exactly-once', 'none']."))); + FactoryUtil.createTableSink( + null, + objectIdentifier, + sinkTable, + new Configuration(), + Thread.currentThread().getContextClassLoader(), + false); + } + + @Test + public void testSinkWithTopicListOrTopicPattern(){ + ObjectIdentifier objectIdentifier = ObjectIdentifier.of( + "default", + "default", + "sinkTable"); + + Map<String, String> modifiedOptions = getModifiedOptions( + getFullSourceOptions(), + options -> { + options.put("topic", TOPICS); + options.put("scan.startup.mode", "earliest-offset"); + options.remove("specific-offsets"); + }); + CatalogTable sinkTable = createKafkaSinkCatalogTable(modifiedOptions); + String errorMessageTemp = "Flink Kafka sink currently only supports single topic, but got %s: %s."; + + try { + FactoryUtil.createTableSink( + null, + objectIdentifier, + sinkTable, + new Configuration(), + Thread.currentThread().getContextClassLoader(), + false); + } catch (Throwable t) { + assertEquals(String.format(errorMessageTemp, "'topic'", String.format("[%s]", String.join(", ", TOPIC_LIST))), + t.getCause().getMessage()); + } + + modifiedOptions = getModifiedOptions( + getFullSourceOptions(), + options -> options.put("topic-pattern", TOPIC_REGEX)); + sinkTable = createKafkaSinkCatalogTable(modifiedOptions); + + try { + FactoryUtil.createTableSink( + null, + objectIdentifier, + sinkTable, + new Configuration(), + Thread.currentThread().getContextClassLoader(), + false); + } catch (Throwable t) { + assertEquals(String.format(errorMessageTemp, "'topic-pattern'", TOPIC_REGEX), t.getCause().getMessage()); + } + } + + // -------------------------------------------------------------------------------------------- + // Utilities + // -------------------------------------------------------------------------------------------- + + private static KafkaDynamicSource createExpectedScanSource( DataType producedDataType, @Nullable List<String> topics, @Nullable Pattern topicPattern, @@ -81,8 +517,7 @@ public class KafkaDynamicTableFactoryTest extends KafkaDynamicTableFactoryTestBa startupTimestamp); } - @Override - protected KafkaDynamicSinkBase getExpectedSink( + private static KafkaDynamicSink createExpectedSink( DataType consumedDataType, String topic, Properties properties, @@ -97,4 +532,70 @@ public class KafkaDynamicTableFactoryTest extends KafkaDynamicTableFactoryTestBa encodingFormat, semantic); } + + private static CatalogTable createKafkaSourceCatalogTable() { + return createKafkaSourceCatalogTable(getFullSourceOptions()); + } + + private static CatalogTable createKafkaSinkCatalogTable() { + return createKafkaSinkCatalogTable(getFullSinkOptions()); + } + + private static CatalogTable createKafkaSourceCatalogTable(Map<String, String> options) { + return new CatalogTableImpl(SOURCE_SCHEMA, options, "scanTable"); + } + + private static CatalogTable createKafkaSinkCatalogTable(Map<String, String> options) { + return new CatalogTableImpl(SINK_SCHEMA, options, "sinkTable"); + } + + /** + * Returns the full options modified by the given consumer {@code optionModifier}. + * + * @param optionModifier Consumer to modify the options + */ + private static Map<String, String> getModifiedOptions( + Map<String, String> options, + Consumer<Map<String, String>> optionModifier) { + optionModifier.accept(options); + return options; + } + + private static Map<String, String> getFullSourceOptions() { + Map<String, String> tableOptions = new HashMap<>(); + // Kafka specific options. + tableOptions.put("connector", KafkaDynamicTableFactory.IDENTIFIER); + tableOptions.put("topic", TOPIC); + tableOptions.put("properties.group.id", "dummy"); + tableOptions.put("properties.bootstrap.servers", "dummy"); + tableOptions.put("scan.startup.mode", "specific-offsets"); + tableOptions.put("scan.startup.specific-offsets", PROPS_SCAN_OFFSETS); + tableOptions.put("scan.topic-partition-discovery.interval", DISCOVERY_INTERVAL); + // Format options. + tableOptions.put("format", TestFormatFactory.IDENTIFIER); + final String formatDelimiterKey = String.format("%s.%s", + TestFormatFactory.IDENTIFIER, TestFormatFactory.DELIMITER.key()); + final String failOnMissingKey = String.format("%s.%s", + TestFormatFactory.IDENTIFIER, TestFormatFactory.FAIL_ON_MISSING.key()); + tableOptions.put(formatDelimiterKey, ","); + tableOptions.put(failOnMissingKey, "true"); + return tableOptions; + } + + private static Map<String, String> getFullSinkOptions() { + Map<String, String> tableOptions = new HashMap<>(); + // Kafka specific options. + tableOptions.put("connector", KafkaDynamicTableFactory.IDENTIFIER); + tableOptions.put("topic", TOPIC); + tableOptions.put("properties.group.id", "dummy"); + tableOptions.put("properties.bootstrap.servers", "dummy"); + tableOptions.put("sink.partitioner", KafkaOptions.SINK_PARTITIONER_VALUE_FIXED); + tableOptions.put("sink.semantic", KafkaOptions.SINK_SEMANTIC_VALUE_EXACTLY_ONCE); + // Format options. + tableOptions.put("format", TestFormatFactory.IDENTIFIER); + final String formatDelimiterKey = String.format("%s.%s", + TestFormatFactory.IDENTIFIER, TestFormatFactory.DELIMITER.key()); + tableOptions.put(formatDelimiterKey, ","); + return tableOptions; + } } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java deleted file mode 100644 index 5d1938f..0000000 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java +++ /dev/null @@ -1,606 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.table; - -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; -import org.apache.flink.streaming.connectors.kafka.config.StartupMode; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; -import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; -import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.CatalogTableImpl; -import org.apache.flink.table.catalog.ObjectIdentifier; -import org.apache.flink.table.connector.format.DecodingFormat; -import org.apache.flink.table.connector.format.EncodingFormat; -import org.apache.flink.table.connector.sink.DynamicTableSink; -import org.apache.flink.table.connector.sink.SinkFunctionProvider; -import org.apache.flink.table.connector.source.DynamicTableSource; -import org.apache.flink.table.connector.source.ScanTableSource; -import org.apache.flink.table.connector.source.SourceFunctionProvider; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.factories.FactoryUtil; -import org.apache.flink.table.factories.TestFormatFactory; -import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext; -import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; -import org.apache.flink.table.types.DataType; -import org.apache.flink.util.TestLogger; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Properties; -import java.util.function.Consumer; -import java.util.regex.Pattern; - -import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -/** - * Abstract test base for {@link KafkaDynamicTableFactoryBase}. - */ -public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger { - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - private static final String TOPIC = "myTopic"; - private static final String TOPICS = "myTopic-1;myTopic-2;myTopic-3"; - private static final String TOPIC_REGEX = "myTopic-\\d+"; - private static final List<String> TOPIC_LIST = Arrays.asList("myTopic-1", "myTopic-2", "myTopic-3"); - private static final int PARTITION_0 = 0; - private static final long OFFSET_0 = 100L; - private static final int PARTITION_1 = 1; - private static final long OFFSET_1 = 123L; - private static final String NAME = "name"; - private static final String COUNT = "count"; - private static final String TIME = "time"; - private static final String WATERMARK_EXPRESSION = TIME + " - INTERVAL '5' SECOND"; - private static final DataType WATERMARK_DATATYPE = DataTypes.TIMESTAMP(3); - private static final String COMPUTED_COLUMN_NAME = "computed-column"; - private static final String COMPUTED_COLUMN_EXPRESSION = COUNT + " + 1.0"; - private static final DataType COMPUTED_COLUMN_DATATYPE = DataTypes.DECIMAL(10, 3); - private static final String SEMANTIC = "exactly-once"; - private static final String DISCOVERY_INTERVAL = "1000 ms"; - - private static final Properties KAFKA_SOURCE_PROPERTIES = new Properties(); - private static final Properties KAFKA_SINK_PROPERTIES = new Properties(); - static { - KAFKA_SOURCE_PROPERTIES.setProperty("group.id", "dummy"); - KAFKA_SOURCE_PROPERTIES.setProperty("bootstrap.servers", "dummy"); - KAFKA_SOURCE_PROPERTIES.setProperty("flink.partition-discovery.interval-millis", "1000"); - - KAFKA_SINK_PROPERTIES.setProperty("group.id", "dummy"); - KAFKA_SINK_PROPERTIES.setProperty("bootstrap.servers", "dummy"); - } - - private static final String PROPS_SCAN_OFFSETS = - String.format("partition:%d,offset:%d;partition:%d,offset:%d", - PARTITION_0, OFFSET_0, PARTITION_1, OFFSET_1); - - private static final TableSchema SOURCE_SCHEMA = TableSchema.builder() - .field(NAME, DataTypes.STRING()) - .field(COUNT, DataTypes.DECIMAL(38, 18)) - .field(TIME, DataTypes.TIMESTAMP(3)) - .field(COMPUTED_COLUMN_NAME, COMPUTED_COLUMN_DATATYPE, COMPUTED_COLUMN_EXPRESSION) - .watermark(TIME, WATERMARK_EXPRESSION, WATERMARK_DATATYPE) - .build(); - - private static final TableSchema SINK_SCHEMA = TableSchema.builder() - .field(NAME, DataTypes.STRING()) - .field(COUNT, DataTypes.DECIMAL(38, 18)) - .field(TIME, DataTypes.TIMESTAMP(3)) - .build(); - - @Test - @SuppressWarnings("unchecked") - public void testTableSource() { - // prepare parameters for Kafka table source - final DataType producedDataType = SOURCE_SCHEMA.toPhysicalRowDataType(); - - final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>(); - specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0); - specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1); - - DecodingFormat<DeserializationSchema<RowData>> decodingFormat = - new TestFormatFactory.DecodingFormatMock(",", true); - - // Construct table source using options and table source factory - ObjectIdentifier objectIdentifier = ObjectIdentifier.of( - "default", - "default", - "scanTable"); - CatalogTable catalogTable = createKafkaSourceCatalogTable(); - final DynamicTableSource actualSource = FactoryUtil.createTableSource(null, - objectIdentifier, - catalogTable, - new Configuration(), - Thread.currentThread().getContextClassLoader(), - false); - - // Test scan source equals - final KafkaDynamicSourceBase expectedKafkaSource = getExpectedScanSource( - producedDataType, - Collections.singletonList(TOPIC), - null, - KAFKA_SOURCE_PROPERTIES, - decodingFormat, - StartupMode.SPECIFIC_OFFSETS, - specificOffsets, - 0); - final KafkaDynamicSourceBase actualKafkaSource = (KafkaDynamicSourceBase) actualSource; - assertEquals(actualKafkaSource, expectedKafkaSource); - - // Test Kafka consumer - ScanTableSource.ScanRuntimeProvider provider = - actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); - assertThat(provider, instanceOf(SourceFunctionProvider.class)); - final SourceFunctionProvider sourceFunctionProvider = (SourceFunctionProvider) provider; - final SourceFunction<RowData> sourceFunction = sourceFunctionProvider.createSourceFunction(); - assertThat(sourceFunction, instanceOf(getExpectedConsumerClass())); - // Test commitOnCheckpoints flag should be true when set consumer group - assertTrue(((FlinkKafkaConsumerBase) sourceFunction).getEnableCommitOnCheckpoints()); - } - - @Test - public void testTableSourceCommitOnCheckpointsDisabled() { - //Construct table source using options and table source factory - ObjectIdentifier objectIdentifier = ObjectIdentifier.of( - "default", - "default", - "scanTable"); - Map<String, String> tableOptions = getFullSourceOptions(); - tableOptions.remove("properties.group.id"); - CatalogTable catalogTable = createKafkaSourceCatalogTable(tableOptions); - final DynamicTableSource tableSource = FactoryUtil.createTableSource(null, - objectIdentifier, - catalogTable, - new Configuration(), - Thread.currentThread().getContextClassLoader(), - false); - - // Test commitOnCheckpoints flag should be false when do not set consumer group. - assertThat(tableSource, instanceOf(KafkaDynamicSourceBase.class)); - ScanTableSource.ScanRuntimeProvider providerWithoutGroupId = ((KafkaDynamicSourceBase) tableSource) - .getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); - assertThat(providerWithoutGroupId, instanceOf(SourceFunctionProvider.class)); - final SourceFunctionProvider functionProviderWithoutGroupId = (SourceFunctionProvider) providerWithoutGroupId; - final SourceFunction<RowData> function = functionProviderWithoutGroupId.createSourceFunction(); - assertFalse(((FlinkKafkaConsumerBase) function).getEnableCommitOnCheckpoints()); - } - - @Test - public void testTableSourceWithPattern() { - // prepare parameters for Kafka table source - final DataType producedDataType = SOURCE_SCHEMA.toPhysicalRowDataType(); - - final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>(); - - DecodingFormat<DeserializationSchema<RowData>> decodingFormat = - new TestFormatFactory.DecodingFormatMock(",", true); - - // Construct table source using options and table source factory - ObjectIdentifier objectIdentifier = ObjectIdentifier.of( - "default", - "default", - "scanTable"); - final Map<String, String> modifiedOptions = getModifiedOptions( - getFullSourceOptions(), - options -> { - options.remove("topic"); - options.put("topic-pattern", TOPIC_REGEX); - options.put("scan.startup.mode", KafkaOptions.SCAN_STARTUP_MODE_VALUE_EARLIEST); - options.remove("scan.startup.specific-offsets"); - }); - CatalogTable catalogTable = createKafkaSourceCatalogTable(modifiedOptions); - - final DynamicTableSource actualSource = FactoryUtil.createTableSource(null, - objectIdentifier, - catalogTable, - new Configuration(), - Thread.currentThread().getContextClassLoader(), - false); - - // Test scan source equals - final KafkaDynamicSourceBase expectedKafkaSource = getExpectedScanSource( - producedDataType, - null, - Pattern.compile(TOPIC_REGEX), - KAFKA_SOURCE_PROPERTIES, - decodingFormat, - StartupMode.EARLIEST, - specificOffsets, - 0); - final KafkaDynamicSourceBase actualKafkaSource = (KafkaDynamicSourceBase) actualSource; - assertEquals(actualKafkaSource, expectedKafkaSource); - - // Test Kafka consumer - ScanTableSource.ScanRuntimeProvider provider = - actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); - assertThat(provider, instanceOf(SourceFunctionProvider.class)); - final SourceFunctionProvider sourceFunctionProvider = (SourceFunctionProvider) provider; - final SourceFunction<RowData> sourceFunction = sourceFunctionProvider.createSourceFunction(); - assertThat(sourceFunction, instanceOf(getExpectedConsumerClass())); - } - - @Test - public void testTableSink() { - final DataType consumedDataType = SINK_SCHEMA.toPhysicalRowDataType(); - EncodingFormat<SerializationSchema<RowData>> encodingFormat = - new TestFormatFactory.EncodingFormatMock(","); - - // Construct table sink using options and table sink factory. - ObjectIdentifier objectIdentifier = ObjectIdentifier.of( - "default", - "default", - "sinkTable"); - final CatalogTable sinkTable = createKafkaSinkCatalogTable(); - final DynamicTableSink actualSink = FactoryUtil.createTableSink( - null, - objectIdentifier, - sinkTable, - new Configuration(), - Thread.currentThread().getContextClassLoader(), - false); - - final DynamicTableSink expectedSink = getExpectedSink( - consumedDataType, - TOPIC, - KAFKA_SINK_PROPERTIES, - Optional.of(new FlinkFixedPartitioner<>()), - encodingFormat, - KafkaSinkSemantic.EXACTLY_ONCE - ); - assertEquals(expectedSink, actualSink); - - // Test sink format. - final KafkaDynamicSinkBase actualKafkaSink = (KafkaDynamicSinkBase) actualSink; - assertEquals(encodingFormat, actualKafkaSink.encodingFormat); - - // Test kafka producer. - DynamicTableSink.SinkRuntimeProvider provider = - actualKafkaSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)); - assertThat(provider, instanceOf(SinkFunctionProvider.class)); - final SinkFunctionProvider sinkFunctionProvider = (SinkFunctionProvider) provider; - final SinkFunction<RowData> sinkFunction = sinkFunctionProvider.createSinkFunction(); - assertThat(sinkFunction, instanceOf(getExpectedProducerClass())); - } - - // -------------------------------------------------------------------------------------------- - // Negative tests - // -------------------------------------------------------------------------------------------- - @Test - public void testInvalidScanStartupMode() { - // Construct table source using DDL and table source factory - ObjectIdentifier objectIdentifier = ObjectIdentifier.of( - "default", - "default", - "scanTable"); - final Map<String, String> modifiedOptions = getModifiedOptions( - getFullSourceOptions(), - options -> { - options.put("scan.startup.mode", "abc"); - }); - CatalogTable catalogTable = createKafkaSourceCatalogTable(modifiedOptions); - - thrown.expect(ValidationException.class); - thrown.expect(containsCause(new ValidationException("Invalid value for option 'scan.startup.mode'. " - + "Supported values are [earliest-offset, latest-offset, group-offsets, specific-offsets, timestamp], " - + "but was: abc"))); - FactoryUtil.createTableSource(null, - objectIdentifier, - catalogTable, - new Configuration(), - Thread.currentThread().getContextClassLoader(), - false); - } - - @Test - public void testSourceTableWithTopicAndTopicPattern() { - // Construct table source using DDL and table source factory - ObjectIdentifier objectIdentifier = ObjectIdentifier.of( - "default", - "default", - "scanTable"); - final Map<String, String> modifiedOptions = getModifiedOptions( - getFullSourceOptions(), - options -> { - options.put("topic", TOPICS); - options.put("topic-pattern", TOPIC_REGEX); - }); - CatalogTable catalogTable = createKafkaSourceCatalogTable(modifiedOptions); - - thrown.expect(ValidationException.class); - thrown.expect(containsCause(new ValidationException("Option 'topic' and 'topic-pattern' shouldn't be set together."))); - FactoryUtil.createTableSource(null, - objectIdentifier, - catalogTable, - new Configuration(), - Thread.currentThread().getContextClassLoader(), - false); - } - - @Test - public void testMissingStartupTimestamp() { - // Construct table source using DDL and table source factory - ObjectIdentifier objectIdentifier = ObjectIdentifier.of( - "default", - "default", - "scanTable"); - final Map<String, String> modifiedOptions = getModifiedOptions( - getFullSourceOptions(), - options -> { - options.put("scan.startup.mode", "timestamp"); - }); - CatalogTable catalogTable = createKafkaSourceCatalogTable(modifiedOptions); - - thrown.expect(ValidationException.class); - thrown.expect(containsCause(new ValidationException("'scan.startup.timestamp-millis' " - + "is required in 'timestamp' startup mode but missing."))); - FactoryUtil.createTableSource(null, - objectIdentifier, - catalogTable, - new Configuration(), - Thread.currentThread().getContextClassLoader(), - false); - } - - @Test - public void testMissingSpecificOffsets() { - // Construct table source using DDL and table source factory - ObjectIdentifier objectIdentifier = ObjectIdentifier.of( - "default", - "default", - "scanTable"); - final Map<String, String> modifiedOptions = getModifiedOptions( - getFullSourceOptions(), - options -> { - options.remove("scan.startup.specific-offsets"); - }); - CatalogTable catalogTable = createKafkaSourceCatalogTable(modifiedOptions); - - thrown.expect(ValidationException.class); - thrown.expect(containsCause(new ValidationException("'scan.startup.specific-offsets' " - + "is required in 'specific-offsets' startup mode but missing."))); - FactoryUtil.createTableSource(null, - objectIdentifier, - catalogTable, - new Configuration(), - Thread.currentThread().getContextClassLoader(), - false); - } - - @Test - public void testInvalidSinkPartitioner() { - ObjectIdentifier objectIdentifier = ObjectIdentifier.of( - "default", - "default", - "sinkTable"); - - final Map<String, String> modifiedOptions = getModifiedOptions( - getFullSourceOptions(), - options -> { - options.put("sink.partitioner", "abc"); - }); - final CatalogTable sinkTable = createKafkaSinkCatalogTable(modifiedOptions); - - thrown.expect(ValidationException.class); - thrown.expect(containsCause(new ValidationException("Could not find and instantiate partitioner class 'abc'"))); - FactoryUtil.createTableSink( - null, - objectIdentifier, - sinkTable, - new Configuration(), - Thread.currentThread().getContextClassLoader(), - false); - } - - @Test - public void testInvalidSinkSemantic(){ - ObjectIdentifier objectIdentifier = ObjectIdentifier.of( - "default", - "default", - "sinkTable"); - - final Map<String, String> modifiedOptions = getModifiedOptions( - getFullSourceOptions(), - options -> { - options.put("sink.semantic", "xyz"); - }); - final CatalogTable sinkTable = createKafkaSinkCatalogTable(modifiedOptions); - - thrown.expect(ValidationException.class); - thrown.expect(containsCause(new ValidationException("Unsupported value 'xyz' for 'sink.semantic'. Supported values are ['at-least-once', 'exactly-once', 'none']."))); - FactoryUtil.createTableSink( - null, - objectIdentifier, - sinkTable, - new Configuration(), - Thread.currentThread().getContextClassLoader(), - false); - } - - @Test - public void testSinkWithTopicListOrTopicPattern(){ - ObjectIdentifier objectIdentifier = ObjectIdentifier.of( - "default", - "default", - "sinkTable"); - - Map<String, String> modifiedOptions = getModifiedOptions( - getFullSourceOptions(), - options -> { - options.put("topic", TOPICS); - options.put("scan.startup.mode", "earliest-offset"); - options.remove("specific-offsets"); - }); - CatalogTable sinkTable = createKafkaSinkCatalogTable(modifiedOptions); - String errorMessageTemp = "Flink Kafka sink currently only supports single topic, but got %s: %s."; - - try { - FactoryUtil.createTableSink( - null, - objectIdentifier, - sinkTable, - new Configuration(), - Thread.currentThread().getContextClassLoader(), - false); - } catch (Throwable t) { - assertEquals(String.format(errorMessageTemp, "'topic'", String.format("[%s]", String.join(", ", TOPIC_LIST))), - t.getCause().getMessage()); - } - - modifiedOptions = getModifiedOptions( - getFullSourceOptions(), - options -> { - options.put("topic-pattern", TOPIC_REGEX); - }); - sinkTable = createKafkaSinkCatalogTable(modifiedOptions); - - try { - FactoryUtil.createTableSink( - null, - objectIdentifier, - sinkTable, - new Configuration(), - Thread.currentThread().getContextClassLoader(), - false); - } catch (Throwable t) { - assertEquals(String.format(errorMessageTemp, "'topic-pattern'", TOPIC_REGEX), t.getCause().getMessage()); - } - } - - // -------------------------------------------------------------------------------------------- - // Utilities - // -------------------------------------------------------------------------------------------- - - private CatalogTable createKafkaSourceCatalogTable() { - return createKafkaSourceCatalogTable(getFullSourceOptions()); - } - - private CatalogTable createKafkaSinkCatalogTable() { - return createKafkaSinkCatalogTable(getFullSinkOptions()); - } - - private CatalogTable createKafkaSourceCatalogTable(Map<String, String> options) { - return new CatalogTableImpl(SOURCE_SCHEMA, options, "scanTable"); - } - - protected CatalogTable createKafkaSinkCatalogTable(Map<String, String> options) { - return new CatalogTableImpl(SINK_SCHEMA, options, "sinkTable"); - } - - /** - * Returns the full options modified by the given consumer {@code optionModifier}. - * - * @param optionModifier Consumer to modify the options - */ - protected static Map<String, String> getModifiedOptions( - Map<String, String> options, - Consumer<Map<String, String>> optionModifier) { - optionModifier.accept(options); - return options; - } - - protected Map<String, String> getFullSourceOptions() { - Map<String, String> tableOptions = new HashMap<>(); - // Kafka specific options. - tableOptions.put("connector", factoryIdentifier()); - tableOptions.put("topic", TOPIC); - tableOptions.put("properties.group.id", "dummy"); - tableOptions.put("properties.bootstrap.servers", "dummy"); - tableOptions.put("scan.startup.mode", "specific-offsets"); - tableOptions.put("scan.startup.specific-offsets", PROPS_SCAN_OFFSETS); - tableOptions.put("scan.topic-partition-discovery.interval", DISCOVERY_INTERVAL); - // Format options. - tableOptions.put("format", TestFormatFactory.IDENTIFIER); - final String formatDelimiterKey = String.format("%s.%s", - TestFormatFactory.IDENTIFIER, TestFormatFactory.DELIMITER.key()); - final String failOnMissingKey = String.format("%s.%s", - TestFormatFactory.IDENTIFIER, TestFormatFactory.FAIL_ON_MISSING.key()); - tableOptions.put(formatDelimiterKey, ","); - tableOptions.put(failOnMissingKey, "true"); - return tableOptions; - } - - protected Map<String, String> getFullSinkOptions() { - Map<String, String> tableOptions = new HashMap<>(); - // Kafka specific options. - tableOptions.put("connector", factoryIdentifier()); - tableOptions.put("topic", TOPIC); - tableOptions.put("properties.group.id", "dummy"); - tableOptions.put("properties.bootstrap.servers", "dummy"); - tableOptions.put("sink.partitioner", KafkaOptions.SINK_PARTITIONER_VALUE_FIXED); - tableOptions.put("sink.semantic", KafkaOptions.SINK_SEMANTIC_VALUE_EXACTLY_ONCE); - // Format options. - tableOptions.put("format", TestFormatFactory.IDENTIFIER); - final String formatDelimiterKey = String.format("%s.%s", - TestFormatFactory.IDENTIFIER, TestFormatFactory.DELIMITER.key()); - tableOptions.put(formatDelimiterKey, ","); - return tableOptions; - } - - // -------------------------------------------------------------------------------------------- - // For version-specific tests - // -------------------------------------------------------------------------------------------- - - protected abstract String factoryIdentifier(); - - protected abstract Class<?> getExpectedConsumerClass(); - - protected abstract Class<?> getExpectedProducerClass(); - - protected abstract KafkaDynamicSourceBase getExpectedScanSource( - DataType producedDataType, - List<String> topics, - Pattern topicPattern, - Properties properties, - DecodingFormat<DeserializationSchema<RowData>> decodingFormat, - StartupMode startupMode, - Map<KafkaTopicPartition, Long> specificStartupOffsets, - long startupTimestamp - ); - - protected abstract KafkaDynamicSinkBase getExpectedSink( - DataType consumedDataType, - String topic, - Properties properties, - Optional<FlinkKafkaPartitioner<RowData>> partitioner, - EncodingFormat<SerializationSchema<RowData>> encodingFormat, - KafkaSinkSemantic semantic - ); -}