[FLINK-8538] [table] Improve unified table sources

This closes #5564.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db2c510f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db2c510f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db2c510f

Branch: refs/heads/release-1.5
Commit: db2c510fb4f171c9e9940759e5fbaf466ec74474
Parents: 1d26062
Author: Timo Walther <twal...@apache.org>
Authored: Mon Feb 19 13:35:45 2018 +0100
Committer: Timo Walther <twal...@apache.org>
Committed: Tue Feb 27 20:23:00 2018 +0100

----------------------------------------------------------------------
 .../flink-connector-kafka-0.10/pom.xml          |   8 +
 .../kafka/Kafka010JsonTableSourceFactory.java   |   5 +-
 .../resources/tableSourceConverter.properties   |  29 -
 .../Kafka010JsonTableSourceFactoryTest.java     |  37 +
 .../kafka/Kafka010TableSourceFactoryTest.java   |  41 -
 .../flink-connector-kafka-0.11/pom.xml          |   8 +
 .../kafka/Kafka011JsonTableSourceFactory.java   |   5 +-
 .../resources/tableSourceConverter.properties   |  29 -
 .../Kafka011JsonTableSourceFactoryTest.java     |  37 +
 .../kafka/Kafka011TableSourceFactoryTest.java   |  41 -
 .../flink-connector-kafka-0.8/pom.xml           |   8 +
 .../kafka/Kafka08JsonTableSourceFactory.java    |   5 +-
 .../resources/tableSourceConverter.properties   |  29 -
 .../Kafka08JsonTableSourceFactoryTest.java      |  37 +
 .../kafka/Kafka08TableSourceFactoryTest.java    |  42 -
 .../flink-connector-kafka-0.9/pom.xml           |   8 +
 .../kafka/Kafka09JsonTableSourceFactory.java    |   5 +-
 .../resources/tableSourceConverter.properties   |  29 -
 .../Kafka09JsonTableSourceFactoryTest.java      |  37 +
 .../kafka/Kafka09TableSourceFactoryTest.java    |  41 -
 .../flink-connector-kafka-base/pom.xml          |  16 +-
 .../connectors/kafka/KafkaJsonTableSource.java  |  24 +-
 .../kafka/KafkaJsonTableSourceFactory.java      | 267 +++---
 .../connectors/kafka/KafkaTableSource.java      |  42 +-
 .../apache/flink/table/descriptors/Kafka.java   | 239 +++---
 .../flink/table/descriptors/KafkaValidator.java | 226 ++---
 .../KafkaJsonTableFromDescriptorTestBase.java   | 127 ---
 .../KafkaJsonTableSourceFactoryTestBase.java    | 145 ++++
 .../flink/table/descriptors/KafkaTest.java      | 113 +++
 .../src/test/resources/kafka-json-schema.json   |  35 -
 flink-dist/pom.xml                              |  16 -
 flink-formats/flink-json/pom.xml                |  29 +-
 .../apache/flink/table/descriptors/Json.java    | 129 +++
 .../flink/table/descriptors/JsonValidator.java  |  55 ++
 .../flink/table/descriptors/JsonTest.java       | 124 +++
 .../client/gateway/local/LocalExecutor.java     |   2 +-
 flink-libraries/flink-table/pom.xml             |  12 +
 .../resources/tableSourceConverter.properties   |   7 +
 .../apache/flink/table/api/TableSchema.scala    |   6 +-
 .../table/catalog/ExternalCatalogTable.scala    |  19 +-
 .../table/catalog/ExternalTableSourceUtil.scala |   2 +-
 .../BatchTableSourceDescriptor.scala            |   2 +-
 .../table/descriptors/ConnectorDescriptor.scala |   9 +-
 .../ConnectorDescriptorValidator.scala          |  18 +-
 .../apache/flink/table/descriptors/Csv.scala    |  13 +-
 .../descriptors/DescriptorProperties.scala      | 833 ++++++++++++++++---
 .../flink/table/descriptors/FileSystem.scala    |   5 +-
 .../table/descriptors/FormatDescriptor.scala    |   4 +-
 .../descriptors/FormatDescriptorValidator.scala |  23 +-
 .../apache/flink/table/descriptors/Json.scala   |  78 --
 .../flink/table/descriptors/JsonValidator.scala |  41 -
 .../table/descriptors/MetadataValidator.scala   |   6 +-
 .../flink/table/descriptors/Rowtime.scala       |   8 +-
 .../table/descriptors/RowtimeValidator.scala    | 179 ++--
 .../apache/flink/table/descriptors/Schema.scala |  14 +-
 .../table/descriptors/SchemaValidator.scala     | 171 +++-
 .../flink/table/descriptors/Statistics.scala    |   6 +-
 .../table/descriptors/StatisticsValidator.scala |  21 +-
 .../StreamTableSourceDescriptor.scala           |   2 +-
 .../descriptors/TableSourceDescriptor.scala     |   3 +-
 .../table/sources/CsvTableSourceFactory.scala   |  43 +-
 .../table/sources/TableSourceFactory.scala      |  11 +-
 .../sources/TableSourceFactoryService.scala     |  26 +-
 .../tsextractors/StreamRecordTimestamp.scala    |   5 +-
 .../flink/table/descriptors/CsvTest.scala       |  83 +-
 .../table/descriptors/DescriptorTestBase.scala  |  65 +-
 .../table/descriptors/FileSystemTest.scala      |  33 +-
 .../flink/table/descriptors/JsonTest.scala      |  77 --
 .../flink/table/descriptors/MetadataTest.scala  |  38 +-
 .../flink/table/descriptors/RowtimeTest.scala   |  59 +-
 .../flink/table/descriptors/SchemaTest.scala    |  84 +-
 .../table/descriptors/SchemaValidatorTest.scala |  76 ++
 .../table/descriptors/StatisticsTest.scala      |  72 +-
 .../sources/TableSourceFactoryServiceTest.scala |  20 +-
 .../table/sources/TestTableSourceFactory.scala  |   8 +-
 75 files changed, 2681 insertions(+), 1571 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml 
b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index 8b4ff38..2d273ae 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -192,6 +192,14 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
        </dependencies>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
index 1d03f6c..c639a44 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
@@ -18,12 +18,13 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import static 
org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_010;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010;
 
 /**
  * Factory for creating configured instances of {@link 
Kafka010JsonTableSource}.
  */
 public class Kafka010JsonTableSourceFactory extends 
