[FLINK-8538][table]Add a Kafka table source factory with JSON format support


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1d26062d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1d26062d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1d26062d

Branch: refs/heads/release-1.5
Commit: 1d26062de130c05fdbe7701b55766b4a8d433418
Parents: a269f85
Author: Xingcan Cui <xingc...@gmail.com>
Authored: Mon Feb 12 18:11:36 2018 +0800
Committer: Timo Walther <twal...@apache.org>
Committed: Tue Feb 27 20:23:00 2018 +0100

----------------------------------------------------------------------
 .../kafka/Kafka010JsonTableSourceFactory.java   |  36 +++
 ...pache.flink.table.sources.TableSourceFactory |  16 ++
 .../resources/tableSourceConverter.properties   |  29 +++
 .../kafka/Kafka010TableSourceFactoryTest.java   |  41 ++++
 .../kafka/Kafka011JsonTableSourceFactory.java   |  36 +++
 ...pache.flink.table.sources.TableSourceFactory |  16 ++
 .../resources/tableSourceConverter.properties   |  29 +++
 .../kafka/Kafka011TableSourceFactoryTest.java   |  41 ++++
 .../kafka/Kafka08JsonTableSourceFactory.java    |  36 +++
 ...pache.flink.table.sources.TableSourceFactory |  16 ++
 .../resources/tableSourceConverter.properties   |  29 +++
 .../kafka/Kafka08TableSourceFactoryTest.java    |  42 ++++
 .../kafka/Kafka09JsonTableSourceFactory.java    |  36 +++
 ...pache.flink.table.sources.TableSourceFactory |  16 ++
 .../resources/tableSourceConverter.properties   |  29 +++
 .../kafka/Kafka09TableSourceFactoryTest.java    |  41 ++++
 .../flink-connector-kafka-base/pom.xml          |  14 ++
 .../connectors/kafka/KafkaJsonTableSource.java  |  17 ++
 .../kafka/KafkaJsonTableSourceFactory.java      | 227 +++++++++++++++++++
 .../connectors/kafka/KafkaTableSource.java      |  30 +++
 .../apache/flink/table/descriptors/Kafka.java   | 199 ++++++++++++++++
 .../flink/table/descriptors/KafkaValidator.java | 193 ++++++++++++++++
 .../KafkaJsonTableFromDescriptorTestBase.java   | 127 +++++++++++
 .../src/test/resources/kafka-json-schema.json   |  35 +++
 .../apache/flink/table/api/TableSchema.scala    |   5 +
 25 files changed, 1336 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
