[FLINK-8538][table]Add a Kafka table source factory with JSON format support
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1d26062d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1d26062d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1d26062d Branch: refs/heads/release-1.5 Commit: 1d26062de130c05fdbe7701b55766b4a8d433418 Parents: a269f85 Author: Xingcan Cui <xingc...@gmail.com> Authored: Mon Feb 12 18:11:36 2018 +0800 Committer: Timo Walther <twal...@apache.org> Committed: Tue Feb 27 20:23:00 2018 +0100 ---------------------------------------------------------------------- .../kafka/Kafka010JsonTableSourceFactory.java | 36 +++ ...pache.flink.table.sources.TableSourceFactory | 16 ++ .../resources/tableSourceConverter.properties | 29 +++ .../kafka/Kafka010TableSourceFactoryTest.java | 41 ++++ .../kafka/Kafka011JsonTableSourceFactory.java | 36 +++ ...pache.flink.table.sources.TableSourceFactory | 16 ++ .../resources/tableSourceConverter.properties | 29 +++ .../kafka/Kafka011TableSourceFactoryTest.java | 41 ++++ .../kafka/Kafka08JsonTableSourceFactory.java | 36 +++ ...pache.flink.table.sources.TableSourceFactory | 16 ++ .../resources/tableSourceConverter.properties | 29 +++ .../kafka/Kafka08TableSourceFactoryTest.java | 42 ++++ .../kafka/Kafka09JsonTableSourceFactory.java | 36 +++ ...pache.flink.table.sources.TableSourceFactory | 16 ++ .../resources/tableSourceConverter.properties | 29 +++ .../kafka/Kafka09TableSourceFactoryTest.java | 41 ++++ .../flink-connector-kafka-base/pom.xml | 14 ++ .../connectors/kafka/KafkaJsonTableSource.java | 17 ++ .../kafka/KafkaJsonTableSourceFactory.java | 227 +++++++++++++++++++ .../connectors/kafka/KafkaTableSource.java | 30 +++ .../apache/flink/table/descriptors/Kafka.java | 199 ++++++++++++++++ .../flink/table/descriptors/KafkaValidator.java | 193 ++++++++++++++++ .../KafkaJsonTableFromDescriptorTestBase.java | 127 +++++++++++ .../src/test/resources/kafka-json-schema.json | 35 +++ .../apache/flink/table/api/TableSchema.scala | 5 + 25 files changed, 1336 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java new file mode 100644 index 0000000..1d03f6c --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_010; + +/** + * Factory for creating configured instances of {@link Kafka010JsonTableSource}. + */ +public class Kafka010JsonTableSourceFactory extends KafkaJsonTableSourceFactory { + @Override + protected KafkaJsonTableSource.Builder createBuilder() { + return new Kafka010JsonTableSource.Builder(); + } + + @Override + protected String kafkaVersion() { + return KAFKA_VERSION_VALUE_010; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory new file mode 100644 index 0000000..9ef54fc --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.streaming.connectors.kafka.Kafka010JsonTableSourceFactory http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.10/src/main/resources/tableSourceConverter.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/tableSourceConverter.properties b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/tableSourceConverter.properties new file mode 100644 index 0000000..5409b49 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/tableSourceConverter.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +################################################################################ +# The config file is used to specify the packages of current module where +# to find TableSourceConverter implementation class annotated with TableType. +# If there are multiple packages to scan, put those packages together into a +# string separated with ',', for example, org.package1,org.package2. +# Please notice: +# It's better to have a tableSourceConverter.properties in each connector Module +# which offers converters instead of put all information into the +# tableSourceConverter.properties of flink-table module. +################################################################################ +scan.packages=org.apache.flink.streaming.connectors.kafka http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java new file mode 100644 index 0000000..15b89e8 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.table.descriptors.Kafka; + +import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_010; + +/** + * Tests for {@link Kafka010JsonTableSourceFactory}. + */ +public class Kafka010TableSourceFactoryTest extends KafkaJsonTableFromDescriptorTestBase { + protected String versionForTest() { + return KAFKA_VERSION_VALUE_010; + } + + protected KafkaJsonTableSource.Builder builderForTest() { + return Kafka010JsonTableSource.builder(); + } + + @Override + protected void extraSettings(KafkaTableSource.Builder builder, Kafka kafka) { + // no extra settings + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java new file mode 100644 index 0000000..ca4d6ce --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_011; + +/** + * Factory for creating configured instances of {@link Kafka011JsonTableSource}. + */ +public class Kafka011JsonTableSourceFactory extends KafkaJsonTableSourceFactory { + @Override + protected KafkaJsonTableSource.Builder createBuilder() { + return new Kafka011JsonTableSource.Builder(); + } + + @Override + protected String kafkaVersion() { + return KAFKA_VERSION_VALUE_011; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory new file mode 100644 index 0000000..75135e5 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.streaming.connectors.kafka.Kafka011JsonTableSourceFactory http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.11/src/main/resources/tableSourceConverter.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/tableSourceConverter.properties b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/tableSourceConverter.properties new file mode 100644 index 0000000..5409b49 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/tableSourceConverter.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +################################################################################ +# The config file is used to specify the packages of current module where +# to find TableSourceConverter implementation class annotated with TableType. +# If there are multiple packages to scan, put those packages together into a +# string separated with ',', for example, org.package1,org.package2. +# Please notice: +# It's better to have a tableSourceConverter.properties in each connector Module +# which offers converters instead of put all information into the +# tableSourceConverter.properties of flink-table module. +################################################################################ +scan.packages=org.apache.flink.streaming.connectors.kafka http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java new file mode 100644 index 0000000..84ac39b --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.table.descriptors.Kafka; + +import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_011; + +/** + * Tests for {@link Kafka011JsonTableSourceFactory}. + */ +public class Kafka011TableSourceFactoryTest extends KafkaJsonTableFromDescriptorTestBase { + protected String versionForTest() { + return KAFKA_VERSION_VALUE_011; + } + + protected KafkaJsonTableSource.Builder builderForTest() { + return Kafka011JsonTableSource.builder(); + } + + @Override + protected void extraSettings(KafkaTableSource.Builder builder, Kafka kafka) { + // no extra settings + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java new file mode 100644 index 0000000..e4e5096 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_08; + +/** + * Factory for creating configured instances of {@link Kafka08JsonTableSource}. + */ +public class Kafka08JsonTableSourceFactory extends KafkaJsonTableSourceFactory { + @Override + protected KafkaJsonTableSource.Builder createBuilder() { + return new Kafka08JsonTableSource.Builder(); + } + + @Override + protected String kafkaVersion() { + return KAFKA_VERSION_VALUE_08; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory new file mode 100644 index 0000000..9092955 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.streaming.connectors.kafka.Kafka08JsonTableSourceFactory http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.8/src/main/resources/tableSourceConverter.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/tableSourceConverter.properties b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/tableSourceConverter.properties new file mode 100644 index 0000000..5409b49 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/tableSourceConverter.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +################################################################################ +# The config file is used to specify the packages of current module where +# to find TableSourceConverter implementation class annotated with TableType. +# If there are multiple packages to scan, put those packages together into a +# string separated with ',', for example, org.package1,org.package2. +# Please notice: +# It's better to have a tableSourceConverter.properties in each connector Module +# which offers converters instead of put all information into the +# tableSourceConverter.properties of flink-table module. +################################################################################ +scan.packages=org.apache.flink.streaming.connectors.kafka http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java new file mode 100644 index 0000000..a2edc09 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.table.descriptors.Kafka; + +import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_08; + +/** + * Tests for {@link Kafka08JsonTableSourceFactory}. + */ +public class Kafka08TableSourceFactoryTest extends KafkaJsonTableFromDescriptorTestBase { + protected String versionForTest() { + return KAFKA_VERSION_VALUE_08; + } + + protected KafkaJsonTableSource.Builder builderForTest() { + return Kafka08JsonTableSource.builder(); + } + + @Override + protected void extraSettings(KafkaTableSource.Builder builder, Kafka kafka) { + builder.getKafkaProps().put("zookeeper.connect", "localhost:1111"); + kafka.zookeeperConnect("localhost:1111"); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java new file mode 100644 index 0000000..bbda4ae --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_09; + +/** + * Factory for creating configured instances of {@link Kafka09JsonTableSource}. + */ +public class Kafka09JsonTableSourceFactory extends KafkaJsonTableSourceFactory { + @Override + protected KafkaJsonTableSource.Builder createBuilder() { + return new Kafka09JsonTableSource.Builder(); + } + + @Override + protected String kafkaVersion() { + return KAFKA_VERSION_VALUE_09; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory new file mode 100644 index 0000000..2f38bd0 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.streaming.connectors.kafka.Kafka09JsonTableSourceFactory http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.9/src/main/resources/tableSourceConverter.properties ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/tableSourceConverter.properties b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/tableSourceConverter.properties new file mode 100644 index 0000000..5409b49 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/tableSourceConverter.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +################################################################################ +# The config file is used to specify the packages of current module where +# to find TableSourceConverter implementation class annotated with TableType. +# If there are multiple packages to scan, put those packages together into a +# string separated with ',', for example, org.package1,org.package2. +# Please notice: +# It's better to have a tableSourceConverter.properties in each connector Module +# which offers converters instead of put all information into the +# tableSourceConverter.properties of flink-table module. +################################################################################ +scan.packages=org.apache.flink.streaming.connectors.kafka http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java new file mode 100644 index 0000000..fc85ea7 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.table.descriptors.Kafka; + +import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_09; + +/** + * Factory for creating configured instances of {@link Kafka09JsonTableSource}. + */ +public class Kafka09TableSourceFactoryTest extends KafkaJsonTableFromDescriptorTestBase { + protected String versionForTest() { + return KAFKA_VERSION_VALUE_09; + } + + protected KafkaJsonTableSource.Builder builderForTest() { + return Kafka09JsonTableSource.builder(); + } + + @Override + protected void extraSettings(KafkaTableSource.Builder builder, Kafka kafka) { + // no extra settings + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/pom.xml ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml b/flink-connectors/flink-connector-kafka-base/pom.xml index e7412cf..2ccaa2e 100644 --- a/flink-connectors/flink-connector-kafka-base/pom.xml +++ b/flink-connectors/flink-connector-kafka-base/pom.xml @@ -205,6 +205,20 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-scala_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> <dependencyManagement> http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java index f581e89..d2dafe7 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java @@ -27,6 +27,7 @@ import org.apache.flink.table.sources.DefinedFieldMapping; import org.apache.flink.table.sources.StreamTableSource; import java.util.Map; +import java.util.Objects; import java.util.Properties; /** @@ -86,6 +87,22 @@ public abstract class KafkaJsonTableSource extends KafkaTableSource implements D return "KafkaJSONTableSource"; } + @Override + public boolean equals(Object other) { + if (super.equals(other)) { + KafkaJsonTableSource otherSource = (KafkaJsonTableSource) other; + return Objects.equals(failOnMissingField, otherSource.failOnMissingField) + && Objects.equals(jsonSchema, otherSource.jsonSchema) + && Objects.equals(fieldMapping, otherSource.fieldMapping); + } + return false; + } + + @Override + public int hashCode() { + return 31 * super.hashCode() + Objects.hash(failOnMissingField, jsonSchema, fieldMapping); + } + //////// SETTERS FOR OPTIONAL PARAMETERS /** http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java new file mode 100644 index 0000000..918b833 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.json.JsonSchemaConverter; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.descriptors.JsonValidator; +import org.apache.flink.table.descriptors.KafkaValidator; +import org.apache.flink.table.descriptors.SchemaValidator; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.sources.TableSourceFactory; +import org.apache.flink.types.Row; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; +import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION; +import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE; +import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_VERSION; +import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD; +import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_SCHEMA_STRING; +import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_TYPE_VALUE; +import static org.apache.flink.table.descriptors.KafkaValidator.BOOTSTRAP_SERVERS; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE; +import static org.apache.flink.table.descriptors.KafkaValidator.GROUP_ID; +import static org.apache.flink.table.descriptors.KafkaValidator.JSON_FIELD; +import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION; +import static org.apache.flink.table.descriptors.KafkaValidator.OFFSET; +import static org.apache.flink.table.descriptors.KafkaValidator.PARTITION; +import static org.apache.flink.table.descriptors.KafkaValidator.SPECIFIC_OFFSETS; +import static org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE; +import static org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE_VALUE_EARLIEST; +import static org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE_VALUE_GROUP_OFFSETS; +import static org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE_VALUE_LATEST; +import static org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE_VALUE_SPECIFIC_OFFSETS; +import static org.apache.flink.table.descriptors.KafkaValidator.TABLE_FIELD; +import static org.apache.flink.table.descriptors.KafkaValidator.TABLE_JSON_MAPPING; +import static org.apache.flink.table.descriptors.KafkaValidator.TOPIC; +import static org.apache.flink.table.descriptors.KafkaValidator.ZOOKEEPER_CONNECT; +import static org.apache.flink.table.descriptors.SchemaValidator.PROCTIME; +import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA; +import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_VERSION; + +import scala.Option; +import scala.collection.JavaConversions; + +/** + * Factory for creating configured instances of {@link KafkaJsonTableSource}. + */ +public abstract class KafkaJsonTableSourceFactory implements TableSourceFactory<Row> { + @Override + public Map<String, String> requiredContext() { + Map<String, String> context = new HashMap<>(); + context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE); // kafka connector + context.put(FORMAT_TYPE(), FORMAT_TYPE_VALUE()); // Json format + context.put(KAFKA_VERSION, kafkaVersion()); // for different implementations + context.put(CONNECTOR_VERSION(), "1"); + context.put(FORMAT_VERSION(), "1"); + context.put(SCHEMA_VERSION(), "1"); + return context; + } + + @Override + public List<String> supportedProperties() { + List<String> properties = new ArrayList<>(); + + // kafka + properties.add(KAFKA_VERSION); + properties.add(BOOTSTRAP_SERVERS); + properties.add(GROUP_ID); + properties.add(ZOOKEEPER_CONNECT); + properties.add(TOPIC); + properties.add(STARTUP_MODE); + properties.add(SPECIFIC_OFFSETS + ".#." + PARTITION); + properties.add(SPECIFIC_OFFSETS + ".#." + OFFSET); + + // json format + properties.add(FORMAT_SCHEMA_STRING()); + properties.add(FORMAT_FAIL_ON_MISSING_FIELD()); + + // table json mapping + properties.add(TABLE_JSON_MAPPING + ".#." + TABLE_FIELD); + properties.add(TABLE_JSON_MAPPING + ".#." + JSON_FIELD); + + // schema + properties.add(SCHEMA() + ".#." + DescriptorProperties.TYPE()); + properties.add(SCHEMA() + ".#." + DescriptorProperties.NAME()); + + // time attributes + properties.add(SCHEMA() + ".#." + PROCTIME()); +// properties.add(SCHEMA() + ".#." + ROWTIME() + ".#." + TIMESTAMPS_CLASS()); +// properties.add(SCHEMA() + ".#." + ROWTIME() + ".#." + TIMESTAMPS_TYPE()); + + return properties; + } + + @Override + public TableSource<Row> create(Map<String, String> properties) { + DescriptorProperties params = new DescriptorProperties(true); + params.putProperties(properties); + + // validate + new KafkaValidator().validate(params); + new JsonValidator().validate(params); + new SchemaValidator(true).validate(params); + + // build + KafkaJsonTableSource.Builder builder = createBuilder(); + Properties kafkaProps = new Properties(); + + // Set the required parameters. + String topic = params.getString(TOPIC).get(); + TableSchema tableSchema = params.getTableSchema(SCHEMA()).get(); + + kafkaProps.put(BOOTSTRAP_SERVERS, params.getString(BOOTSTRAP_SERVERS).get()); + kafkaProps.put(GROUP_ID, params.getString(GROUP_ID).get()); + + // Set the zookeeper connect for kafka 0.8. + Option<String> zkConnect = params.getString(ZOOKEEPER_CONNECT); + if (zkConnect.isDefined()) { + kafkaProps.put(ZOOKEEPER_CONNECT, zkConnect.get()); + } + + builder.withKafkaProperties(kafkaProps).forTopic(topic).withSchema(tableSchema); + + // Set the startup mode. + String startupMode = params.getString(STARTUP_MODE).get(); + if (null != startupMode) { + switch (startupMode) { + case STARTUP_MODE_VALUE_EARLIEST: + builder.fromEarliest(); + break; + case STARTUP_MODE_VALUE_LATEST: + builder.fromLatest(); + break; + case STARTUP_MODE_VALUE_GROUP_OFFSETS: + builder.fromGroupOffsets(); + break; + case STARTUP_MODE_VALUE_SPECIFIC_OFFSETS: + Map<String, String> partitions = JavaConversions. + mapAsJavaMap(params.getIndexedProperty(SPECIFIC_OFFSETS, PARTITION)); + Map<KafkaTopicPartition, Long> offsetMap = new HashMap<>(); + for (int i = 0; i < partitions.size(); i++) { + offsetMap.put( + new KafkaTopicPartition( + topic, + Integer.valueOf(params.getString( + SPECIFIC_OFFSETS + "" + "." + i + "." + PARTITION).get())), + Long.valueOf(params.getString( + SPECIFIC_OFFSETS + "" + "." + i + "." + OFFSET).get())); + } + builder.fromSpecificOffsets(offsetMap); + break; + } + } + + // Set whether fail on missing JSON field. + Option<String> failOnMissing = params.getString(FORMAT_FAIL_ON_MISSING_FIELD()); + if (failOnMissing.isDefined()) { + builder.failOnMissingField(Boolean.valueOf(failOnMissing.get())); + } + + // Set the JSON schema. + Option<String> jsonSchema = params.getString(FORMAT_SCHEMA_STRING()); + if (jsonSchema.isDefined()) { + TypeInformation jsonSchemaType = JsonSchemaConverter.convert(jsonSchema.get()); + builder.forJsonSchema(TableSchema.fromTypeInfo(jsonSchemaType)); + } + + // Set the table => JSON fields mapping. + Map<String, String> mappingTableFields = JavaConversions. + mapAsJavaMap(params.getIndexedProperty(TABLE_JSON_MAPPING, TABLE_FIELD)); + + if (!mappingTableFields.isEmpty()) { + Map<String, String> tableJsonMapping = new HashMap<>(); + for (int i = 0; i < mappingTableFields.size(); i++) { + tableJsonMapping.put(params.getString(TABLE_JSON_MAPPING + "." + i + "." + TABLE_FIELD).get(), + params.getString(TABLE_JSON_MAPPING + "." + i + "." + JSON_FIELD).get() + ); + } + builder.withTableToJsonMapping(tableJsonMapping); + } + + // Set the time attributes. + setTimeAttributes(tableSchema, params, builder); + + return builder.build(); + } + + protected abstract KafkaJsonTableSource.Builder createBuilder(); + + protected abstract String kafkaVersion(); + + private void setTimeAttributes(TableSchema schema, DescriptorProperties params, KafkaJsonTableSource.Builder builder) { + // TODO to deal with rowtime fields + Option<String> proctimeField; + for (int i = 0; i < schema.getColumnNum(); i++) { + proctimeField = params.getString(SCHEMA() + "." + i + "." + PROCTIME()); + if (proctimeField.isDefined()) { + builder.withProctimeAttribute(schema.getColumnName(i).get()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java index d5cda4a..9ce3b8e 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java @@ -42,6 +42,7 @@ import org.apache.flink.util.Preconditions; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import scala.Option; @@ -138,6 +139,35 @@ public abstract class KafkaTableSource return TableConnectorUtil.generateRuntimeName(this.getClass(), schema.getColumnNames()); } + @Override + public boolean equals(Object o) { + if (!o.getClass().equals(this.getClass())) { + return false; + } + KafkaTableSource other = (KafkaTableSource) o; + return Objects.equals(topic, other.topic) + && Objects.equals(schema, other.schema) + && Objects.equals(properties, other.properties) + && Objects.equals(proctimeAttribute, other.proctimeAttribute) + && Objects.equals(returnType, other.returnType) + && Objects.equals(rowtimeAttributeDescriptors, other.rowtimeAttributeDescriptors) + && Objects.equals(specificStartupOffsets, other.specificStartupOffsets) + && Objects.equals(startupMode, other.startupMode); + } + + @Override + public int hashCode() { + return Objects.hash( + topic, + schema, + properties, + proctimeAttribute, + returnType, + rowtimeAttributeDescriptors, + specificStartupOffsets, + startupMode); + } + /** * Returns a version-specific Kafka consumer with the start position configured. * http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java new file mode 100644 index 0000000..4733f6e --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; + +import static org.apache.flink.table.descriptors.KafkaValidator.BOOTSTRAP_SERVERS; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE; +import static org.apache.flink.table.descriptors.KafkaValidator.GROUP_ID; +import static org.apache.flink.table.descriptors.KafkaValidator.JSON_FIELD; +import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION; +import static org.apache.flink.table.descriptors.KafkaValidator.OFFSET; +import static org.apache.flink.table.descriptors.KafkaValidator.PARTITION; +import static org.apache.flink.table.descriptors.KafkaValidator.SPECIFIC_OFFSETS; +import static org.apache.flink.table.descriptors.KafkaValidator.TABLE_FIELD; +import static org.apache.flink.table.descriptors.KafkaValidator.TABLE_JSON_MAPPING; +import static org.apache.flink.table.descriptors.KafkaValidator.TOPIC; +import static org.apache.flink.table.descriptors.KafkaValidator.ZOOKEEPER_CONNECT; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import scala.collection.JavaConversions; +import scala.collection.Seq; + +/** + * Connector descriptor for the kafka message queue. + */ +public class Kafka extends ConnectorDescriptor { + + private Optional<String> version = Optional.empty(); + private Optional<String> bootstrapServers = Optional.empty(); + private Optional<String> groupId = Optional.empty(); + private Optional<String> topic = Optional.empty(); + private Optional<String> zookeeperConnect = Optional.empty(); + private Optional<Map<String, String>> tableJsonMapping = Optional.empty(); + + private Optional<StartupMode> startupMode = Optional.empty(); + private Optional<Map<Integer, Long>> specificOffsets = Optional.empty(); + + public Kafka() { + super(CONNECTOR_TYPE_VALUE, 1); + } + + /** + * Sets the kafka version. + * + * @param version + * Could be {@link KafkaValidator#KAFKA_VERSION_VALUE_011}, + * {@link KafkaValidator#KAFKA_VERSION_VALUE_010}, + * {@link KafkaValidator#KAFKA_VERSION_VALUE_09}, + * or {@link KafkaValidator#KAFKA_VERSION_VALUE_08}. + */ + public Kafka version(String version) { + this.version = Optional.of(version); + return this; + } + + /** + * Sets the bootstrap servers for kafka. + */ + public Kafka bootstrapServers(String bootstrapServers) { + this.bootstrapServers = Optional.of(bootstrapServers); + return this; + } + + /** + * Sets the consumer group id. + */ + public Kafka groupId(String groupId) { + this.groupId = Optional.of(groupId); + return this; + } + + /** + * Sets the topic to consume. + */ + public Kafka topic(String topic) { + this.topic = Optional.of(topic); + return this; + } + + /** + * Sets the startup mode. + */ + public Kafka startupMode(StartupMode startupMode) { + this.startupMode = Optional.of(startupMode); + return this; + } + + /** + * Sets the zookeeper hosts. Only required by kafka 0.8. + */ + public Kafka zookeeperConnect(String zookeeperConnect) { + this.zookeeperConnect = Optional.of(zookeeperConnect); + return this; + } + + /** + * Sets the consume offsets for the topic set with {@link Kafka#topic(String)}. + * Only works in {@link StartupMode#SPECIFIC_OFFSETS} mode. + */ + public Kafka specificOffsets(Map<Integer, Long> specificOffsets) { + this.specificOffsets = Optional.of(specificOffsets); + return this; + } + + /** + * Sets the mapping from logical table schema to json schema. + */ + public Kafka tableJsonMapping(Map<String, String> jsonTableMapping) { + this.tableJsonMapping = Optional.of(jsonTableMapping); + return this; + } + + @Override + public void addConnectorProperties(DescriptorProperties properties) { + if (version.isPresent()) { + properties.putString(KAFKA_VERSION, version.get()); + } + if (bootstrapServers.isPresent()) { + properties.putString(BOOTSTRAP_SERVERS, bootstrapServers.get()); + } + if (groupId.isPresent()) { + properties.putString(GROUP_ID, groupId.get()); + } + if (topic.isPresent()) { + properties.putString(TOPIC, topic.get()); + } + if (zookeeperConnect.isPresent()) { + properties.putString(ZOOKEEPER_CONNECT, zookeeperConnect.get()); + } + if (startupMode.isPresent()) { + Map<String, String> map = KafkaValidator.normalizeStartupMode(startupMode.get()); + for (Map.Entry<String, String> entry : map.entrySet()) { + properties.putString(entry.getKey(), entry.getValue()); + } + } + if (specificOffsets.isPresent()) { + List<String> propertyKeys = new ArrayList<>(); + propertyKeys.add(PARTITION); + propertyKeys.add(OFFSET); + + List<Seq<String>> propertyValues = new ArrayList<>(specificOffsets.get().size()); + for (Map.Entry<Integer, Long> entry : specificOffsets.get().entrySet()) { + List<String> partitionOffset = new ArrayList<>(2); + partitionOffset.add(entry.getKey().toString()); + partitionOffset.add(entry.getValue().toString()); + propertyValues.add(JavaConversions.asScalaBuffer(partitionOffset).toSeq()); + } + properties.putIndexedFixedProperties( + SPECIFIC_OFFSETS, + JavaConversions.asScalaBuffer(propertyKeys).toSeq(), + JavaConversions.asScalaBuffer(propertyValues).toSeq() + ); + } + if (tableJsonMapping.isPresent()) { + List<String> propertyKeys = new ArrayList<>(); + propertyKeys.add(TABLE_FIELD); + propertyKeys.add(JSON_FIELD); + + List<Seq<String>> mappingFields = new ArrayList<>(tableJsonMapping.get().size()); + for (Map.Entry<String, String> entry : tableJsonMapping.get().entrySet()) { + List<String> singleMapping = new ArrayList<>(2); + singleMapping.add(entry.getKey()); + singleMapping.add(entry.getValue()); + mappingFields.add(JavaConversions.asScalaBuffer(singleMapping).toSeq()); + } + properties.putIndexedFixedProperties( + TABLE_JSON_MAPPING, + JavaConversions.asScalaBuffer(propertyKeys).toSeq(), + JavaConversions.asScalaBuffer(mappingFields).toSeq() + ); + } + } + + @Override + public boolean needsFormat() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java new file mode 100644 index 0000000..a3ca22f --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.table.api.ValidationException; + +import java.util.HashMap; +import java.util.Map; + +import scala.Function0; +import scala.Tuple2; +import scala.collection.JavaConversions; +import scala.runtime.AbstractFunction0; +import scala.runtime.BoxedUnit; + + +/** + * The validator for {@link Kafka}. + */ +public class KafkaValidator extends ConnectorDescriptorValidator { + // fields + public static final String CONNECTOR_TYPE_VALUE = "kafka"; + public static final String KAFKA_VERSION = "kafka.version"; + public static final String BOOTSTRAP_SERVERS = "bootstrap.servers"; + public static final String GROUP_ID = "group.id"; + public static final String TOPIC = "topic"; + public static final String STARTUP_MODE = "startup.mode"; + public static final String SPECIFIC_OFFSETS = "specific.offsets"; + public static final String TABLE_JSON_MAPPING = "table.json.mapping"; + + public static final String PARTITION = "partition"; + public static final String OFFSET = "offset"; + + public static final String TABLE_FIELD = "table.field"; + public static final String JSON_FIELD = "json.field"; + + public static final String ZOOKEEPER_CONNECT = "zookeeper.connect"; // only required for 0.8 + + // values + public static final String KAFKA_VERSION_VALUE_08 = "0.8"; + public static final String KAFKA_VERSION_VALUE_09 = "0.9"; + public static final String KAFKA_VERSION_VALUE_010 = "0.10"; + public static final String KAFKA_VERSION_VALUE_011 = "0.11"; + + public static final String STARTUP_MODE_VALUE_EARLIEST = "earliest-offset"; + public static final String STARTUP_MODE_VALUE_LATEST = "latest-offset"; + public static final String STARTUP_MODE_VALUE_GROUP_OFFSETS = "group-offsets"; + public static final String STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets"; + + // utils + public static Map<String, String> normalizeStartupMode(StartupMode startupMode) { + Map<String, String> mapPair = new HashMap<>(); + switch (startupMode) { + case EARLIEST: + mapPair.put(STARTUP_MODE, STARTUP_MODE_VALUE_EARLIEST); + break; + case LATEST: + mapPair.put(STARTUP_MODE, STARTUP_MODE_VALUE_LATEST); + break; + case GROUP_OFFSETS: + mapPair.put(STARTUP_MODE, STARTUP_MODE_VALUE_GROUP_OFFSETS); + break; + case SPECIFIC_OFFSETS: + mapPair.put(STARTUP_MODE, STARTUP_MODE_VALUE_SPECIFIC_OFFSETS); + break; + } + return mapPair; + } + + @Override + public void validate(DescriptorProperties properties) { + super.validate(properties); + + AbstractFunction0<BoxedUnit> emptyValidator = new AbstractFunction0<BoxedUnit>() { + @Override + public BoxedUnit apply() { + return BoxedUnit.UNIT; + } + }; + + properties.validateValue(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE, false); + + AbstractFunction0<BoxedUnit> version08Validator = new AbstractFunction0<BoxedUnit>() { + @Override + public BoxedUnit apply() { + properties.validateString(ZOOKEEPER_CONNECT, false, 0, Integer.MAX_VALUE); + return BoxedUnit.UNIT; + } + }; + + Map<String, Function0<BoxedUnit>> versionValidatorMap = new HashMap<>(); + versionValidatorMap.put(KAFKA_VERSION_VALUE_08, version08Validator); + versionValidatorMap.put(KAFKA_VERSION_VALUE_09, emptyValidator); + versionValidatorMap.put(KAFKA_VERSION_VALUE_010, emptyValidator); + versionValidatorMap.put(KAFKA_VERSION_VALUE_011, emptyValidator); + properties.validateEnum( + KAFKA_VERSION, + false, + toScalaImmutableMap(versionValidatorMap) + ); + + properties.validateString(BOOTSTRAP_SERVERS, false, 1, Integer.MAX_VALUE); + properties.validateString(GROUP_ID, false, 1, Integer.MAX_VALUE); + properties.validateString(TOPIC, false, 1, Integer.MAX_VALUE); + + AbstractFunction0<BoxedUnit> specificOffsetsValidator = new AbstractFunction0<BoxedUnit>() { + @Override + public BoxedUnit apply() { + Map<String, String> partitions = JavaConversions.mapAsJavaMap( + properties.getIndexedProperty(SPECIFIC_OFFSETS, PARTITION)); + + Map<String, String> offsets = JavaConversions.mapAsJavaMap( + properties.getIndexedProperty(SPECIFIC_OFFSETS, OFFSET)); + if (partitions.isEmpty() || offsets.isEmpty()) { + throw new ValidationException("Offsets must be set for SPECIFIC_OFFSETS mode."); + } + for (int i = 0; i < partitions.size(); ++i) { + properties.validateInt( + SPECIFIC_OFFSETS + "." + i + "." + PARTITION, + false, + 0, + Integer.MAX_VALUE); + properties.validateLong( + SPECIFIC_OFFSETS + "." + i + "." + OFFSET, + false, + 0, + Long.MAX_VALUE); + } + return BoxedUnit.UNIT; + } + }; + Map<String, Function0<BoxedUnit>> startupModeValidatorMap = new HashMap<>(); + startupModeValidatorMap.put(STARTUP_MODE_VALUE_GROUP_OFFSETS, emptyValidator); + startupModeValidatorMap.put(STARTUP_MODE_VALUE_EARLIEST, emptyValidator); + startupModeValidatorMap.put(STARTUP_MODE_VALUE_LATEST, emptyValidator); + startupModeValidatorMap.put(STARTUP_MODE_VALUE_SPECIFIC_OFFSETS, specificOffsetsValidator); + + properties.validateEnum(STARTUP_MODE, true, toScalaImmutableMap(startupModeValidatorMap)); + validateTableJsonMapping(properties); + } + + private void validateTableJsonMapping(DescriptorProperties properties) { + Map<String, String> mappingTableField = JavaConversions.mapAsJavaMap( + properties.getIndexedProperty(TABLE_JSON_MAPPING, TABLE_FIELD)); + Map<String, String> mappingJsonField = JavaConversions.mapAsJavaMap( + properties.getIndexedProperty(TABLE_JSON_MAPPING, JSON_FIELD)); + + if (mappingJsonField.size() != mappingJsonField.size()) { + throw new ValidationException("Table JSON mapping must be one to one."); + } + + for (int i = 0; i < mappingTableField.size(); i++) { + properties.validateString( + TABLE_JSON_MAPPING + "." + i + "." + TABLE_FIELD, + false, + 1, + Integer.MAX_VALUE); + properties.validateString( + TABLE_JSON_MAPPING + "." + i + "." + JSON_FIELD, + false, + 1, + Integer.MAX_VALUE); + } + } + + @SuppressWarnings("unchecked") + private <K, V> scala.collection.immutable.Map<K, V> toScalaImmutableMap(Map<K, V> javaMap) { + final java.util.List<scala.Tuple2<K, V>> list = new java.util.ArrayList<>(javaMap.size()); + for (final java.util.Map.Entry<K, V> entry : javaMap.entrySet()) { + list.add(scala.Tuple2.apply(entry.getKey(), entry.getValue())); + } + final scala.collection.Seq<Tuple2<K, V>> seq = + scala.collection.JavaConverters.asScalaBufferConverter(list).asScala().toSeq(); + return (scala.collection.immutable.Map<K, V>) scala.collection.immutable.Map$.MODULE$.apply(seq); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableFromDescriptorTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableFromDescriptorTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableFromDescriptorTestBase.java new file mode 100644 index 0000000..964a624 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableFromDescriptorTestBase.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.descriptors.Kafka; + +import org.mockito.Mockito; + +/** + * Tests for {@link KafkaJsonTableSourceFactory}. + */ +public abstract class KafkaJsonTableFromDescriptorTestBase { + private static final String GROUP_ID = "test-group"; + private static final String BOOTSTRAP_SERVERS = "localhost:1234"; + private static final String TOPIC = "test-topic"; + + protected abstract String versionForTest(); + + protected abstract KafkaJsonTableSource.Builder builderForTest(); + + protected abstract void extraSettings(KafkaTableSource.Builder builder, Kafka kafka); + + private static StreamExecutionEnvironment env = Mockito.mock(StreamExecutionEnvironment.class); + private static StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); + +// @Test +// public void buildJsonTableSourceTest() throws Exception { +// final URL url = getClass().getClassLoader().getResource("kafka-json-schema.json"); +// Objects.requireNonNull(url); +// final String schema = FileUtils.readFileUtf8(new File(url.getFile())); +// +// Map<String, String> tableJsonMapping = new HashMap<>(); +// tableJsonMapping.put("fruit-name", "name"); +// tableJsonMapping.put("fruit-count", "count"); +// tableJsonMapping.put("event-time", "time"); +// +// // Construct with the builder. +// Properties props = new Properties(); +// props.put("group.id", GROUP_ID); +// props.put("bootstrap.servers", BOOTSTRAP_SERVERS); +// +// Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>(); +// specificOffsets.put(new KafkaTopicPartition(TOPIC, 0), 100L); +// specificOffsets.put(new KafkaTopicPartition(TOPIC, 1), 123L); +// +// KafkaTableSource.Builder builder = builderForTest() +// .forJsonSchema(TableSchema.fromTypeInfo(JsonSchemaConverter.convert(schema))) +// .failOnMissingField(true) +// .withTableToJsonMapping(tableJsonMapping) +// .withKafkaProperties(props) +// .forTopic(TOPIC) +// .fromSpecificOffsets(specificOffsets) +// .withSchema( +// TableSchema.builder() +// .field("fruit-name", Types.STRING) +// .field("fruit-count", Types.INT) +// .field("event-time", Types.LONG) +// .field("proc-time", Types.SQL_TIMESTAMP) +// .build()) +// .withProctimeAttribute("proc-time"); +// +// // Construct with the descriptor. +// Map<Integer, Long> offsets = new HashMap<>(); +// offsets.put(0, 100L); +// offsets.put(1, 123L); +// Kafka kafka = new Kafka() +// .version(versionForTest()) +// .groupId(GROUP_ID) +// .bootstrapServers(BOOTSTRAP_SERVERS) +// .topic(TOPIC) +// .startupMode(StartupMode.SPECIFIC_OFFSETS) +// .specificOffsets(offsets) +// .tableJsonMapping(tableJsonMapping); +// extraSettings(builder, kafka); +// +// TableSource source = tEnv +// .from(kafka) +// .withFormat( +// new Json() +// .schema(schema) +// .failOnMissingField(true)) +// .withSchema(new Schema() +// .field("fruit-name", Types.STRING) +// .field("fruit-count", Types.INT) +// .field("event-time", Types.LONG) +// .field("proc-time", Types.SQL_TIMESTAMP).proctime()) +// .toTableSource(); +// +// Assert.assertEquals(builder.build(), source); +// } + +// @Test(expected = TableException.class) +// public void buildJsonTableSourceFailTest() { +// tEnv.from( +// new Kafka() +// .version(versionForTest()) +// .groupId(GROUP_ID) +// .bootstrapServers(BOOTSTRAP_SERVERS) +// .topic(TOPIC) +// .startupMode(StartupMode.SPECIFIC_OFFSETS) +// .specificOffsets(new HashMap<>())) +// .withFormat( +// new Json() +// .schema("") +// .failOnMissingField(true)) +// .toTableSource(); +// } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-json-schema.json ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-json-schema.json b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-json-schema.json new file mode 100644 index 0000000..5167e5e --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-json-schema.json @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "title": "Fruit", + "type": "object", + "properties": { + "name": { + "type": "string" + }, + "count": { + "type": "integer" + }, + "time": { + "description": "Age in years", + "type": "number" + } + }, + "required": ["name", "count", "time"] +} http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala index 534ef39..1e88d93 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala @@ -92,6 +92,11 @@ class TableSchema( } /** + * Returns the number of columns. + */ + def getColumnNum: Int = columnNames.length + + /** * Returns all column names as an array. */ def getColumnNames: Array[String] = columnNames