Repository: flink Updated Branches: refs/heads/release-1.6 d180d599a -> d7b80b0aa
[FLINK-9934] [table] Fix invalid field mapping by Kafka table source factory According to the DefinedFieldMapping interface the field mapping can also contain the input fields. However, the Kafka table source factory was calling SchemaValidator#deriveFieldMapping with its own schema instead of the input type. This closes #6403. This closes #3124. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d7b80b0a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d7b80b0a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d7b80b0a Branch: refs/heads/release-1.6 Commit: d7b80b0aa0ae6451da46b910b07b17415cb2530a Parents: d180d59 Author: Timo Walther <twal...@apache.org> Authored: Tue Jul 24 11:40:36 2018 +0200 Committer: Timo Walther <twal...@apache.org> Committed: Wed Jul 25 08:03:28 2018 +0200 ---------------------------------------------------------------------- .../kafka/KafkaTableSourceSinkFactoryBase.java | 14 ++++++++----- .../KafkaJsonTableSourceFactoryTestBase.java | 5 +++++ .../KafkaTableSourceSinkFactoryTestBase.java | 5 +++++ .../table/descriptors/SchemaValidator.scala | 21 +++++++++++++------- .../table/descriptors/SchemaValidatorTest.scala | 4 ++-- 5 files changed, 35 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d7b80b0a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java index 3307994..27b2e67 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java @@ -132,18 +132,20 @@ public abstract class KafkaTableSourceSinkFactoryBase implements public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) { final DescriptorProperties descriptorProperties = getValidatedProperties(properties); - final TableSchema schema = descriptorProperties.getTableSchema(SCHEMA()); final String topic = descriptorProperties.getString(CONNECTOR_TOPIC); + final DeserializationSchema<Row> deserializationSchema = getDeserializationSchema(properties); final StartupOptions startupOptions = getStartupOptions(descriptorProperties, topic); return createKafkaTableSource( - schema, + descriptorProperties.getTableSchema(SCHEMA()), SchemaValidator.deriveProctimeAttribute(descriptorProperties), SchemaValidator.deriveRowtimeAttributes(descriptorProperties), - SchemaValidator.deriveFieldMapping(descriptorProperties, Optional.of(schema)), + SchemaValidator.deriveFieldMapping( + descriptorProperties, + Optional.of(deserializationSchema.getProducedType())), topic, getKafkaProperties(descriptorProperties), - getDeserializationSchema(properties), + deserializationSchema, startupOptions.startupMode, startupOptions.specificOffsets); } @@ -318,7 +320,9 @@ public abstract class KafkaTableSourceSinkFactoryBase implements } private boolean checkForCustomFieldMapping(DescriptorProperties descriptorProperties, TableSchema schema) { - final Map<String, String> fieldMapping = SchemaValidator.deriveFieldMapping(descriptorProperties, Optional.of(schema)); + final Map<String, String> fieldMapping = SchemaValidator.deriveFieldMapping( + descriptorProperties, + Optional.of(schema.toRowType())); // until FLINK-9870 is fixed we assume that the table schema is the output type return fieldMapping.size() != schema.getColumnNames().length || !fieldMapping.entrySet().stream().allMatch(mapping -> mapping.getKey().equals(mapping.getValue())); } http://git-wip-us.apache.org/repos/asf/flink/blob/d7b80b0a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java index 20da156..51c0e7b 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java @@ -32,6 +32,7 @@ import org.apache.flink.table.descriptors.TestTableDescriptor; import org.apache.flink.table.factories.StreamTableSourceFactory; import org.apache.flink.table.factories.TableFactoryService; import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.sources.TableSourceUtil; import org.apache.flink.table.sources.tsextractors.ExistingField; import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps; @@ -101,7 +102,9 @@ public abstract class KafkaJsonTableSourceFactoryTestBase { final Map<String, String> tableJsonMapping = new HashMap<>(); tableJsonMapping.put("fruit-name", "name"); + tableJsonMapping.put("name", "name"); tableJsonMapping.put("count", "count"); + tableJsonMapping.put("time", "time"); final Properties props = new Properties(); props.put("group.id", "test-group"); @@ -129,6 +132,8 @@ public abstract class KafkaJsonTableSourceFactoryTestBase { .withRowtimeAttribute("event-time", new ExistingField("time"), new AscendingTimestamps()) .build(); + TableSourceUtil.validateTableSource(builderSource); + // construct table source using descriptors and table source factory final Map<Integer, Long> offsets = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/d7b80b0a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java index d8e8f7d..504bed1 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java @@ -50,6 +50,7 @@ import org.apache.flink.table.factories.utils.TestTableFormat; import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.sources.RowtimeAttributeDescriptor; import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.sources.TableSourceUtil; import org.apache.flink.table.sources.tsextractors.ExistingField; import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps; import org.apache.flink.types.Row; @@ -115,7 +116,9 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger { final Map<String, String> fieldMapping = new HashMap<>(); fieldMapping.put(FRUIT_NAME, NAME); + fieldMapping.put(NAME, NAME); fieldMapping.put(COUNT, COUNT); + fieldMapping.put(TIME, TIME); final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>(); specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0); @@ -141,6 +144,8 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger { StartupMode.SPECIFIC_OFFSETS, specificOffsets); + TableSourceUtil.validateTableSource(expected); + // construct table source using descriptors and table source factory final TestTableDescriptor testDesc = new TestTableDescriptor( http://git-wip-us.apache.org/repos/asf/flink/blob/d7b80b0a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala index af2baba..f6cbb2b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala @@ -21,6 +21,8 @@ package org.apache.flink.table.descriptors import java.util import java.util.Optional +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.table.api.{TableException, TableSchema, ValidationException} import org.apache.flink.table.descriptors.DescriptorProperties.{toJava, toScala} import org.apache.flink.table.descriptors.RowtimeValidator._ @@ -222,23 +224,28 @@ object SchemaValidator { /** * Finds a table source field mapping. + * + * @param properties The properties describing a schema. + * @param inputType The input type that a connector and/or format produces. This parameter + * can be used to resolve a rowtime field against an input field. */ def deriveFieldMapping( properties: DescriptorProperties, - sourceSchema: Optional[TableSchema]) + inputType: Optional[TypeInformation[_]]) : util.Map[String, String] = { val mapping = mutable.Map[String, String]() val schema = properties.getTableSchema(SCHEMA) - // add all source fields first because rowtime might reference one of them - toScala(sourceSchema).map(_.getColumnNames).foreach { names => - names.foreach { name => - mapping.put(name, name) - } + val columnNames = toScala(inputType) match { + case Some(composite: CompositeType[_]) => composite.getFieldNames.toSeq + case _ => Seq[String]() } + // add all source fields first because rowtime might reference one of them + columnNames.foreach(name => mapping.put(name, name)) + // add all schema fields first for implicit mappings schema.getColumnNames.foreach { name => mapping.put(name, name) @@ -266,7 +273,7 @@ object SchemaValidator { mapping.remove(name) } // check for invalid fields - else if (toScala(sourceSchema).forall(s => !s.getColumnNames.contains(name))) { + else if (!columnNames.contains(name)) { throw new ValidationException(s"Could not map the schema field '$name' to a field " + s"from source. Please specify the source field from which it can be derived.") } http://git-wip-us.apache.org/repos/asf/flink/blob/d7b80b0a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala index c558057..a2eec4c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala @@ -67,7 +67,7 @@ class SchemaValidatorTest { "myField" -> "myField").asJava assertEquals( expectedMapping, - SchemaValidator.deriveFieldMapping(props, Optional.of(inputSchema))) + SchemaValidator.deriveFieldMapping(props, Optional.of(inputSchema.toRowType))) // test field format val formatSchema = SchemaValidator.deriveFormatFields(props) @@ -148,7 +148,7 @@ class SchemaValidatorTest { "myTime" -> "myTime").asJava assertEquals( expectedMapping, - SchemaValidator.deriveFieldMapping(props, Optional.of(inputSchema))) + SchemaValidator.deriveFieldMapping(props, Optional.of(inputSchema.toRowType))) // test field format val formatSchema = SchemaValidator.deriveFormatFields(props)