KafkaJsonTableSourceFactory {
+
        @Override
        protected KafkaJsonTableSource.Builder createBuilder() {
                return new Kafka010JsonTableSource.Builder();
@@ -31,6 +32,6 @@ public class Kafka010JsonTableSourceFactory extends 
KafkaJsonTableSourceFactory
 
        @Override
        protected String kafkaVersion() {
-               return KAFKA_VERSION_VALUE_010;
+               return CONNECTOR_VERSION_VALUE_010;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.10/src/main/resources/tableSourceConverter.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/tableSourceConverter.properties
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/tableSourceConverter.properties
deleted file mode 100644
index 5409b49..0000000
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/tableSourceConverter.properties
+++ /dev/null
@@ -1,29 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-################################################################################
-# The config file is used to specify the packages of current module where
-# to find TableSourceConverter implementation class annotated with TableType.
-# If there are multiple packages to scan, put those packages together into a
-# string separated with ',', for example, org.package1,org.package2.
-# Please notice:
-# It's better to have a tableSourceConverter.properties in each connector 
Module
-# which offers converters instead of put all information into the
-# tableSourceConverter.properties of flink-table module.
-################################################################################
-scan.packages=org.apache.flink.streaming.connectors.kafka

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactoryTest.java
new file mode 100644
index 0000000..22cf659
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactoryTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010;
+
+/**
+ * Tests for {@link Kafka010JsonTableSourceFactory}.
+ */
+public class Kafka010JsonTableSourceFactoryTest extends 
KafkaJsonTableSourceFactoryTestBase {
+
+       @Override
+       protected String version() {
+               return CONNECTOR_VERSION_VALUE_010;
+       }
+
+       @Override
+       protected KafkaJsonTableSource.Builder builder() {
+               return Kafka010JsonTableSource.builder();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
deleted file mode 100644
index 15b89e8..0000000
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.table.descriptors.Kafka;
-
-import static 
org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_010;
-
-/**
- * Tests for {@link Kafka010JsonTableSourceFactory}.
- */
-public class Kafka010TableSourceFactoryTest extends 
KafkaJsonTableFromDescriptorTestBase {
-       protected String versionForTest() {
-               return KAFKA_VERSION_VALUE_010;
-       }
-
-       protected KafkaJsonTableSource.Builder builderForTest() {
-               return Kafka010JsonTableSource.builder();
-       }
-
-       @Override
-       protected void extraSettings(KafkaTableSource.Builder builder, Kafka 
kafka) {
-               // no extra settings
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.11/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/pom.xml 
b/flink-connectors/flink-connector-kafka-0.11/pom.xml
index 1e935f6..0fc1f13 100644
--- a/flink-connectors/flink-connector-kafka-0.11/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.11/pom.xml
@@ -201,6 +201,14 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
        </dependencies>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
index ca4d6ce..6745bb2 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
@@ -18,12 +18,13 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import static 
org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_011;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_011;
 
 /**
  * Factory for creating configured instances of {@link 
Kafka011JsonTableSource}.
  */
 public class Kafka011JsonTableSourceFactory extends 
KafkaJsonTableSourceFactory {
+
        @Override
        protected KafkaJsonTableSource.Builder createBuilder() {
                return new Kafka011JsonTableSource.Builder();
@@ -31,6 +32,6 @@ public class Kafka011JsonTableSourceFactory extends 
KafkaJsonTableSourceFactory
 
        @Override
        protected String kafkaVersion() {
-               return KAFKA_VERSION_VALUE_011;
+               return CONNECTOR_VERSION_VALUE_011;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.11/src/main/resources/tableSourceConverter.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/tableSourceConverter.properties
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/tableSourceConverter.properties
deleted file mode 100644
index 5409b49..0000000
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/tableSourceConverter.properties
+++ /dev/null
@@ -1,29 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-################################################################################
-# The config file is used to specify the packages of current module where
-# to find TableSourceConverter implementation class annotated with TableType.
-# If there are multiple packages to scan, put those packages together into a
-# string separated with ',', for example, org.package1,org.package2.
-# Please notice:
-# It's better to have a tableSourceConverter.properties in each connector 
Module
-# which offers converters instead of put all information into the
-# tableSourceConverter.properties of flink-table module.
-################################################################################
-scan.packages=org.apache.flink.streaming.connectors.kafka

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactoryTest.java
new file mode 100644
index 0000000..ed92863
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactoryTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_011;
+
+/**
+ * Tests for {@link Kafka011JsonTableSourceFactory}.
+ */
+public class Kafka011JsonTableSourceFactoryTest extends 
KafkaJsonTableSourceFactoryTestBase {
+
+       @Override
+       protected String version() {
+               return CONNECTOR_VERSION_VALUE_011;
+       }
+
+       @Override
+       protected KafkaJsonTableSource.Builder builder() {
+               return Kafka011JsonTableSource.builder();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
deleted file mode 100644
index 84ac39b..0000000
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.table.descriptors.Kafka;
-
-import static 
org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_011;
-
-/**
- * Tests for {@link Kafka011JsonTableSourceFactory}.
- */
-public class Kafka011TableSourceFactoryTest extends 
KafkaJsonTableFromDescriptorTestBase {
-       protected String versionForTest() {
-               return KAFKA_VERSION_VALUE_011;
-       }
-
-       protected KafkaJsonTableSource.Builder builderForTest() {
-               return Kafka011JsonTableSource.builder();
-       }
-
-       @Override
-       protected void extraSettings(KafkaTableSource.Builder builder, Kafka 
kafka) {
-               // no extra settings
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/pom.xml 
b/flink-connectors/flink-connector-kafka-0.8/pom.xml
index a58591e..43a9dac 100644
--- a/flink-connectors/flink-connector-kafka-0.8/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.8/pom.xml
@@ -204,6 +204,14 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
        </dependencies>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java
index e4e5096..2da805a 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java
@@ -18,12 +18,13 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import static 
org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_08;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08;
 
 /**
  * Factory for creating configured instances of {@link Kafka08JsonTableSource}.
  */
 public class Kafka08JsonTableSourceFactory extends KafkaJsonTableSourceFactory 
{
+
        @Override
        protected KafkaJsonTableSource.Builder createBuilder() {
                return new Kafka08JsonTableSource.Builder();
@@ -31,6 +32,6 @@ public class Kafka08JsonTableSourceFactory extends 
KafkaJsonTableSourceFactory {
 
        @Override
        protected String kafkaVersion() {
-               return KAFKA_VERSION_VALUE_08;
+               return CONNECTOR_VERSION_VALUE_08;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.8/src/main/resources/tableSourceConverter.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/tableSourceConverter.properties
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/tableSourceConverter.properties
deleted file mode 100644
index 5409b49..0000000
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/tableSourceConverter.properties
+++ /dev/null
@@ -1,29 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-################################################################################
-# The config file is used to specify the packages of current module where
-# to find TableSourceConverter implementation class annotated with TableType.
-# If there are multiple packages to scan, put those packages together into a
-# string separated with ',', for example, org.package1,org.package2.
-# Please notice:
-# It's better to have a tableSourceConverter.properties in each connector 
Module
-# which offers converters instead of put all information into the
-# tableSourceConverter.properties of flink-table module.
-################################################################################
-scan.packages=org.apache.flink.streaming.connectors.kafka

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java
new file mode 100644
index 0000000..0238b2b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactoryTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08;
+
+/**
+ * Tests for {@link Kafka08JsonTableSourceFactory}.
+ */
+public class Kafka08JsonTableSourceFactoryTest extends 
KafkaJsonTableSourceFactoryTestBase {
+
+       @Override
+       protected String version() {
+               return CONNECTOR_VERSION_VALUE_08;
+       }
+
+       @Override
+       protected KafkaJsonTableSource.Builder builder() {
+               return Kafka08JsonTableSource.builder();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java
deleted file mode 100644
index a2edc09..0000000
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.table.descriptors.Kafka;
-
-import static 
org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_08;
-
-/**
- * Tests for {@link Kafka08JsonTableSourceFactory}.
- */
-public class Kafka08TableSourceFactoryTest extends 
KafkaJsonTableFromDescriptorTestBase {
-       protected String versionForTest() {
-               return KAFKA_VERSION_VALUE_08;
-       }
-
-       protected KafkaJsonTableSource.Builder builderForTest() {
-               return Kafka08JsonTableSource.builder();
-       }
-
-       @Override
-       protected void extraSettings(KafkaTableSource.Builder builder, Kafka 
kafka) {
-               builder.getKafkaProps().put("zookeeper.connect", 
"localhost:1111");
-               kafka.zookeeperConnect("localhost:1111");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.9/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/pom.xml 
b/flink-connectors/flink-connector-kafka-0.9/pom.xml
index d07cb5a..1999a8d 100644
--- a/flink-connectors/flink-connector-kafka-0.9/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.9/pom.xml
@@ -174,6 +174,14 @@ under the License.
                </dependency>
 
                <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
                        <groupId>org.apache.hadoop</groupId>
                        <artifactId>hadoop-minikdc</artifactId>
                        <version>${minikdc.version}</version>

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java
index bbda4ae..9207426 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java
@@ -18,12 +18,13 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import static 
org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_09;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_09;
 
 /**
  * Factory for creating configured instances of {@link Kafka09JsonTableSource}.
  */
 public class Kafka09JsonTableSourceFactory extends KafkaJsonTableSourceFactory 
{
+
        @Override
        protected KafkaJsonTableSource.Builder createBuilder() {
                return new Kafka09JsonTableSource.Builder();
@@ -31,6 +32,6 @@ public class Kafka09JsonTableSourceFactory extends 
KafkaJsonTableSourceFactory {
 
        @Override
        protected String kafkaVersion() {
-               return KAFKA_VERSION_VALUE_09;
+               return CONNECTOR_VERSION_VALUE_09;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.9/src/main/resources/tableSourceConverter.properties
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/tableSourceConverter.properties
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/tableSourceConverter.properties
deleted file mode 100644
index 5409b49..0000000
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/tableSourceConverter.properties
+++ /dev/null
@@ -1,29 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-################################################################################
-# The config file is used to specify the packages of current module where
-# to find TableSourceConverter implementation class annotated with TableType.
-# If there are multiple packages to scan, put those packages together into a
-# string separated with ',', for example, org.package1,org.package2.
-# Please notice:
-# It's better to have a tableSourceConverter.properties in each connector 
Module
-# which offers converters instead of put all information into the
-# tableSourceConverter.properties of flink-table module.
-################################################################################
-scan.packages=org.apache.flink.streaming.connectors.kafka

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactoryTest.java
new file mode 100644
index 0000000..dd545e9
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactoryTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_09;
+
+/**
+ * Factory for creating configured instances of {@link Kafka09JsonTableSource}.
+ */
+public class Kafka09JsonTableSourceFactoryTest extends 
KafkaJsonTableSourceFactoryTestBase {
+
+       @Override
+       protected String version() {
+               return CONNECTOR_VERSION_VALUE_09;
+       }
+
+       @Override
+       protected KafkaJsonTableSource.Builder builder() {
+               return Kafka09JsonTableSource.builder();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java
deleted file mode 100644
index fc85ea7..0000000
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.table.descriptors.Kafka;
-
-import static 
org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_09;
-
-/**
- * Factory for creating configured instances of {@link Kafka09JsonTableSource}.
- */
-public class Kafka09TableSourceFactoryTest extends 
KafkaJsonTableFromDescriptorTestBase {
-       protected String versionForTest() {
-               return KAFKA_VERSION_VALUE_09;
-       }
-
-       protected KafkaJsonTableSource.Builder builderForTest() {
-               return Kafka09JsonTableSource.builder();
-       }
-
-       @Override
-       protected void extraSettings(KafkaTableSource.Builder builder, Kafka 
kafka) {
-               // no extra settings
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml 
b/flink-connectors/flink-connector-kafka-base/pom.xml
index 2ccaa2e..cea3896 100644
--- a/flink-connectors/flink-connector-kafka-base/pom.xml
+++ b/flink-connectors/flink-connector-kafka-base/pom.xml
@@ -199,23 +199,17 @@ under the License.
                </dependency>
 
                <dependency>
-                       <groupId>org.apache.hadoop</groupId>
-                       <artifactId>hadoop-minikdc</artifactId>
-                       <version>${minikdc.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
                        <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-scala_${scala.binary.version}</artifactId>
+                       
<artifactId>flink-table_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
+                       <type>test-jar</type>
                        <scope>test</scope>
                </dependency>
 
                <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-minikdc</artifactId>
+                       <version>${minikdc.version}</version>
                        <scope>test</scope>
                </dependency>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
index d2dafe7..b2bb8ff 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
@@ -84,23 +84,29 @@ public abstract class KafkaJsonTableSource extends 
KafkaTableSource implements D
 
        @Override
        public String explainSource() {
-               return "KafkaJSONTableSource";
+               return "KafkaJsonTableSource";
        }
 
        @Override
-       public boolean equals(Object other) {
-               if (super.equals(other)) {
-                       KafkaJsonTableSource otherSource = 
(KafkaJsonTableSource) other;
-                       return Objects.equals(failOnMissingField, 
otherSource.failOnMissingField)
-                                       && Objects.equals(jsonSchema, 
otherSource.jsonSchema)
-                                       && Objects.equals(fieldMapping, 
otherSource.fieldMapping);
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
                }
-               return false;
+               if (!(o instanceof KafkaJsonTableSource)) {
+                       return false;
+               }
+               if (!super.equals(o)) {
+                       return false;
+               }
+               KafkaJsonTableSource that = (KafkaJsonTableSource) o;
+               return failOnMissingField == that.failOnMissingField &&
+                       Objects.equals(jsonSchema, that.jsonSchema) &&
+                       Objects.equals(fieldMapping, that.fieldMapping);
        }
 
        @Override
        public int hashCode() {
-               return 31 * super.hashCode() + Objects.hash(failOnMissingField, 
jsonSchema, fieldMapping);
+               return Objects.hash(super.hashCode(), jsonSchema, fieldMapping, 
failOnMissingField);
        }
 
        //////// SETTERS FOR OPTIONAL PARAMETERS

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
index 918b833..2897314 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
@@ -21,65 +21,74 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.formats.json.JsonSchemaConverter;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.JsonValidator;
 import org.apache.flink.table.descriptors.KafkaValidator;
 import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.sources.TableSourceFactory;
 import org.apache.flink.types.Row;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
 import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
 import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
+import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA;
+import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_PROPERTY_VERSION;
 import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE;
-import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_VERSION;
 import static 
org.apache.flink.table.descriptors.JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD;
-import static 
org.apache.flink.table.descriptors.JsonValidator.FORMAT_SCHEMA_STRING;
+import static 
org.apache.flink.table.descriptors.JsonValidator.FORMAT_JSON_SCHEMA;
+import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_SCHEMA;
 import static 
org.apache.flink.table.descriptors.JsonValidator.FORMAT_TYPE_VALUE;
-import static 
org.apache.flink.table.descriptors.KafkaValidator.BOOTSTRAP_SERVERS;
-import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE;
-import static org.apache.flink.table.descriptors.KafkaValidator.GROUP_ID;
-import static org.apache.flink.table.descriptors.KafkaValidator.JSON_FIELD;
-import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION;
-import static org.apache.flink.table.descriptors.KafkaValidator.OFFSET;
-import static org.apache.flink.table.descriptors.KafkaValidator.PARTITION;
-import static 
org.apache.flink.table.descriptors.KafkaValidator.SPECIFIC_OFFSETS;
-import static org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE;
-import static 
org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE_VALUE_EARLIEST;
-import static 
org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE_VALUE_GROUP_OFFSETS;
-import static 
org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE_VALUE_LATEST;
-import static 
org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE_VALUE_SPECIFIC_OFFSETS;
-import static org.apache.flink.table.descriptors.KafkaValidator.TABLE_FIELD;
-import static 
org.apache.flink.table.descriptors.KafkaValidator.TABLE_JSON_MAPPING;
-import static org.apache.flink.table.descriptors.KafkaValidator.TOPIC;
-import static 
org.apache.flink.table.descriptors.KafkaValidator.ZOOKEEPER_CONNECT;
-import static org.apache.flink.table.descriptors.SchemaValidator.PROCTIME;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_STARTUP_MODE;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TOPIC;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE_KAFKA;
+import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_CLASS;
+import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_FROM;
+import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_SERIALIZED;
+import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_TIMESTAMPS_TYPE;
+import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_CLASS;
+import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_DELAY;
+import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_SERIALIZED;
+import static 
org.apache.flink.table.descriptors.RowtimeValidator.ROWTIME_WATERMARKS_TYPE;
 import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA;
-import static 
org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_VERSION;
-
-import scala.Option;
-import scala.collection.JavaConversions;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_FROM;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_NAME;
+import static 
org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_PROCTIME;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_TYPE;
 
 /**
  * Factory for creating configured instances of {@link KafkaJsonTableSource}.
  */
 public abstract class KafkaJsonTableSourceFactory implements 
TableSourceFactory<Row> {
+
        @Override
        public Map<String, String> requiredContext() {
                Map<String, String> context = new HashMap<>();
-               context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE); // kafka 
connector
-               context.put(FORMAT_TYPE(), FORMAT_TYPE_VALUE()); // Json format
-               context.put(KAFKA_VERSION, kafkaVersion()); // for different 
implementations
-               context.put(CONNECTOR_VERSION(), "1");
-               context.put(FORMAT_VERSION(), "1");
-               context.put(SCHEMA_VERSION(), "1");
+               context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA); // 
kafka
+               context.put(CONNECTOR_VERSION(), kafkaVersion());
+
+               context.put(FORMAT_TYPE(), FORMAT_TYPE_VALUE); // json format
+
+               context.put(CONNECTOR_PROPERTY_VERSION(), "1"); // backwards 
compatibility
+               context.put(FORMAT_PROPERTY_VERSION(), "1");
+
                return context;
        }
 
@@ -88,124 +97,137 @@ public abstract class KafkaJsonTableSourceFactory 
implements TableSourceFactory<
                List<String> properties = new ArrayList<>();
 
                // kafka
-               properties.add(KAFKA_VERSION);
-               properties.add(BOOTSTRAP_SERVERS);
-               properties.add(GROUP_ID);
-               properties.add(ZOOKEEPER_CONNECT);
-               properties.add(TOPIC);
-               properties.add(STARTUP_MODE);
-               properties.add(SPECIFIC_OFFSETS + ".#." + PARTITION);
-               properties.add(SPECIFIC_OFFSETS + ".#." + OFFSET);
+               properties.add(CONNECTOR_TOPIC);
+               properties.add(CONNECTOR_PROPERTIES);
+               properties.add(CONNECTOR_PROPERTIES + ".#." + 
CONNECTOR_PROPERTIES_KEY);
+               properties.add(CONNECTOR_PROPERTIES + ".#." + 
CONNECTOR_PROPERTIES_VALUE);
+               properties.add(CONNECTOR_STARTUP_MODE);
+               properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + 
CONNECTOR_SPECIFIC_OFFSETS_PARTITION);
+               properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + 
CONNECTOR_SPECIFIC_OFFSETS_OFFSET);
 
                // json format
-               properties.add(FORMAT_SCHEMA_STRING());
-               properties.add(FORMAT_FAIL_ON_MISSING_FIELD());
-
-               // table json mapping
-               properties.add(TABLE_JSON_MAPPING + ".#." + TABLE_FIELD);
-               properties.add(TABLE_JSON_MAPPING + ".#." + JSON_FIELD);
+               properties.add(FORMAT_JSON_SCHEMA);
+               properties.add(FORMAT_SCHEMA);
+               properties.add(FORMAT_FAIL_ON_MISSING_FIELD);
+               properties.add(FORMAT_DERIVE_SCHEMA());
 
                // schema
-               properties.add(SCHEMA() + ".#." + DescriptorProperties.TYPE());
-               properties.add(SCHEMA() + ".#." + DescriptorProperties.NAME());
+               properties.add(SCHEMA() + ".#." + SCHEMA_TYPE());
+               properties.add(SCHEMA() + ".#." + SCHEMA_NAME());
+               properties.add(SCHEMA() + ".#." + SCHEMA_FROM());
 
                // time attributes
-               properties.add(SCHEMA() + ".#." + PROCTIME());
-//             properties.add(SCHEMA() + ".#." + ROWTIME() + ".#." + 
TIMESTAMPS_CLASS());
-//             properties.add(SCHEMA() + ".#." + ROWTIME() + ".#." + 
TIMESTAMPS_TYPE());
+               properties.add(SCHEMA() + ".#." + SCHEMA_PROCTIME());
+               properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_TYPE());
+               properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_FROM());
+               properties.add(SCHEMA() + ".#." + ROWTIME_TIMESTAMPS_CLASS());
+               properties.add(SCHEMA() + ".#." + 
ROWTIME_TIMESTAMPS_SERIALIZED());
+               properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_TYPE());
+               properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_CLASS());
+               properties.add(SCHEMA() + ".#." + 
ROWTIME_WATERMARKS_SERIALIZED());
+               properties.add(SCHEMA() + ".#." + ROWTIME_WATERMARKS_DELAY());
 
                return properties;
        }
 
        @Override
        public TableSource<Row> create(Map<String, String> properties) {
-               DescriptorProperties params = new DescriptorProperties(true);
+               final DescriptorProperties params = new 
DescriptorProperties(true);
                params.putProperties(properties);
 
                // validate
+               new SchemaValidator(true).validate(params);
                new KafkaValidator().validate(params);
                new JsonValidator().validate(params);
-               new SchemaValidator(true).validate(params);
 
                // build
-               KafkaJsonTableSource.Builder builder = createBuilder();
-               Properties kafkaProps = new Properties();
-
-               // Set the required parameters.
-               String topic = params.getString(TOPIC).get();
-               TableSchema tableSchema = params.getTableSchema(SCHEMA()).get();
-
-               kafkaProps.put(BOOTSTRAP_SERVERS, 
params.getString(BOOTSTRAP_SERVERS).get());
-               kafkaProps.put(GROUP_ID, params.getString(GROUP_ID).get());
-
-               // Set the zookeeper connect for kafka 0.8.
-               Option<String> zkConnect = params.getString(ZOOKEEPER_CONNECT);
-               if (zkConnect.isDefined()) {
-                       kafkaProps.put(ZOOKEEPER_CONNECT, zkConnect.get());
-               }
-
-               
builder.withKafkaProperties(kafkaProps).forTopic(topic).withSchema(tableSchema);
-
-               // Set the startup mode.
-               String startupMode = params.getString(STARTUP_MODE).get();
-               if (null != startupMode) {
-                       switch (startupMode) {
-                               case STARTUP_MODE_VALUE_EARLIEST:
+               final KafkaJsonTableSource.Builder builder = createBuilder();
+
+               // topic
+               final String topic = params.getString(CONNECTOR_TOPIC);
+               builder.forTopic(topic);
+
+               // properties
+               final Properties props = new Properties();
+               final List<Map<String, String>> propsList = 
params.getFixedIndexedProperties(
+                       CONNECTOR_PROPERTIES,
+                       Arrays.asList(CONNECTOR_PROPERTIES_KEY, 
CONNECTOR_PROPERTIES_VALUE));
+               propsList.forEach(kv -> props.put(
+                       params.getString(kv.get(CONNECTOR_PROPERTIES_KEY)),
+                       params.getString(kv.get(CONNECTOR_PROPERTIES_VALUE))
+               ));
+               builder.withKafkaProperties(props);
+
+               // startup mode
+               params
+                       .getOptionalString(CONNECTOR_STARTUP_MODE)
+                       .ifPresent(startupMode -> {
+                               switch (startupMode) {
+
+                               case 
KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_EARLIEST:
                                        builder.fromEarliest();
                                        break;
-                               case STARTUP_MODE_VALUE_LATEST:
+
+                               case 
KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_LATEST:
                                        builder.fromLatest();
                                        break;
-                               case STARTUP_MODE_VALUE_GROUP_OFFSETS:
+
+                               case 
KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS:
                                        builder.fromGroupOffsets();
                                        break;
-                               case STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
-                                       Map<String, String> partitions = 
JavaConversions.
-                                                       
mapAsJavaMap(params.getIndexedProperty(SPECIFIC_OFFSETS, PARTITION));
-                                       Map<KafkaTopicPartition, Long> 
offsetMap = new HashMap<>();
-                                       for (int i = 0; i < partitions.size(); 
i++) {
-                                               offsetMap.put(
-                                                               new 
KafkaTopicPartition(
-                                                                               
topic,
-                                                                               
Integer.valueOf(params.getString(
-                                                                               
                SPECIFIC_OFFSETS + "" + "." + i + "." + PARTITION).get())),
-                                                               
Long.valueOf(params.getString(
-                                                                               
SPECIFIC_OFFSETS + "" + "." + i + "." + OFFSET).get()));
-                                       }
+
+                               case 
KafkaValidator.CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
+                                       final Map<KafkaTopicPartition, Long> 
offsetMap = new HashMap<>();
+
+                                       final List<Map<String, String>> 
offsetList = params.getFixedIndexedProperties(
+                                               CONNECTOR_SPECIFIC_OFFSETS,
+                                               
Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, 
CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
+                                       offsetList.forEach(kv -> {
+                                               final int partition = 
params.getInt(kv.get(CONNECTOR_SPECIFIC_OFFSETS_PARTITION));
+                                               final long offset = 
params.getLong(kv.get(CONNECTOR_SPECIFIC_OFFSETS_OFFSET));
+                                               final KafkaTopicPartition 
topicPartition = new KafkaTopicPartition(topic, partition);
+                                               offsetMap.put(topicPartition, 
offset);
+                                       });
                                        builder.fromSpecificOffsets(offsetMap);
                                        break;
-                       }
-               }
-
-               // Set whether fail on missing JSON field.
-               Option<String> failOnMissing = 
params.getString(FORMAT_FAIL_ON_MISSING_FIELD());
-               if (failOnMissing.isDefined()) {
-                       
builder.failOnMissingField(Boolean.valueOf(failOnMissing.get()));
+                               }
+                       });
+
+               // missing field
+               
params.getOptionalBoolean(FORMAT_FAIL_ON_MISSING_FIELD).ifPresent(builder::failOnMissingField);
+
+               // json schema
+               final TableSchema formatSchema;
+               if (params.containsKey(FORMAT_SCHEMA)) {
+                       final TypeInformation<?> info = 
params.getType(FORMAT_SCHEMA);
+                       formatSchema = TableSchema.fromTypeInfo(info);
+               } else if (params.containsKey(FORMAT_JSON_SCHEMA)) {
+                       final TypeInformation<?> info = 
JsonSchemaConverter.convert(params.getString(FORMAT_JSON_SCHEMA));
+                       formatSchema = TableSchema.fromTypeInfo(info);
+               } else {
+                       formatSchema = 
SchemaValidator.deriveFormatFields(params);
                }
+               builder.forJsonSchema(formatSchema);
 
-               // Set the JSON schema.
-               Option<String> jsonSchema = 
params.getString(FORMAT_SCHEMA_STRING());
-               if (jsonSchema.isDefined()) {
-                       TypeInformation jsonSchemaType = 
JsonSchemaConverter.convert(jsonSchema.get());
-                       
builder.forJsonSchema(TableSchema.fromTypeInfo(jsonSchemaType));
-               }
-
-               // Set the table => JSON fields mapping.
-               Map<String, String>  mappingTableFields = JavaConversions.
-                               
mapAsJavaMap(params.getIndexedProperty(TABLE_JSON_MAPPING, TABLE_FIELD));
-
-               if (!mappingTableFields.isEmpty()) {
-                       Map<String, String> tableJsonMapping = new HashMap<>();
-                       for (int i = 0; i < mappingTableFields.size(); i++) {
-                               
tableJsonMapping.put(params.getString(TABLE_JSON_MAPPING + "." + i + "." + 
TABLE_FIELD).get(),
-                                               
params.getString(TABLE_JSON_MAPPING + "." + i + "." + JSON_FIELD).get()
-                               );
-                       }
-                       builder.withTableToJsonMapping(tableJsonMapping);
+               // schema
+               final TableSchema schema = params.getTableSchema(SCHEMA());
+               builder.withSchema(schema);
+
+               // proctime
+               
SchemaValidator.deriveProctimeAttribute(params).ifPresent(builder::withProctimeAttribute);
+
+               // rowtime
+               final List<RowtimeAttributeDescriptor> descriptors = 
SchemaValidator.deriveRowtimeAttributes(params);
+               if (descriptors.size() > 1) {
+                       throw new TableException("More than one rowtime 
attribute is not supported yet.");
+               } else if (descriptors.size() == 1) {
+                       final RowtimeAttributeDescriptor desc = 
descriptors.get(0);
+                       builder.withRowtimeAttribute(desc.getAttributeName(), 
desc.getTimestampExtractor(), desc.getWatermarkStrategy());
                }
 
-               // Set the time attributes.
-               setTimeAttributes(tableSchema, params, builder);
+               // field mapping
+               final Map<String, String> mapping = 
SchemaValidator.deriveFieldMapping(params, Optional.of(formatSchema));
+               builder.withTableToJsonMapping(mapping);
 
                return builder.build();
        }
@@ -213,15 +235,4 @@ public abstract class KafkaJsonTableSourceFactory 
implements TableSourceFactory<
        protected abstract KafkaJsonTableSource.Builder createBuilder();
 
        protected abstract String kafkaVersion();
-
-       private void setTimeAttributes(TableSchema schema, DescriptorProperties 
params, KafkaJsonTableSource.Builder builder) {
-               // TODO to deal with rowtime fields
-               Option<String> proctimeField;
-               for (int i = 0; i < schema.getColumnNum(); i++) {
-                       proctimeField = params.getString(SCHEMA() + "." + i + 
"." + PROCTIME());
-                       if (proctimeField.isDefined()) {
-                               
builder.withProctimeAttribute(schema.getColumnName(i).get());
-                       }
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index 9ce3b8e..134c483 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -141,31 +141,27 @@ public abstract class KafkaTableSource
 
        @Override
        public boolean equals(Object o) {
-               if (!o.getClass().equals(this.getClass())) {
+               if (this == o) {
+                       return true;
+               }
+               if (!(o instanceof KafkaTableSource)) {
                        return false;
                }
-               KafkaTableSource other = (KafkaTableSource) o;
-               return Objects.equals(topic, other.topic)
-                               && Objects.equals(schema, other.schema)
-                               && Objects.equals(properties, other.properties)
-                               && Objects.equals(proctimeAttribute, 
other.proctimeAttribute)
-                               && Objects.equals(returnType, other.returnType)
-                               && Objects.equals(rowtimeAttributeDescriptors, 
other.rowtimeAttributeDescriptors)
-                               && Objects.equals(specificStartupOffsets, 
other.specificStartupOffsets)
-                               && Objects.equals(startupMode, 
other.startupMode);
+               KafkaTableSource that = (KafkaTableSource) o;
+               return Objects.equals(schema, that.schema) &&
+                       Objects.equals(topic, that.topic) &&
+                       Objects.equals(properties, that.properties) &&
+                       Objects.equals(returnType, that.returnType) &&
+                       Objects.equals(proctimeAttribute, 
that.proctimeAttribute) &&
+                       Objects.equals(rowtimeAttributeDescriptors, 
that.rowtimeAttributeDescriptors) &&
+                       startupMode == that.startupMode &&
+                       Objects.equals(specificStartupOffsets, 
that.specificStartupOffsets);
        }
 
        @Override
        public int hashCode() {
-               return Objects.hash(
-                               topic,
-                               schema,
-                               properties,
-                               proctimeAttribute,
-                               returnType,
-                               rowtimeAttributeDescriptors,
-                               specificStartupOffsets,
-                               startupMode);
+               return Objects.hash(schema, topic, properties, returnType,
+                       proctimeAttribute, rowtimeAttributeDescriptors, 
startupMode, specificStartupOffsets);
        }
 
        /**
@@ -211,9 +207,9 @@ public abstract class KafkaTableSource
                        // validate that field exists and is of correct type
                        Option<TypeInformation<?>> tpe = 
schema.getType(proctimeAttribute);
                        if (tpe.isEmpty()) {
-                               throw new ValidationException("Processing time 
attribute " + proctimeAttribute + " is not present in TableSchema.");
+                               throw new ValidationException("Processing time 
attribute '" + proctimeAttribute + "' is not present in TableSchema.");
                        } else if (tpe.get() != Types.SQL_TIMESTAMP()) {
-                               throw new ValidationException("Processing time 
attribute " + proctimeAttribute + " is not of type SQL_TIMESTAMP.");
+                               throw new ValidationException("Processing time 
attribute '" + proctimeAttribute + "' is not of type SQL_TIMESTAMP.");
                        }
                }
                this.proctimeAttribute = proctimeAttribute;
@@ -230,9 +226,9 @@ public abstract class KafkaTableSource
                        String rowtimeAttribute = desc.getAttributeName();
                        Option<TypeInformation<?>> tpe = 
schema.getType(rowtimeAttribute);
                        if (tpe.isEmpty()) {
-                               throw new ValidationException("Rowtime 
attribute " + rowtimeAttribute + " is not present in TableSchema.");
+                               throw new ValidationException("Rowtime 
attribute '" + rowtimeAttribute + "' is not present in TableSchema.");
                        } else if (tpe.get() != Types.SQL_TIMESTAMP()) {
-                               throw new ValidationException("Rowtime 
attribute " + rowtimeAttribute + " is not of type SQL_TIMESTAMP.");
+                               throw new ValidationException("Rowtime 
attribute '" + rowtimeAttribute + "' is not of type SQL_TIMESTAMP.");
                        }
                }
                this.rowtimeAttributeDescriptors = rowtimeAttributeDescriptors;

http://git-wip-us.apache.org/repos/asf/flink/blob/db2c510f/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
index 4733f6e..4535958 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
@@ -18,182 +18,199 @@
 
 package org.apache.flink.table.descriptors;
 
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-
-import static 
org.apache.flink.table.descriptors.KafkaValidator.BOOTSTRAP_SERVERS;
-import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE;
-import static org.apache.flink.table.descriptors.KafkaValidator.GROUP_ID;
-import static org.apache.flink.table.descriptors.KafkaValidator.JSON_FIELD;
-import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION;
-import static org.apache.flink.table.descriptors.KafkaValidator.OFFSET;
-import static org.apache.flink.table.descriptors.KafkaValidator.PARTITION;
-import static 
org.apache.flink.table.descriptors.KafkaValidator.SPECIFIC_OFFSETS;
-import static org.apache.flink.table.descriptors.KafkaValidator.TABLE_FIELD;
-import static 
org.apache.flink.table.descriptors.KafkaValidator.TABLE_JSON_MAPPING;
-import static org.apache.flink.table.descriptors.KafkaValidator.TOPIC;
-import static 
org.apache.flink.table.descriptors.KafkaValidator.ZOOKEEPER_CONNECT;
+import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-
-import scala.collection.JavaConversions;
-import scala.collection.Seq;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_STARTUP_MODE;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TOPIC;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE_KAFKA;
 
 /**
- * Connector descriptor for the kafka message queue.
+ * Connector descriptor for the Apache Kafka message queue.
  */
 public class Kafka extends ConnectorDescriptor {
 
-       private Optional<String> version = Optional.empty();
-       private Optional<String> bootstrapServers = Optional.empty();
-       private Optional<String> groupId = Optional.empty();
-       private Optional<String> topic = Optional.empty();
-       private Optional<String> zookeeperConnect = Optional.empty();
-       private Optional<Map<String, String>> tableJsonMapping = 
Optional.empty();
-
-       private Optional<StartupMode> startupMode = Optional.empty();
-       private Optional<Map<Integer, Long>> specificOffsets = Optional.empty();
+       private String version;
+       private String topic;
+       private StartupMode startupMode;
+       private Map<Integer, Long> specificOffsets;
+       private Map<String, String> kafkaProperties;
 
+       /**
+        * Connector descriptor for the Apache Kafka message queue.
+        */
        public Kafka() {
-               super(CONNECTOR_TYPE_VALUE, 1);
+               super(CONNECTOR_TYPE_VALUE_KAFKA, 1, true);
        }
 
        /**
-        * Sets the kafka version.
+        * Sets the Kafka version to be used.
         *
-        * @param version
-        * Could be {@link KafkaValidator#KAFKA_VERSION_VALUE_011},
-        * {@link KafkaValidator#KAFKA_VERSION_VALUE_010},
-        * {@link KafkaValidator#KAFKA_VERSION_VALUE_09},
-        * or {@link KafkaValidator#KAFKA_VERSION_VALUE_08}.
+        * @param version Kafka version. E.g., "0.8", "0.11", etc.
         */
        public Kafka version(String version) {
-               this.version = Optional.of(version);
+               Preconditions.checkNotNull(version);
+               this.version = version;
                return this;
        }
 
        /**
-        * Sets the bootstrap servers for kafka.
+        * Sets the topic from which the table is read.
+        *
+        * @param topic The topic from which the table is read.
         */
-       public Kafka bootstrapServers(String bootstrapServers) {
-               this.bootstrapServers = Optional.of(bootstrapServers);
+       public Kafka topic(String topic) {
+               Preconditions.checkNotNull(topic);
+               this.topic = topic;
                return this;
        }
 
        /**
-        * Sets the consumer group id.
+        * Sets the configuration properties for the Kafka consumer. Resets 
previously set properties.
+        *
+        * @param properties The configuration properties for the Kafka 
consumer.
         */
-       public Kafka groupId(String groupId) {
-               this.groupId = Optional.of(groupId);
+       public Kafka properties(Properties properties) {
+               Preconditions.checkNotNull(properties);
+               if (this.kafkaProperties == null) {
+                       this.kafkaProperties = new HashMap<>();
+               }
+               this.kafkaProperties.clear();
+               properties.forEach((k, v) -> this.kafkaProperties.put((String) 
k, (String) v));
                return this;
        }
 
        /**
-        * Sets the topic to consume.
+        * Adds a configuration properties for the Kafka consumer.
+        *
+        * @param key property key for the Kafka consumer
+        * @param value property value for the Kafka consumer
         */
-       public Kafka topic(String topic) {
-               this.topic = Optional.of(topic);
+       public Kafka property(String key, String value) {
+               Preconditions.checkNotNull(key);
+               Preconditions.checkNotNull(value);
+               if (this.kafkaProperties == null) {
+                       this.kafkaProperties = new HashMap<>();
+               }
+               kafkaProperties.put(key, value);
                return this;
        }
 
        /**
-        * Sets the startup mode.
+        * Configures to start reading from the earliest offset for all 
partitions.
+        *
+        * @see FlinkKafkaConsumerBase#setStartFromEarliest()
         */
-       public Kafka startupMode(StartupMode startupMode) {
-               this.startupMode = Optional.of(startupMode);
+       public Kafka startFromEarliest() {
+               this.startupMode = StartupMode.EARLIEST;
+               this.specificOffsets = null;
                return this;
        }
 
        /**
-        * Sets the zookeeper hosts. Only required by kafka 0.8.
+        * Configures to start reading from the latest offset for all 
partitions.
+        *
+        * @see FlinkKafkaConsumerBase#setStartFromLatest()
         */
-       public Kafka zookeeperConnect(String zookeeperConnect) {
-               this.zookeeperConnect = Optional.of(zookeeperConnect);
+       public Kafka startFromLatest() {
+               this.startupMode = StartupMode.LATEST;
+               this.specificOffsets = null;
                return this;
        }
 
        /**
-        * Sets the consume offsets for the topic set with {@link 
Kafka#topic(String)}.
-        * Only works in {@link StartupMode#SPECIFIC_OFFSETS} mode.
+        * Configures to start reading from any committed group offsets found 
in Zookeeper / Kafka brokers.
+        *
+        * @see FlinkKafkaConsumerBase#setStartFromGroupOffsets()
         */
-       public Kafka specificOffsets(Map<Integer, Long> specificOffsets) {
-               this.specificOffsets = Optional.of(specificOffsets);
+       public Kafka startFromGroupOffsets() {
+               this.startupMode = StartupMode.GROUP_OFFSETS;
+               this.specificOffsets = null;
                return this;
        }
 
        /**
-        * Sets the mapping from logical table schema to json schema.
+        * Configures to start reading partitions from specific offsets, set 
independently for each partition.
+        * Resets previously set offsets.
+        *
+        * @param specificOffsets the specified offsets for partitions
+        * @see FlinkKafkaConsumerBase#setStartFromSpecificOffsets(Map)
         */
-       public Kafka tableJsonMapping(Map<String, String> jsonTableMapping) {
-               this.tableJsonMapping = Optional.of(jsonTableMapping);
+       public Kafka startFromSpecificOffsets(Map<Integer, Long> 
specificOffsets) {
+               this.startupMode = StartupMode.SPECIFIC_OFFSETS;
+               this.specificOffsets = 
Preconditions.checkNotNull(specificOffsets);
                return this;
        }
 
+       /**
+        * Configures to start reading partitions from specific offsets and 
specifies the given offset for
+        * the given partition.
+        *
+        * @param partition partition index
+        * @param specificOffset partition offset to start reading from
+        * @see FlinkKafkaConsumerBase#setStartFromSpecificOffsets(Map)
+        */
+       public Kafka startFromSpecificOffset(int partition, long 
specificOffset) {
+               this.startupMode = StartupMode.SPECIFIC_OFFSETS;
+               if (this.specificOffsets == null) {
+                       this.specificOffsets = new HashMap<>();
+               }
+               this.specificOffsets.put(partition, specificOffset);
+               return this;
+       }
+
+       /**
+        * Internal method for connector properties conversion.
+        */
        @Override
        public void addConnectorProperties(DescriptorProperties properties) {
-               if (version.isPresent()) {
-                       properties.putString(KAFKA_VERSION, version.get());
-               }
-               if (bootstrapServers.isPresent()) {
-                       properties.putString(BOOTSTRAP_SERVERS, 
bootstrapServers.get());
-               }
-               if (groupId.isPresent()) {
-                       properties.putString(GROUP_ID, groupId.get());
-               }
-               if (topic.isPresent()) {
-                       properties.putString(TOPIC, topic.get());
+               if (version != null) {
+                       properties.putString(CONNECTOR_VERSION(), version);
                }
-               if (zookeeperConnect.isPresent()) {
-                       properties.putString(ZOOKEEPER_CONNECT, 
zookeeperConnect.get());
+
+               if (topic != null) {
+                       properties.putString(CONNECTOR_TOPIC, topic);
                }
-               if (startupMode.isPresent()) {
-                       Map<String, String> map = 
KafkaValidator.normalizeStartupMode(startupMode.get());
-                       for (Map.Entry<String, String> entry : map.entrySet()) {
-                               properties.putString(entry.getKey(), 
entry.getValue());
-                       }
+
+               if (startupMode != null) {
+                       properties.putString(CONNECTOR_STARTUP_MODE, 
KafkaValidator.normalizeStartupMode(startupMode));
                }
-               if (specificOffsets.isPresent()) {
-                       List<String> propertyKeys = new ArrayList<>();
-                       propertyKeys.add(PARTITION);
-                       propertyKeys.add(OFFSET);
-
-                       List<Seq<String>> propertyValues = new 
ArrayList<>(specificOffsets.get().size());
-                       for (Map.Entry<Integer, Long> entry : 
specificOffsets.get().entrySet()) {
-                               List<String> partitionOffset = new 
ArrayList<>(2);
-                               partitionOffset.add(entry.getKey().toString());
-                               
partitionOffset.add(entry.getValue().toString());
-                               
propertyValues.add(JavaConversions.asScalaBuffer(partitionOffset).toSeq());
+
+               if (specificOffsets != null) {
+                       final List<List<String>> values = new ArrayList<>();
+                       for (Map.Entry<Integer, Long> specificOffset : 
specificOffsets.entrySet()) {
+                               
values.add(Arrays.asList(specificOffset.getKey().toString(), 
specificOffset.getValue().toString()));
                        }
                        properties.putIndexedFixedProperties(
-                                       SPECIFIC_OFFSETS,
-                                       
JavaConversions.asScalaBuffer(propertyKeys).toSeq(),
-                                       
JavaConversions.asScalaBuffer(propertyValues).toSeq()
-                       );
+                               CONNECTOR_SPECIFIC_OFFSETS,
+                               
Arrays.asList(CONNECTOR_SPECIFIC_OFFSETS_PARTITION, 
CONNECTOR_SPECIFIC_OFFSETS_OFFSET),
+                               values);
                }
-               if (tableJsonMapping.isPresent()) {
-                       List<String> propertyKeys = new ArrayList<>();
-                       propertyKeys.add(TABLE_FIELD);
-                       propertyKeys.add(JSON_FIELD);
-
-                       List<Seq<String>> mappingFields = new 
ArrayList<>(tableJsonMapping.get().size());
-                       for (Map.Entry<String, String> entry : 
tableJsonMapping.get().entrySet()) {
-                               List<String> singleMapping = new ArrayList<>(2);
-                               singleMapping.add(entry.getKey());
-                               singleMapping.add(entry.getValue());
-                               
mappingFields.add(JavaConversions.asScalaBuffer(singleMapping).toSeq());
-                       }
+
+               if (kafkaProperties != null) {
                        properties.putIndexedFixedProperties(
-                                       TABLE_JSON_MAPPING,
-                                       
JavaConversions.asScalaBuffer(propertyKeys).toSeq(),
-                                       
JavaConversions.asScalaBuffer(mappingFields).toSeq()
-                       );
+                               CONNECTOR_PROPERTIES,
+                               Arrays.asList(CONNECTOR_PROPERTIES_KEY, 
CONNECTOR_PROPERTIES_VALUE),
+                               this.kafkaProperties.entrySet().stream()
+                                       .map(e -> Arrays.asList(e.getKey(), 
e.getValue()))
+                                       .collect(Collectors.toList())
+                               );
                }
        }
-
-       @Override
-       public boolean needsFormat() {
-               return true;
-       }
 }

Reply via email to