Repository: flink
Updated Branches:
  refs/heads/release-1.6 d180d599a -> d7b80b0aa


[FLINK-9934] [table] Fix invalid field mapping by Kafka table source factory

According to the DefinedFieldMapping interface the field mapping can also 
contain
the input fields. However, the Kafka table source factory was calling
SchemaValidator#deriveFieldMapping with its own schema instead of the input 
type.

This closes #6403.
This closes #3124.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d7b80b0a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d7b80b0a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d7b80b0a

Branch: refs/heads/release-1.6
Commit: d7b80b0aa0ae6451da46b910b07b17415cb2530a
Parents: d180d59
Author: Timo Walther <twal...@apache.org>
Authored: Tue Jul 24 11:40:36 2018 +0200
Committer: Timo Walther <twal...@apache.org>
Committed: Wed Jul 25 08:03:28 2018 +0200

----------------------------------------------------------------------
 .../kafka/KafkaTableSourceSinkFactoryBase.java  | 14 ++++++++-----
 .../KafkaJsonTableSourceFactoryTestBase.java    |  5 +++++
 .../KafkaTableSourceSinkFactoryTestBase.java    |  5 +++++
 .../table/descriptors/SchemaValidator.scala     | 21 +++++++++++++-------
 .../table/descriptors/SchemaValidatorTest.scala |  4 ++--
 5 files changed, 35 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d7b80b0a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
index 3307994..27b2e67 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
@@ -132,18 +132,20 @@ public abstract class KafkaTableSourceSinkFactoryBase 
implements
        public StreamTableSource<Row> createStreamTableSource(Map<String, 
String> properties) {
                final DescriptorProperties descriptorProperties = 
getValidatedProperties(properties);
 
-               final TableSchema schema = 
descriptorProperties.getTableSchema(SCHEMA());
                final String topic = 
descriptorProperties.getString(CONNECTOR_TOPIC);
+               final DeserializationSchema<Row> deserializationSchema = 
getDeserializationSchema(properties);
                final StartupOptions startupOptions = 
getStartupOptions(descriptorProperties, topic);
 
                return createKafkaTableSource(
-                       schema,
+                       descriptorProperties.getTableSchema(SCHEMA()),
                        
SchemaValidator.deriveProctimeAttribute(descriptorProperties),
                        
SchemaValidator.deriveRowtimeAttributes(descriptorProperties),
-                       
SchemaValidator.deriveFieldMapping(descriptorProperties, Optional.of(schema)),
+                       SchemaValidator.deriveFieldMapping(
+                               descriptorProperties,
+                               
Optional.of(deserializationSchema.getProducedType())),
                        topic,
                        getKafkaProperties(descriptorProperties),
-                       getDeserializationSchema(properties),
+                       deserializationSchema,
                        startupOptions.startupMode,
                        startupOptions.specificOffsets);
        }
@@ -318,7 +320,9 @@ public abstract class KafkaTableSourceSinkFactoryBase 
implements
        }
 
        private boolean checkForCustomFieldMapping(DescriptorProperties 
descriptorProperties, TableSchema schema) {
-               final Map<String, String> fieldMapping = 
SchemaValidator.deriveFieldMapping(descriptorProperties, Optional.of(schema));
+               final Map<String, String> fieldMapping = 
SchemaValidator.deriveFieldMapping(
+                       descriptorProperties,
+                       Optional.of(schema.toRowType())); // until FLINK-9870 
is fixed we assume that the table schema is the output type
                return fieldMapping.size() != schema.getColumnNames().length ||
                        !fieldMapping.entrySet().stream().allMatch(mapping -> 
mapping.getKey().equals(mapping.getValue()));
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d7b80b0a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
index 20da156..51c0e7b 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.descriptors.TestTableDescriptor;
 import org.apache.flink.table.factories.StreamTableSourceFactory;
 import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceUtil;
 import org.apache.flink.table.sources.tsextractors.ExistingField;
 import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
 
@@ -101,7 +102,9 @@ public abstract class KafkaJsonTableSourceFactoryTestBase {
 
                final Map<String, String> tableJsonMapping = new HashMap<>();
                tableJsonMapping.put("fruit-name", "name");
+               tableJsonMapping.put("name", "name");
                tableJsonMapping.put("count", "count");
+               tableJsonMapping.put("time", "time");
 
                final Properties props = new Properties();
                props.put("group.id", "test-group");
@@ -129,6 +132,8 @@ public abstract class KafkaJsonTableSourceFactoryTestBase {
                                .withRowtimeAttribute("event-time", new 
ExistingField("time"), new AscendingTimestamps())
                                .build();
 
+               TableSourceUtil.validateTableSource(builderSource);
+
                // construct table source using descriptors and table source 
factory
 
                final Map<Integer, Long> offsets = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/d7b80b0a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
index d8e8f7d..504bed1 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
@@ -50,6 +50,7 @@ import org.apache.flink.table.factories.utils.TestTableFormat;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceUtil;
 import org.apache.flink.table.sources.tsextractors.ExistingField;
 import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
 import org.apache.flink.types.Row;
@@ -115,7 +116,9 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
 
                final Map<String, String> fieldMapping = new HashMap<>();
                fieldMapping.put(FRUIT_NAME, NAME);
+               fieldMapping.put(NAME, NAME);
                fieldMapping.put(COUNT, COUNT);
+               fieldMapping.put(TIME, TIME);
 
                final Map<KafkaTopicPartition, Long> specificOffsets = new 
HashMap<>();
                specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_0), OFFSET_0);
@@ -141,6 +144,8 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
                        StartupMode.SPECIFIC_OFFSETS,
                        specificOffsets);
 
