This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 833b46b51826e05ddec0eacea2f6919a7c0aae8e Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Thu Dec 19 14:12:45 2019 +0100 [hotfix][table] Remove duplicated code for physical to logical mapping --- .../kafka/KafkaTableSourceSinkFactoryTestBase.java | 28 +-- .../io/jdbc/JDBCTableSourceSinkFactoryTest.java | 4 +- .../table/api/internal/TableEnvironmentImpl.java | 2 +- .../flink/table/sources/TableSourceValidation.java | 230 ++++----------------- .../table/api/internal/BatchTableEnvImpl.scala | 2 +- .../flink/table/plan/schema/TableSourceTable.scala | 2 +- 6 files changed, 56 insertions(+), 212 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java index 8d30736..755630a 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java @@ -128,13 +128,13 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger { specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0); specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1); + TableSchema tableSchema = TableSchema.builder() + .field(NAME, DataTypes.STRING()) + .field(COUNT, DataTypes.DECIMAL(10, 3)) + .field(TIME, DataTypes.TIMESTAMP(3)) + .build(); final TestDeserializationSchema deserializationSchema = new TestDeserializationSchema( - TableSchema.builder() - .field(NAME, DataTypes.STRING()) - .field(COUNT, DataTypes.DECIMAL(10, 3)) - .field(TIME, DataTypes.TIMESTAMP(3)) - .build() - .toRowType() + tableSchema.toRowType() ); final KafkaTableSourceBase expected = getExpectedKafkaTableSource( @@ -148,7 +148,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger { StartupMode.SPECIFIC_OFFSETS, specificOffsets); - TableSourceValidation.validateTableSource(expected); + TableSourceValidation.validateTableSource(expected, tableSchema); // construct table source using descriptors and table source factory final Map<String, String> propertiesMap = new HashMap<>(); @@ -196,13 +196,13 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger { specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0); specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1); + TableSchema tableSchema = TableSchema.builder() + .field(NAME, DataTypes.STRING()) + .field(COUNT, DataTypes.DECIMAL(10, 3)) + .field(TIME, DataTypes.TIMESTAMP(3)) + .build(); final TestDeserializationSchema deserializationSchema = new TestDeserializationSchema( - TableSchema.builder() - .field(NAME, DataTypes.STRING()) - .field(COUNT, DataTypes.DECIMAL(10, 3)) - .field(TIME, DataTypes.TIMESTAMP(3)) - .build() - .toRowType() + tableSchema.toRowType() ); final KafkaTableSourceBase expected = getExpectedKafkaTableSource( @@ -216,7 +216,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger { StartupMode.SPECIFIC_OFFSETS, specificOffsets); - TableSourceValidation.validateTableSource(expected); + TableSourceValidation.validateTableSource(expected, tableSchema); // construct table source using descriptors and table source factory final Map<String, String> legacyPropertiesMap = new HashMap<>(); diff --git a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactoryTest.java b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactoryTest.java index 995597d..f9cf971 100644 --- a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactoryTest.java +++ b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceSinkFactoryTest.java @@ -76,8 +76,8 @@ public class JDBCTableSourceSinkFactoryTest { .setSchema(schema) .build(); - TableSourceValidation.validateTableSource(expected); - TableSourceValidation.validateTableSource(actual); + TableSourceValidation.validateTableSource(expected, schema); + TableSourceValidation.validateTableSource(actual, schema); assertEquals(expected, actual); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 5429509..748c339 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -675,7 +675,7 @@ public class TableEnvironmentImpl implements TableEnvironment { * @param tableSource tableSource to validate */ protected void validateTableSource(TableSource<?> tableSource) { - TableSourceValidation.validateTableSource(tableSource); + TableSourceValidation.validateTableSource(tableSource, tableSource.getTableSchema()); } private void translate(List<ModifyOperation> modifyOperations) { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java index 3aa2eaf..6bc84d2 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/TableSourceValidation.java @@ -20,27 +20,20 @@ package org.apache.flink.table.sources; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.CompositeType; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.types.AtomicDataType; -import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.FieldsDataType; -import org.apache.flink.table.types.logical.LegacyTypeInformationType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.LogicalTypeFamily; -import org.apache.flink.table.types.logical.LogicalTypeRoot; -import org.apache.flink.table.types.utils.DataTypeDefaultVisitor; -import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.table.expressions.ResolvedFieldReference; +import org.apache.flink.table.sources.tsextractors.TimestampExtractor; +import org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils; import org.apache.flink.table.utils.TableSchemaUtils; +import org.apache.flink.table.utils.TypeMappingUtils; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; - -import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasFamily; +import java.util.function.Function; /** * Logic to validate {@link TableSource} types. @@ -59,9 +52,7 @@ public class TableSourceValidation { * * @param tableSource The {@link TableSource} for which the time attributes are checked. */ - public static void validateTableSource(TableSource<?> tableSource){ - TableSchema schema = tableSource.getTableSchema(); - + public static void validateTableSource(TableSource<?> tableSource, TableSchema schema){ List<RowtimeAttributeDescriptor> rowtimeAttributes = getRowtimeAttributes(tableSource); Optional<String> proctimeAttribute = getProctimeAttribute(tableSource); @@ -69,7 +60,7 @@ public class TableSourceValidation { validateSingleRowtimeAttribute(rowtimeAttributes); validateRowtimeAttributesExistInSchema(rowtimeAttributes, schema); validateProctimeAttributesExistInSchema(proctimeAttribute, schema); - validateLogicalToPhysicalMapping(tableSource, schema, rowtimeAttributes, proctimeAttribute); + validateLogicalToPhysicalMapping(tableSource, schema); validateTimestampExtractorArguments(rowtimeAttributes, tableSource); validateNotOverlapping(rowtimeAttributes, proctimeAttribute); } @@ -135,86 +126,46 @@ public class TableSourceValidation { private static void validateLogicalToPhysicalMapping( TableSource<?> tableSource, - TableSchema schema, - List<RowtimeAttributeDescriptor> rowtimeAttributes, - Optional<String> proctimeAttribute) { - // validate that schema fields can be resolved to a return type field of correct type - int mappedFieldCnt = 0; - for (int i = 0; i < schema.getFieldCount(); i++) { - DataType fieldType = schema.getFieldDataType(i).get(); - LogicalType logicalFieldType = fieldType.getLogicalType(); - String fieldName = schema.getFieldName(i).get(); - - if (proctimeAttribute.map(p -> p.equals(fieldName)).orElse(false)) { - if (!(hasFamily(logicalFieldType, LogicalTypeFamily.TIMESTAMP))) { - throw new ValidationException(String.format("Processing time field '%s' has invalid type %s. " + - "Processing time attributes must be of type SQL_TIMESTAMP.", fieldName, logicalFieldType)); - } - } else if (rowtimeAttributes.stream().anyMatch(p -> p.getAttributeName().equals(fieldName))) { - if (!(hasFamily(logicalFieldType, LogicalTypeFamily.TIMESTAMP))) { - throw new ValidationException(String.format("Rowtime time field '%s' has invalid type %s. " + - "Rowtime time attributes must be of type SQL_TIMESTAMP.", fieldName, logicalFieldType)); - } - } else { - validateLogicalTypeEqualsPhysical(fieldName, fieldType, tableSource); - mappedFieldCnt += 1; - } - } - - // ensure that only one field is mapped to an atomic type - DataType producedDataType = tableSource.getProducedDataType(); - if (!isCompositeType(producedDataType) && mappedFieldCnt > 1) { - throw new ValidationException( - String.format( - "More than one table field matched to atomic input type %s.", - producedDataType)); - } - } - - private static boolean isCompositeType(DataType producedDataType) { - LogicalType logicalType = producedDataType.getLogicalType(); - return producedDataType instanceof FieldsDataType || - (logicalType instanceof LegacyTypeInformationType && - ((LegacyTypeInformationType) logicalType).getTypeInformation() instanceof CompositeType); + TableSchema schema) { + final Function<String, String> fieldMapping = getNameMappingFunction(tableSource); + + // if we can + TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers( + tableSource, + schema.getTableColumns(), + true, // this makes no difference for validation, we don't care about the returned indices + fieldMapping + ); } - private static void validateLogicalTypeEqualsPhysical( - String fieldName, - DataType logicalType, - TableSource<?> tableSource) { - ResolvedField resolvedField = resolveField(fieldName, tableSource); - if (!resolvedField.getType().getLogicalType().equals(logicalType.getLogicalType())) { - - if (resolvedField.getType().getLogicalType() instanceof LegacyTypeInformationType && - logicalType.getLogicalType().getTypeRoot() == LogicalTypeRoot.DECIMAL && - logicalType.getLogicalType().getTypeRoot() == resolvedField.getType().getLogicalType().getTypeRoot()) { - // the resolvedField DataType may be derived from TypeInformation, - // then it might be LegacyTypeInformationType(BigDecimal) which doesn't equal to logical type DECIMAL(p,s) - // however, this doesn't break type equality for a source - return; + private static Function<String, String> getNameMappingFunction(TableSource<?> tableSource) { + final Function<String, String> fieldMapping; + if (tableSource instanceof DefinedFieldMapping && + ((DefinedFieldMapping) tableSource).getFieldMapping() != null) { + Map<String, String> fieldsMap = ((DefinedFieldMapping) tableSource).getFieldMapping(); + if (fieldsMap != null) { + fieldMapping = fieldsMap::get; + } else { + fieldMapping = Function.identity(); } - throw new ValidationException(String.format( - "Type '%s' of table field '%s' does not match with type '%s' of field '%s' of the TableSource.", - logicalType, - fieldName, - resolvedField.getType(), - resolvedField.getName())); + } else { + fieldMapping = Function.identity(); } + return fieldMapping; } private static void validateTimestampExtractorArguments( List<RowtimeAttributeDescriptor> descriptors, TableSource<?> tableSource) { if (descriptors.size() == 1) { - RowtimeAttributeDescriptor descriptor = descriptors.get(0); - // look up extractor input fields in return type - String[] extractorInputFields = descriptor.getTimestampExtractor().getArgumentFields(); - TypeInformation[] physicalTypes = Arrays.stream(extractorInputFields) - .map(fieldName -> resolveField(fieldName, tableSource)) - .map(resolvedField -> TypeConversions.fromDataTypeToLegacyInfo(resolvedField.getType())) - .toArray(TypeInformation[]::new); - // validate timestamp extractor - descriptor.getTimestampExtractor().validateArgumentFields(physicalTypes); + TimestampExtractor extractor = descriptors.get(0).getTimestampExtractor(); + TypeInformation<?>[] types = Arrays.stream(TimestampExtractorUtils.getAccessedFields( + extractor, + tableSource.getProducedDataType(), + getNameMappingFunction(tableSource) + )).map(ResolvedFieldReference::resultType) + .toArray(TypeInformation<?>[]::new); + extractor.validateArgumentFields(types); } } @@ -225,113 +176,6 @@ public class TableSourceValidation { } } - private static class ResolvedField { - private final String name; - private final DataType type; - - private ResolvedField(String name, DataType type) { - this.type = type; - this.name = name; - } - - public DataType getType() { - return type; - } - - public String getName() { - return name; - } - } - - /** - * Identifies for a field name of the logical schema, the corresponding physical field in the - * return type of a {@link TableSource}. - * - * @param fieldName The logical field to look up. - * @param tableSource The table source in which to look for the field. - * @return The name, index, and type information of the physical field. - */ - private static ResolvedField resolveField(String fieldName, TableSource<?> tableSource) { - - DataType producedDataType = tableSource.getProducedDataType(); - - if (tableSource instanceof DefinedFieldMapping) { - Map<String, String> fieldMapping = ((DefinedFieldMapping) tableSource).getFieldMapping(); - if (fieldMapping != null) { - String resolvedFieldName = fieldMapping.get(fieldName); - if (resolvedFieldName == null) { - throw new ValidationException(String.format( - "Field '%s' could not be resolved by the field mapping.", - fieldName)); - } - - return new ResolvedField( - resolvedFieldName, - lookupFieldType( - producedDataType, - resolvedFieldName, - String.format( - "Table field '%s' was resolved to TableSource return type field " + - "'%s', but field '%s' was not found in the return " + - "type %s of the TableSource. " + - "Please verify the field mapping of the TableSource.", - fieldName, - resolvedFieldName, - resolvedFieldName, - producedDataType))); - } - } - - return new ResolvedField( - fieldName, - lookupFieldType( - producedDataType, - fieldName, - String.format( - "Table field '%s' was not found in the return type %s of the TableSource.", - fieldName, - producedDataType))); - } - - /** Look up a field by name in a {@link DataType}. */ - private static DataType lookupFieldType(DataType inputType, String fieldName, String failMsg) { - return inputType.accept(new TypeExtractor(fieldName)).orElseThrow(() -> new ValidationException(failMsg)); - } - - private static class TypeExtractor extends DataTypeDefaultVisitor<Optional<DataType>> { - private final String fieldName; - - TypeExtractor(String fieldName) { - this.fieldName = fieldName; - } - - @Override - public Optional<DataType> visit(AtomicDataType atomicDataType) { - // This is check for backwards compatibility. We should also support legacy type with composite type info - LogicalType logicalType = atomicDataType.getLogicalType(); - if (logicalType instanceof LegacyTypeInformationType) { - LegacyTypeInformationType<?> legacyTypeInformationType = (LegacyTypeInformationType<?>) logicalType; - TypeInformation<?> typeInformation = legacyTypeInformationType.getTypeInformation(); - if (typeInformation instanceof CompositeType<?>) { - CompositeType<?> compositeType = (CompositeType<?>) typeInformation; - return Optional.of(TypeConversions.fromLegacyInfoToDataType(compositeType.getTypeAt(fieldName))); - } - } - - return Optional.of(atomicDataType); - } - - @Override - public Optional<DataType> visit(FieldsDataType fieldsDataType) { - return Optional.ofNullable(fieldsDataType.getFieldDataTypes().get(fieldName)); - } - - @Override - protected Optional<DataType> defaultMethod(DataType dataType) { - return Optional.of(dataType); - } - } - /** Returns a list with all rowtime attribute descriptors of the {@link TableSource}. */ private static List<RowtimeAttributeDescriptor> getRowtimeAttributes(TableSource<?> tableSource) { if (tableSource instanceof DefinedRowtimeAttributes) { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala index 815cbb4..53026ce 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/BatchTableEnvImpl.scala @@ -95,7 +95,7 @@ abstract class BatchTableEnvImpl( * @param tableSource The [[TableSource]] to register. */ override protected def validateTableSource(tableSource: TableSource[_]): Unit = { - TableSourceValidation.validateTableSource(tableSource) + TableSourceValidation.validateTableSource(tableSource, tableSource.getTableSchema) if (!tableSource.isInstanceOf[BatchTableSource[_]] && !tableSource.isInstanceOf[InputFormatTableSource[_]]) { diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala index f14fe03..a134968 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala @@ -39,7 +39,7 @@ class TableSourceTable[T]( val statistic: FlinkStatistic) extends AbstractTable { - TableSourceValidation.validateTableSource(tableSource) + TableSourceValidation.validateTableSource(tableSource, tableSource.getTableSchema) /** * Returns statistics of current table