new file mode 100644
index 0000000..1d03f6c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static 
org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_010;
+
+/**
+ * Factory for creating configured instances of {@link 
Kafka010JsonTableSource}.
+ */
+public class Kafka010JsonTableSourceFactory extends 
KafkaJsonTableSourceFactory {
+       @Override
+       protected KafkaJsonTableSource.Builder createBuilder() {
+               return new Kafka010JsonTableSource.Builder();
+       }
+
+       @Override
+       protected String kafkaVersion() {
+               return KAFKA_VERSION_VALUE_010;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
new file mode 100644
index 0000000..9ef54fc
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.kafka.Kafka010JsonTableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.10/src/main/resources/tableSourceConverter.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/tableSourceConverter.properties
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/tableSourceConverter.properties
new file mode 100644
index 0000000..5409b49
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/tableSourceConverter.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+################################################################################
+# The config file is used to specify the packages of current module where
+# to find TableSourceConverter implementation class annotated with TableType.
+# If there are multiple packages to scan, put those packages together into a
+# string separated with ',', for example, org.package1,org.package2.
+# Please notice:
+# It's better to have a tableSourceConverter.properties in each connector 
Module
+# which offers converters instead of put all information into the
+# tableSourceConverter.properties of flink-table module.
+################################################################################
+scan.packages=org.apache.flink.streaming.connectors.kafka

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
new file mode 100644
index 0000000..15b89e8
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.table.descriptors.Kafka;
+
+import static 
org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_010;
+
+/**
+ * Tests for {@link Kafka010JsonTableSourceFactory}.
+ */
+public class Kafka010TableSourceFactoryTest extends 
KafkaJsonTableFromDescriptorTestBase {
+       protected String versionForTest() {
+               return KAFKA_VERSION_VALUE_010;
+       }
+
+       protected KafkaJsonTableSource.Builder builderForTest() {
+               return Kafka010JsonTableSource.builder();
+       }
+
+       @Override
+       protected void extraSettings(KafkaTableSource.Builder builder, Kafka 
kafka) {
+               // no extra settings
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
new file mode 100644
index 0000000..ca4d6ce
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static 
org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_011;
+
+/**
+ * Factory for creating configured instances of {@link 
Kafka011JsonTableSource}.
+ */
+public class Kafka011JsonTableSourceFactory extends 
KafkaJsonTableSourceFactory {
+       @Override
+       protected KafkaJsonTableSource.Builder createBuilder() {
+               return new Kafka011JsonTableSource.Builder();
+       }
+
+       @Override
+       protected String kafkaVersion() {
+               return KAFKA_VERSION_VALUE_011;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
new file mode 100644
index 0000000..75135e5
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.kafka.Kafka011JsonTableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.11/src/main/resources/tableSourceConverter.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/tableSourceConverter.properties
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/tableSourceConverter.properties
new file mode 100644
index 0000000..5409b49
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/tableSourceConverter.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+################################################################################
+# The config file is used to specify the packages of current module where
+# to find TableSourceConverter implementation class annotated with TableType.
+# If there are multiple packages to scan, put those packages together into a
+# string separated with ',', for example, org.package1,org.package2.
+# Please notice:
+# It's better to have a tableSourceConverter.properties in each connector 
Module
+# which offers converters instead of put all information into the
+# tableSourceConverter.properties of flink-table module.
+################################################################################
+scan.packages=org.apache.flink.streaming.connectors.kafka

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
new file mode 100644
index 0000000..84ac39b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.table.descriptors.Kafka;
+
+import static 
org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_011;
+
+/**
+ * Tests for {@link Kafka011JsonTableSourceFactory}.
+ */
+public class Kafka011TableSourceFactoryTest extends 
KafkaJsonTableFromDescriptorTestBase {
+       protected String versionForTest() {
+               return KAFKA_VERSION_VALUE_011;
+       }
+
+       protected KafkaJsonTableSource.Builder builderForTest() {
+               return Kafka011JsonTableSource.builder();
+       }
+
+       @Override
+       protected void extraSettings(KafkaTableSource.Builder builder, Kafka 
kafka) {
+               // no extra settings
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java
new file mode 100644
index 0000000..e4e5096
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static 
org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_08;
+
+/**
+ * Factory for creating configured instances of {@link Kafka08JsonTableSource}.
+ */
+public class Kafka08JsonTableSourceFactory extends KafkaJsonTableSourceFactory 
{
+       @Override
+       protected KafkaJsonTableSource.Builder createBuilder() {
+               return new Kafka08JsonTableSource.Builder();
+       }
+
+       @Override
+       protected String kafkaVersion() {
+               return KAFKA_VERSION_VALUE_08;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
new file mode 100644
index 0000000..9092955
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.kafka.Kafka08JsonTableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.8/src/main/resources/tableSourceConverter.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/tableSourceConverter.properties
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/tableSourceConverter.properties
new file mode 100644
index 0000000..5409b49
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/tableSourceConverter.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+################################################################################
+# The config file is used to specify the packages of current module where
+# to find TableSourceConverter implementation class annotated with TableType.
+# If there are multiple packages to scan, put those packages together into a
+# string separated with ',', for example, org.package1,org.package2.
+# Please notice:
+# It's better to have a tableSourceConverter.properties in each connector 
Module
+# which offers converters instead of put all information into the
+# tableSourceConverter.properties of flink-table module.
+################################################################################
+scan.packages=org.apache.flink.streaming.connectors.kafka

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java
new file mode 100644
index 0000000..a2edc09
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.table.descriptors.Kafka;
+
+import static 
org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_08;
+
+/**
+ * Tests for {@link Kafka08JsonTableSourceFactory}.
+ */
+public class Kafka08TableSourceFactoryTest extends 
KafkaJsonTableFromDescriptorTestBase {
+       protected String versionForTest() {
+               return KAFKA_VERSION_VALUE_08;
+       }
+
+       protected KafkaJsonTableSource.Builder builderForTest() {
+               return Kafka08JsonTableSource.builder();
+       }
+
+       @Override
+       protected void extraSettings(KafkaTableSource.Builder builder, Kafka 
kafka) {
+               builder.getKafkaProps().put("zookeeper.connect", 
"localhost:1111");
+               kafka.zookeeperConnect("localhost:1111");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java
new file mode 100644
index 0000000..bbda4ae
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static 
org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_09;
+
+/**
+ * Factory for creating configured instances of {@link Kafka09JsonTableSource}.
+ */
+public class Kafka09JsonTableSourceFactory extends KafkaJsonTableSourceFactory 
{
+       @Override
+       protected KafkaJsonTableSource.Builder createBuilder() {
+               return new Kafka09JsonTableSource.Builder();
+       }
+
+       @Override
+       protected String kafkaVersion() {
+               return KAFKA_VERSION_VALUE_09;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
new file mode 100644
index 0000000..2f38bd0
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.kafka.Kafka09JsonTableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.9/src/main/resources/tableSourceConverter.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/tableSourceConverter.properties
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/tableSourceConverter.properties
new file mode 100644
index 0000000..5409b49
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/tableSourceConverter.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+################################################################################
+# The config file is used to specify the packages of current module where
+# to find TableSourceConverter implementation class annotated with TableType.
+# If there are multiple packages to scan, put those packages together into a
+# string separated with ',', for example, org.package1,org.package2.
+# Please notice:
+# It's better to have a tableSourceConverter.properties in each connector 
Module
+# which offers converters instead of put all information into the
+# tableSourceConverter.properties of flink-table module.
+################################################################################
+scan.packages=org.apache.flink.streaming.connectors.kafka

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java
new file mode 100644
index 0000000..fc85ea7
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.table.descriptors.Kafka;
+
+import static 
org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_09;
+
+/**
+ * Factory for creating configured instances of {@link Kafka09JsonTableSource}.
+ */
+public class Kafka09TableSourceFactoryTest extends 
KafkaJsonTableFromDescriptorTestBase {
+       protected String versionForTest() {
+               return KAFKA_VERSION_VALUE_09;
+       }
+
+       protected KafkaJsonTableSource.Builder builderForTest() {
+               return Kafka09JsonTableSource.builder();
+       }
+
+       @Override
+       protected void extraSettings(KafkaTableSource.Builder builder, Kafka 
kafka) {
+               // no extra settings
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml 
b/flink-connectors/flink-connector-kafka-base/pom.xml
index e7412cf..2ccaa2e 100644
--- a/flink-connectors/flink-connector-kafka-base/pom.xml
+++ b/flink-connectors/flink-connector-kafka-base/pom.xml
@@ -205,6 +205,20 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-scala_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
        </dependencies>
 
        <dependencyManagement>

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
index f581e89..d2dafe7 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.sources.DefinedFieldMapping;
 import org.apache.flink.table.sources.StreamTableSource;
 
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 
 /**
@@ -86,6 +87,22 @@ public abstract class KafkaJsonTableSource extends 
KafkaTableSource implements D
                return "KafkaJSONTableSource";
        }
 
+       @Override
+       public boolean equals(Object other) {
+               if (super.equals(other)) {
+                       KafkaJsonTableSource otherSource = 
(KafkaJsonTableSource) other;
+                       return Objects.equals(failOnMissingField, 
otherSource.failOnMissingField)
+                                       && Objects.equals(jsonSchema, 
otherSource.jsonSchema)
+                                       && Objects.equals(fieldMapping, 
otherSource.fieldMapping);
+               }
+               return false;
+       }
+
+       @Override
+       public int hashCode() {
+               return 31 * super.hashCode() + Objects.hash(failOnMissingField, 
jsonSchema, fieldMapping);
+       }
+
        //////// SETTERS FOR OPTIONAL PARAMETERS
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
new file mode 100644
index 0000000..918b833
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.json.JsonSchemaConverter;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.JsonValidator;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceFactory;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
+import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE;
+import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_VERSION;
+import static 
org.apache.flink.table.descriptors.JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD;
+import static 
org.apache.flink.table.descriptors.JsonValidator.FORMAT_SCHEMA_STRING;
+import static 
org.apache.flink.table.descriptors.JsonValidator.FORMAT_TYPE_VALUE;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE;
+import static org.apache.flink.table.descriptors.KafkaValidator.GROUP_ID;
+import static org.apache.flink.table.descriptors.KafkaValidator.JSON_FIELD;
+import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION;
+import static org.apache.flink.table.descriptors.KafkaValidator.OFFSET;
+import static org.apache.flink.table.descriptors.KafkaValidator.PARTITION;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.SPECIFIC_OFFSETS;
+import static org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE_VALUE_EARLIEST;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE_VALUE_GROUP_OFFSETS;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE_VALUE_LATEST;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE_VALUE_SPECIFIC_OFFSETS;
+import static org.apache.flink.table.descriptors.KafkaValidator.TABLE_FIELD;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.TABLE_JSON_MAPPING;
+import static org.apache.flink.table.descriptors.KafkaValidator.TOPIC;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.ZOOKEEPER_CONNECT;
+import static org.apache.flink.table.descriptors.SchemaValidator.PROCTIME;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA;
+import static 
org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_VERSION;
+
+import scala.Option;
+import scala.collection.JavaConversions;
+
+/**
+ * Factory for creating configured instances of {@link KafkaJsonTableSource}.
+ */
+public abstract class KafkaJsonTableSourceFactory implements 
TableSourceFactory<Row> {
+       @Override
+       public Map<String, String> requiredContext() {
+               Map<String, String> context = new HashMap<>();
+               context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE); // kafka 
connector
+               context.put(FORMAT_TYPE(), FORMAT_TYPE_VALUE()); // Json format
+               context.put(KAFKA_VERSION, kafkaVersion()); // for different 
implementations
+               context.put(CONNECTOR_VERSION(), "1");
+               context.put(FORMAT_VERSION(), "1");
+               context.put(SCHEMA_VERSION(), "1");
+               return context;
+       }
+
+       @Override
+       public List<String> supportedProperties() {
+               List<String> properties = new ArrayList<>();
+
+               // kafka
+               properties.add(KAFKA_VERSION);
+               properties.add(BOOTSTRAP_SERVERS);
+               properties.add(GROUP_ID);
+               properties.add(ZOOKEEPER_CONNECT);
+               properties.add(TOPIC);
+               properties.add(STARTUP_MODE);
+               properties.add(SPECIFIC_OFFSETS + ".#." + PARTITION);
+               properties.add(SPECIFIC_OFFSETS + ".#." + OFFSET);
+
+               // json format
+               properties.add(FORMAT_SCHEMA_STRING());
+               properties.add(FORMAT_FAIL_ON_MISSING_FIELD());
+
+               // table json mapping
+               properties.add(TABLE_JSON_MAPPING + ".#." + TABLE_FIELD);
+               properties.add(TABLE_JSON_MAPPING + ".#." + JSON_FIELD);
+
+               // schema
+               properties.add(SCHEMA() + ".#." + DescriptorProperties.TYPE());
+               properties.add(SCHEMA() + ".#." + DescriptorProperties.NAME());
+
+               // time attributes
+               properties.add(SCHEMA() + ".#." + PROCTIME());
+//             properties.add(SCHEMA() + ".#." + ROWTIME() + ".#." + 
TIMESTAMPS_CLASS());
+//             properties.add(SCHEMA() + ".#." + ROWTIME() + ".#." + 
TIMESTAMPS_TYPE());
+
+               return properties;
+       }
+
+       @Override
+       public TableSource<Row> create(Map<String, String> properties) {
+               DescriptorProperties params = new DescriptorProperties(true);
+               params.putProperties(properties);
+
+               // validate
+               new KafkaValidator().validate(params);
+               new JsonValidator().validate(params);
+               new SchemaValidator(true).validate(params);
+
+               // build
+               KafkaJsonTableSource.Builder builder = createBuilder();
+               Properties kafkaProps = new Properties();
+
+               // Set the required parameters.
+               String topic = params.getString(TOPIC).get();
+               TableSchema tableSchema = params.getTableSchema(SCHEMA()).get();
+
+               kafkaProps.put(BOOTSTRAP_SERVERS, 
params.getString(BOOTSTRAP_SERVERS).get());
+               kafkaProps.put(GROUP_ID, params.getString(GROUP_ID).get());
+
+               // Set the zookeeper connect for kafka 0.8.
+               Option<String> zkConnect = params.getString(ZOOKEEPER_CONNECT);
+               if (zkConnect.isDefined()) {
+                       kafkaProps.put(ZOOKEEPER_CONNECT, zkConnect.get());
+               }
+
+               
builder.withKafkaProperties(kafkaProps).forTopic(topic).withSchema(tableSchema);
+
+               // Set the startup mode.
+               String startupMode = params.getString(STARTUP_MODE).get();
+               if (null != startupMode) {
+                       switch (startupMode) {
+                               case STARTUP_MODE_VALUE_EARLIEST:
+                                       builder.fromEarliest();
+                                       break;
+                               case STARTUP_MODE_VALUE_LATEST:
+                                       builder.fromLatest();
+                                       break;
+                               case STARTUP_MODE_VALUE_GROUP_OFFSETS:
+                                       builder.fromGroupOffsets();
+                                       break;
+                               case STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
+                                       Map<String, String> partitions = 
JavaConversions.
+                                                       
mapAsJavaMap(params.getIndexedProperty(SPECIFIC_OFFSETS, PARTITION));
+                                       Map<KafkaTopicPartition, Long> 
offsetMap = new HashMap<>();
+                                       for (int i = 0; i < partitions.size(); 
i++) {
+                                               offsetMap.put(
+                                                               new 
KafkaTopicPartition(
+                                                                               
topic,
+                                                                               
Integer.valueOf(params.getString(
+                                                                               
                SPECIFIC_OFFSETS + "" + "." + i + "." + PARTITION).get())),
+                                                               
Long.valueOf(params.getString(
+                                                                               
SPECIFIC_OFFSETS + "" + "." + i + "." + OFFSET).get()));
+                                       }
+                                       builder.fromSpecificOffsets(offsetMap);
+                                       break;
+                       }
+               }
+
+               // Set whether fail on missing JSON field.
+               Option<String> failOnMissing = 
params.getString(FORMAT_FAIL_ON_MISSING_FIELD());
+               if (failOnMissing.isDefined()) {
+                       
builder.failOnMissingField(Boolean.valueOf(failOnMissing.get()));
+               }
+
+               // Set the JSON schema.
+               Option<String> jsonSchema = 
params.getString(FORMAT_SCHEMA_STRING());
+               if (jsonSchema.isDefined()) {
+                       TypeInformation jsonSchemaType = 
JsonSchemaConverter.convert(jsonSchema.get());
+                       
builder.forJsonSchema(TableSchema.fromTypeInfo(jsonSchemaType));
+               }
+
+               // Set the table => JSON fields mapping.
+               Map<String, String>  mappingTableFields = JavaConversions.
+                               
mapAsJavaMap(params.getIndexedProperty(TABLE_JSON_MAPPING, TABLE_FIELD));
+
+               if (!mappingTableFields.isEmpty()) {
+                       Map<String, String> tableJsonMapping = new HashMap<>();
+                       for (int i = 0; i < mappingTableFields.size(); i++) {
+                               
tableJsonMapping.put(params.getString(TABLE_JSON_MAPPING + "." + i + "." + 
TABLE_FIELD).get(),
+                                               
params.getString(TABLE_JSON_MAPPING + "." + i + "." + JSON_FIELD).get()
+                               );
+                       }
+                       builder.withTableToJsonMapping(tableJsonMapping);
+               }
+
+               // Set the time attributes.
+               setTimeAttributes(tableSchema, params, builder);
+
+               return builder.build();
+       }
+
+       protected abstract KafkaJsonTableSource.Builder createBuilder();
+
+       protected abstract String kafkaVersion();
+
+       private void setTimeAttributes(TableSchema schema, DescriptorProperties 
params, KafkaJsonTableSource.Builder builder) {
+               // TODO to deal with rowtime fields
+               Option<String> proctimeField;
+               for (int i = 0; i < schema.getColumnNum(); i++) {
+                       proctimeField = params.getString(SCHEMA() + "." + i + 
"." + PROCTIME());
+                       if (proctimeField.isDefined()) {
+                               
builder.withProctimeAttribute(schema.getColumnName(i).get());
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index d5cda4a..9ce3b8e 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -42,6 +42,7 @@ import org.apache.flink.util.Preconditions;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 
 import scala.Option;
@@ -138,6 +139,35 @@ public abstract class KafkaTableSource
                return TableConnectorUtil.generateRuntimeName(this.getClass(), 
schema.getColumnNames());
        }
 
+       @Override
+       public boolean equals(Object o) {
+               if (!o.getClass().equals(this.getClass())) {
+                       return false;
+               }
+               KafkaTableSource other = (KafkaTableSource) o;
+               return Objects.equals(topic, other.topic)
+                               && Objects.equals(schema, other.schema)
+                               && Objects.equals(properties, other.properties)
+                               && Objects.equals(proctimeAttribute, 
other.proctimeAttribute)
+                               && Objects.equals(returnType, other.returnType)
+                               && Objects.equals(rowtimeAttributeDescriptors, 
other.rowtimeAttributeDescriptors)
+                               && Objects.equals(specificStartupOffsets, 
other.specificStartupOffsets)
+                               && Objects.equals(startupMode, 
other.startupMode);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(
+                               topic,
+                               schema,
+                               properties,
+                               proctimeAttribute,
+                               returnType,
+                               rowtimeAttributeDescriptors,
+                               specificStartupOffsets,
+                               startupMode);
+       }
+
        /**
         * Returns a version-specific Kafka consumer with the start position 
configured.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
new file mode 100644
index 0000000..4733f6e
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+
+import static 
org.apache.flink.table.descriptors.KafkaValidator.BOOTSTRAP_SERVERS;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE;
+import static org.apache.flink.table.descriptors.KafkaValidator.GROUP_ID;
+import static org.apache.flink.table.descriptors.KafkaValidator.JSON_FIELD;
+import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION;
+import static org.apache.flink.table.descriptors.KafkaValidator.OFFSET;
+import static org.apache.flink.table.descriptors.KafkaValidator.PARTITION;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.SPECIFIC_OFFSETS;
+import static org.apache.flink.table.descriptors.KafkaValidator.TABLE_FIELD;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.TABLE_JSON_MAPPING;
+import static org.apache.flink.table.descriptors.KafkaValidator.TOPIC;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.ZOOKEEPER_CONNECT;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+/**
+ * Connector descriptor for the kafka message queue.
+ */
+public class Kafka extends ConnectorDescriptor {
+
+       private Optional<String> version = Optional.empty();
+       private Optional<String> bootstrapServers = Optional.empty();
+       private Optional<String> groupId = Optional.empty();
+       private Optional<String> topic = Optional.empty();
+       private Optional<String> zookeeperConnect = Optional.empty();
+       private Optional<Map<String, String>> tableJsonMapping = 
Optional.empty();
+
+       private Optional<StartupMode> startupMode = Optional.empty();
+       private Optional<Map<Integer, Long>> specificOffsets = Optional.empty();
+
+       public Kafka() {
+               super(CONNECTOR_TYPE_VALUE, 1);
+       }
+
+       /**
+        * Sets the kafka version.
+        *
+        * @param version
+        * Could be {@link KafkaValidator#KAFKA_VERSION_VALUE_011},
+        * {@link KafkaValidator#KAFKA_VERSION_VALUE_010},
+        * {@link KafkaValidator#KAFKA_VERSION_VALUE_09},
+        * or {@link KafkaValidator#KAFKA_VERSION_VALUE_08}.
+        */
+       public Kafka version(String version) {
+               this.version = Optional.of(version);
+               return this;
+       }
+
+       /**
+        * Sets the bootstrap servers for kafka.
+        */
+       public Kafka bootstrapServers(String bootstrapServers) {
+               this.bootstrapServers = Optional.of(bootstrapServers);
+               return this;
+       }
+
+       /**
+        * Sets the consumer group id.
+        */
+       public Kafka groupId(String groupId) {
+               this.groupId = Optional.of(groupId);
+               return this;
+       }
+
+       /**
+        * Sets the topic to consume.
+        */
+       public Kafka topic(String topic) {
+               this.topic = Optional.of(topic);
+               return this;
+       }
+
+       /**
+        * Sets the startup mode.
+        */
+       public Kafka startupMode(StartupMode startupMode) {
+               this.startupMode = Optional.of(startupMode);
+               return this;
+       }
+
+       /**
+        * Sets the zookeeper hosts. Only required by kafka 0.8.
+        */
+       public Kafka zookeeperConnect(String zookeeperConnect) {
+               this.zookeeperConnect = Optional.of(zookeeperConnect);
+               return this;
+       }
+
+       /**
+        * Sets the consume offsets for the topic set with {@link 
Kafka#topic(String)}.
+        * Only works in {@link StartupMode#SPECIFIC_OFFSETS} mode.
+        */
+       public Kafka specificOffsets(Map<Integer, Long> specificOffsets) {
+               this.specificOffsets = Optional.of(specificOffsets);
+               return this;
+       }
+
+       /**
+        * Sets the mapping from logical table schema to json schema.
+        */
+       public Kafka tableJsonMapping(Map<String, String> jsonTableMapping) {
+               this.tableJsonMapping = Optional.of(jsonTableMapping);
+               return this;
+       }
+
+       @Override
+       public void addConnectorProperties(DescriptorProperties properties) {
+               if (version.isPresent()) {
+                       properties.putString(KAFKA_VERSION, version.get());
+               }
+               if (bootstrapServers.isPresent()) {
+                       properties.putString(BOOTSTRAP_SERVERS, 
bootstrapServers.get());
+               }
+               if (groupId.isPresent()) {
+                       properties.putString(GROUP_ID, groupId.get());
+               }
+               if (topic.isPresent()) {
+                       properties.putString(TOPIC, topic.get());
+               }
+               if (zookeeperConnect.isPresent()) {
+                       properties.putString(ZOOKEEPER_CONNECT, 
zookeeperConnect.get());
+               }
+               if (startupMode.isPresent()) {
+                       Map<String, String> map = 
KafkaValidator.normalizeStartupMode(startupMode.get());
+                       for (Map.Entry<String, String> entry : map.entrySet()) {
+                               properties.putString(entry.getKey(), 
entry.getValue());
+                       }
+               }
+               if (specificOffsets.isPresent()) {
+                       List<String> propertyKeys = new ArrayList<>();
+                       propertyKeys.add(PARTITION);
+                       propertyKeys.add(OFFSET);
+
+                       List<Seq<String>> propertyValues = new 
ArrayList<>(specificOffsets.get().size());
+                       for (Map.Entry<Integer, Long> entry : 
specificOffsets.get().entrySet()) {
+                               List<String> partitionOffset = new 
ArrayList<>(2);
+                               partitionOffset.add(entry.getKey().toString());
+                               
partitionOffset.add(entry.getValue().toString());
+                               
propertyValues.add(JavaConversions.asScalaBuffer(partitionOffset).toSeq());
+                       }
+                       properties.putIndexedFixedProperties(
+                                       SPECIFIC_OFFSETS,
+                                       
JavaConversions.asScalaBuffer(propertyKeys).toSeq(),
+                                       
JavaConversions.asScalaBuffer(propertyValues).toSeq()
+                       );
+               }
+               if (tableJsonMapping.isPresent()) {
+                       List<String> propertyKeys = new ArrayList<>();
+                       propertyKeys.add(TABLE_FIELD);
+                       propertyKeys.add(JSON_FIELD);
+
+                       List<Seq<String>> mappingFields = new 
ArrayList<>(tableJsonMapping.get().size());
+                       for (Map.Entry<String, String> entry : 
tableJsonMapping.get().entrySet()) {
+                               List<String> singleMapping = new ArrayList<>(2);
+                               singleMapping.add(entry.getKey());
+                               singleMapping.add(entry.getValue());
+                               
mappingFields.add(JavaConversions.asScalaBuffer(singleMapping).toSeq());
+                       }
+                       properties.putIndexedFixedProperties(
+                                       TABLE_JSON_MAPPING,
+                                       
JavaConversions.asScalaBuffer(propertyKeys).toSeq(),
+                                       
JavaConversions.asScalaBuffer(mappingFields).toSeq()
+                       );
+               }
+       }
+
+       @Override
+       public boolean needsFormat() {
+               return true;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
new file mode 100644
index 0000000..a3ca22f
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.table.api.ValidationException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import scala.Function0;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.runtime.AbstractFunction0;
+import scala.runtime.BoxedUnit;
+
+
+/**
+ * The validator for {@link Kafka}.
+ */
+public class KafkaValidator extends ConnectorDescriptorValidator {
+       // fields
+       public static final String CONNECTOR_TYPE_VALUE = "kafka";
+       public static final String KAFKA_VERSION = "kafka.version";
+       public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
+       public static final String GROUP_ID = "group.id";
+       public static final String TOPIC = "topic";
+       public static final String STARTUP_MODE = "startup.mode";
+       public static final String SPECIFIC_OFFSETS = "specific.offsets";
+       public static final String TABLE_JSON_MAPPING = "table.json.mapping";
+
+       public static final String PARTITION = "partition";
+       public static final String OFFSET = "offset";
+
+       public static final String TABLE_FIELD = "table.field";
+       public static final String JSON_FIELD = "json.field";
+
+       public static final String ZOOKEEPER_CONNECT = "zookeeper.connect"; // 
only required for 0.8
+
+       // values
+       public static final String KAFKA_VERSION_VALUE_08 = "0.8";
+       public static final String KAFKA_VERSION_VALUE_09 = "0.9";
+       public static final String KAFKA_VERSION_VALUE_010 = "0.10";
+       public static final String KAFKA_VERSION_VALUE_011 = "0.11";
+
+       public static final String STARTUP_MODE_VALUE_EARLIEST = 
"earliest-offset";
+       public static final String STARTUP_MODE_VALUE_LATEST = "latest-offset";
+       public static final String STARTUP_MODE_VALUE_GROUP_OFFSETS = 
"group-offsets";
+       public static final String STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = 
"specific-offsets";
+
+       // utils
+       public static Map<String, String> normalizeStartupMode(StartupMode 
startupMode) {
+               Map<String, String> mapPair = new HashMap<>();
+               switch (startupMode) {
+                       case EARLIEST:
+                               mapPair.put(STARTUP_MODE, 
STARTUP_MODE_VALUE_EARLIEST);
+                               break;
+                       case LATEST:
+                               mapPair.put(STARTUP_MODE, 
STARTUP_MODE_VALUE_LATEST);
+                               break;
+                       case GROUP_OFFSETS:
+                               mapPair.put(STARTUP_MODE, 
STARTUP_MODE_VALUE_GROUP_OFFSETS);
+                               break;
+                       case SPECIFIC_OFFSETS:
+                               mapPair.put(STARTUP_MODE, 
STARTUP_MODE_VALUE_SPECIFIC_OFFSETS);
+                               break;
+               }
+               return mapPair;
+       }
+
+       @Override
+       public void validate(DescriptorProperties properties) {
+               super.validate(properties);
+
+               AbstractFunction0<BoxedUnit> emptyValidator = new 
AbstractFunction0<BoxedUnit>() {
+                       @Override
+                       public BoxedUnit apply() {
+                               return BoxedUnit.UNIT;
+                       }
+               };
+
+               properties.validateValue(CONNECTOR_TYPE(), 
CONNECTOR_TYPE_VALUE, false);
+
+               AbstractFunction0<BoxedUnit> version08Validator = new 
AbstractFunction0<BoxedUnit>() {
+                       @Override
+                       public BoxedUnit apply() {
+                               properties.validateString(ZOOKEEPER_CONNECT, 
false, 0, Integer.MAX_VALUE);
+                               return BoxedUnit.UNIT;
+                       }
+               };
+
+               Map<String, Function0<BoxedUnit>> versionValidatorMap = new 
HashMap<>();
+               versionValidatorMap.put(KAFKA_VERSION_VALUE_08, 
version08Validator);
+               versionValidatorMap.put(KAFKA_VERSION_VALUE_09, emptyValidator);
+               versionValidatorMap.put(KAFKA_VERSION_VALUE_010, 
emptyValidator);
+               versionValidatorMap.put(KAFKA_VERSION_VALUE_011, 
emptyValidator);
+               properties.validateEnum(
+                               KAFKA_VERSION,
+                               false,
+                               toScalaImmutableMap(versionValidatorMap)
+               );
+
+               properties.validateString(BOOTSTRAP_SERVERS, false, 1, 
Integer.MAX_VALUE);
+               properties.validateString(GROUP_ID, false, 1, 
Integer.MAX_VALUE);
+               properties.validateString(TOPIC, false, 1, Integer.MAX_VALUE);
+
+               AbstractFunction0<BoxedUnit> specificOffsetsValidator = new 
AbstractFunction0<BoxedUnit>() {
+                       @Override
+                       public BoxedUnit apply() {
+                               Map<String, String> partitions = 
JavaConversions.mapAsJavaMap(
+                                               
properties.getIndexedProperty(SPECIFIC_OFFSETS, PARTITION));
+
+                               Map<String, String> offsets = 
JavaConversions.mapAsJavaMap(
+                                               
properties.getIndexedProperty(SPECIFIC_OFFSETS, OFFSET));
+                               if (partitions.isEmpty() || offsets.isEmpty()) {
+                                       throw new ValidationException("Offsets 
must be set for SPECIFIC_OFFSETS mode.");
+                               }
+                               for (int i = 0; i < partitions.size(); ++i) {
+                                       properties.validateInt(
+                                                       SPECIFIC_OFFSETS + "." 
+ i + "." + PARTITION,
+                                                       false,
+                                                       0,
+                                                       Integer.MAX_VALUE);
+                                       properties.validateLong(
+                                                       SPECIFIC_OFFSETS + "." 
+ i + "." + OFFSET,
+                                                       false,
+                                                       0,
+                                                       Long.MAX_VALUE);
+                               }
+                               return BoxedUnit.UNIT;
+                       }
+               };
+               Map<String, Function0<BoxedUnit>> startupModeValidatorMap = new 
HashMap<>();
+               startupModeValidatorMap.put(STARTUP_MODE_VALUE_GROUP_OFFSETS, 
emptyValidator);
+               startupModeValidatorMap.put(STARTUP_MODE_VALUE_EARLIEST, 
emptyValidator);
+               startupModeValidatorMap.put(STARTUP_MODE_VALUE_LATEST, 
emptyValidator);
+               
startupModeValidatorMap.put(STARTUP_MODE_VALUE_SPECIFIC_OFFSETS, 
specificOffsetsValidator);
+
+               properties.validateEnum(STARTUP_MODE, true, 
toScalaImmutableMap(startupModeValidatorMap));
+               validateTableJsonMapping(properties);
+       }
+
+       private void validateTableJsonMapping(DescriptorProperties properties) {
+               Map<String, String> mappingTableField = 
JavaConversions.mapAsJavaMap(
+                               
properties.getIndexedProperty(TABLE_JSON_MAPPING, TABLE_FIELD));
+               Map<String, String> mappingJsonField = 
JavaConversions.mapAsJavaMap(
+                               
properties.getIndexedProperty(TABLE_JSON_MAPPING, JSON_FIELD));
+
+               if (mappingJsonField.size() != mappingJsonField.size()) {
+                       throw new ValidationException("Table JSON mapping must 
be one to one.");
+               }
+
+               for (int i = 0; i < mappingTableField.size(); i++) {
+                       properties.validateString(
+                                       TABLE_JSON_MAPPING + "." + i + "." + 
TABLE_FIELD,
+                                       false,
+                                       1,
+                                       Integer.MAX_VALUE);
+                       properties.validateString(
+                                       TABLE_JSON_MAPPING + "." + i + "." + 
JSON_FIELD,
+                                       false,
+                                       1,
+                                       Integer.MAX_VALUE);
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       private <K, V> scala.collection.immutable.Map<K, V> 
toScalaImmutableMap(Map<K, V> javaMap) {
+               final java.util.List<scala.Tuple2<K, V>> list = new 
java.util.ArrayList<>(javaMap.size());
+               for (final java.util.Map.Entry<K, V> entry : 
javaMap.entrySet()) {
+                       list.add(scala.Tuple2.apply(entry.getKey(), 
entry.getValue()));
+               }
+               final scala.collection.Seq<Tuple2<K, V>> seq =
+                               
scala.collection.JavaConverters.asScalaBufferConverter(list).asScala().toSeq();
+               return (scala.collection.immutable.Map<K, V>) 
scala.collection.immutable.Map$.MODULE$.apply(seq);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableFromDescriptorTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableFromDescriptorTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableFromDescriptorTestBase.java
new file mode 100644
index 0000000..964a624
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableFromDescriptorTestBase.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.descriptors.Kafka;
+
+import org.mockito.Mockito;
+
+/**
+ * Tests for {@link KafkaJsonTableSourceFactory}.
+ */
+public abstract class KafkaJsonTableFromDescriptorTestBase {
+       private static final String GROUP_ID = "test-group";
+       private static final String BOOTSTRAP_SERVERS = "localhost:1234";
+       private static final String TOPIC = "test-topic";
+
+       protected abstract String versionForTest();
+
+       protected abstract KafkaJsonTableSource.Builder builderForTest();
+
+       protected abstract void extraSettings(KafkaTableSource.Builder builder, 
Kafka kafka);
+
+       private static StreamExecutionEnvironment env = 
Mockito.mock(StreamExecutionEnvironment.class);
+       private static StreamTableEnvironment tEnv = 
TableEnvironment.getTableEnvironment(env);
+
+//     @Test
+//     public void buildJsonTableSourceTest() throws Exception {
+//             final URL url = 
getClass().getClassLoader().getResource("kafka-json-schema.json");
+//             Objects.requireNonNull(url);
+//             final String schema = FileUtils.readFileUtf8(new 
File(url.getFile()));
+//
+//             Map<String, String> tableJsonMapping = new HashMap<>();
+//             tableJsonMapping.put("fruit-name", "name");
+//             tableJsonMapping.put("fruit-count", "count");
+//             tableJsonMapping.put("event-time", "time");
+//
+//             // Construct with the builder.
+//             Properties props = new Properties();
+//             props.put("group.id", GROUP_ID);
+//             props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
+//
+//             Map<KafkaTopicPartition, Long> specificOffsets = new 
HashMap<>();
+//             specificOffsets.put(new KafkaTopicPartition(TOPIC, 0), 100L);
+//             specificOffsets.put(new KafkaTopicPartition(TOPIC, 1), 123L);
+//
+//             KafkaTableSource.Builder builder = builderForTest()
+//                             
.forJsonSchema(TableSchema.fromTypeInfo(JsonSchemaConverter.convert(schema)))
+//                             .failOnMissingField(true)
+//                             .withTableToJsonMapping(tableJsonMapping)
+//                             .withKafkaProperties(props)
+//                             .forTopic(TOPIC)
+//                             .fromSpecificOffsets(specificOffsets)
+//                             .withSchema(
+//                                             TableSchema.builder()
+//                                                             
.field("fruit-name", Types.STRING)
+//                                                             
.field("fruit-count", Types.INT)
+//                                                             
.field("event-time", Types.LONG)
+//                                                             
.field("proc-time", Types.SQL_TIMESTAMP)
+//                                                             .build())
+//                             .withProctimeAttribute("proc-time");
+//
+//             // Construct with the descriptor.
+//             Map<Integer, Long> offsets = new HashMap<>();
+//             offsets.put(0, 100L);
+//             offsets.put(1, 123L);
+//             Kafka kafka = new Kafka()
+//                             .version(versionForTest())
+//                             .groupId(GROUP_ID)
+//                             .bootstrapServers(BOOTSTRAP_SERVERS)
+//                             .topic(TOPIC)
+//                             .startupMode(StartupMode.SPECIFIC_OFFSETS)
+//                             .specificOffsets(offsets)
+//                             .tableJsonMapping(tableJsonMapping);
+//             extraSettings(builder, kafka);
+//
+//             TableSource source = tEnv
+//                             .from(kafka)
+//                             .withFormat(
+//                                             new Json()
+//                                                             .schema(schema)
+//                                                             
.failOnMissingField(true))
+//                             .withSchema(new Schema()
+//                                             .field("fruit-name", 
Types.STRING)
+//                                             .field("fruit-count", Types.INT)
+//                                             .field("event-time", Types.LONG)
+//                                             .field("proc-time", 
Types.SQL_TIMESTAMP).proctime())
+//                             .toTableSource();
+//
+//             Assert.assertEquals(builder.build(), source);
+//     }
+
+//     @Test(expected = TableException.class)
+//     public void buildJsonTableSourceFailTest() {
+//             tEnv.from(
+//                             new Kafka()
+//                                             .version(versionForTest())
+//                                             .groupId(GROUP_ID)
+//                                             
.bootstrapServers(BOOTSTRAP_SERVERS)
+//                                             .topic(TOPIC)
+//                                             
.startupMode(StartupMode.SPECIFIC_OFFSETS)
+//                                             .specificOffsets(new 
HashMap<>()))
+//                             .withFormat(
+//                                             new Json()
+//                                                             .schema("")
+//                                                             
.failOnMissingField(true))
+//                             .toTableSource();
+//     }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-json-schema.json
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-json-schema.json
 
b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-json-schema.json
new file mode 100644
index 0000000..5167e5e
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-json-schema.json
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+  "title": "Fruit",
+  "type": "object",
+  "properties": {
+    "name": {
+      "type": "string"
+    },
+    "count": {
+      "type": "integer"
+    },
+    "time": {
+      "description": "Age in years",
+      "type": "number"
+    }
+  },
+  "required": ["name", "count", "time"]
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
index 534ef39..1e88d93 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
@@ -92,6 +92,11 @@ class TableSchema(
   }
 
   /**
+    * Returns the number of columns.
+    */
+  def getColumnNum: Int = columnNames.length
+
+  /**
     * Returns all column names as an array.
     */
   def getColumnNames: Array[String] = columnNames

Reply via email to