http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
index a3ca22f..3adc7c5 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
@@ -19,175 +19,97 @@
 package org.apache.flink.table.descriptors;
 
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
-import org.apache.flink.table.api.ValidationException;
 
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-
-import scala.Function0;
-import scala.Tuple2;
-import scala.collection.JavaConversions;
-import scala.runtime.AbstractFunction0;
-import scala.runtime.BoxedUnit;
-
+import java.util.function.Consumer;
 
 /**
  * The validator for {@link Kafka}.
  */
 public class KafkaValidator extends ConnectorDescriptorValidator {
-       // fields
-       public static final String CONNECTOR_TYPE_VALUE = "kafka";
-       public static final String KAFKA_VERSION = "kafka.version";
-       public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
-       public static final String GROUP_ID = "group.id";
-       public static final String TOPIC = "topic";
-       public static final String STARTUP_MODE = "startup.mode";
-       public static final String SPECIFIC_OFFSETS = "specific.offsets";
-       public static final String TABLE_JSON_MAPPING = "table.json.mapping";
-
-       public static final String PARTITION = "partition";
-       public static final String OFFSET = "offset";
-
-       public static final String TABLE_FIELD = "table.field";
-       public static final String JSON_FIELD = "json.field";
-
-       public static final String ZOOKEEPER_CONNECT = "zookeeper.connect"; // 
only required for 0.8
 
-       // values
-       public static final String KAFKA_VERSION_VALUE_08 = "0.8";
-       public static final String KAFKA_VERSION_VALUE_09 = "0.9";
-       public static final String KAFKA_VERSION_VALUE_010 = "0.10";
-       public static final String KAFKA_VERSION_VALUE_011 = "0.11";
-
-       public static final String STARTUP_MODE_VALUE_EARLIEST = 
"earliest-offset";
-       public static final String STARTUP_MODE_VALUE_LATEST = "latest-offset";
-       public static final String STARTUP_MODE_VALUE_GROUP_OFFSETS = 
"group-offsets";
-       public static final String STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = 
"specific-offsets";
-
-       // utils
-       public static Map<String, String> normalizeStartupMode(StartupMode 
startupMode) {
-               Map<String, String> mapPair = new HashMap<>();
-               switch (startupMode) {
-                       case EARLIEST:
-                               mapPair.put(STARTUP_MODE, 
STARTUP_MODE_VALUE_EARLIEST);
-                               break;
-                       case LATEST:
-                               mapPair.put(STARTUP_MODE, 
STARTUP_MODE_VALUE_LATEST);
-                               break;
-                       case GROUP_OFFSETS:
-                               mapPair.put(STARTUP_MODE, 
STARTUP_MODE_VALUE_GROUP_OFFSETS);
-                               break;
-                       case SPECIFIC_OFFSETS:
-                               mapPair.put(STARTUP_MODE, 
STARTUP_MODE_VALUE_SPECIFIC_OFFSETS);
-                               break;
-               }
-               return mapPair;
-       }
+       public static final String CONNECTOR_TYPE_VALUE_KAFKA = "kafka";
+       public static final String CONNECTOR_VERSION_VALUE_08 = "0.8";
+       public static final String CONNECTOR_VERSION_VALUE_09 = "0.9";
+       public static final String CONNECTOR_VERSION_VALUE_010 = "0.10";
+       public static final String CONNECTOR_VERSION_VALUE_011 = "0.11";
+       public static final String CONNECTOR_TOPIC = "connector.topic";
+       public static final String CONNECTOR_STARTUP_MODE = 
"connector.startup-mode";
+       public static final String CONNECTOR_STARTUP_MODE_VALUE_EARLIEST = 
"earliest-offset";
+       public static final String CONNECTOR_STARTUP_MODE_VALUE_LATEST = 
"latest-offset";
+       public static final String CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS = 
"group-offsets";
+       public static final String 
CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets";
+       public static final String CONNECTOR_SPECIFIC_OFFSETS = 
"connector.specific-offsets";
+       public static final String CONNECTOR_SPECIFIC_OFFSETS_PARTITION = 
"partition";
+       public static final String CONNECTOR_SPECIFIC_OFFSETS_OFFSET = "offset";
+       public static final String CONNECTOR_PROPERTIES = 
"connector.properties";
+       public static final String CONNECTOR_PROPERTIES_KEY = "key";
+       public static final String CONNECTOR_PROPERTIES_VALUE = "value";
 
        @Override
        public void validate(DescriptorProperties properties) {
                super.validate(properties);
-
-               AbstractFunction0<BoxedUnit> emptyValidator = new 
AbstractFunction0<BoxedUnit>() {
-                       @Override
-                       public BoxedUnit apply() {
-                               return BoxedUnit.UNIT;
-                       }
-               };
-
-               properties.validateValue(CONNECTOR_TYPE(), 
CONNECTOR_TYPE_VALUE, false);
-
-               AbstractFunction0<BoxedUnit> version08Validator = new 
AbstractFunction0<BoxedUnit>() {
-                       @Override
-                       public BoxedUnit apply() {
-                               properties.validateString(ZOOKEEPER_CONNECT, 
false, 0, Integer.MAX_VALUE);
-                               return BoxedUnit.UNIT;
-                       }
-               };
-
-               Map<String, Function0<BoxedUnit>> versionValidatorMap = new 
HashMap<>();
-               versionValidatorMap.put(KAFKA_VERSION_VALUE_08, 
version08Validator);
-               versionValidatorMap.put(KAFKA_VERSION_VALUE_09, emptyValidator);
-               versionValidatorMap.put(KAFKA_VERSION_VALUE_010, 
emptyValidator);
-               versionValidatorMap.put(KAFKA_VERSION_VALUE_011, 
emptyValidator);
-               properties.validateEnum(
-                               KAFKA_VERSION,
+               properties.validateValue(CONNECTOR_TYPE(), 
CONNECTOR_TYPE_VALUE_KAFKA, false);
+
+               final List<String> versions = Arrays.asList(
+                       CONNECTOR_VERSION_VALUE_08,
+                       CONNECTOR_VERSION_VALUE_09,
+                       CONNECTOR_VERSION_VALUE_010,
+                       CONNECTOR_VERSION_VALUE_011);
+               properties.validateEnumValues(CONNECTOR_VERSION(), false, 
versions);
+               properties.validateString(CONNECTOR_TOPIC, false, 1, 
Integer.MAX_VALUE);
+
+               final Map<String, Consumer<String>> specificOffsetValidators = 
new HashMap<>();
+               specificOffsetValidators.put(
+                       CONNECTOR_SPECIFIC_OFFSETS_PARTITION,
+                       (prefix) -> properties.validateInt(
+                               prefix + CONNECTOR_SPECIFIC_OFFSETS_PARTITION,
                                false,
-                               toScalaImmutableMap(versionValidatorMap)
-               );
-
-               properties.validateString(BOOTSTRAP_SERVERS, false, 1, 
Integer.MAX_VALUE);
-               properties.validateString(GROUP_ID, false, 1, 
Integer.MAX_VALUE);
-               properties.validateString(TOPIC, false, 1, Integer.MAX_VALUE);
-
-               AbstractFunction0<BoxedUnit> specificOffsetsValidator = new 
AbstractFunction0<BoxedUnit>() {
-                       @Override
-                       public BoxedUnit apply() {
-                               Map<String, String> partitions = 
JavaConversions.mapAsJavaMap(
-                                               
properties.getIndexedProperty(SPECIFIC_OFFSETS, PARTITION));
-
-                               Map<String, String> offsets = 
JavaConversions.mapAsJavaMap(
-                                               
properties.getIndexedProperty(SPECIFIC_OFFSETS, OFFSET));
-                               if (partitions.isEmpty() || offsets.isEmpty()) {
-                                       throw new ValidationException("Offsets 
must be set for SPECIFIC_OFFSETS mode.");
-                               }
-                               for (int i = 0; i < partitions.size(); ++i) {
-                                       properties.validateInt(
-                                                       SPECIFIC_OFFSETS + "." 
+ i + "." + PARTITION,
-                                                       false,
-                                                       0,
-                                                       Integer.MAX_VALUE);
-                                       properties.validateLong(
-                                                       SPECIFIC_OFFSETS + "." 
+ i + "." + OFFSET,
-                                                       false,
-                                                       0,
-                                                       Long.MAX_VALUE);
-                               }
-                               return BoxedUnit.UNIT;
-                       }
-               };
-               Map<String, Function0<BoxedUnit>> startupModeValidatorMap = new 
HashMap<>();
-               startupModeValidatorMap.put(STARTUP_MODE_VALUE_GROUP_OFFSETS, 
emptyValidator);
-               startupModeValidatorMap.put(STARTUP_MODE_VALUE_EARLIEST, 
emptyValidator);
-               startupModeValidatorMap.put(STARTUP_MODE_VALUE_LATEST, 
emptyValidator);
-               
startupModeValidatorMap.put(STARTUP_MODE_VALUE_SPECIFIC_OFFSETS, 
specificOffsetsValidator);
-
-               properties.validateEnum(STARTUP_MODE, true, 
toScalaImmutableMap(startupModeValidatorMap));
-               validateTableJsonMapping(properties);
+                               0,
+                               Integer.MAX_VALUE));
+               specificOffsetValidators.put(
+                       CONNECTOR_SPECIFIC_OFFSETS_OFFSET,
+                       (prefix) -> properties.validateLong(
+                               prefix + CONNECTOR_SPECIFIC_OFFSETS_OFFSET,
+                               false,
+                               0,
+                               Long.MAX_VALUE));
+
+               final Map<String, Consumer<String>> startupModeValidation = new 
HashMap<>();
+               
startupModeValidation.put(CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS, 
properties.noValidation());
+               
startupModeValidation.put(CONNECTOR_STARTUP_MODE_VALUE_EARLIEST, 
properties.noValidation());
+               startupModeValidation.put(CONNECTOR_STARTUP_MODE_VALUE_LATEST, 
properties.noValidation());
+               startupModeValidation.put(
+                       CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS,
+                       prefix -> 
properties.validateFixedIndexedProperties(CONNECTOR_SPECIFIC_OFFSETS, false, 
specificOffsetValidators));
+               properties.validateEnum(CONNECTOR_STARTUP_MODE, true, 
startupModeValidation);
+
+               final Map<String, Consumer<String>> propertyValidators = new 
HashMap<>();
+               propertyValidators.put(
+                       CONNECTOR_PROPERTIES_KEY,
+                       prefix -> properties.validateString(prefix + 
CONNECTOR_PROPERTIES_KEY, false, 1, Integer.MAX_VALUE));
+               propertyValidators.put(
+                       CONNECTOR_PROPERTIES_VALUE,
+                       prefix -> properties.validateString(prefix + 
CONNECTOR_PROPERTIES_VALUE, false, 0, Integer.MAX_VALUE));
+               properties.validateFixedIndexedProperties(CONNECTOR_PROPERTIES, 
true, propertyValidators);
        }
 
-       private void validateTableJsonMapping(DescriptorProperties properties) {
-               Map<String, String> mappingTableField = 
JavaConversions.mapAsJavaMap(
-                               
properties.getIndexedProperty(TABLE_JSON_MAPPING, TABLE_FIELD));
-               Map<String, String> mappingJsonField = 
JavaConversions.mapAsJavaMap(
-                               
properties.getIndexedProperty(TABLE_JSON_MAPPING, JSON_FIELD));
-
-               if (mappingJsonField.size() != mappingJsonField.size()) {
-                       throw new ValidationException("Table JSON mapping must 
be one to one.");
-               }
-
-               for (int i = 0; i < mappingTableField.size(); i++) {
-                       properties.validateString(
-                                       TABLE_JSON_MAPPING + "." + i + "." + 
TABLE_FIELD,
-                                       false,
-                                       1,
-                                       Integer.MAX_VALUE);
-                       properties.validateString(
-                                       TABLE_JSON_MAPPING + "." + i + "." + 
JSON_FIELD,
-                                       false,
-                                       1,
-                                       Integer.MAX_VALUE);
-               }
-       }
+       // utilities
 
-       @SuppressWarnings("unchecked")
-       private <K, V> scala.collection.immutable.Map<K, V> 
toScalaImmutableMap(Map<K, V> javaMap) {
-               final java.util.List<scala.Tuple2<K, V>> list = new 
java.util.ArrayList<>(javaMap.size());
-               for (final java.util.Map.Entry<K, V> entry : 
javaMap.entrySet()) {
-                       list.add(scala.Tuple2.apply(entry.getKey(), 
entry.getValue()));
+       public static String normalizeStartupMode(StartupMode startupMode) {
+               switch (startupMode) {
+                       case EARLIEST:
+                               return CONNECTOR_STARTUP_MODE_VALUE_EARLIEST;
+                       case LATEST:
+                               return CONNECTOR_STARTUP_MODE_VALUE_LATEST;
+                       case GROUP_OFFSETS:
+                               return 
CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS;
+                       case SPECIFIC_OFFSETS:
+                               return 
CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS;
                }
-               final scala.collection.Seq<Tuple2<K, V>> seq =
-                               
scala.collection.JavaConverters.asScalaBufferConverter(list).asScala().toSeq();
-               return (scala.collection.immutable.Map<K, V>) 
scala.collection.immutable.Map$.MODULE$.apply(seq);
+               throw new IllegalArgumentException("Invalid startup mode.");
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableFromDescriptorTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableFromDescriptorTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableFromDescriptorTestBase.java
deleted file mode 100644
index 964a624..0000000
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableFromDescriptorTestBase.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.descriptors.Kafka;
-
-import org.mockito.Mockito;
-
-/**
- * Tests for {@link KafkaJsonTableSourceFactory}.
- */
-public abstract class KafkaJsonTableFromDescriptorTestBase {
-       private static final String GROUP_ID = "test-group";
-       private static final String BOOTSTRAP_SERVERS = "localhost:1234";
-       private static final String TOPIC = "test-topic";
-
-       protected abstract String versionForTest();
-
-       protected abstract KafkaJsonTableSource.Builder builderForTest();
-
-       protected abstract void extraSettings(KafkaTableSource.Builder builder, 
Kafka kafka);
-
-       private static StreamExecutionEnvironment env = 
Mockito.mock(StreamExecutionEnvironment.class);
-       private static StreamTableEnvironment tEnv = 
TableEnvironment.getTableEnvironment(env);
-
-//     @Test
-//     public void buildJsonTableSourceTest() throws Exception {
-//             final URL url = 
getClass().getClassLoader().getResource("kafka-json-schema.json");
-//             Objects.requireNonNull(url);
-//             final String schema = FileUtils.readFileUtf8(new 
File(url.getFile()));
-//
-//             Map<String, String> tableJsonMapping = new HashMap<>();
-//             tableJsonMapping.put("fruit-name", "name");
-//             tableJsonMapping.put("fruit-count", "count");
-//             tableJsonMapping.put("event-time", "time");
-//
-//             // Construct with the builder.
-//             Properties props = new Properties();
-//             props.put("group.id", GROUP_ID);
-//             props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
-//
-//             Map<KafkaTopicPartition, Long> specificOffsets = new 
HashMap<>();
-//             specificOffsets.put(new KafkaTopicPartition(TOPIC, 0), 100L);
-//             specificOffsets.put(new KafkaTopicPartition(TOPIC, 1), 123L);
-//
-//             KafkaTableSource.Builder builder = builderForTest()
-//                             
.forJsonSchema(TableSchema.fromTypeInfo(JsonSchemaConverter.convert(schema)))
-//                             .failOnMissingField(true)
-//                             .withTableToJsonMapping(tableJsonMapping)
-//                             .withKafkaProperties(props)
-//                             .forTopic(TOPIC)
-//                             .fromSpecificOffsets(specificOffsets)
-//                             .withSchema(
-//                                             TableSchema.builder()
-//                                                             
.field("fruit-name", Types.STRING)
-//                                                             
.field("fruit-count", Types.INT)
-//                                                             
.field("event-time", Types.LONG)
-//                                                             
.field("proc-time", Types.SQL_TIMESTAMP)
-//                                                             .build())
-//                             .withProctimeAttribute("proc-time");
-//
-//             // Construct with the descriptor.
-//             Map<Integer, Long> offsets = new HashMap<>();
-//             offsets.put(0, 100L);
-//             offsets.put(1, 123L);
-//             Kafka kafka = new Kafka()
-//                             .version(versionForTest())
-//                             .groupId(GROUP_ID)
-//                             .bootstrapServers(BOOTSTRAP_SERVERS)
-//                             .topic(TOPIC)
-//                             .startupMode(StartupMode.SPECIFIC_OFFSETS)
-//                             .specificOffsets(offsets)
-//                             .tableJsonMapping(tableJsonMapping);
-//             extraSettings(builder, kafka);
-//
-//             TableSource source = tEnv
-//                             .from(kafka)
-//                             .withFormat(
-//                                             new Json()
-//                                                             .schema(schema)
-//                                                             
.failOnMissingField(true))
-//                             .withSchema(new Schema()
-//                                             .field("fruit-name", 
Types.STRING)
-//                                             .field("fruit-count", Types.INT)
-//                                             .field("event-time", Types.LONG)
-//                                             .field("proc-time", 
Types.SQL_TIMESTAMP).proctime())
-//                             .toTableSource();
-//
-//             Assert.assertEquals(builder.build(), source);
-//     }
-
-//     @Test(expected = TableException.class)
-//     public void buildJsonTableSourceFailTest() {
-//             tEnv.from(
-//                             new Kafka()
-//                                             .version(versionForTest())
-//                                             .groupId(GROUP_ID)
-//                                             
.bootstrapServers(BOOTSTRAP_SERVERS)
-//                                             .topic(TOPIC)
-//                                             
.startupMode(StartupMode.SPECIFIC_OFFSETS)
-//                                             .specificOffsets(new 
HashMap<>()))
-//                             .withFormat(
-//                                             new Json()
-//                                                             .schema("")
-//                                                             
.failOnMissingField(true))
-//                             .toTableSource();
-//     }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
new file mode 100644
index 0000000..2b081a9
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.formats.json.JsonSchemaConverter;
+import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.FormatDescriptor;
+import org.apache.flink.table.descriptors.Json;
+import org.apache.flink.table.descriptors.Kafka;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.table.descriptors.TestTableSourceDescriptor;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceFactoryService;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link KafkaJsonTableSourceFactory}.
+ */
+public abstract class KafkaJsonTableSourceFactoryTestBase {
+
+       private static final String JSON_SCHEMA =
+               "{" +
+               "  'title': 'Fruit'," +
+               "  'type': 'object'," +
+               "  'properties': {" +
+               "    'name': {" +
+               "      'type': 'string'" +
+               "    }," +
+               "    'count': {" +
+               "      'type': 'integer'" +
+               "    }," +
+               "    'time': {" +
+               "      'description': 'Age in years'," +
+               "      'type': 'number'" +
+               "    }" + "  }," +
+               "  'required': ['name', 'count', 'time']" +
+               "}";
+
+       private static final String TOPIC = "test-topic";
+
+       protected abstract String version();
+
+       protected abstract KafkaJsonTableSource.Builder builder();
+
+       @Test
+       public void testTableSourceFromJsonSchema() {
+               testTableSource(
+                       new Json()
+                               .jsonSchema(JSON_SCHEMA)
+                               .failOnMissingField(true)
+               );
+       }
+
+       @Test
+       public void testTableSourceDerivedSchema() {
+               testTableSource(
+                       new Json()
+                               .deriveSchema()
+                               .failOnMissingField(true)
+               );
+       }
+
+       private void testTableSource(FormatDescriptor format) {
+               // construct table source using a builder
+
+               final Map<String, String> tableJsonMapping = new HashMap<>();
+               tableJsonMapping.put("fruit-name", "name");
+               tableJsonMapping.put("count", "count");
+               tableJsonMapping.put("event-time", "time");
+
+               final Properties props = new Properties();
+               props.put("group.id", "test-group");
+               props.put("bootstrap.servers", "localhost:1234");
+
+               final Map<KafkaTopicPartition, Long> specificOffsets = new 
HashMap<>();
+               specificOffsets.put(new KafkaTopicPartition(TOPIC, 0), 100L);
+               specificOffsets.put(new KafkaTopicPartition(TOPIC, 1), 123L);
+
+               final KafkaTableSource builderSource = builder()
+                               
.forJsonSchema(TableSchema.fromTypeInfo(JsonSchemaConverter.convert(JSON_SCHEMA)))
+                               .failOnMissingField(true)
+                               .withTableToJsonMapping(tableJsonMapping)
+                               .withKafkaProperties(props)
+                               .forTopic(TOPIC)
+                               .fromSpecificOffsets(specificOffsets)
+                               .withSchema(
+                                       TableSchema.builder()
+                                               .field("fruit-name", 
Types.STRING)
+                                               .field("count", Types.BIG_INT)
+                                               .field("event-time", 
Types.BIG_DEC)
+                                               .field("proc-time", 
Types.SQL_TIMESTAMP)
+                                               .build())
+                               .withProctimeAttribute("proc-time")
+                               .build();
+
+               // construct table source using descriptors and table source 
factory
+
+               final Map<Integer, Long> offsets = new HashMap<>();
+               offsets.put(0, 100L);
+               offsets.put(1, 123L);
+
+               final TestTableSourceDescriptor testDesc = new 
TestTableSourceDescriptor(
+                               new Kafka()
+                                       .version(version())
+                                       .topic(TOPIC)
+                                       .properties(props)
+                                       .startFromSpecificOffsets(offsets))
+                       .addFormat(format)
+                       .addSchema(
+                               new Schema()
+                                               .field("fruit-name", 
Types.STRING).from("name")
+                                               .field("count", Types.BIG_INT) 
// no from so it must match with the input
+                                               .field("event-time", 
Types.BIG_DEC).from("time")
+                                               .field("proc-time", 
Types.SQL_TIMESTAMP).proctime());
+
+               final TableSource<?> factorySource = 
TableSourceFactoryService.findAndCreateTableSource(testDesc);
+
+               assertEquals(builderSource, factorySource);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java
new file mode 100644
index 0000000..f3d96f1
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Tests for the {@link Kafka} descriptor.
+ */
+public class KafkaTest extends DescriptorTestBase {
+
+       @Override
+       public List<Descriptor> descriptors() {
+               final Descriptor earliestDesc =
+                       new Kafka()
+                               .version("0.8")
+                               .startFromEarliest()
+                               .topic("WhateverTopic");
+
+               final Descriptor specificOffsetsDesc =
+                       new Kafka()
+                               .version("0.11")
+                               .topic("MyTable")
+                               .startFromSpecificOffset(0, 42L)
+                               .startFromSpecificOffset(1, 300L)
+                               .property("zookeeper.stuff", "12")
+                               .property("kafka.stuff", "42");
+
+               final Map<Integer, Long> offsets = new HashMap<>();
+               offsets.put(0, 42L);
+               offsets.put(1, 300L);
+
+               final Properties properties = new Properties();
+               properties.put("zookeeper.stuff", "12");
+               properties.put("kafka.stuff", "42");
+
+               final Descriptor specificOffsetsMapDesc =
+                       new Kafka()
+                               .version("0.11")
+                               .topic("MyTable")
+                               .startFromSpecificOffsets(offsets)
+                               .properties(properties);
+
+               return Arrays.asList(earliestDesc, specificOffsetsDesc, 
specificOffsetsMapDesc);
+       }
+
+       @Override
+       public List<Map<String, String>> properties() {
+               final Map<String, String> props1 = new HashMap<>();
+               props1.put("connector.property-version", "1");
+               props1.put("connector.type", "kafka");
+               props1.put("connector.version", "0.8");
+               props1.put("connector.topic", "WhateverTopic");
+               props1.put("connector.startup-mode", "earliest-offset");
+
+               final Map<String, String> props2 = new HashMap<>();
+               props2.put("connector.property-version", "1");
+               props2.put("connector.type", "kafka");
+               props2.put("connector.version", "0.11");
+               props2.put("connector.topic", "MyTable");
+               props2.put("connector.startup-mode", "specific-offsets");
+               props2.put("connector.specific-offsets.0.partition", "0");
+               props2.put("connector.specific-offsets.0.offset", "42");
+               props2.put("connector.specific-offsets.1.partition", "1");
+               props2.put("connector.specific-offsets.1.offset", "300");
+               props2.put("connector.properties.0.key", "zookeeper.stuff");
+               props2.put("connector.properties.0.value", "12");
+               props2.put("connector.properties.1.key", "kafka.stuff");
+               props2.put("connector.properties.1.value", "42");
+
+               final Map<String, String> props3 = new HashMap<>();
+               props3.put("connector.property-version", "1");
+               props3.put("connector.type", "kafka");
+               props3.put("connector.version", "0.11");
+               props3.put("connector.topic", "MyTable");
+               props3.put("connector.startup-mode", "specific-offsets");
+               props3.put("connector.specific-offsets.0.partition", "0");
+               props3.put("connector.specific-offsets.0.offset", "42");
+               props3.put("connector.specific-offsets.1.partition", "1");
+               props3.put("connector.specific-offsets.1.offset", "300");
+               props3.put("connector.properties.0.key", "zookeeper.stuff");
+               props3.put("connector.properties.0.value", "12");
+               props3.put("connector.properties.1.key", "kafka.stuff");
+               props3.put("connector.properties.1.value", "42");
+
+               return Arrays.asList(props1, props2, props3);
+       }
+
+       @Override
+       public DescriptorValidator validator() {
+               return new KafkaValidator();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-json-schema.json
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-json-schema.json
 
b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-json-schema.json
deleted file mode 100644
index 5167e5e..0000000
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-json-schema.json
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-{
-  "title": "Fruit",
-  "type": "object",
-  "properties": {
-    "name": {
-      "type": "string"
-    },
-    "count": {
-      "type": "integer"
-    },
-    "time": {
-      "description": "Age in years",
-      "type": "number"
-    }
-  },
-  "required": ["name", "count", "time"]
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 019047c..d8477fe 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -337,22 +337,6 @@ under the License.
                </dependency>
                <!-- end optional Flink libraries -->
 
-               <!-- start optional Flink formats -->
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-avro</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-json</artifactId>
-                       <version>${project.version}</version>
-                       <scope>provided</scope>
-               </dependency>
-               <!-- end optional Flink formats -->
-
                <!-- test dependencies -->
 
                <dependency>

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-formats/flink-json/pom.xml
----------------------------------------------------------------------
diff --git a/flink-formats/flink-json/pom.xml b/flink-formats/flink-json/pom.xml
index d0f55ab..3a80b0e 100644
--- a/flink-formats/flink-json/pom.xml
+++ b/flink-formats/flink-json/pom.xml
@@ -50,6 +50,33 @@ under the License.
                        <version>${project.version}</version>
                        <scope>provided</scope>
                </dependency>
-       </dependencies>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <!-- use a dedicated Scala version to not depend on it 
-->
+                       <artifactId>flink-table_2.11</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+                       <!-- Projects depending on this project, won't depend 
on flink-table. -->
+                       <optional>true</optional>
+               </dependency>
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <!-- use a dedicated Scala version to not depend on it 
-->
+                       <artifactId>flink-table_2.11</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <!-- flink-table needs Scala -->
+               <dependency>
+                       <groupId>org.scala-lang</groupId>
+                       <artifactId>scala-compiler</artifactId>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java
new file mode 100644
index 0000000..9c12191
--- /dev/null
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.typeutils.TypeStringUtils;
+import org.apache.flink.util.Preconditions;
+
+import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA;
+import static 
org.apache.flink.table.descriptors.JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD;
+import static 
org.apache.flink.table.descriptors.JsonValidator.FORMAT_JSON_SCHEMA;
+import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_SCHEMA;
+import static 
org.apache.flink.table.descriptors.JsonValidator.FORMAT_TYPE_VALUE;
+
+/**
+  * Format descriptor for JSON.
+  */
+public class Json extends FormatDescriptor {
+
+       private Boolean failOnMissingField;
+       private Boolean deriveSchema;
+       private String jsonSchema;
+       private String schema;
+
+       /**
+         * Format descriptor for JSON.
+         */
+       public Json() {
+               super(FORMAT_TYPE_VALUE, 1);
+       }
+
+       /**
+        * Sets flag whether to fail if a field is missing or not.
+        *
+        * @param failOnMissingField If set to true, the operation fails if 
there is a missing field.
+        *                           If set to false, a missing field is set to 
null.
+        */
+       public Json failOnMissingField(boolean failOnMissingField) {
+               this.failOnMissingField = failOnMissingField;
+               return this;
+       }
+
+       /**
+        * Sets the JSON schema string with field names and the types according 
to the JSON schema
+        * specification [[http://json-schema.org/specification.html]].
+        *
+        * <p>The schema might be nested.
+        *
+        * @param jsonSchema JSON schema
+        */
+       public Json jsonSchema(String jsonSchema) {
+               Preconditions.checkNotNull(jsonSchema);
+               this.jsonSchema = jsonSchema;
+               this.schema = null;
+               this.deriveSchema = null;
+               return this;
+       }
+
+       /**
+        * Sets the schema using type information.
+        *
+        * <p>JSON objects are represented as ROW types.
+        *
+        * <p>The schema might be nested.
+        *
+        * @param schemaType type information that describes the schema
+        */
+       public Json schema(TypeInformation<?> schemaType) {
+               Preconditions.checkNotNull(schemaType);
+               this.schema = TypeStringUtils.writeTypeInfo(schemaType);
+               this.jsonSchema = null;
+               this.deriveSchema = null;
+               return this;
+       }
+
+       /**
+        * Derives the format schema from the table's schema described using 
{@link Schema}.
+        *
+        * <p>This allows for defining schema information only once.
+        *
+        * <p>The names, types, and field order of the format are determined by 
the table's
+        * schema. Time attributes are ignored. A "from" definition is 
interpreted as a field renaming
+        * in the format.
+        */
+       public Json deriveSchema() {
+               this.deriveSchema = true;
+               this.schema = null;
+               this.jsonSchema = null;
+               return this;
+       }
+
+       /**
+        * Internal method for format properties conversion.
+        */
+       @Override
+       public void addFormatProperties(DescriptorProperties properties) {
+               if (deriveSchema != null) {
+                       properties.putBoolean(FORMAT_DERIVE_SCHEMA(), 
deriveSchema);
+               }
+
+               if (jsonSchema != null) {
+                       properties.putString(FORMAT_JSON_SCHEMA, jsonSchema);
+               }
+
+               if (schema != null) {
+                       properties.putString(FORMAT_SCHEMA, schema);
+               }
+
+               if (failOnMissingField != null) {
+                       properties.putBoolean(FORMAT_FAIL_ON_MISSING_FIELD, 
failOnMissingField);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java
 
b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java
new file mode 100644
index 0000000..fea7cf5
--- /dev/null
+++ 
b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.table.api.ValidationException;
+
+/**
+  * Validator for {@link Json}.
+  */
+public class JsonValidator extends FormatDescriptorValidator {
+
+       public static final String FORMAT_TYPE_VALUE = "json";
+       public static final String FORMAT_SCHEMA = "format.schema";
+       public static final String FORMAT_JSON_SCHEMA = "format.json-schema";
+       public static final String FORMAT_FAIL_ON_MISSING_FIELD = 
"format.fail-on-missing-field";
+
+       @Override
+       public void validate(DescriptorProperties properties) {
+               super.validate(properties);
+               properties.validateBoolean(FORMAT_DERIVE_SCHEMA(), true);
+               final boolean deriveSchema = 
properties.getOptionalBoolean(FORMAT_DERIVE_SCHEMA()).orElse(false);
+               final boolean hasSchema = properties.containsKey(FORMAT_SCHEMA);
+               final boolean hasSchemaString = 
properties.containsKey(FORMAT_JSON_SCHEMA);
+               if (deriveSchema && (hasSchema || hasSchemaString)) {
+                       throw new ValidationException(
+                               "Format cannot define a schema and derive from 
the table's schema at the same time.");
+               } else if (!deriveSchema && hasSchema && hasSchemaString) {
+                       throw new ValidationException("A definition of both a 
schema and JSON schema is not allowed.");
+               } else if (!deriveSchema && !hasSchema && !hasSchemaString) {
+                       throw new ValidationException("A definition of a schema 
or JSON schema is required.");
+               } else if (hasSchema) {
+                       properties.validateType(FORMAT_SCHEMA, false);
+               } else if (hasSchemaString) {
+                       properties.validateString(FORMAT_JSON_SCHEMA, false, 1);
+               }
+
+               properties.validateBoolean(FORMAT_FAIL_ON_MISSING_FIELD, true);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java
----------------------------------------------------------------------
diff --git 
a/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java
 
b/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java
new file mode 100644
index 0000000..6e370a0
--- /dev/null
+++ 
b/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.api.ValidationException;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for the {@link Json} descriptor.
+ */
+public class JsonTest extends DescriptorTestBase {
+
+       private static final String JSON_SCHEMA =
+               "{" +
+               "    'title': 'Person'," +
+               "    'type': 'object'," +
+               "    'properties': {" +
+               "        'firstName': {" +
+               "            'type': 'string'" +
+               "        }," +
+               "        'lastName': {" +
+               "            'type': 'string'" +
+               "        }," +
+               "        'age': {" +
+               "            'description': 'Age in years'," +
+               "            'type': 'integer'," +
+               "            'minimum': 0" +
+               "        }" +
+               "    }," +
+               "    'required': ['firstName', 'lastName']" +
+               "}";
+
+       @Test(expected = ValidationException.class)
+       public void testInvalidMissingField() {
+               addPropertyAndVerify(descriptors().get(0), 
"format.fail-on-missing-field", "DDD");
+       }
+
+       @Test(expected = ValidationException.class)
+       public void testMissingSchema() {
+               removePropertyAndVerify(descriptors().get(0), 
"format.json-schema");
+       }
+
+       @Test(expected = ValidationException.class)
+       public void testDuplicateSchema() {
+               // we add an additional non-json schema
+               addPropertyAndVerify(descriptors().get(0), "format.schema", 
"DDD");
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       public List<Descriptor> descriptors() {
+               final Descriptor desc1 = new Json().jsonSchema("test");
+
+               final Descriptor desc2 = new 
Json().jsonSchema(JSON_SCHEMA).failOnMissingField(true);
+
+               final Descriptor desc3 = new Json()
+                       .schema(
+                               Types.ROW(
+                                       new String[]{"test1", "test2"},
+                                       new TypeInformation[]{Types.STRING(), 
Types.SQL_TIMESTAMP()}))
+                       .failOnMissingField(true);
+
+               final Descriptor desc4 = new Json().deriveSchema();
+
+               return Arrays.asList(desc1, desc2, desc3, desc4);
+       }
+
+       @Override
+       public List<Map<String, String>> properties() {
+               final Map<String, String> props1 = new HashMap<>();
+               props1.put("format.type", "json");
+               props1.put("format.property-version", "1");
+               props1.put("format.json-schema", "test");
+
+               final Map<String, String> props2 = new HashMap<>();
+               props2.put("format.type", "json");
+               props2.put("format.property-version", "1");
+               props2.put("format.json-schema", JSON_SCHEMA);
+               props2.put("format.fail-on-missing-field", "true");
+
+               final Map<String, String> props3 = new HashMap<>();
+               props3.put("format.type", "json");
+               props3.put("format.property-version", "1");
+               props3.put("format.schema", "ROW(test1 VARCHAR, test2 
TIMESTAMP)");
+               props3.put("format.fail-on-missing-field", "true");
+
+               final Map<String, String> props4 = new HashMap<>();
+               props4.put("format.type", "json");
+               props4.put("format.property-version", "1");
+               props4.put("format.derive-schema", "true");
+
+               return Arrays.asList(props1, props2, props3, props4);
+       }
+
+       @Override
+       public DescriptorValidator validator() {
+               return new JsonValidator();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 9e7413c..8c40885 100644
--- 
a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -476,7 +476,7 @@ public class LocalExecutor implements Executor {
                        }
 
                        env.getSources().forEach((name, source) -> {
-                               TableSource<?> tableSource = 
TableSourceFactoryService.findTableSourceFactory(source);
+                               TableSource<?> tableSource = 
TableSourceFactoryService.findAndCreateTableSource(source);
                                tableEnv.registerTableSource(name, tableSource);
                        });
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml 
b/flink-libraries/flink-table/pom.xml
index b4b0eee..53a3233 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -317,6 +317,18 @@ under the License.
                                </executions>
                        </plugin>
 
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>
+                                               </goals>
+                                       </execution>
+                               </executions>
+                       </plugin>
+
                        <!-- Scala Code Style, most of the configuration done 
via plugin management -->
                        <plugin>
                                <groupId>org.scalastyle</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties
 
b/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties
index 86a48a8..ec46579 100644
--- 
a/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties
+++ 
b/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties
@@ -17,6 +17,13 @@
 
################################################################################
 
 
################################################################################
+# NOTE: THIS APPROACH IS DEPRECATED AND WILL BE REMOVED IN FUTURE VERSIONS!
+#
+# We recommend to use a org.apache.flink.table.sources.TableSourceFactory
+# instead. They allow to define new factories by using Java Service Providers.
+################################################################################
+
+################################################################################
 # The config file is used to specify the packages of current module where
 # to find TableSourceConverter implementation class annotated with TableType.
 # If there are multiple packages to scan, put those packages together into a

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
index 1e88d93..6958b3d 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 
 import _root_.scala.collection.mutable.ArrayBuffer
+import _root_.java.util.Objects
 
 /**
   * A TableSchema represents a Table's structure.
@@ -94,7 +95,7 @@ class TableSchema(
   /**
     * Returns the number of columns.
     */
-  def getColumnNum: Int = columnNames.length
+  def getColumnCount: Int = columnNames.length
 
   /**
     * Returns all column names as an array.
@@ -134,6 +135,9 @@ class TableSchema(
 
   def canEqual(other: Any): Boolean = other.isInstanceOf[TableSchema]
 
+  override def hashCode(): Int = {
+    Objects.hash(columnNames, columnTypes)
+  }
 }
 
 object TableSchema {

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
index fc7f7a3..ef14b8a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala
@@ -23,6 +23,7 @@ import java.util.{HashMap => JHashMap, Map => JMap}
 
 import org.apache.flink.table.api.{TableException, TableSchema}
 import org.apache.flink.table.catalog.ExternalCatalogTable._
+import org.apache.flink.table.descriptors.DescriptorProperties.toScala
 import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, 
METADATA_CREATION_TIME, METADATA_LAST_ACCESS_TIME}
 import org.apache.flink.table.descriptors._
 import org.apache.flink.table.plan.stats.TableStats
@@ -73,8 +74,7 @@ class ExternalCatalogTable(
   lazy val tableType: String = {
     val props = new DescriptorProperties()
     connectorDesc.addProperties(props)
-    props
-      .getString(CONNECTOR_LEGACY_TYPE)
+    toScala(props.getOptionalString(CONNECTOR_LEGACY_TYPE))
       .getOrElse(throw new TableException("Could not find a legacy table type 
to return."))
   }
 
@@ -88,8 +88,7 @@ class ExternalCatalogTable(
   lazy val schema: TableSchema = {
     val props = new DescriptorProperties()
     connectorDesc.addProperties(props)
-    props
-      .getTableSchema(CONNECTOR_LEGACY_SCHEMA)
+    toScala(props.getOptionalTableSchema(CONNECTOR_LEGACY_SCHEMA))
       .getOrElse(throw new TableException("Could not find a legacy schema to 
return."))
   }
 
@@ -105,7 +104,7 @@ class ExternalCatalogTable(
     val props = new DescriptorProperties(normalizeKeys = false)
     val legacyProps = new JHashMap[String, String]()
     connectorDesc.addProperties(props)
-    props.asMap.flatMap { case (k, v) =>
+    props.asMap.asScala.flatMap { case (k, v) =>
       if (k.startsWith(CONNECTOR_LEGACY_PROPERTY)) {
         // remove "connector.legacy-property-"
         Some(legacyProps.put(k.substring(CONNECTOR_LEGACY_PROPERTY.length + 
1), v))
@@ -138,7 +137,7 @@ class ExternalCatalogTable(
     metadataDesc match {
       case Some(meta) =>
         meta.addProperties(normalizedProps)
-        normalizedProps.getString(METADATA_COMMENT).orNull
+        normalizedProps.getOptionalString(METADATA_COMMENT).orElse(null)
       case None =>
         null
     }
@@ -157,7 +156,7 @@ class ExternalCatalogTable(
     metadataDesc match {
       case Some(meta) =>
         meta.addProperties(normalizedProps)
-        normalizedProps.getLong(METADATA_CREATION_TIME).map(v => 
Long.box(v)).orNull
+        normalizedProps.getOptionalLong(METADATA_CREATION_TIME).orElse(null)
       case None =>
         null
     }
@@ -176,7 +175,7 @@ class ExternalCatalogTable(
     metadataDesc match {
       case Some(meta) =>
         meta.addProperties(normalizedProps)
-        normalizedProps.getLong(METADATA_LAST_ACCESS_TIME).map(v => 
Long.box(v)).orNull
+        normalizedProps.getOptionalLong(METADATA_LAST_ACCESS_TIME).orElse(null)
       case None =>
         null
     }
@@ -267,7 +266,7 @@ object ExternalCatalogTable {
       tableType: String,
       schema: TableSchema,
       legacyProperties: JMap[String, String])
-    extends ConnectorDescriptor(CONNECTOR_TYPE_VALUE, version = 1) {
+    extends ConnectorDescriptor(CONNECTOR_TYPE_VALUE, version = 1, 
formatNeeded = false) {
 
     override protected def addConnectorProperties(properties: 
DescriptorProperties): Unit = {
       properties.putString(CONNECTOR_LEGACY_TYPE, tableType)
@@ -276,8 +275,6 @@ object ExternalCatalogTable {
           properties.putString(s"$CONNECTOR_LEGACY_PROPERTY-$k", v)
       }
     }
-
-    override private[flink] def needsFormat() = false
   }
 
   def toConnectorDescriptor(

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
index 3bc5dc0..2288522 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala
@@ -58,7 +58,7 @@ object ExternalTableSourceUtil extends Logging {
     }
     // use the factory approach
     else {
-      val source = 
TableSourceFactoryService.findTableSourceFactory(externalCatalogTable)
+      val source = 
TableSourceFactoryService.findAndCreateTableSource(externalCatalogTable)
       tableEnv match {
         // check for a batch table source in this batch environment
         case _: BatchTableEnvironment =>

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
index ed9ee7d..afdd84c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala
@@ -43,7 +43,7 @@ class BatchTableSourceDescriptor(tableEnv: 
BatchTableEnvironment, connector: Con
     * Searches for the specified table source, configures it accordingly, and 
returns it.
     */
   def toTableSource: TableSource[_] = {
-    val source = TableSourceFactoryService.findTableSourceFactory(this)
+    val source = TableSourceFactoryService.findAndCreateTableSource(this)
     source match {
       case _: BatchTableSource[_] => source
       case _ => throw new TableException(

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala
index f691b4f..dc344f3 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.descriptors
 
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE,
 CONNECTOR_VERSION}
+import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE,
 CONNECTOR_PROPERTY_VERSION}
 
 /**
   * Describes a connector to an other system.
@@ -27,7 +27,8 @@ import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTO
   */
 abstract class ConnectorDescriptor(
     private val tpe: String,
-    private val version: Int)
+    private val version: Int,
+    private val formatNeeded: Boolean)
   extends Descriptor {
 
   override def toString: String = this.getClass.getSimpleName
@@ -37,7 +38,7 @@ abstract class ConnectorDescriptor(
     */
   final private[flink] def addProperties(properties: DescriptorProperties): 
Unit = {
     properties.putString(CONNECTOR_TYPE, tpe)
-    properties.putLong(CONNECTOR_VERSION, version)
+    properties.putLong(CONNECTOR_PROPERTY_VERSION, version)
     addConnectorProperties(properties)
   }
 
@@ -49,6 +50,6 @@ abstract class ConnectorDescriptor(
   /**
     * Internal method that defines if this connector requires a format 
descriptor.
     */
-  private[flink] def needsFormat(): Boolean
+  private[flink] def needsFormat(): Boolean = formatNeeded
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
index 8ab0f45..211d374 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.descriptors
 
-import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE,
 CONNECTOR_VERSION}
+import 
org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE,
 CONNECTOR_PROPERTY_VERSION}
 
 /**
   * Validator for [[ConnectorDescriptor]].
@@ -27,13 +27,27 @@ class ConnectorDescriptorValidator extends 
DescriptorValidator {
 
   override def validate(properties: DescriptorProperties): Unit = {
     properties.validateString(CONNECTOR_TYPE, isOptional = false, minLen = 1)
-    properties.validateInt(CONNECTOR_VERSION, isOptional = true, 0, 
Integer.MAX_VALUE)
+    properties.validateInt(CONNECTOR_PROPERTY_VERSION, isOptional = true, 0, 
Integer.MAX_VALUE)
   }
 }
 
 object ConnectorDescriptorValidator {
 
+  /**
+    * Key for describing the type of the connector. Usually used for factory 
discovery.
+    */
   val CONNECTOR_TYPE = "connector.type"
+
+  /**
+    * Key for describing the property version. This property can be used for 
backwards
+    * compatibility in case the property format changes.
+    */
+  val CONNECTOR_PROPERTY_VERSION = "connector.property-version"
+
+  /**
+    * Key for describing the version of the connector. This property can be 
used for different
+    * connector versions (e.g. Kafka 0.8 or Kafka 0.11).
+    */
   val CONNECTOR_VERSION = "connector.version"
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala
index 0493d99..7e69fea 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala
@@ -23,6 +23,7 @@ import org.apache.flink.table.api.{TableSchema, 
ValidationException}
 import org.apache.flink.table.descriptors.CsvValidator._
 
 import scala.collection.mutable
+import scala.collection.JavaConverters._
 
 /**
   * Format descriptor for comma-separated values (CSV).
@@ -31,7 +32,7 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, version 
= 1) {
 
   private var fieldDelim: Option[String] = None
   private var lineDelim: Option[String] = None
-  private val formatSchema: mutable.LinkedHashMap[String, String] =
+  private val schema: mutable.LinkedHashMap[String, String] =
       mutable.LinkedHashMap[String, String]()
   private var quoteCharacter: Option[Character] = None
   private var commentPrefix: Option[String] = None
@@ -67,7 +68,7 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, version 
= 1) {
     * @param schema the table schema
     */
   def schema(schema: TableSchema): Csv = {
-    this.formatSchema.clear()
+    this.schema.clear()
     DescriptorProperties.normalizeTableSchema(schema).foreach {
       case (n, t) => field(n, t)
     }
@@ -96,10 +97,10 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 
version = 1) {
     * @param fieldType the type string of the field
     */
   def field(fieldName: String, fieldType: String): Csv = {
-    if (formatSchema.contains(fieldName)) {
+    if (schema.contains(fieldName)) {
       throw new ValidationException(s"Duplicate field name $fieldName.")
     }
-    formatSchema += (fieldName -> fieldType)
+    schema += (fieldName -> fieldType)
     this
   }
 
@@ -145,7 +146,9 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, 
version = 1) {
   override protected def addFormatProperties(properties: 
DescriptorProperties): Unit = {
     fieldDelim.foreach(properties.putString(FORMAT_FIELD_DELIMITER, _))
     lineDelim.foreach(properties.putString(FORMAT_LINE_DELIMITER, _))
-    properties.putTableSchema(FORMAT_FIELDS, formatSchema.toIndexedSeq)
+    properties.putTableSchema(
+      FORMAT_FIELDS,
+      schema.toIndexedSeq.map(DescriptorProperties.toJava[String, 
String]).asJava)
     quoteCharacter.foreach(properties.putCharacter(FORMAT_QUOTE_CHARACTER, _))
     commentPrefix.foreach(properties.putString(FORMAT_COMMENT_PREFIX, _))
     isIgnoreFirstLine.foreach(properties.putBoolean(FORMAT_IGNORE_FIRST_LINE, 
_))

Reply via email to