http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java index a3ca22f..3adc7c5 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java @@ -19,175 +19,97 @@ package org.apache.flink.table.descriptors; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; -import org.apache.flink.table.api.ValidationException; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; - -import scala.Function0; -import scala.Tuple2; -import scala.collection.JavaConversions; -import scala.runtime.AbstractFunction0; -import scala.runtime.BoxedUnit; - +import java.util.function.Consumer; /** * The validator for {@link Kafka}. */ public class KafkaValidator extends ConnectorDescriptorValidator { - // fields - public static final String CONNECTOR_TYPE_VALUE = "kafka"; - public static final String KAFKA_VERSION = "kafka.version"; - public static final String BOOTSTRAP_SERVERS = "bootstrap.servers"; - public static final String GROUP_ID = "group.id"; - public static final String TOPIC = "topic"; - public static final String STARTUP_MODE = "startup.mode"; - public static final String SPECIFIC_OFFSETS = "specific.offsets"; - public static final String TABLE_JSON_MAPPING = "table.json.mapping"; - - public static final String PARTITION = "partition"; - public static final String OFFSET = "offset"; - - public static final String TABLE_FIELD = "table.field"; - public static final String JSON_FIELD = "json.field"; - - public static final String ZOOKEEPER_CONNECT = "zookeeper.connect"; // only required for 0.8 - // values - public static final String KAFKA_VERSION_VALUE_08 = "0.8"; - public static final String KAFKA_VERSION_VALUE_09 = "0.9"; - public static final String KAFKA_VERSION_VALUE_010 = "0.10"; - public static final String KAFKA_VERSION_VALUE_011 = "0.11"; - - public static final String STARTUP_MODE_VALUE_EARLIEST = "earliest-offset"; - public static final String STARTUP_MODE_VALUE_LATEST = "latest-offset"; - public static final String STARTUP_MODE_VALUE_GROUP_OFFSETS = "group-offsets"; - public static final String STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets"; - - // utils - public static Map<String, String> normalizeStartupMode(StartupMode startupMode) { - Map<String, String> mapPair = new HashMap<>(); - switch (startupMode) { - case EARLIEST: - mapPair.put(STARTUP_MODE, STARTUP_MODE_VALUE_EARLIEST); - break; - case LATEST: - mapPair.put(STARTUP_MODE, STARTUP_MODE_VALUE_LATEST); - break; - case GROUP_OFFSETS: - mapPair.put(STARTUP_MODE, STARTUP_MODE_VALUE_GROUP_OFFSETS); - break; - case SPECIFIC_OFFSETS: - mapPair.put(STARTUP_MODE, STARTUP_MODE_VALUE_SPECIFIC_OFFSETS); - break; - } - return mapPair; - } + public static final String CONNECTOR_TYPE_VALUE_KAFKA = "kafka"; + public static final String CONNECTOR_VERSION_VALUE_08 = "0.8"; + public static final String CONNECTOR_VERSION_VALUE_09 = "0.9"; + public static final String CONNECTOR_VERSION_VALUE_010 = "0.10"; + public static final String CONNECTOR_VERSION_VALUE_011 = "0.11"; + public static final String CONNECTOR_TOPIC = "connector.topic"; + public static final String CONNECTOR_STARTUP_MODE = "connector.startup-mode"; + public static final String CONNECTOR_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset"; + public static final String CONNECTOR_STARTUP_MODE_VALUE_LATEST = "latest-offset"; + public static final String CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS = "group-offsets"; + public static final String CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets"; + public static final String CONNECTOR_SPECIFIC_OFFSETS = "connector.specific-offsets"; + public static final String CONNECTOR_SPECIFIC_OFFSETS_PARTITION = "partition"; + public static final String CONNECTOR_SPECIFIC_OFFSETS_OFFSET = "offset"; + public static final String CONNECTOR_PROPERTIES = "connector.properties"; + public static final String CONNECTOR_PROPERTIES_KEY = "key"; + public static final String CONNECTOR_PROPERTIES_VALUE = "value"; @Override public void validate(DescriptorProperties properties) { super.validate(properties); - - AbstractFunction0<BoxedUnit> emptyValidator = new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - return BoxedUnit.UNIT; - } - }; - - properties.validateValue(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE, false); - - AbstractFunction0<BoxedUnit> version08Validator = new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - properties.validateString(ZOOKEEPER_CONNECT, false, 0, Integer.MAX_VALUE); - return BoxedUnit.UNIT; - } - }; - - Map<String, Function0<BoxedUnit>> versionValidatorMap = new HashMap<>(); - versionValidatorMap.put(KAFKA_VERSION_VALUE_08, version08Validator); - versionValidatorMap.put(KAFKA_VERSION_VALUE_09, emptyValidator); - versionValidatorMap.put(KAFKA_VERSION_VALUE_010, emptyValidator); - versionValidatorMap.put(KAFKA_VERSION_VALUE_011, emptyValidator); - properties.validateEnum( - KAFKA_VERSION, + properties.validateValue(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA, false); + + final List<String> versions = Arrays.asList( + CONNECTOR_VERSION_VALUE_08, + CONNECTOR_VERSION_VALUE_09, + CONNECTOR_VERSION_VALUE_010, + CONNECTOR_VERSION_VALUE_011); + properties.validateEnumValues(CONNECTOR_VERSION(), false, versions); + properties.validateString(CONNECTOR_TOPIC, false, 1, Integer.MAX_VALUE); + + final Map<String, Consumer<String>> specificOffsetValidators = new HashMap<>(); + specificOffsetValidators.put( + CONNECTOR_SPECIFIC_OFFSETS_PARTITION, + (prefix) -> properties.validateInt( + prefix + CONNECTOR_SPECIFIC_OFFSETS_PARTITION, false, - toScalaImmutableMap(versionValidatorMap) - ); - - properties.validateString(BOOTSTRAP_SERVERS, false, 1, Integer.MAX_VALUE); - properties.validateString(GROUP_ID, false, 1, Integer.MAX_VALUE); - properties.validateString(TOPIC, false, 1, Integer.MAX_VALUE); - - AbstractFunction0<BoxedUnit> specificOffsetsValidator = new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - Map<String, String> partitions = JavaConversions.mapAsJavaMap( - properties.getIndexedProperty(SPECIFIC_OFFSETS, PARTITION)); - - Map<String, String> offsets = JavaConversions.mapAsJavaMap( - properties.getIndexedProperty(SPECIFIC_OFFSETS, OFFSET)); - if (partitions.isEmpty() || offsets.isEmpty()) { - throw new ValidationException("Offsets must be set for SPECIFIC_OFFSETS mode."); - } - for (int i = 0; i < partitions.size(); ++i) { - properties.validateInt( - SPECIFIC_OFFSETS + "." + i + "." + PARTITION, - false, - 0, - Integer.MAX_VALUE); - properties.validateLong( - SPECIFIC_OFFSETS + "." + i + "." + OFFSET, - false, - 0, - Long.MAX_VALUE); - } - return BoxedUnit.UNIT; - } - }; - Map<String, Function0<BoxedUnit>> startupModeValidatorMap = new HashMap<>(); - startupModeValidatorMap.put(STARTUP_MODE_VALUE_GROUP_OFFSETS, emptyValidator); - startupModeValidatorMap.put(STARTUP_MODE_VALUE_EARLIEST, emptyValidator); - startupModeValidatorMap.put(STARTUP_MODE_VALUE_LATEST, emptyValidator); - startupModeValidatorMap.put(STARTUP_MODE_VALUE_SPECIFIC_OFFSETS, specificOffsetsValidator); - - properties.validateEnum(STARTUP_MODE, true, toScalaImmutableMap(startupModeValidatorMap)); - validateTableJsonMapping(properties); + 0, + Integer.MAX_VALUE)); + specificOffsetValidators.put( + CONNECTOR_SPECIFIC_OFFSETS_OFFSET, + (prefix) -> properties.validateLong( + prefix + CONNECTOR_SPECIFIC_OFFSETS_OFFSET, + false, + 0, + Long.MAX_VALUE)); + + final Map<String, Consumer<String>> startupModeValidation = new HashMap<>(); + startupModeValidation.put(CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS, properties.noValidation()); + startupModeValidation.put(CONNECTOR_STARTUP_MODE_VALUE_EARLIEST, properties.noValidation()); + startupModeValidation.put(CONNECTOR_STARTUP_MODE_VALUE_LATEST, properties.noValidation()); + startupModeValidation.put( + CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS, + prefix -> properties.validateFixedIndexedProperties(CONNECTOR_SPECIFIC_OFFSETS, false, specificOffsetValidators)); + properties.validateEnum(CONNECTOR_STARTUP_MODE, true, startupModeValidation); + + final Map<String, Consumer<String>> propertyValidators = new HashMap<>(); + propertyValidators.put( + CONNECTOR_PROPERTIES_KEY, + prefix -> properties.validateString(prefix + CONNECTOR_PROPERTIES_KEY, false, 1, Integer.MAX_VALUE)); + propertyValidators.put( + CONNECTOR_PROPERTIES_VALUE, + prefix -> properties.validateString(prefix + CONNECTOR_PROPERTIES_VALUE, false, 0, Integer.MAX_VALUE)); + properties.validateFixedIndexedProperties(CONNECTOR_PROPERTIES, true, propertyValidators); } - private void validateTableJsonMapping(DescriptorProperties properties) { - Map<String, String> mappingTableField = JavaConversions.mapAsJavaMap( - properties.getIndexedProperty(TABLE_JSON_MAPPING, TABLE_FIELD)); - Map<String, String> mappingJsonField = JavaConversions.mapAsJavaMap( - properties.getIndexedProperty(TABLE_JSON_MAPPING, JSON_FIELD)); - - if (mappingJsonField.size() != mappingJsonField.size()) { - throw new ValidationException("Table JSON mapping must be one to one."); - } - - for (int i = 0; i < mappingTableField.size(); i++) { - properties.validateString( - TABLE_JSON_MAPPING + "." + i + "." + TABLE_FIELD, - false, - 1, - Integer.MAX_VALUE); - properties.validateString( - TABLE_JSON_MAPPING + "." + i + "." + JSON_FIELD, - false, - 1, - Integer.MAX_VALUE); - } - } + // utilities - @SuppressWarnings("unchecked") - private <K, V> scala.collection.immutable.Map<K, V> toScalaImmutableMap(Map<K, V> javaMap) { - final java.util.List<scala.Tuple2<K, V>> list = new java.util.ArrayList<>(javaMap.size()); - for (final java.util.Map.Entry<K, V> entry : javaMap.entrySet()) { - list.add(scala.Tuple2.apply(entry.getKey(), entry.getValue())); + public static String normalizeStartupMode(StartupMode startupMode) { + switch (startupMode) { + case EARLIEST: + return CONNECTOR_STARTUP_MODE_VALUE_EARLIEST; + case LATEST: + return CONNECTOR_STARTUP_MODE_VALUE_LATEST; + case GROUP_OFFSETS: + return CONNECTOR_STARTUP_MODE_VALUE_GROUP_OFFSETS; + case SPECIFIC_OFFSETS: + return CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS; } - final scala.collection.Seq<Tuple2<K, V>> seq = - scala.collection.JavaConverters.asScalaBufferConverter(list).asScala().toSeq(); - return (scala.collection.immutable.Map<K, V>) scala.collection.immutable.Map$.MODULE$.apply(seq); + throw new IllegalArgumentException("Invalid startup mode."); } }
http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableFromDescriptorTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableFromDescriptorTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableFromDescriptorTestBase.java deleted file mode 100644 index 964a624..0000000 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableFromDescriptorTestBase.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.java.StreamTableEnvironment; -import org.apache.flink.table.descriptors.Kafka; - -import org.mockito.Mockito; - -/** - * Tests for {@link KafkaJsonTableSourceFactory}. - */ -public abstract class KafkaJsonTableFromDescriptorTestBase { - private static final String GROUP_ID = "test-group"; - private static final String BOOTSTRAP_SERVERS = "localhost:1234"; - private static final String TOPIC = "test-topic"; - - protected abstract String versionForTest(); - - protected abstract KafkaJsonTableSource.Builder builderForTest(); - - protected abstract void extraSettings(KafkaTableSource.Builder builder, Kafka kafka); - - private static StreamExecutionEnvironment env = Mockito.mock(StreamExecutionEnvironment.class); - private static StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); - -// @Test -// public void buildJsonTableSourceTest() throws Exception { -// final URL url = getClass().getClassLoader().getResource("kafka-json-schema.json"); -// Objects.requireNonNull(url); -// final String schema = FileUtils.readFileUtf8(new File(url.getFile())); -// -// Map<String, String> tableJsonMapping = new HashMap<>(); -// tableJsonMapping.put("fruit-name", "name"); -// tableJsonMapping.put("fruit-count", "count"); -// tableJsonMapping.put("event-time", "time"); -// -// // Construct with the builder. -// Properties props = new Properties(); -// props.put("group.id", GROUP_ID); -// props.put("bootstrap.servers", BOOTSTRAP_SERVERS); -// -// Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>(); -// specificOffsets.put(new KafkaTopicPartition(TOPIC, 0), 100L); -// specificOffsets.put(new KafkaTopicPartition(TOPIC, 1), 123L); -// -// KafkaTableSource.Builder builder = builderForTest() -// .forJsonSchema(TableSchema.fromTypeInfo(JsonSchemaConverter.convert(schema))) -// .failOnMissingField(true) -// .withTableToJsonMapping(tableJsonMapping) -// .withKafkaProperties(props) -// .forTopic(TOPIC) -// .fromSpecificOffsets(specificOffsets) -// .withSchema( -// TableSchema.builder() -// .field("fruit-name", Types.STRING) -// .field("fruit-count", Types.INT) -// .field("event-time", Types.LONG) -// .field("proc-time", Types.SQL_TIMESTAMP) -// .build()) -// .withProctimeAttribute("proc-time"); -// -// // Construct with the descriptor. -// Map<Integer, Long> offsets = new HashMap<>(); -// offsets.put(0, 100L); -// offsets.put(1, 123L); -// Kafka kafka = new Kafka() -// .version(versionForTest()) -// .groupId(GROUP_ID) -// .bootstrapServers(BOOTSTRAP_SERVERS) -// .topic(TOPIC) -// .startupMode(StartupMode.SPECIFIC_OFFSETS) -// .specificOffsets(offsets) -// .tableJsonMapping(tableJsonMapping); -// extraSettings(builder, kafka); -// -// TableSource source = tEnv -// .from(kafka) -// .withFormat( -// new Json() -// .schema(schema) -// .failOnMissingField(true)) -// .withSchema(new Schema() -// .field("fruit-name", Types.STRING) -// .field("fruit-count", Types.INT) -// .field("event-time", Types.LONG) -// .field("proc-time", Types.SQL_TIMESTAMP).proctime()) -// .toTableSource(); -// -// Assert.assertEquals(builder.build(), source); -// } - -// @Test(expected = TableException.class) -// public void buildJsonTableSourceFailTest() { -// tEnv.from( -// new Kafka() -// .version(versionForTest()) -// .groupId(GROUP_ID) -// .bootstrapServers(BOOTSTRAP_SERVERS) -// .topic(TOPIC) -// .startupMode(StartupMode.SPECIFIC_OFFSETS) -// .specificOffsets(new HashMap<>())) -// .withFormat( -// new Json() -// .schema("") -// .failOnMissingField(true)) -// .toTableSource(); -// } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java new file mode 100644 index 0000000..2b081a9 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.formats.json.JsonSchemaConverter; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.descriptors.FormatDescriptor; +import org.apache.flink.table.descriptors.Json; +import org.apache.flink.table.descriptors.Kafka; +import org.apache.flink.table.descriptors.Schema; +import org.apache.flink.table.descriptors.TestTableSourceDescriptor; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.sources.TableSourceFactoryService; + +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link KafkaJsonTableSourceFactory}. + */ +public abstract class KafkaJsonTableSourceFactoryTestBase { + + private static final String JSON_SCHEMA = + "{" + + " 'title': 'Fruit'," + + " 'type': 'object'," + + " 'properties': {" + + " 'name': {" + + " 'type': 'string'" + + " }," + + " 'count': {" + + " 'type': 'integer'" + + " }," + + " 'time': {" + + " 'description': 'Age in years'," + + " 'type': 'number'" + + " }" + " }," + + " 'required': ['name', 'count', 'time']" + + "}"; + + private static final String TOPIC = "test-topic"; + + protected abstract String version(); + + protected abstract KafkaJsonTableSource.Builder builder(); + + @Test + public void testTableSourceFromJsonSchema() { + testTableSource( + new Json() + .jsonSchema(JSON_SCHEMA) + .failOnMissingField(true) + ); + } + + @Test + public void testTableSourceDerivedSchema() { + testTableSource( + new Json() + .deriveSchema() + .failOnMissingField(true) + ); + } + + private void testTableSource(FormatDescriptor format) { + // construct table source using a builder + + final Map<String, String> tableJsonMapping = new HashMap<>(); + tableJsonMapping.put("fruit-name", "name"); + tableJsonMapping.put("count", "count"); + tableJsonMapping.put("event-time", "time"); + + final Properties props = new Properties(); + props.put("group.id", "test-group"); + props.put("bootstrap.servers", "localhost:1234"); + + final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>(); + specificOffsets.put(new KafkaTopicPartition(TOPIC, 0), 100L); + specificOffsets.put(new KafkaTopicPartition(TOPIC, 1), 123L); + + final KafkaTableSource builderSource = builder() + .forJsonSchema(TableSchema.fromTypeInfo(JsonSchemaConverter.convert(JSON_SCHEMA))) + .failOnMissingField(true) + .withTableToJsonMapping(tableJsonMapping) + .withKafkaProperties(props) + .forTopic(TOPIC) + .fromSpecificOffsets(specificOffsets) + .withSchema( + TableSchema.builder() + .field("fruit-name", Types.STRING) + .field("count", Types.BIG_INT) + .field("event-time", Types.BIG_DEC) + .field("proc-time", Types.SQL_TIMESTAMP) + .build()) + .withProctimeAttribute("proc-time") + .build(); + + // construct table source using descriptors and table source factory + + final Map<Integer, Long> offsets = new HashMap<>(); + offsets.put(0, 100L); + offsets.put(1, 123L); + + final TestTableSourceDescriptor testDesc = new TestTableSourceDescriptor( + new Kafka() + .version(version()) + .topic(TOPIC) + .properties(props) + .startFromSpecificOffsets(offsets)) + .addFormat(format) + .addSchema( + new Schema() + .field("fruit-name", Types.STRING).from("name") + .field("count", Types.BIG_INT) // no from so it must match with the input + .field("event-time", Types.BIG_DEC).from("time") + .field("proc-time", Types.SQL_TIMESTAMP).proctime()); + + final TableSource<?> factorySource = TableSourceFactoryService.findAndCreateTableSource(testDesc); + + assertEquals(builderSource, factorySource); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java new file mode 100644 index 0000000..f3d96f1 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Tests for the {@link Kafka} descriptor. + */ +public class KafkaTest extends DescriptorTestBase { + + @Override + public List<Descriptor> descriptors() { + final Descriptor earliestDesc = + new Kafka() + .version("0.8") + .startFromEarliest() + .topic("WhateverTopic"); + + final Descriptor specificOffsetsDesc = + new Kafka() + .version("0.11") + .topic("MyTable") + .startFromSpecificOffset(0, 42L) + .startFromSpecificOffset(1, 300L) + .property("zookeeper.stuff", "12") + .property("kafka.stuff", "42"); + + final Map<Integer, Long> offsets = new HashMap<>(); + offsets.put(0, 42L); + offsets.put(1, 300L); + + final Properties properties = new Properties(); + properties.put("zookeeper.stuff", "12"); + properties.put("kafka.stuff", "42"); + + final Descriptor specificOffsetsMapDesc = + new Kafka() + .version("0.11") + .topic("MyTable") + .startFromSpecificOffsets(offsets) + .properties(properties); + + return Arrays.asList(earliestDesc, specificOffsetsDesc, specificOffsetsMapDesc); + } + + @Override + public List<Map<String, String>> properties() { + final Map<String, String> props1 = new HashMap<>(); + props1.put("connector.property-version", "1"); + props1.put("connector.type", "kafka"); + props1.put("connector.version", "0.8"); + props1.put("connector.topic", "WhateverTopic"); + props1.put("connector.startup-mode", "earliest-offset"); + + final Map<String, String> props2 = new HashMap<>(); + props2.put("connector.property-version", "1"); + props2.put("connector.type", "kafka"); + props2.put("connector.version", "0.11"); + props2.put("connector.topic", "MyTable"); + props2.put("connector.startup-mode", "specific-offsets"); + props2.put("connector.specific-offsets.0.partition", "0"); + props2.put("connector.specific-offsets.0.offset", "42"); + props2.put("connector.specific-offsets.1.partition", "1"); + props2.put("connector.specific-offsets.1.offset", "300"); + props2.put("connector.properties.0.key", "zookeeper.stuff"); + props2.put("connector.properties.0.value", "12"); + props2.put("connector.properties.1.key", "kafka.stuff"); + props2.put("connector.properties.1.value", "42"); + + final Map<String, String> props3 = new HashMap<>(); + props3.put("connector.property-version", "1"); + props3.put("connector.type", "kafka"); + props3.put("connector.version", "0.11"); + props3.put("connector.topic", "MyTable"); + props3.put("connector.startup-mode", "specific-offsets"); + props3.put("connector.specific-offsets.0.partition", "0"); + props3.put("connector.specific-offsets.0.offset", "42"); + props3.put("connector.specific-offsets.1.partition", "1"); + props3.put("connector.specific-offsets.1.offset", "300"); + props3.put("connector.properties.0.key", "zookeeper.stuff"); + props3.put("connector.properties.0.value", "12"); + props3.put("connector.properties.1.key", "kafka.stuff"); + props3.put("connector.properties.1.value", "42"); + + return Arrays.asList(props1, props2, props3); + } + + @Override + public DescriptorValidator validator() { + return new KafkaValidator(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-json-schema.json ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-json-schema.json b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-json-schema.json deleted file mode 100644 index 5167e5e..0000000 --- a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-json-schema.json +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -{ - "title": "Fruit", - "type": "object", - "properties": { - "name": { - "type": "string" - }, - "count": { - "type": "integer" - }, - "time": { - "description": "Age in years", - "type": "number" - } - }, - "required": ["name", "count", "time"] -} http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-dist/pom.xml ---------------------------------------------------------------------- diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml index 019047c..d8477fe 100644 --- a/flink-dist/pom.xml +++ b/flink-dist/pom.xml @@ -337,22 +337,6 @@ under the License. </dependency> <!-- end optional Flink libraries --> - <!-- start optional Flink formats --> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-avro</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-json</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - <!-- end optional Flink formats --> - <!-- test dependencies --> <dependency> http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-formats/flink-json/pom.xml ---------------------------------------------------------------------- diff --git a/flink-formats/flink-json/pom.xml b/flink-formats/flink-json/pom.xml index d0f55ab..3a80b0e 100644 --- a/flink-formats/flink-json/pom.xml +++ b/flink-formats/flink-json/pom.xml @@ -50,6 +50,33 @@ under the License. <version>${project.version}</version> <scope>provided</scope> </dependency> - </dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <!-- use a dedicated Scala version to not depend on it --> + <artifactId>flink-table_2.11</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + <!-- Projects depending on this project, won't depend on flink-table. --> + <optional>true</optional> + </dependency> + + <!-- test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <!-- use a dedicated Scala version to not depend on it --> + <artifactId>flink-table_2.11</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <!-- flink-table needs Scala --> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-compiler</artifactId> + <scope>test</scope> + </dependency> + </dependencies> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java new file mode 100644 index 0000000..9c12191 --- /dev/null +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/Json.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.typeutils.TypeStringUtils; +import org.apache.flink.util.Preconditions; + +import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_DERIVE_SCHEMA; +import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD; +import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_JSON_SCHEMA; +import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_SCHEMA; +import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_TYPE_VALUE; + +/** + * Format descriptor for JSON. + */ +public class Json extends FormatDescriptor { + + private Boolean failOnMissingField; + private Boolean deriveSchema; + private String jsonSchema; + private String schema; + + /** + * Format descriptor for JSON. + */ + public Json() { + super(FORMAT_TYPE_VALUE, 1); + } + + /** + * Sets flag whether to fail if a field is missing or not. + * + * @param failOnMissingField If set to true, the operation fails if there is a missing field. + * If set to false, a missing field is set to null. + */ + public Json failOnMissingField(boolean failOnMissingField) { + this.failOnMissingField = failOnMissingField; + return this; + } + + /** + * Sets the JSON schema string with field names and the types according to the JSON schema + * specification [[http://json-schema.org/specification.html]]. + * + * <p>The schema might be nested. + * + * @param jsonSchema JSON schema + */ + public Json jsonSchema(String jsonSchema) { + Preconditions.checkNotNull(jsonSchema); + this.jsonSchema = jsonSchema; + this.schema = null; + this.deriveSchema = null; + return this; + } + + /** + * Sets the schema using type information. + * + * <p>JSON objects are represented as ROW types. + * + * <p>The schema might be nested. + * + * @param schemaType type information that describes the schema + */ + public Json schema(TypeInformation<?> schemaType) { + Preconditions.checkNotNull(schemaType); + this.schema = TypeStringUtils.writeTypeInfo(schemaType); + this.jsonSchema = null; + this.deriveSchema = null; + return this; + } + + /** + * Derives the format schema from the table's schema described using {@link Schema}. + * + * <p>This allows for defining schema information only once. + * + * <p>The names, types, and field order of the format are determined by the table's + * schema. Time attributes are ignored. A "from" definition is interpreted as a field renaming + * in the format. + */ + public Json deriveSchema() { + this.deriveSchema = true; + this.schema = null; + this.jsonSchema = null; + return this; + } + + /** + * Internal method for format properties conversion. + */ + @Override + public void addFormatProperties(DescriptorProperties properties) { + if (deriveSchema != null) { + properties.putBoolean(FORMAT_DERIVE_SCHEMA(), deriveSchema); + } + + if (jsonSchema != null) { + properties.putString(FORMAT_JSON_SCHEMA, jsonSchema); + } + + if (schema != null) { + properties.putString(FORMAT_SCHEMA, schema); + } + + if (failOnMissingField != null) { + properties.putBoolean(FORMAT_FAIL_ON_MISSING_FIELD, failOnMissingField); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java new file mode 100644 index 0000000..fea7cf5 --- /dev/null +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/table/descriptors/JsonValidator.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.table.api.ValidationException; + +/** + * Validator for {@link Json}. + */ +public class JsonValidator extends FormatDescriptorValidator { + + public static final String FORMAT_TYPE_VALUE = "json"; + public static final String FORMAT_SCHEMA = "format.schema"; + public static final String FORMAT_JSON_SCHEMA = "format.json-schema"; + public static final String FORMAT_FAIL_ON_MISSING_FIELD = "format.fail-on-missing-field"; + + @Override + public void validate(DescriptorProperties properties) { + super.validate(properties); + properties.validateBoolean(FORMAT_DERIVE_SCHEMA(), true); + final boolean deriveSchema = properties.getOptionalBoolean(FORMAT_DERIVE_SCHEMA()).orElse(false); + final boolean hasSchema = properties.containsKey(FORMAT_SCHEMA); + final boolean hasSchemaString = properties.containsKey(FORMAT_JSON_SCHEMA); + if (deriveSchema && (hasSchema || hasSchemaString)) { + throw new ValidationException( + "Format cannot define a schema and derive from the table's schema at the same time."); + } else if (!deriveSchema && hasSchema && hasSchemaString) { + throw new ValidationException("A definition of both a schema and JSON schema is not allowed."); + } else if (!deriveSchema && !hasSchema && !hasSchemaString) { + throw new ValidationException("A definition of a schema or JSON schema is required."); + } else if (hasSchema) { + properties.validateType(FORMAT_SCHEMA, false); + } else if (hasSchemaString) { + properties.validateString(FORMAT_JSON_SCHEMA, false, 1); + } + + properties.validateBoolean(FORMAT_FAIL_ON_MISSING_FIELD, true); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java ---------------------------------------------------------------------- diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java new file mode 100644 index 0000000..6e370a0 --- /dev/null +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/table/descriptors/JsonTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.Types; +import org.apache.flink.table.api.ValidationException; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Tests for the {@link Json} descriptor. + */ +public class JsonTest extends DescriptorTestBase { + + private static final String JSON_SCHEMA = + "{" + + " 'title': 'Person'," + + " 'type': 'object'," + + " 'properties': {" + + " 'firstName': {" + + " 'type': 'string'" + + " }," + + " 'lastName': {" + + " 'type': 'string'" + + " }," + + " 'age': {" + + " 'description': 'Age in years'," + + " 'type': 'integer'," + + " 'minimum': 0" + + " }" + + " }," + + " 'required': ['firstName', 'lastName']" + + "}"; + + @Test(expected = ValidationException.class) + public void testInvalidMissingField() { + addPropertyAndVerify(descriptors().get(0), "format.fail-on-missing-field", "DDD"); + } + + @Test(expected = ValidationException.class) + public void testMissingSchema() { + removePropertyAndVerify(descriptors().get(0), "format.json-schema"); + } + + @Test(expected = ValidationException.class) + public void testDuplicateSchema() { + // we add an additional non-json schema + addPropertyAndVerify(descriptors().get(0), "format.schema", "DDD"); + } + + // -------------------------------------------------------------------------------------------- + + @Override + public List<Descriptor> descriptors() { + final Descriptor desc1 = new Json().jsonSchema("test"); + + final Descriptor desc2 = new Json().jsonSchema(JSON_SCHEMA).failOnMissingField(true); + + final Descriptor desc3 = new Json() + .schema( + Types.ROW( + new String[]{"test1", "test2"}, + new TypeInformation[]{Types.STRING(), Types.SQL_TIMESTAMP()})) + .failOnMissingField(true); + + final Descriptor desc4 = new Json().deriveSchema(); + + return Arrays.asList(desc1, desc2, desc3, desc4); + } + + @Override + public List<Map<String, String>> properties() { + final Map<String, String> props1 = new HashMap<>(); + props1.put("format.type", "json"); + props1.put("format.property-version", "1"); + props1.put("format.json-schema", "test"); + + final Map<String, String> props2 = new HashMap<>(); + props2.put("format.type", "json"); + props2.put("format.property-version", "1"); + props2.put("format.json-schema", JSON_SCHEMA); + props2.put("format.fail-on-missing-field", "true"); + + final Map<String, String> props3 = new HashMap<>(); + props3.put("format.type", "json"); + props3.put("format.property-version", "1"); + props3.put("format.schema", "ROW(test1 VARCHAR, test2 TIMESTAMP)"); + props3.put("format.fail-on-missing-field", "true"); + + final Map<String, String> props4 = new HashMap<>(); + props4.put("format.type", "json"); + props4.put("format.property-version", "1"); + props4.put("format.derive-schema", "true"); + + return Arrays.asList(props1, props2, props3, props4); + } + + @Override + public DescriptorValidator validator() { + return new JsonValidator(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index 9e7413c..8c40885 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -476,7 +476,7 @@ public class LocalExecutor implements Executor { } env.getSources().forEach((name, source) -> { - TableSource<?> tableSource = TableSourceFactoryService.findTableSourceFactory(source); + TableSource<?> tableSource = TableSourceFactoryService.findAndCreateTableSource(source); tableEnv.registerTableSource(name, tableSource); }); http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml index b4b0eee..53a3233 100644 --- a/flink-libraries/flink-table/pom.xml +++ b/flink-libraries/flink-table/pom.xml @@ -317,6 +317,18 @@ under the License. </executions> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + <!-- Scala Code Style, most of the configuration done via plugin management --> <plugin> <groupId>org.scalastyle</groupId> http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties b/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties index 86a48a8..ec46579 100644 --- a/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties +++ b/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties @@ -17,6 +17,13 @@ ################################################################################ ################################################################################ +# NOTE: THIS APPROACH IS DEPRECATED AND WILL BE REMOVED IN FUTURE VERSIONS! +# +# We recommend to use a org.apache.flink.table.sources.TableSourceFactory +# instead. They allow to define new factories by using Java Service Providers. +################################################################################ + +################################################################################ # The config file is used to specify the packages of current module where # to find TableSourceConverter implementation class annotated with TableType. # If there are multiple packages to scan, put those packages together into a http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala index 1e88d93..6958b3d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala @@ -21,6 +21,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.CompositeType import _root_.scala.collection.mutable.ArrayBuffer +import _root_.java.util.Objects /** * A TableSchema represents a Table's structure. @@ -94,7 +95,7 @@ class TableSchema( /** * Returns the number of columns. */ - def getColumnNum: Int = columnNames.length + def getColumnCount: Int = columnNames.length /** * Returns all column names as an array. @@ -134,6 +135,9 @@ class TableSchema( def canEqual(other: Any): Boolean = other.isInstanceOf[TableSchema] + override def hashCode(): Int = { + Objects.hash(columnNames, columnTypes) + } } object TableSchema { http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala index fc7f7a3..ef14b8a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala @@ -23,6 +23,7 @@ import java.util.{HashMap => JHashMap, Map => JMap} import org.apache.flink.table.api.{TableException, TableSchema} import org.apache.flink.table.catalog.ExternalCatalogTable._ +import org.apache.flink.table.descriptors.DescriptorProperties.toScala import org.apache.flink.table.descriptors.MetadataValidator.{METADATA_COMMENT, METADATA_CREATION_TIME, METADATA_LAST_ACCESS_TIME} import org.apache.flink.table.descriptors._ import org.apache.flink.table.plan.stats.TableStats @@ -73,8 +74,7 @@ class ExternalCatalogTable( lazy val tableType: String = { val props = new DescriptorProperties() connectorDesc.addProperties(props) - props - .getString(CONNECTOR_LEGACY_TYPE) + toScala(props.getOptionalString(CONNECTOR_LEGACY_TYPE)) .getOrElse(throw new TableException("Could not find a legacy table type to return.")) } @@ -88,8 +88,7 @@ class ExternalCatalogTable( lazy val schema: TableSchema = { val props = new DescriptorProperties() connectorDesc.addProperties(props) - props - .getTableSchema(CONNECTOR_LEGACY_SCHEMA) + toScala(props.getOptionalTableSchema(CONNECTOR_LEGACY_SCHEMA)) .getOrElse(throw new TableException("Could not find a legacy schema to return.")) } @@ -105,7 +104,7 @@ class ExternalCatalogTable( val props = new DescriptorProperties(normalizeKeys = false) val legacyProps = new JHashMap[String, String]() connectorDesc.addProperties(props) - props.asMap.flatMap { case (k, v) => + props.asMap.asScala.flatMap { case (k, v) => if (k.startsWith(CONNECTOR_LEGACY_PROPERTY)) { // remove "connector.legacy-property-" Some(legacyProps.put(k.substring(CONNECTOR_LEGACY_PROPERTY.length + 1), v)) @@ -138,7 +137,7 @@ class ExternalCatalogTable( metadataDesc match { case Some(meta) => meta.addProperties(normalizedProps) - normalizedProps.getString(METADATA_COMMENT).orNull + normalizedProps.getOptionalString(METADATA_COMMENT).orElse(null) case None => null } @@ -157,7 +156,7 @@ class ExternalCatalogTable( metadataDesc match { case Some(meta) => meta.addProperties(normalizedProps) - normalizedProps.getLong(METADATA_CREATION_TIME).map(v => Long.box(v)).orNull + normalizedProps.getOptionalLong(METADATA_CREATION_TIME).orElse(null) case None => null } @@ -176,7 +175,7 @@ class ExternalCatalogTable( metadataDesc match { case Some(meta) => meta.addProperties(normalizedProps) - normalizedProps.getLong(METADATA_LAST_ACCESS_TIME).map(v => Long.box(v)).orNull + normalizedProps.getOptionalLong(METADATA_LAST_ACCESS_TIME).orElse(null) case None => null } @@ -267,7 +266,7 @@ object ExternalCatalogTable { tableType: String, schema: TableSchema, legacyProperties: JMap[String, String]) - extends ConnectorDescriptor(CONNECTOR_TYPE_VALUE, version = 1) { + extends ConnectorDescriptor(CONNECTOR_TYPE_VALUE, version = 1, formatNeeded = false) { override protected def addConnectorProperties(properties: DescriptorProperties): Unit = { properties.putString(CONNECTOR_LEGACY_TYPE, tableType) @@ -276,8 +275,6 @@ object ExternalCatalogTable { properties.putString(s"$CONNECTOR_LEGACY_PROPERTY-$k", v) } } - - override private[flink] def needsFormat() = false } def toConnectorDescriptor( http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala index 3bc5dc0..2288522 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala @@ -58,7 +58,7 @@ object ExternalTableSourceUtil extends Logging { } // use the factory approach else { - val source = TableSourceFactoryService.findTableSourceFactory(externalCatalogTable) + val source = TableSourceFactoryService.findAndCreateTableSource(externalCatalogTable) tableEnv match { // check for a batch table source in this batch environment case _: BatchTableEnvironment => http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala index ed9ee7d..afdd84c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/BatchTableSourceDescriptor.scala @@ -43,7 +43,7 @@ class BatchTableSourceDescriptor(tableEnv: BatchTableEnvironment, connector: Con * Searches for the specified table source, configures it accordingly, and returns it. */ def toTableSource: TableSource[_] = { - val source = TableSourceFactoryService.findTableSourceFactory(this) + val source = TableSourceFactoryService.findAndCreateTableSource(this) source match { case _: BatchTableSource[_] => source case _ => throw new TableException( http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala index f691b4f..dc344f3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptor.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.descriptors -import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_VERSION} +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_PROPERTY_VERSION} /** * Describes a connector to an other system. @@ -27,7 +27,8 @@ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTO */ abstract class ConnectorDescriptor( private val tpe: String, - private val version: Int) + private val version: Int, + private val formatNeeded: Boolean) extends Descriptor { override def toString: String = this.getClass.getSimpleName @@ -37,7 +38,7 @@ abstract class ConnectorDescriptor( */ final private[flink] def addProperties(properties: DescriptorProperties): Unit = { properties.putString(CONNECTOR_TYPE, tpe) - properties.putLong(CONNECTOR_VERSION, version) + properties.putLong(CONNECTOR_PROPERTY_VERSION, version) addConnectorProperties(properties) } @@ -49,6 +50,6 @@ abstract class ConnectorDescriptor( /** * Internal method that defines if this connector requires a format descriptor. */ - private[flink] def needsFormat(): Boolean + private[flink] def needsFormat(): Boolean = formatNeeded } http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala index 8ab0f45..211d374 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.descriptors -import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_VERSION} +import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.{CONNECTOR_TYPE, CONNECTOR_PROPERTY_VERSION} /** * Validator for [[ConnectorDescriptor]]. @@ -27,13 +27,27 @@ class ConnectorDescriptorValidator extends DescriptorValidator { override def validate(properties: DescriptorProperties): Unit = { properties.validateString(CONNECTOR_TYPE, isOptional = false, minLen = 1) - properties.validateInt(CONNECTOR_VERSION, isOptional = true, 0, Integer.MAX_VALUE) + properties.validateInt(CONNECTOR_PROPERTY_VERSION, isOptional = true, 0, Integer.MAX_VALUE) } } object ConnectorDescriptorValidator { + /** + * Key for describing the type of the connector. Usually used for factory discovery. + */ val CONNECTOR_TYPE = "connector.type" + + /** + * Key for describing the property version. This property can be used for backwards + * compatibility in case the property format changes. + */ + val CONNECTOR_PROPERTY_VERSION = "connector.property-version" + + /** + * Key for describing the version of the connector. This property can be used for different + * connector versions (e.g. Kafka 0.8 or Kafka 0.11). + */ val CONNECTOR_VERSION = "connector.version" } http://git-wip-us.apache.org/repos/asf/flink/blob/d9f2f2f8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala index 0493d99..7e69fea 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Csv.scala @@ -23,6 +23,7 @@ import org.apache.flink.table.api.{TableSchema, ValidationException} import org.apache.flink.table.descriptors.CsvValidator._ import scala.collection.mutable +import scala.collection.JavaConverters._ /** * Format descriptor for comma-separated values (CSV). @@ -31,7 +32,7 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, version = 1) { private var fieldDelim: Option[String] = None private var lineDelim: Option[String] = None - private val formatSchema: mutable.LinkedHashMap[String, String] = + private val schema: mutable.LinkedHashMap[String, String] = mutable.LinkedHashMap[String, String]() private var quoteCharacter: Option[Character] = None private var commentPrefix: Option[String] = None @@ -67,7 +68,7 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, version = 1) { * @param schema the table schema */ def schema(schema: TableSchema): Csv = { - this.formatSchema.clear() + this.schema.clear() DescriptorProperties.normalizeTableSchema(schema).foreach { case (n, t) => field(n, t) } @@ -96,10 +97,10 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, version = 1) { * @param fieldType the type string of the field */ def field(fieldName: String, fieldType: String): Csv = { - if (formatSchema.contains(fieldName)) { + if (schema.contains(fieldName)) { throw new ValidationException(s"Duplicate field name $fieldName.") } - formatSchema += (fieldName -> fieldType) + schema += (fieldName -> fieldType) this } @@ -145,7 +146,9 @@ class Csv extends FormatDescriptor(FORMAT_TYPE_VALUE, version = 1) { override protected def addFormatProperties(properties: DescriptorProperties): Unit = { fieldDelim.foreach(properties.putString(FORMAT_FIELD_DELIMITER, _)) lineDelim.foreach(properties.putString(FORMAT_LINE_DELIMITER, _)) - properties.putTableSchema(FORMAT_FIELDS, formatSchema.toIndexedSeq) + properties.putTableSchema( + FORMAT_FIELDS, + schema.toIndexedSeq.map(DescriptorProperties.toJava[String, String]).asJava) quoteCharacter.foreach(properties.putCharacter(FORMAT_QUOTE_CHARACTER, _)) commentPrefix.foreach(properties.putString(FORMAT_COMMENT_PREFIX, _)) isIgnoreFirstLine.foreach(properties.putBoolean(FORMAT_IGNORE_FIRST_LINE, _))