This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 833b46b51826e05ddec0eacea2f6919a7c0aae8e
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Thu Dec 19 14:12:45 2019 +0100

    [hotfix][table] Remove duplicated code for physical to logical mapping
---
 .../kafka/KafkaTableSourceSinkFactoryTestBase.java |  28 +--
 .../io/jdbc/JDBCTableSourceSinkFactoryTest.java    |   4 +-
 .../table/api/internal/TableEnvironmentImpl.java   |   2 +-
 .../flink/table/sources/TableSourceValidation.java | 230 ++++-----------------
 .../table/api/internal/BatchTableEnvImpl.scala     |   2 +-
 .../flink/table/plan/schema/TableSourceTable.scala |   2 +-
 6 files changed, 56 insertions(+), 212 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
index 8d30736..755630a 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
@@ -128,13 +128,13 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
                specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_0), OFFSET_0);
                specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_1), OFFSET_1);
 
+               TableSchema tableSchema = TableSchema.builder()
+                       .field(NAME, DataTypes.STRING())
+                       .field(COUNT, DataTypes.DECIMAL(10, 3))
+                       .field(TIME, DataTypes.TIMESTAMP(3))
+                       .build();
                final TestDeserializationSchema deserializationSchema = new 
TestDeserializationSchema(
-                       TableSchema.builder()
-                               .field(NAME, DataTypes.STRING())
-                               .field(COUNT, DataTypes.DECIMAL(10, 3))
-                               .field(TIME, DataTypes.TIMESTAMP(3))
-                               .build()
-                               .toRowType()
+                       tableSchema.toRowType()
                );
 
                final KafkaTableSourceBase expected = 
getExpectedKafkaTableSource(
@@ -148,7 +148,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
                        StartupMode.SPECIFIC_OFFSETS,
                        specificOffsets);
 
-               TableSourceValidation.validateTableSource(expected);
+               TableSourceValidation.validateTableSource(expected, 
tableSchema);
 
                // construct table source using descriptors and table source 
factory
                final Map<String, String> propertiesMap = new HashMap<>();
@@ -196,13 +196,13 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
                specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_0), OFFSET_0);
                specificOffsets.put(new KafkaTopicPartition(TOPIC, 
PARTITION_1), OFFSET_1);
 
+               TableSchema tableSchema = TableSchema.builder()
+                       .field(NAME, DataTypes.STRING())
+                       .field(COUNT, DataTypes.DECIMAL(10, 3))
+                       .field(TIME, DataTypes.TIMESTAMP(3))
+                       .build();
                final TestDeserializationSchema deserializationSchema = new 
TestDeserializationSchema(
-                       TableSchema.builder()
-                               .field(NAME, DataTypes.STRING())
-                               .field(COUNT, DataTypes.DECIMAL(10, 3))
-                               .field(TIME, DataTypes.TIMESTAMP(3))
-                               .build()
-                               .toRowType()
+                       tableSchema.toRowType()
                );
 
                final KafkaTableSourceBase expected = 