+               TableSourceUtil.validateTableSource(expected);
+
                // construct table source using descriptors and table source 
factory
 
                final TestTableDescriptor testDesc = new TestTableDescriptor(

http://git-wip-us.apache.org/repos/asf/flink/blob/d7b80b0a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
index af2baba..f6cbb2b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
@@ -21,6 +21,8 @@ package org.apache.flink.table.descriptors
 import java.util
 import java.util.Optional
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.table.api.{TableException, TableSchema, 
ValidationException}
 import org.apache.flink.table.descriptors.DescriptorProperties.{toJava, 
toScala}
 import org.apache.flink.table.descriptors.RowtimeValidator._
@@ -222,23 +224,28 @@ object SchemaValidator {
 
   /**
     * Finds a table source field mapping.
+    *
+    * @param properties The properties describing a schema.
+    * @param inputType  The input type that a connector and/or format 
produces. This parameter
+    *                   can be used to resolve a rowtime field against an 
input field.
     */
   def deriveFieldMapping(
       properties: DescriptorProperties,
-      sourceSchema: Optional[TableSchema])
+      inputType: Optional[TypeInformation[_]])
     : util.Map[String, String] = {
 
     val mapping = mutable.Map[String, String]()
 
     val schema = properties.getTableSchema(SCHEMA)
 
-    // add all source fields first because rowtime might reference one of them
-    toScala(sourceSchema).map(_.getColumnNames).foreach { names =>
-      names.foreach { name =>
-        mapping.put(name, name)
-      }
+    val columnNames = toScala(inputType) match {
+      case Some(composite: CompositeType[_]) => composite.getFieldNames.toSeq
+      case _ => Seq[String]()
     }
 
+    // add all source fields first because rowtime might reference one of them
+    columnNames.foreach(name => mapping.put(name, name))
+
     // add all schema fields first for implicit mappings
     schema.getColumnNames.foreach { name =>
       mapping.put(name, name)
@@ -266,7 +273,7 @@ object SchemaValidator {
             mapping.remove(name)
           }
           // check for invalid fields
-          else if (toScala(sourceSchema).forall(s => 
!s.getColumnNames.contains(name))) {
+          else if (!columnNames.contains(name)) {
             throw new ValidationException(s"Could not map the schema field 
'$name' to a field " +
               s"from source. Please specify the source field from which it can 
be derived.")
           }

http://git-wip-us.apache.org/repos/asf/flink/blob/d7b80b0a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
index c558057..a2eec4c 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
@@ -67,7 +67,7 @@ class SchemaValidatorTest {
       "myField" -> "myField").asJava
     assertEquals(
       expectedMapping,
-      SchemaValidator.deriveFieldMapping(props, Optional.of(inputSchema)))
+      SchemaValidator.deriveFieldMapping(props, 
Optional.of(inputSchema.toRowType)))
 
     // test field format
     val formatSchema = SchemaValidator.deriveFormatFields(props)
@@ -148,7 +148,7 @@ class SchemaValidatorTest {
       "myTime" -> "myTime").asJava
     assertEquals(
       expectedMapping,
-      SchemaValidator.deriveFieldMapping(props, Optional.of(inputSchema)))
+      SchemaValidator.deriveFieldMapping(props, 
Optional.of(inputSchema.toRowType)))
 
     // test field format
     val formatSchema = SchemaValidator.deriveFormatFields(props)

Reply via email to