[FLINK-8538] [table] Improve unified table sources This closes #5564.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db2c510f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db2c510f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db2c510f Branch: refs/heads/release-1.5 Commit: db2c510fb4f171c9e9940759e5fbaf466ec74474 Parents: 1d26062 Author: Timo Walther <twal...@apache.org> Authored: Mon Feb 19 13:35:45 2018 +0100 Committer: Timo Walther <twal...@apache.org> Committed: Tue Feb 27 20:23:00 2018 +0100 ---------------------------------------------------------------------- .../flink-connector-kafka-0.10/pom.xml | 8 + .../kafka/Kafka010JsonTableSourceFactory.java | 5 +- .../resources/tableSourceConverter.properties | 29 - .../Kafka010JsonTableSourceFactoryTest.java | 37 + .../kafka/Kafka010TableSourceFactoryTest.java | 41 - .../flink-connector-kafka-0.11/pom.xml | 8 + .../kafka/Kafka011JsonTableSourceFactory.java | 5 +- .../resources/tableSourceConverter.properties | 29 - .../Kafka011JsonTableSourceFactoryTest.java | 37 + .../kafka/Kafka011TableSourceFactoryTest.java | 41 - .../flink-connector-kafka-0.8/pom.xml | 8 + .../kafka/Kafka08JsonTableSourceFactory.java | 5 +- .../resources/tableSourceConverter.properties | 29 - .../Kafka08JsonTableSourceFactoryTest.java | 37 + .../kafka/Kafka08TableSourceFactoryTest.java | 42 - .../flink-connector-kafka-0.9/pom.xml | 8 + .../kafka/Kafka09JsonTableSourceFactory.java | 5 +- .../resources/tableSourceConverter.properties | 29 - .../Kafka09JsonTableSourceFactoryTest.java | 37 + .../kafka/Kafka09TableSourceFactoryTest.java | 41 - .../flink-connector-kafka-base/pom.xml | 16 +- .../connectors/kafka/KafkaJsonTableSource.java | 24 +- .../kafka/KafkaJsonTableSourceFactory.java | 267 +++--- .../connectors/kafka/KafkaTableSource.java | 42 +- .../apache/flink/table/descriptors/Kafka.java | 239 +++--- .../flink/table/descriptors/KafkaValidator.java | 226 ++--- .../KafkaJsonTableFromDescriptorTestBase.java | 127 --- .../KafkaJsonTableSourceFactoryTestBase.java | 145 ++++ .../flink/table/descriptors/KafkaTest.java | 113 +++ .../src/test/resources/kafka-json-schema.json | 35 - flink-dist/pom.xml | 16 - flink-formats/flink-json/pom.xml | 29 +- .../apache/flink/table/descriptors/Json.java | 129 +++ .../flink/table/descriptors/JsonValidator.java | 55 ++ .../flink/table/descriptors/JsonTest.java | 124 +++ .../client/gateway/local/LocalExecutor.java | 2 +- flink-libraries/flink-table/pom.xml | 12 + .../resources/tableSourceConverter.properties | 7 + .../apache/flink/table/api/TableSchema.scala | 6 +- .../table/catalog/ExternalCatalogTable.scala | 19 +- .../table/catalog/ExternalTableSourceUtil.scala | 2 +- .../BatchTableSourceDescriptor.scala | 2 +- .../table/descriptors/ConnectorDescriptor.scala | 9 +- .../ConnectorDescriptorValidator.scala | 18 +- .../apache/flink/table/descriptors/Csv.scala | 13 +- .../descriptors/DescriptorProperties.scala | 833 ++++++++++++++++--- .../flink/table/descriptors/FileSystem.scala | 5 +- .../table/descriptors/FormatDescriptor.scala | 4 +- .../descriptors/FormatDescriptorValidator.scala | 23 +- .../apache/flink/table/descriptors/Json.scala | 78 -- .../flink/table/descriptors/JsonValidator.scala | 41 - .../table/descriptors/MetadataValidator.scala | 6 +- .../flink/table/descriptors/Rowtime.scala | 8 +- .../table/descriptors/RowtimeValidator.scala | 179 ++-- .../apache/flink/table/descriptors/Schema.scala | 14 +- .../table/descriptors/SchemaValidator.scala | 171 +++- .../flink/table/descriptors/Statistics.scala | 6 +- .../table/descriptors/StatisticsValidator.scala | 21 +- .../StreamTableSourceDescriptor.scala | 2 +- .../descriptors/TableSourceDescriptor.scala | 3 +- .../table/sources/CsvTableSourceFactory.scala | 43 +- .../table/sources/TableSourceFactory.scala | 11 +- .../sources/TableSourceFactoryService.scala | 26 +- .../tsextractors/StreamRecordTimestamp.scala | 5 +- .../flink/table/descriptors/CsvTest.scala | 83 +- .../table/descriptors/DescriptorTestBase.scala | 65 +- .../table/descriptors/FileSystemTest.scala | 33 +- .../flink/table/descriptors/JsonTest.scala | 77 -- .../flink/table/descriptors/MetadataTest.scala | 38 +- .../flink/table/descriptors/RowtimeTest.scala | 59 +- .../flink/table/descriptors/SchemaTest.scala | 84 +- .../table/descriptors/SchemaValidatorTest.scala | 76 ++ .../table/descriptors/StatisticsTest.scala | 72 +- .../sources/TableSourceFactoryServiceTest.scala | 20 +- .../table/sources/TestTableSourceFactory.scala | 8 +- 75 files changed, 2681 insertions(+), 1571 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.10/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml index 8b4ff38..2d273ae 100644 --- a/flink-connectors/flink-connector-kafka-0.10/pom.xml +++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml @@ -192,6 +192,14 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java index 1d03f6c..c639a44 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java @@ -18,12 +18,13 @@ package org.apache.flink.streaming.connectors.kafka; -import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_010; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010; /** * Factory for creating configured instances of {@link Kafka010JsonTableSource}. */ public class Kafka010JsonTableSourceFactory extends KafkaJsonTableSourceFactory { + @Override protected KafkaJsonTableSource.Builder createBuilder() { return new Kafka010JsonTableSource.Builder(); @@ -31,6 +32,6 @@ public class Kafka010JsonTableSourceFactory extends KafkaJsonTableSourceFactory @Override protected String kafkaVersion() { - return KAFKA_VERSION_VALUE_010; + return CONNECTOR_VERSION_VALUE_010; } } http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.10/src/main/resources/tableSourceConverter.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/tableSourceConverter.properties b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/tableSourceConverter.properties deleted file mode 100644 index 5409b49..0000000 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/tableSourceConverter.properties +++ /dev/null @@ -1,29 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -################################################################################ -# The config file is used to specify the packages of current module where -# to find TableSourceConverter implementation class annotated with TableType. -# If there are multiple packages to scan, put those packages together into a -# string separated with ',', for example, org.package1,org.package2. -# Please notice: -# It's better to have a tableSourceConverter.properties in each connector Module -# which offers converters instead of put all information into the -# tableSourceConverter.properties of flink-table module. -################################################################################ -scan.packages=org.apache.flink.streaming.connectors.kafka http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactoryTest.java new file mode 100644 index 0000000..22cf659 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactoryTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010; + +/** + * Tests for {@link Kafka010JsonTableSourceFactory}. + */ +public class Kafka010JsonTableSourceFactoryTest extends KafkaJsonTableSourceFactoryTestBase { + + @Override + protected String version() { + return CONNECTOR_VERSION_VALUE_010; + } + + @Override + protected KafkaJsonTableSource.Builder builder() { + return Kafka010JsonTableSource.builder(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java deleted file mode 100644 index 15b89e8..0000000 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.table.descriptors.Kafka; - -import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_010; - -/** - * Tests for {@link Kafka010JsonTableSourceFactory}. - */ -public class Kafka010TableSourceFactoryTest extends KafkaJsonTableFromDescriptorTestBase { - protected String versionForTest() { - return KAFKA_VERSION_VALUE_010; - } - - protected KafkaJsonTableSource.Builder builderForTest() { - return Kafka010JsonTableSource.builder(); - } - - @Override - protected void extraSettings(KafkaTableSource.Builder builder, Kafka kafka) { - // no extra settings - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.11/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/pom.xml b/flink-connectors/flink-connector-kafka-0.11/pom.xml index 1e935f6..0fc1f13 100644 --- a/flink-connectors/flink-connector-kafka-0.11/pom.xml +++ b/flink-connectors/flink-connector-kafka-0.11/pom.xml @@ -201,6 +201,14 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java index ca4d6ce..6745bb2 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java @@ -18,12 +18,13 @@ package org.apache.flink.streaming.connectors.kafka; -import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_011; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_011; /** * Factory for creating configured instances of {@link Kafka011JsonTableSource}. */ public class Kafka011JsonTableSourceFactory extends KafkaJsonTableSourceFactory { + @Override protected KafkaJsonTableSource.Builder createBuilder() { return new Kafka011JsonTableSource.Builder(); @@ -31,6 +32,6 @@ public class Kafka011JsonTableSourceFactory extends KafkaJsonTableSourceFactory @Override protected String kafkaVersion() { - return KAFKA_VERSION_VALUE_011; + return CONNECTOR_VERSION_VALUE_011; } } http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.11/src/main/resources/tableSourceConverter.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/tableSourceConverter.properties b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/tableSourceConverter.properties deleted file mode 100644 index 5409b49..0000000 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/tableSourceConverter.properties +++ /dev/null @@ -1,29 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -################################################################################ -# The config file is used to specify the packages of current module where -# to find TableSourceConverter implementation class annotated with TableType. -# If there are multiple packages to scan, put those packages together into a -# string separated with ',', for example, org.package1,org.package2. -# Please notice: -# It's better to have a tableSourceConverter.properties in each connector Module -# which offers converters instead of put all information into the -# tableSourceConverter.properties of flink-table module. -################################################################################ -scan.packages=org.apache.flink.streaming.connectors.kafka http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactoryTest.java new file mode 100644 index 0000000..ed92863 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactoryTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_011; + +/** + * Tests for {@link Kafka011JsonTableSourceFactory}. + */ +public class Kafka011JsonTableSourceFactoryTest extends KafkaJsonTableSourceFactoryTestBase { + + @Override + protected String version() { + return CONNECTOR_VERSION_VALUE_011; + } + + @Override + protected KafkaJsonTableSource.Builder builder() { + return Kafka011JsonTableSource.builder(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java deleted file mode 100644 index 84ac39b..0000000 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.table.descriptors.Kafka; - -import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_011; - -/** - * Tests for {@link Kafka011JsonTableSourceFactory}. - */ -public class Kafka011TableSourceFactoryTest extends KafkaJsonTableFromDescriptorTestBase { - protected String versionForTest() { - return KAFKA_VERSION_VALUE_011; - } - - protected KafkaJsonTableSource.Builder builderForTest() { - return Kafka011JsonTableSource.builder(); - } - - @Override - protected void extraSettings(KafkaTableSource.Builder builder, Kafka kafka) { - // no extra settings - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.8/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/pom.xml b/flink-connectors/flink-connector-kafka-0.8/pom.xml index a58591e..43a9dac 100644 --- a/flink-connectors/flink-connector-kafka-0.8/pom.xml +++ b/flink-connectors/flink-connector-kafka-0.8/pom.xml @@ -204,6 +204,14 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java index e4e5096..2da805a 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java @@ -18,12 +18,13 @@ package org.apache.flink.streaming.connectors.kafka; -import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_08; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08; /** * Factory for creating configured instances of {@link Kafka08JsonTableSource}. */ public class Kafka08JsonTableSourceFactory extends KafkaJsonTableSourceFactory { + @Override protected KafkaJsonTableSource.Builder createBuilder() { return new Kafka08JsonTableSource.Builder(); @@ -31,6 +32,6 @@ public class Kafka08JsonTableSourceFactory extends KafkaJsonTableSourceFactory { @Override protected String kafkaVersion() { - return KAFKA_VERSION_VALUE_08; + return CONNECTOR_VERSION_VALUE_08; } } http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.8/src/main/resources/tableSourceConverter.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/tableSourceConverter.properties b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/tableSourceConverter.properties deleted file mode 100644 index 5409b49..0000000 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/tableSourceConverter.properties +++ /dev/null @@ -1,29 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -################################################################################ -# The config file is used to specify the packages of current module where -# to find TableSourceConverter implementation class annotated with TableType. -# If there are multiple packages to scan, put those packages together into a -# string separated with ',', for example, org.package1,org.package2. -# Please notice: -# It's better to have a tableSourceConverter.properties in each connector Module -# which offers converters instead of put all information into the -# tableSourceConverter.properties of flink-table module. -################################################################################ -scan.packages=org.apache.flink.streaming.connectors.kafka http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java new file mode 100644 index 0000000..0238b2b --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08; + +/** + * Tests for {@link Kafka08JsonTableSourceFactory}. + */ +public class Kafka08JsonTableSourceFactoryTest extends KafkaJsonTableSourceFactoryTestBase { + + @Override + protected String version() { + return CONNECTOR_VERSION_VALUE_08; + } + + @Override + protected KafkaJsonTableSource.Builder builder() { + return Kafka08JsonTableSource.builder(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java deleted file mode 100644 index a2edc09..0000000 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.table.descriptors.Kafka; - -import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_08; - -/** - * Tests for {@link Kafka08JsonTableSourceFactory}. - */ -public class Kafka08TableSourceFactoryTest extends KafkaJsonTableFromDescriptorTestBase { - protected String versionForTest() { - return KAFKA_VERSION_VALUE_08; - } - - protected KafkaJsonTableSource.Builder builderForTest() { - return Kafka08JsonTableSource.builder(); - } - - @Override - protected void extraSettings(KafkaTableSource.Builder builder, Kafka kafka) { - builder.getKafkaProps().put("zookeeper.connect", "localhost:1111"); - kafka.zookeeperConnect("localhost:1111"); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.9/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/pom.xml b/flink-connectors/flink-connector-kafka-0.9/pom.xml index d07cb5a..1999a8d 100644 --- a/flink-connectors/flink-connector-kafka-0.9/pom.xml +++ b/flink-connectors/flink-connector-kafka-0.9/pom.xml @@ -174,6 +174,14 @@ under the License. </dependency> <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-minikdc</artifactId> <version>${minikdc.version}</version> http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java index bbda4ae..9207426 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java @@ -18,12 +18,13 @@ package org.apache.flink.streaming.connectors.kafka; -import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_09; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_09; /** * Factory for creating configured instances of {@link Kafka09JsonTableSource}. */ public class Kafka09JsonTableSourceFactory extends KafkaJsonTableSourceFactory { + @Override protected KafkaJsonTableSource.Builder createBuilder() { return new Kafka09JsonTableSource.Builder(); @@ -31,6 +32,6 @@ public class Kafka09JsonTableSourceFactory extends KafkaJsonTableSourceFactory { @Override protected String kafkaVersion() { - return KAFKA_VERSION_VALUE_09; + return CONNECTOR_VERSION_VALUE_09; } } http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.9/src/main/resources/tableSourceConverter.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/tableSourceConverter.properties b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/tableSourceConverter.properties deleted file mode 100644 index 5409b49..0000000 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/tableSourceConverter.properties +++ /dev/null @@ -1,29 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -################################################################################ -# The config file is used to specify the packages of current module where -# to find TableSourceConverter implementation class annotated with TableType. -# If there are multiple packages to scan, put those packages together into a -# string separated with ',', for example, org.package1,org.package2. -# Please notice: -# It's better to have a tableSourceConverter.properties in each connector Module -# which offers converters instead of put all information into the -# tableSourceConverter.properties of flink-table module. -################################################################################ -scan.packages=org.apache.flink.streaming.connectors.kafka http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactoryTest.java new file mode 100644 index 0000000..dd545e9 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactoryTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_09; + +/** + * Factory for creating configured instances of {@link Kafka09JsonTableSource}. + */ +public class Kafka09JsonTableSourceFactoryTest extends KafkaJsonTableSourceFactoryTestBase { + + @Override + protected String version() { + return CONNECTOR_VERSION_VALUE_09; + } + + @Override + protected KafkaJsonTableSource.Builder builder() { + return Kafka09JsonTableSource.builder(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java deleted file mode 100644 index fc85ea7..0000000 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.table.descriptors.Kafka; - -import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_09; - -/** - * Factory for creating configured instances of {@link Kafka09JsonTableSource}. - */ -public class Kafka09TableSourceFactoryTest extends KafkaJsonTableFromDescriptorTestBase { - protected String versionForTest() { - return KAFKA_VERSION_VALUE_09; - } - - protected KafkaJsonTableSource.Builder builderForTest() { - return Kafka09JsonTableSource.builder(); - } - - @Override - protected void extraSettings(KafkaTableSource.Builder builder, Kafka kafka) { - // no extra settings - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-base/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml b/flink-connectors/flink-connector-kafka-base/pom.xml index 2ccaa2e..cea3896 100644 --- a/flink-connectors/flink-connector-kafka-base/pom.xml +++ b/flink-connectors/flink-connector-kafka-base/pom.xml @@ -199,23 +199,17 @@ under the License. </dependency> <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-minikdc</artifactId> - <version>${minikdc.version}</version> - <scope>test</scope> - </dependency> - - <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-scala_${scala.binary.version}</artifactId> + <artifactId>flink-table_${scala.binary.version}</artifactId> <version>${project.version}</version> + <type>test-jar</type> <scope>test</scope> </dependency> <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> - <version>${project.version}</version> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minikdc</artifactId> + <version>${minikdc.version}</version> <scope>test</scope> </dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java index d2dafe7..b2bb8ff 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java @@ -84,23 +84,29 @@ public abstract class KafkaJsonTableSource extends KafkaTableSource implements D @Override public String explainSource() { - return "KafkaJSONTableSource"; + return "KafkaJsonTableSource"; } @Override - public boolean equals(Object other) { - if (super.equals(other)) { - KafkaJsonTableSource otherSource = (KafkaJsonTableSource) other; - return Objects.equals(failOnMissingField, otherSource.failOnMissingField) - && Objects.equals(jsonSchema, otherSource.jsonSchema) - && Objects.equals(fieldMapping, otherSource.fieldMapping); + public boolean equals(Object o) { + if (this == o) { + return true; } - return false; + if (!(o instanceof KafkaJsonTableSource)) { + return false; + } + if (!super.equals(o)) { + return false; + } + KafkaJsonTableSource that = (KafkaJsonTableSource) o; + return failOnMissingField == that.failOnMissingField && + Objects.equals(jsonSchema, that.jsonSchema) && + Objects.equals(fieldMapping, that.fieldMapping); } @Override public int hashCode() { - return 31 * super.hashCode() + Objects.hash(failOnMissingField, jsonSchema, fieldMapping); + return Objects.hash(super.hashCode(), jsonSchema, fieldMapping, failOnMissingField); } //////// SETTERS FOR OPTIONAL PARAMETERS http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java index 918b833..2897314 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java @@ -21,65 +21,74 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.formats.json.JsonSchemaConverter; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.descriptors.JsonValidator; import org.apache.flink.table.descriptors.KafkaValidator; import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.sources.RowtimeAttributeDescriptor; import org.apache.flink.table.sources.TableSource; import org.apache.flink.table.sources.TableSourceFactory; import org.apache.flink.types.Row; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION; +import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA; +import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION; import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE; -import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_VERSION; import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD; -import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_SCHEMA_STRING; +import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_JSON_SCHEMA; +import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_SCHEMA; import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_TYPE_VALUE; -import static org.apache.flink.table.descriptors.KafkaValidator.BOOTSTRAP_SERVERS; -import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE; -import static org.apache.flink.table.descriptors.KafkaValidator.GROUP_ID; -import static org.apache.flink.table.descriptors.KafkaValidator.JSON_FIELD; -import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION; -import static org.apache.flink.table.descriptors.KafkaValidator.OFFSET; -import static org.apache.flink.table.descriptors.KafkaValidator.PARTITION; -import static org.apache.flink.table.descriptors.KafkaValidator.SPECIFIC_OFFSETS; -import static org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE; -import static org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE_VALUE_EARLIEST; -import static org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE_VALUE_GROUP_OFFSETS; -import static org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE_VALUE_LATEST; -import static org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE_VALUE_SPECIFIC_OFFSETS; -import static org.apache.flink.table.descriptors.KafkaValidator.TABLE_FIELD; -import static org.apache.flink.table.descriptors.KafkaValidator.TABLE_JSON_MAPPING; -import static org.apache.flink.table.descriptors.KafkaValidator.TOPIC; -import static org.apache.flink.table.descriptors.KafkaValidator.ZOOKEEPER_CONNECT; -import static org.apache.flink.table.descriptors.SchemaValidator.PROCTIME; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_STARTUP_MODE; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TOPIC; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE_KAFKA; +import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_CLASS; +import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_FROM; +import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_SERIALIZED; +import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE; +import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_CLASS; +import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_DELAY; +import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_SERIALIZED; +import static org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_TYPE; import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA; -import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_VERSION; - -import scala.Option; -import scala.collection.JavaConversions; +import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_FROM; +import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME; +import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_PROCTIME; +import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE; /** * Factory for creating configured instances of {@link KafkaJsonTableSource}. */ public abstract class KafkaJsonTableSourceFactory implements TableSourceFactory<Row> { + @Override public Map<String, String> requiredContext() { Map<String, String> context = new HashMap<>(); - context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE); // kafka connector - context.put(FORMAT_TYPE(), FORMAT_TYPE_VALUE()); // Json format - context.put(KAFKA_VERSION, kafkaVersion()); // for different implementations - context.put(CONNECTOR_VERSION(), "1"); - context.put(FORMAT_VERSION(), "1"); - context.put(SCHEMA_VERSION(), "1"); + context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA); // kafka + context.put(CONNECTOR_VERSION(), kafkaVersion()); + + context.put(FORMAT_TYPE(), FORMAT_TYPE_VALUE); // json format + + context.put(CONNECTOR_PROPERTY_VERSION(), "1"); // backwards compatibility + context.put(FORMAT_PROPERTY_VERSION(), "1"); + return context; } @@ -88,124 +97,137 @@ public abstract class KafkaJsonTableSourceFactory implements TableSourceFactory< List<String> properties = new ArrayList<>(); // kafka - properties.add(KAFKA_VERSION); - properties.add(BOOTSTRAP_SERVERS); - properties.add(GROUP_ID); - properties.add(ZOOKEEPER_CONNECT); - properties.add(TOPIC); - properties.add(STARTUP_MODE); - properties.add(SPECIFIC_OFFSETS + ".#." + PARTITION); - properties.add(SPECIFIC_OFFSETS + ".#." + OFFSET); + properties.add(CONNECTOR_TOPIC); + properties.add(CONNECTOR_PROPERTIES); + properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_KEY); + properties.add(CONNECTOR_PROPERTIES + ".#." + CONNECTOR_PROPERTIES_VALUE); + properties.add(CONNECTOR_STARTUP_MODE); + properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_PARTITION); + properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_OFFSET); // json format - properties.add(FORMAT_SCHEMA_STRING()); - properties.add(FORMAT_FAIL_ON_MISSING_FIELD()); - - // table json mapping - properties.add(TABLE_JSON_MAPPING + ".#." + TABLE_FIELD); - properties.add(TABLE_JSON_MAPPING + ".#." + JSON_FIELD); + properties.add(FORMAT_JSON_SCHEMA); + properties.add(FORMAT_SCHEMA); + properties.add(FORMAT_FAIL_ON_MISSING_FIELD); + properties.add(FORMAT_DERIVE_SCHEMA()); // schema - properties.add(SCHEMA() + ".#." + DescriptorProperties.TYPE()); - properties.add(SCHEMA() + ".#." + DescriptorProperties.NAME()); + properties.add(SCHEMA() + ".#." + SCHEMA_TYPE()); + properties.add(SCHEMA() + ".#." + SCHEMA_NAME()); + properties.add(SCHEMA() + ".#." + SCHEMA_FROM()); // time attributes - properties.add(SCHEMA() + ".#." + PROCTIME()); -// properties.add(SCHEMA() + ".#." + ROWTIME() + ".#." + TIMESTAMPS_CLASS()); -// properties.add(SCHEMA() + ".#." + ROWTIME() + ".#." + TIMESTAMPS_TYPE()); + properties.add(SCHEMA() + ".#." + SCHEMA_PROCTIME()); + properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE()); + properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_FROM()); + properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_CLASS()); + properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED()); + properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_TYPE()); + properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_CLASS()); + properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_SERIALIZED()); + properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_DELAY()); return properties; } @Override public TableSource<Row> create(Map<String, String> properties) { - DescriptorProperties params = new DescriptorProperties(true); + final DescriptorProperties params = new DescriptorProperties(true); params.putProperties(properties); // validate + new SchemaValidator(true).validate(params); new KafkaValidator().validate(params); new JsonValidator().validate(params); - new SchemaValidator(true).validate(params); // build - KafkaJsonTableSource.Builder builder = createBuilder(); - Properties kafkaProps = new Properties(); - - // Set the required parameters. - String topic = params.getString(TOPIC).get(); - TableSchema tableSchema = params.getTableSchema(SCHEMA()).get(); - - kafkaProps.put(BOOTSTRAP_SERVERS, params.getString(BOOTSTRAP_SERVERS).get()); - kafkaProps.put(GROUP_ID, params.getString(GROUP_ID).get()); - - // Set the zookeeper connect for kafka 0.8. - Option<String> zkConnect = params.getString(ZOOKEEPER_CONNECT); - if (zkConnect.isDefined()) { - kafkaProps.put(ZOOKEEPER_CONNECT, zkConnect.get()); - } - - builder.withKafkaProperties(kafkaProps).forTopic(topic).withSchema(tableSchema); - - // Set the startup mode. - String startupMode = params.getString(STARTUP_MODE).get(); - if (null != startupMode) { - switch (startupMode) { - case STARTUP_MODE_VALUE_EARLIEST: + final KafkaJsonTableSource.Builder builder = createBuilder(); + + // topic + final String topic = params.getString(CONNECTOR_TOPIC); + builder.forTopic(topic); + + // properties + final Properties props = new Properties(); + final List<Map<String, String>> propsList = params.getFixedIndexedProperties( + CONNECTOR_PROPERTIES, + Arrays.asList(CONNECTOR_PROPERTIES_KEY, CONNECTOR_PROPERTIES_VALUE)); + propsList.forEach(kv -> props.put( + params.getString(kv.get(CONNECTOR_PROPERTIES_KEY)), + params.getString(kv.get(CONNECTOR_PROPERTIES_VALUE)) + )); + builder.withKafkaProperties(props); + + // startup mode + params + .getOptionalString(CONNECTOR_STARTUP_MODE) + .ifPresent(startupMode -> { + switch (startupMode) { + + case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_EARLIEST: builder.fromEarliest(); break; - case STARTUP_MODE_VALUE_LATEST: + + case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_LATEST: builder.fromLatest(); break; - case STARTUP_MODE_VALUE_GROUP_OFFSETS: + + case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS: builder.fromGroupOffsets(); break; - case STARTUP_MODE_VALUE_SPECIFIC_OFFSETS: - Map<String, String> partitions = JavaConversions. - mapAsJavaMap(params.getIndexedProperty(SPECIFIC_OFFSETS, PARTITION)); - Map<KafkaTopicPartition, Long> offsetMap = new HashMap<>(); - for (int i = 0; i < partitions.size(); i++) { - offsetMap.put( - new KafkaTopicPartition( - topic, - Integer.valueOf(params.getString( - SPECIFIC_OFFSETS + "" + "." + i + "." + PARTITION).get())), - Long.valueOf(params.getString( - SPECIFIC_OFFSETS + "" + "." + i + "." + OFFSET).get())); - } + + case KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS: + final Map<KafkaTopicPartition, Long> offsetMap = new HashMap<>(); + + final List<Map<String, String>> offsetList = params.getFixedIndexedProperties( + CONNECTOR_SPECIFIC_OFFSETS, + Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, CONNECTOR_SPECIFIC_OFFSETS_OFFSET)); + offsetList.forEach(kv -> { + final int partition = params.getInt(kv.get(CONNECTOR_SPECIFIC_OFFSETS_PARTITION)); + final long offset = params.getLong(kv.get(CONNECTOR_SPECIFIC_OFFSETS_OFFSET)); + final KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, partition); + offsetMap.put(topicPartition, offset); + }); builder.fromSpecificOffsets(offsetMap); break; - } - } - - // Set whether fail on missing JSON field. - Option<String> failOnMissing = params.getString(FORMAT_FAIL_ON_MISSING_FIELD()); - if (failOnMissing.isDefined()) { - builder.failOnMissingField(Boolean.valueOf(failOnMissing.get())); + } + }); + + // missing field + params.getOptionalBoolean(FORMAT_FAIL_ON_MISSING_FIELD).ifPresent(builder::failOnMissingField); + + // json schema + final TableSchema formatSchema; + if (params.containsKey(FORMAT_SCHEMA)) { + final TypeInformation<?> info = params.getType(FORMAT_SCHEMA); + formatSchema = TableSchema.fromTypeInfo(info); + } else if (params.containsKey(FORMAT_JSON_SCHEMA)) { + final TypeInformation<?> info = JsonSchemaConverter.convert(params.getString(FORMAT_JSON_SCHEMA)); + formatSchema = TableSchema.fromTypeInfo(info); + } else { + formatSchema = SchemaValidator.deriveFormatFields(params); } + builder.forJsonSchema(formatSchema); - // Set the JSON schema. - Option<String> jsonSchema = params.getString(FORMAT_SCHEMA_STRING()); - if (jsonSchema.isDefined()) { - TypeInformation jsonSchemaType = JsonSchemaConverter.convert(jsonSchema.get()); - builder.forJsonSchema(TableSchema.fromTypeInfo(jsonSchemaType)); - } - - // Set the table => JSON fields mapping. - Map<String, String> mappingTableFields = JavaConversions. - mapAsJavaMap(params.getIndexedProperty(TABLE_JSON_MAPPING, TABLE_FIELD)); - - if (!mappingTableFields.isEmpty()) { - Map<String, String> tableJsonMapping = new HashMap<>(); - for (int i = 0; i < mappingTableFields.size(); i++) { - tableJsonMapping.put(params.getString(TABLE_JSON_MAPPING + "." + i + "." + TABLE_FIELD).get(), - params.getString(TABLE_JSON_MAPPING + "." + i + "." + JSON_FIELD).get() - ); - } - builder.withTableToJsonMapping(tableJsonMapping); + // schema + final TableSchema schema = params.getTableSchema(SCHEMA()); + builder.withSchema(schema); + + // proctime + SchemaValidator.deriveProctimeAttribute(params).ifPresent(builder::withProctimeAttribute); + + // rowtime + final List<RowtimeAttributeDescriptor> descriptors = SchemaValidator.deriveRowtimeAttributes(params); + if (descriptors.size() > 1) { + throw new TableException("More than one rowtime attribute is not supported yet."); + } else if (descriptors.size() == 1) { + final RowtimeAttributeDescriptor desc = descriptors.get(0); + builder.withRowtimeAttribute(desc.getAttributeName(), desc.getTimestampExtractor(), desc.getWatermarkStrategy()); } - // Set the time attributes. - setTimeAttributes(tableSchema, params, builder); + // field mapping + final Map<String, String> mapping = SchemaValidator.deriveFieldMapping(params, Optional.of(formatSchema)); + builder.withTableToJsonMapping(mapping); return builder.build(); } @@ -213,15 +235,4 @@ public abstract class KafkaJsonTableSourceFactory implements TableSourceFactory< protected abstract KafkaJsonTableSource.Builder createBuilder(); protected abstract String kafkaVersion(); - - private void setTimeAttributes(TableSchema schema, DescriptorProperties params, KafkaJsonTableSource.Builder builder) { - // TODO to deal with rowtime fields - Option<String> proctimeField; - for (int i = 0; i < schema.getColumnNum(); i++) { - proctimeField = params.getString(SCHEMA() + "." + i + "." + PROCTIME()); - if (proctimeField.isDefined()) { - builder.withProctimeAttribute(schema.getColumnName(i).get()); - } - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java index 9ce3b8e..134c483 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java @@ -141,31 +141,27 @@ public abstract class KafkaTableSource @Override public boolean equals(Object o) { - if (!o.getClass().equals(this.getClass())) { + if (this == o) { + return true; + } + if (!(o instanceof KafkaTableSource)) { return false; } - KafkaTableSource other = (KafkaTableSource) o; - return Objects.equals(topic, other.topic) - && Objects.equals(schema, other.schema) - && Objects.equals(properties, other.properties) - && Objects.equals(proctimeAttribute, other.proctimeAttribute) - && Objects.equals(returnType, other.returnType) - && Objects.equals(rowtimeAttributeDescriptors, other.rowtimeAttributeDescriptors) - && Objects.equals(specificStartupOffsets, other.specificStartupOffsets) - && Objects.equals(startupMode, other.startupMode); + KafkaTableSource that = (KafkaTableSource) o; + return Objects.equals(schema, that.schema) && + Objects.equals(topic, that.topic) && + Objects.equals(properties, that.properties) && + Objects.equals(returnType, that.returnType) && + Objects.equals(proctimeAttribute, that.proctimeAttribute) && + Objects.equals(rowtimeAttributeDescriptors, that.rowtimeAttributeDescriptors) && + startupMode == that.startupMode && + Objects.equals(specificStartupOffsets, that.specificStartupOffsets); } @Override public int hashCode() { - return Objects.hash( - topic, - schema, - properties, - proctimeAttribute, - returnType, - rowtimeAttributeDescriptors, - specificStartupOffsets, - startupMode); + return Objects.hash(schema, topic, properties, returnType, + proctimeAttribute, rowtimeAttributeDescriptors, startupMode, specificStartupOffsets); } /** @@ -211,9 +207,9 @@ public abstract class KafkaTableSource // validate that field exists and is of correct type Option<TypeInformation<?>> tpe = schema.getType(proctimeAttribute); if (tpe.isEmpty()) { - throw new ValidationException("Processing time attribute " + proctimeAttribute + " is not present in TableSchema."); + throw new ValidationException("Processing time attribute '" + proctimeAttribute + "' is not present in TableSchema."); } else if (tpe.get() != Types.SQL_TIMESTAMP()) { - throw new ValidationException("Processing time attribute " + proctimeAttribute + " is not of type SQL_TIMESTAMP."); + throw new ValidationException("Processing time attribute '" + proctimeAttribute + "' is not of type SQL_TIMESTAMP."); } } this.proctimeAttribute = proctimeAttribute; @@ -230,9 +226,9 @@ public abstract class KafkaTableSource String rowtimeAttribute = desc.getAttributeName(); Option<TypeInformation<?>> tpe = schema.getType(rowtimeAttribute); if (tpe.isEmpty()) { - throw new ValidationException("Rowtime attribute " + rowtimeAttribute + " is not present in TableSchema."); + throw new ValidationException("Rowtime attribute '" + rowtimeAttribute + "' is not present in TableSchema."); } else if (tpe.get() != Types.SQL_TIMESTAMP()) { - throw new ValidationException("Rowtime attribute " + rowtimeAttribute + " is not of type SQL_TIMESTAMP."); + throw new ValidationException("Rowtime attribute '" + rowtimeAttribute + "' is not of type SQL_TIMESTAMP."); } } this.rowtimeAttributeDescriptors = rowtimeAttributeDescriptors; http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java index 4733f6e..4535958 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java @@ -18,182 +18,199 @@ package org.apache.flink.table.descriptors; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; - -import static org.apache.flink.table.descriptors.KafkaValidator.BOOTSTRAP_SERVERS; -import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE; -import static org.apache.flink.table.descriptors.KafkaValidator.GROUP_ID; -import static org.apache.flink.table.descriptors.KafkaValidator.JSON_FIELD; -import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION; -import static org.apache.flink.table.descriptors.KafkaValidator.OFFSET; -import static org.apache.flink.table.descriptors.KafkaValidator.PARTITION; -import static org.apache.flink.table.descriptors.KafkaValidator.SPECIFIC_OFFSETS; -import static org.apache.flink.table.descriptors.KafkaValidator.TABLE_FIELD; -import static org.apache.flink.table.descriptors.KafkaValidator.TABLE_JSON_MAPPING; -import static org.apache.flink.table.descriptors.KafkaValidator.TOPIC; -import static org.apache.flink.table.descriptors.KafkaValidator.ZOOKEEPER_CONNECT; +import org.apache.flink.util.Preconditions; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; - -import scala.collection.JavaConversions; -import scala.collection.Seq; +import java.util.Properties; +import java.util.stream.Collectors; + +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_STARTUP_MODE; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TOPIC; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE_KAFKA; /** - * Connector descriptor for the kafka message queue. + * Connector descriptor for the Apache Kafka message queue. */ public class Kafka extends ConnectorDescriptor { - private Optional<String> version = Optional.empty(); - private Optional<String> bootstrapServers = Optional.empty(); - private Optional<String> groupId = Optional.empty(); - private Optional<String> topic = Optional.empty(); - private Optional<String> zookeeperConnect = Optional.empty(); - private Optional<Map<String, String>> tableJsonMapping = Optional.empty(); - - private Optional<StartupMode> startupMode = Optional.empty(); - private Optional<Map<Integer, Long>> specificOffsets = Optional.empty(); + private String version; + private String topic; + private StartupMode startupMode; + private Map<Integer, Long> specificOffsets; + private Map<String, String> kafkaProperties; + /** + * Connector descriptor for the Apache Kafka message queue. + */ public Kafka() { - super(CONNECTOR_TYPE_VALUE, 1); + super(CONNECTOR_TYPE_VALUE_KAFKA, 1, true); } /** - * Sets the kafka version. + * Sets the Kafka version to be used. * - * @param version - * Could be {@link KafkaValidator#KAFKA_VERSION_VALUE_011}, - * {@link KafkaValidator#KAFKA_VERSION_VALUE_010}, - * {@link KafkaValidator#KAFKA_VERSION_VALUE_09}, - * or {@link KafkaValidator#KAFKA_VERSION_VALUE_08}. + * @param version Kafka version. E.g., "0.8", "0.11", etc. */ public Kafka version(String version) { - this.version = Optional.of(version); + Preconditions.checkNotNull(version); + this.version = version; return this; } /** - * Sets the bootstrap servers for kafka. + * Sets the topic from which the table is read. + * + * @param topic The topic from which the table is read. */ - public Kafka bootstrapServers(String bootstrapServers) { - this.bootstrapServers = Optional.of(bootstrapServers); + public Kafka topic(String topic) { + Preconditions.checkNotNull(topic); + this.topic = topic; return this; } /** - * Sets the consumer group id. + * Sets the configuration properties for the Kafka consumer. Resets previously set properties. + * + * @param properties The configuration properties for the Kafka consumer. */ - public Kafka groupId(String groupId) { - this.groupId = Optional.of(groupId); + public Kafka properties(Properties properties) { + Preconditions.checkNotNull(properties); + if (this.kafkaProperties == null) { + this.kafkaProperties = new HashMap<>(); + } + this.kafkaProperties.clear(); + properties.forEach((k, v) -> this.kafkaProperties.put((String) k, (String) v)); return this; } /** - * Sets the topic to consume. + * Adds a configuration properties for the Kafka consumer. + * + * @param key property key for the Kafka consumer + * @param value property value for the Kafka consumer */ - public Kafka topic(String topic) { - this.topic = Optional.of(topic); + public Kafka property(String key, String value) { + Preconditions.checkNotNull(key); + Preconditions.checkNotNull(value); + if (this.kafkaProperties == null) { + this.kafkaProperties = new HashMap<>(); + } + kafkaProperties.put(key, value); return this; } /** - * Sets the startup mode. + * Configures to start reading from the earliest offset for all partitions. + * + * @see FlinkKafkaConsumerBase#setStartFromEarliest() */ - public Kafka startupMode(StartupMode startupMode) { - this.startupMode = Optional.of(startupMode); + public Kafka startFromEarliest() { + this.startupMode = StartupMode.EARLIEST; + this.specificOffsets = null; return this; } /** - * Sets the zookeeper hosts. Only required by kafka 0.8. + * Configures to start reading from the latest offset for all partitions. + * + * @see FlinkKafkaConsumerBase#setStartFromLatest() */ - public Kafka zookeeperConnect(String zookeeperConnect) { - this.zookeeperConnect = Optional.of(zookeeperConnect); + public Kafka startFromLatest() { + this.startupMode = StartupMode.LATEST; + this.specificOffsets = null; return this; } /** - * Sets the consume offsets for the topic set with {@link Kafka#topic(String)}. - * Only works in {@link StartupMode#SPECIFIC_OFFSETS} mode. + * Configures to start reading from any committed group offsets found in Zookeeper / Kafka brokers. + * + * @see FlinkKafkaConsumerBase#setStartFromGroupOffsets() */ - public Kafka specificOffsets(Map<Integer, Long> specificOffsets) { - this.specificOffsets = Optional.of(specificOffsets); + public Kafka startFromGroupOffsets() { + this.startupMode = StartupMode.GROUP_OFFSETS; + this.specificOffsets = null; return this; } /** - * Sets the mapping from logical table schema to json schema. + * Configures to start reading partitions from specific offsets, set independently for each partition. + * Resets previously set offsets. + * + * @param specificOffsets the specified offsets for partitions + * @see FlinkKafkaConsumerBase#setStartFromSpecificOffsets(Map) */ - public Kafka tableJsonMapping(Map<String, String> jsonTableMapping) { - this.tableJsonMapping = Optional.of(jsonTableMapping); + public Kafka startFromSpecificOffsets(Map<Integer, Long> specificOffsets) { + this.startupMode = StartupMode.SPECIFIC_OFFSETS; + this.specificOffsets = Preconditions.checkNotNull(specificOffsets); return this; } + /** + * Configures to start reading partitions from specific offsets and specifies the given offset for + * the given partition. + * + * @param partition partition index + * @param specificOffset partition offset to start reading from + * @see FlinkKafkaConsumerBase#setStartFromSpecificOffsets(Map) + */ + public Kafka startFromSpecificOffset(int partition, long specificOffset) { + this.startupMode = StartupMode.SPECIFIC_OFFSETS; + if (this.specificOffsets == null) { + this.specificOffsets = new HashMap<>(); + } + this.specificOffsets.put(partition, specificOffset); + return this; + } + + /** + * Internal method for connector properties conversion. + */ @Override public void addConnectorProperties(DescriptorProperties properties) { - if (version.isPresent()) { - properties.putString(KAFKA_VERSION, version.get()); - } - if (bootstrapServers.isPresent()) { - properties.putString(BOOTSTRAP_SERVERS, bootstrapServers.get()); - } - if (groupId.isPresent()) { - properties.putString(GROUP_ID, groupId.get()); - } - if (topic.isPresent()) { - properties.putString(TOPIC, topic.get()); + if (version != null) { + properties.putString(CONNECTOR_VERSION(), version); } - if (zookeeperConnect.isPresent()) { - properties.putString(ZOOKEEPER_CONNECT, zookeeperConnect.get()); + + if (topic != null) { + properties.putString(CONNECTOR_TOPIC, topic); } - if (startupMode.isPresent()) { - Map<String, String> map = KafkaValidator.normalizeStartupMode(startupMode.get()); - for (Map.Entry<String, String> entry : map.entrySet()) { - properties.putString(entry.getKey(), entry.getValue()); - } + + if (startupMode != null) { + properties.putString(CONNECTOR_STARTUP_MODE, KafkaValidator.normalizeStartupMode(startupMode)); } - if (specificOffsets.isPresent()) { - List<String> propertyKeys = new ArrayList<>(); - propertyKeys.add(PARTITION); - propertyKeys.add(OFFSET); - - List<Seq<String>> propertyValues = new ArrayList<>(specificOffsets.get().size()); - for (Map.Entry<Integer, Long> entry : specificOffsets.get().entrySet()) { - List<String> partitionOffset = new ArrayList<>(2); - partitionOffset.add(entry.getKey().toString()); - partitionOffset.add(entry.getValue().toString()); - propertyValues.add(JavaConversions.asScalaBuffer(partitionOffset).toSeq()); + + if (specificOffsets != null) { + final List<List<String>> values = new ArrayList<>(); + for (Map.Entry<Integer, Long> specificOffset : specificOffsets.entrySet()) { + values.add(Arrays.asList(specificOffset.getKey().toString(), specificOffset.getValue().toString())); } properties.putIndexedFixedProperties( - SPECIFIC_OFFSETS, - JavaConversions.asScalaBuffer(propertyKeys).toSeq(), - JavaConversions.asScalaBuffer(propertyValues).toSeq() - ); + CONNECTOR_SPECIFIC_OFFSETS, + Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, CONNECTOR_SPECIFIC_OFFSETS_OFFSET), + values); } - if (tableJsonMapping.isPresent()) { - List<String> propertyKeys = new ArrayList<>(); - propertyKeys.add(TABLE_FIELD); - propertyKeys.add(JSON_FIELD); - - List<Seq<String>> mappingFields = new ArrayList<>(tableJsonMapping.get().size()); - for (Map.Entry<String, String> entry : tableJsonMapping.get().entrySet()) { - List<String> singleMapping = new ArrayList<>(2); - singleMapping.add(entry.getKey()); - singleMapping.add(entry.getValue()); - mappingFields.add(JavaConversions.asScalaBuffer(singleMapping).toSeq()); - } + + if (kafkaProperties != null) { properties.putIndexedFixedProperties( - TABLE_JSON_MAPPING, - JavaConversions.asScalaBuffer(propertyKeys).toSeq(), - JavaConversions.asScalaBuffer(mappingFields).toSeq() - ); + CONNECTOR_PROPERTIES, + Arrays.asList(CONNECTOR_PROPERTIES_KEY, CONNECTOR_PROPERTIES_VALUE), + this.kafkaProperties.entrySet().stream() + .map(e -> Arrays.asList(e.getKey(), e.getValue())) + .collect(Collectors.toList()) + ); } } - - @Override - public boolean needsFormat() { - return true; - } }