getExpectedKafkaTableSource(
@@ -216,7 +216,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase 
extends TestLogger {
                        StartupMode.SPECIFIC_OFFSETS,
                        specificOffsets);
 
-               TableSourceValidation.validateTableSource(expected);
+               TableSourceValidation.validateTableSource(expected, 
tableSchema);
 
                // construct table source using descriptors and table source 
factory
                final Map<String, String> legacyPropertiesMap = new HashMap<>();
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactoryTest.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactoryTest.java
index 995597d..f9cf971 100644
--- 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactoryTest.java
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactoryTest.java
@@ -76,8 +76,8 @@ public class JDBCTableSourceSinkFactoryTest {
                        .setSchema(schema)
                        .build();
 
-               TableSourceValidation.validateTableSource(expected);
-               TableSourceValidation.validateTableSource(actual);
+               TableSourceValidation.validateTableSource(expected, schema);
+               TableSourceValidation.validateTableSource(actual, schema);
                assertEquals(expected, actual);
        }
 
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 5429509..748c339 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -675,7 +675,7 @@ public class TableEnvironmentImpl implements 
TableEnvironment {
         * @param tableSource tableSource to validate
         */
        protected void validateTableSource(TableSource<?> tableSource) {
-               TableSourceValidation.validateTableSource(tableSource);
+               TableSourceValidation.validateTableSource(tableSource, 
tableSource.getTableSchema());
        }
 
        private void translate(List<ModifyOperation> modifyOperations) {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java
index 3aa2eaf..6bc84d2 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java
@@ -20,27 +20,20 @@ package org.apache.flink.table.sources;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.types.AtomicDataType;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.FieldsDataType;
-import org.apache.flink.table.types.logical.LegacyTypeInformationType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.LogicalTypeFamily;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
-import org.apache.flink.table.types.utils.DataTypeDefaultVisitor;
-import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.table.expressions.ResolvedFieldReference;
+import org.apache.flink.table.sources.tsextractors.TimestampExtractor;
+import org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils;
 import org.apache.flink.table.utils.TableSchemaUtils;
+import org.apache.flink.table.utils.TypeMappingUtils;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-
-import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasFamily;
+import java.util.function.Function;
 
 /**
  * Logic to validate {@link TableSource} types.
@@ -59,9 +52,7 @@ public class TableSourceValidation {
         *
         * @param tableSource The {@link TableSource} for which the time 
attributes are checked.
         */
-       public static void validateTableSource(TableSource<?> tableSource){
-               TableSchema schema = tableSource.getTableSchema();
-
+       public static void validateTableSource(TableSource<?> tableSource, 
TableSchema schema){
                List<RowtimeAttributeDescriptor> rowtimeAttributes = 
getRowtimeAttributes(tableSource);
                Optional<String> proctimeAttribute = 
getProctimeAttribute(tableSource);
 
@@ -69,7 +60,7 @@ public class TableSourceValidation {
                validateSingleRowtimeAttribute(rowtimeAttributes);
                validateRowtimeAttributesExistInSchema(rowtimeAttributes, 
schema);
                validateProctimeAttributesExistInSchema(proctimeAttribute, 
schema);
-               validateLogicalToPhysicalMapping(tableSource, schema, 
rowtimeAttributes, proctimeAttribute);
+               validateLogicalToPhysicalMapping(tableSource, schema);
                validateTimestampExtractorArguments(rowtimeAttributes, 
tableSource);
                validateNotOverlapping(rowtimeAttributes, proctimeAttribute);
        }
@@ -135,86 +126,46 @@ public class TableSourceValidation {
 
        private static void validateLogicalToPhysicalMapping(
                        TableSource<?> tableSource,
-                       TableSchema schema,
-                       List<RowtimeAttributeDescriptor> rowtimeAttributes,
-                       Optional<String> proctimeAttribute) {
-               // validate that schema fields can be resolved to a return type 
field of correct type
-               int mappedFieldCnt = 0;
-               for (int i = 0; i < schema.getFieldCount(); i++) {
-                       DataType fieldType = schema.getFieldDataType(i).get();
-                       LogicalType logicalFieldType = 
fieldType.getLogicalType();
-                       String fieldName = schema.getFieldName(i).get();
-
-                       if (proctimeAttribute.map(p -> 
p.equals(fieldName)).orElse(false)) {
-                               if (!(hasFamily(logicalFieldType, 
LogicalTypeFamily.TIMESTAMP))) {
-                                       throw new 
ValidationException(String.format("Processing time field '%s' has invalid type 
%s. " +
-                                               "Processing time attributes 
must be of type SQL_TIMESTAMP.", fieldName, logicalFieldType));
-                               }
-                       } else if (rowtimeAttributes.stream().anyMatch(p -> 
p.getAttributeName().equals(fieldName))) {
-                               if (!(hasFamily(logicalFieldType, 
LogicalTypeFamily.TIMESTAMP))) {
-                                       throw new 
ValidationException(String.format("Rowtime time field '%s' has invalid type %s. 
" +
-                                               "Rowtime time attributes must 
be of type SQL_TIMESTAMP.", fieldName, logicalFieldType));
-                               }
-                       } else {
-                               validateLogicalTypeEqualsPhysical(fieldName, 
fieldType, tableSource);
-                               mappedFieldCnt += 1;
-                       }
-               }
-
-               // ensure that only one field is mapped to an atomic type
-               DataType producedDataType = tableSource.getProducedDataType();
-               if (!isCompositeType(producedDataType) && mappedFieldCnt > 1) {
-                       throw new ValidationException(
-                               String.format(
-                                       "More than one table field matched to 
atomic input type %s.",
-                                       producedDataType));
-               }
-       }
-
-       private static boolean isCompositeType(DataType producedDataType) {
-               LogicalType logicalType = producedDataType.getLogicalType();
-               return producedDataType instanceof FieldsDataType ||
-                       (logicalType instanceof LegacyTypeInformationType &&
-                               ((LegacyTypeInformationType) 
logicalType).getTypeInformation() instanceof CompositeType);
+                       TableSchema schema) {
+               final Function<String, String> fieldMapping = 
getNameMappingFunction(tableSource);
+
+               // if we can
+               TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(
+                       tableSource,
+                       schema.getTableColumns(),
+                       true, // this makes no difference for validation, we 
don't care about the returned indices
+                       fieldMapping
+               );
        }
 
-       private static void validateLogicalTypeEqualsPhysical(
-                       String fieldName,
-                       DataType logicalType,
-                       TableSource<?> tableSource) {
-               ResolvedField resolvedField = resolveField(fieldName, 
tableSource);
-               if 
(!resolvedField.getType().getLogicalType().equals(logicalType.getLogicalType()))
 {
-
-                       if (resolvedField.getType().getLogicalType() instanceof 
LegacyTypeInformationType &&
-                               logicalType.getLogicalType().getTypeRoot() == 
LogicalTypeRoot.DECIMAL &&
-                               logicalType.getLogicalType().getTypeRoot() == 
resolvedField.getType().getLogicalType().getTypeRoot()) {
-                               // the resolvedField DataType may be derived 
from TypeInformation,
-                               // then it might be 
LegacyTypeInformationType(BigDecimal) which doesn't equal to logical type 
DECIMAL(p,s)
-                               // however, this doesn't break type equality 
for a source
-                               return;
+       private static Function<String, String> 
getNameMappingFunction(TableSource<?> tableSource) {
+               final Function<String, String> fieldMapping;
+               if (tableSource instanceof DefinedFieldMapping &&
+                       ((DefinedFieldMapping) tableSource).getFieldMapping() 
!= null) {
+                       Map<String, String> fieldsMap = ((DefinedFieldMapping) 
tableSource).getFieldMapping();
+                       if (fieldsMap != null) {
+                               fieldMapping = fieldsMap::get;
+                       } else {
+                               fieldMapping = Function.identity();
                        }
-                       throw new ValidationException(String.format(
-                               "Type '%s' of table field '%s' does not match 
with type '%s' of field '%s' of the TableSource.",
-                               logicalType,
-                               fieldName,
-                               resolvedField.getType(),
-                               resolvedField.getName()));
+               } else {
+                       fieldMapping = Function.identity();
                }
+               return fieldMapping;
        }
 
        private static void validateTimestampExtractorArguments(
                        List<RowtimeAttributeDescriptor> descriptors,
                        TableSource<?> tableSource) {
                if (descriptors.size() == 1) {
-                       RowtimeAttributeDescriptor descriptor = 
descriptors.get(0);
-                       // look up extractor input fields in return type
-                       String[] extractorInputFields = 
descriptor.getTimestampExtractor().getArgumentFields();
-                       TypeInformation[] physicalTypes = 
Arrays.stream(extractorInputFields)
-                               .map(fieldName -> resolveField(fieldName, 
tableSource))
-                               .map(resolvedField -> 
TypeConversions.fromDataTypeToLegacyInfo(resolvedField.getType()))
-                               .toArray(TypeInformation[]::new);
-                       // validate timestamp extractor
-                       
descriptor.getTimestampExtractor().validateArgumentFields(physicalTypes);
+                       TimestampExtractor extractor = 
descriptors.get(0).getTimestampExtractor();
+                       TypeInformation<?>[] types = 
Arrays.stream(TimestampExtractorUtils.getAccessedFields(
+                               extractor,
+                               tableSource.getProducedDataType(),
+                               getNameMappingFunction(tableSource)
+                       )).map(ResolvedFieldReference::resultType)
+                               .toArray(TypeInformation<?>[]::new);
+                       extractor.validateArgumentFields(types);
                }
        }
 
@@ -225,113 +176,6 @@ public class TableSourceValidation {
                }
        }
 
-       private static class ResolvedField {
-               private final String name;
-               private final DataType type;
-
-               private ResolvedField(String name, DataType type) {
-                       this.type = type;
-                       this.name = name;
-               }
-
-               public DataType getType() {
-                       return type;
-               }
-
-               public String getName() {
-                       return name;
-               }
-       }
-
-       /**
-        * Identifies for a field name of the logical schema, the corresponding 
physical field in the
-        * return type of a {@link TableSource}.
-        *
-        * @param fieldName The logical field to look up.
-        * @param tableSource The table source in which to look for the field.
-        * @return The name, index, and type information of the physical field.
-        */
-       private static ResolvedField resolveField(String fieldName, 
TableSource<?> tableSource) {
-
-               DataType producedDataType = tableSource.getProducedDataType();
-
-               if (tableSource instanceof DefinedFieldMapping) {
-                       Map<String, String> fieldMapping = 
((DefinedFieldMapping) tableSource).getFieldMapping();
-                       if (fieldMapping != null) {
-                               String resolvedFieldName = 
fieldMapping.get(fieldName);
-                               if (resolvedFieldName == null) {
-                                       throw new 
ValidationException(String.format(
-                                               "Field '%s' could not be 
resolved by the field mapping.",
-                                               fieldName));
-                               }
-
-                               return new ResolvedField(
-                                       resolvedFieldName,
-                                       lookupFieldType(
-                                               producedDataType,
-                                               resolvedFieldName,
-                                               String.format(
-                                                       "Table field '%s' was 
resolved to TableSource return type field " +
-                                                               "'%s', but 
field '%s' was not found in the return " +
-                                                               "type %s of the 
TableSource. " +
-                                                               "Please verify 
the field mapping of the TableSource.",
-                                                       fieldName,
-                                                       resolvedFieldName,
-                                                       resolvedFieldName,
-                                                       producedDataType)));
-                       }
-               }
-
-               return new ResolvedField(
-                       fieldName,
-                       lookupFieldType(
-                               producedDataType,
-                               fieldName,
-                               String.format(
-                                       "Table field '%s' was not found in the 
return type %s of the TableSource.",
-                                       fieldName,
-                                       producedDataType)));
-       }
-
-       /** Look up a field by name in a {@link DataType}. */
-       private static DataType lookupFieldType(DataType inputType, String 
fieldName, String failMsg) {
-               return inputType.accept(new 
TypeExtractor(fieldName)).orElseThrow(() -> new ValidationException(failMsg));
-       }
-
-       private static class TypeExtractor extends 
DataTypeDefaultVisitor<Optional<DataType>> {
-               private final String fieldName;
-
-               TypeExtractor(String fieldName) {
-                       this.fieldName = fieldName;
-               }
-
-               @Override
-               public Optional<DataType> visit(AtomicDataType atomicDataType) {
-                       //  This is check for backwards compatibility. We 
should also support legacy type with composite type info
-                       LogicalType logicalType = 
atomicDataType.getLogicalType();
-                       if (logicalType instanceof LegacyTypeInformationType) {
-                               LegacyTypeInformationType<?> 
legacyTypeInformationType = (LegacyTypeInformationType<?>) logicalType;
-                               TypeInformation<?> typeInformation = 
legacyTypeInformationType.getTypeInformation();
-                               if (typeInformation instanceof 
CompositeType<?>) {
-                                       CompositeType<?> compositeType = 
(CompositeType<?>) typeInformation;
-                                       return 
Optional.of(TypeConversions.fromLegacyInfoToDataType(compositeType.getTypeAt(fieldName)));
-                               }
-                       }
-
-                       return Optional.of(atomicDataType);
-               }
-
-               @Override
-               public Optional<DataType> visit(FieldsDataType fieldsDataType) {
-                       return 
Optional.ofNullable(fieldsDataType.getFieldDataTypes().get(fieldName));
-               }
-
-               @Override
-               protected Optional<DataType> defaultMethod(DataType dataType) {
-                       return Optional.of(dataType);
-               }
-       }
-
        /** Returns a list with all rowtime attribute descriptors of the {@link 
TableSource}. */
        private static List<RowtimeAttributeDescriptor> 
getRowtimeAttributes(TableSource<?> tableSource) {
                if (tableSource instanceof DefinedRowtimeAttributes) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
index 815cbb4..53026ce 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala
@@ -95,7 +95,7 @@ abstract class BatchTableEnvImpl(
     * @param tableSource The [[TableSource]] to register.
     */
   override protected def validateTableSource(tableSource: TableSource[_]): 
Unit = {
-    TableSourceValidation.validateTableSource(tableSource)
+    TableSourceValidation.validateTableSource(tableSource, 
tableSource.getTableSchema)
 
     if (!tableSource.isInstanceOf[BatchTableSource[_]] &&
         !tableSource.isInstanceOf[InputFormatTableSource[_]]) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
index f14fe03..a134968 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
@@ -39,7 +39,7 @@ class TableSourceTable[T](
     val statistic: FlinkStatistic)
   extends AbstractTable {
 
-  TableSourceValidation.validateTableSource(tableSource)
+  TableSourceValidation.validateTableSource(tableSource, 
tableSource.getTableSchema)
 
   /**
     * Returns statistics of current table

Reply via email to