This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new 0625b31 [FLINK-18673][table] Improve support for ROW constructor 0625b31 is described below commit 0625b31521247d03b1e2b5280120c6546865da49 Author: Timo Walther <twal...@apache.org> AuthorDate: Fri Nov 13 15:52:18 2020 +0100 [FLINK-18673][table] Improve support for ROW constructor This closes #14067. --- .../types/logical/utils/LogicalTypeCasts.java | 22 +++++++++++----------- .../flink/table/types/LogicalTypeCastsTest.java | 12 ++++++++++++ .../planner/calcite/FlinkCalciteSqlValidator.java | 10 ++++++++++ .../table/planner/calcite/FlinkTypeFactory.scala | 5 +++++ 4 files changed, 38 insertions(+), 11 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java index ea63e0a..56d5b27 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java @@ -289,14 +289,9 @@ public final class LogicalTypeCasts { final LogicalTypeRoot sourceRoot = sourceType.getTypeRoot(); final LogicalTypeRoot targetRoot = targetType.getTypeRoot(); - if (hasFamily(sourceType, INTERVAL) && hasFamily(targetType, EXACT_NUMERIC)) { - // cast between interval and exact numeric is only supported if interval has a single field - return isSingleFieldInterval(sourceType); - } else if (hasFamily(sourceType, EXACT_NUMERIC) && hasFamily(targetType, INTERVAL)) { - // cast between interval and exact numeric is only supported if interval has a single field - return isSingleFieldInterval(targetType); - } else if (hasFamily(sourceType, CONSTRUCTED) || hasFamily(targetType, CONSTRUCTED)) { - return supportsConstructedCasting(sourceType, targetType, allowExplicit); + if (sourceRoot == NULL) { + // null can be cast to an arbitrary type + return true; } else if (sourceRoot == DISTINCT_TYPE && targetRoot == DISTINCT_TYPE) { // the two distinct types are not equal (from initial invariant), casting is not possible return false; @@ -304,12 +299,17 @@ public final class LogicalTypeCasts { return supportsCasting(((DistinctType) sourceType).getSourceType(), targetType, allowExplicit); } else if (targetRoot == DISTINCT_TYPE) { return supportsCasting(sourceType, ((DistinctType) targetType).getSourceType(), allowExplicit); + } else if (hasFamily(sourceType, INTERVAL) && hasFamily(targetType, EXACT_NUMERIC)) { + // cast between interval and exact numeric is only supported if interval has a single field + return isSingleFieldInterval(sourceType); + } else if (hasFamily(sourceType, EXACT_NUMERIC) && hasFamily(targetType, INTERVAL)) { + // cast between interval and exact numeric is only supported if interval has a single field + return isSingleFieldInterval(targetType); + } else if (hasFamily(sourceType, CONSTRUCTED) || hasFamily(targetType, CONSTRUCTED)) { + return supportsConstructedCasting(sourceType, targetType, allowExplicit); } else if (sourceRoot == STRUCTURED_TYPE || targetRoot == STRUCTURED_TYPE) { // inheritance is not supported yet, so structured type must be fully equal return false; - } else if (sourceRoot == NULL) { - // null can be cast to an arbitrary type - return true; } else if (sourceRoot == RAW || targetRoot == RAW) { // the two raw types are not equal (from initial invariant), casting is not possible return false; diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java index 03fb1b7..b817754 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java @@ -106,6 +106,18 @@ public class LogicalTypeCastsTest { {new NullType(), new IntType(), true, true}, + { + new NullType(), + new RowType( + Arrays.asList( + new RowField("f1", new IntType()), + new RowField("f2", new IntType()) + ) + ), + true, + true + }, + {new ArrayType(new IntType()), new ArrayType(new BigIntType()), true, true}, {new ArrayType(new IntType()), new ArrayType(new VarCharType(Integer.MAX_VALUE)), false, true}, diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java index cfffc62..50f0d32 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java @@ -22,8 +22,10 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.types.logical.DecimalType; +import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.sql.JoinType; +import org.apache.calcite.sql.SqlFunction; import org.apache.calcite.sql.SqlJoin; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlLiteral; @@ -38,6 +40,7 @@ import org.apache.calcite.sql.validate.SqlValidatorScope; import org.apache.calcite.util.Static; import java.math.BigDecimal; +import java.util.List; import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL; @@ -85,4 +88,11 @@ public final class FlinkCalciteSqlValidator extends SqlValidatorImpl { } super.validateJoin(join, scope); } + + @Override + public void validateColumnListParams(SqlFunction function, List<RelDataType> argTypes, List<SqlNode> operands) { + // we don't support column lists and translate them into the unknown type in the type factory, + // this makes it possible to ignore them in the validator and fall back to regular row types + // see also SqlFunction#deriveType + } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala index 57d64f8..acc09da 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala @@ -311,6 +311,11 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) // keep precision/scale in sync with our type system's default value, // see DecimalType.USER_DEFAULT. createSqlType(typeName, DecimalType.DEFAULT_PRECISION, DecimalType.DEFAULT_SCALE) + } else if (typeName == COLUMN_LIST) { + // we don't support column lists and translate them into the unknown type, + // this makes it possible to ignore them in the validator and fall back to regular row types + // see also SqlFunction#deriveType + createUnknownType() } else { super.createSqlType(typeName) }