This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 0625b31  [FLINK-18673][table] Improve support for ROW constructor
0625b31 is described below

commit 0625b31521247d03b1e2b5280120c6546865da49
Author: Timo Walther <twal...@apache.org>
AuthorDate: Fri Nov 13 15:52:18 2020 +0100

    [FLINK-18673][table] Improve support for ROW constructor
    
    This closes #14067.
---
 .../types/logical/utils/LogicalTypeCasts.java      | 22 +++++++++++-----------
 .../flink/table/types/LogicalTypeCastsTest.java    | 12 ++++++++++++
 .../planner/calcite/FlinkCalciteSqlValidator.java  | 10 ++++++++++
 .../table/planner/calcite/FlinkTypeFactory.scala   |  5 +++++
 4 files changed, 38 insertions(+), 11 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
index ea63e0a..56d5b27 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
@@ -289,14 +289,9 @@ public final class LogicalTypeCasts {
                final LogicalTypeRoot sourceRoot = sourceType.getTypeRoot();
                final LogicalTypeRoot targetRoot = targetType.getTypeRoot();
 
-               if (hasFamily(sourceType, INTERVAL) && hasFamily(targetType, 
EXACT_NUMERIC)) {
-                       // cast between interval and exact numeric is only 
supported if interval has a single field
-                       return isSingleFieldInterval(sourceType);
-               } else if (hasFamily(sourceType, EXACT_NUMERIC) && 
hasFamily(targetType, INTERVAL)) {
-                       // cast between interval and exact numeric is only 
supported if interval has a single field
-                       return isSingleFieldInterval(targetType);
-               } else if (hasFamily(sourceType, CONSTRUCTED) || 
hasFamily(targetType, CONSTRUCTED)) {
-                       return supportsConstructedCasting(sourceType, 
targetType, allowExplicit);
+               if (sourceRoot == NULL) {
+                       // null can be cast to an arbitrary type
+                       return true;
                } else if (sourceRoot == DISTINCT_TYPE && targetRoot == 
DISTINCT_TYPE) {
                        // the two distinct types are not equal (from initial 
invariant), casting is not possible
                        return false;
@@ -304,12 +299,17 @@ public final class LogicalTypeCasts {
                        return supportsCasting(((DistinctType) 
sourceType).getSourceType(), targetType, allowExplicit);
                } else if (targetRoot == DISTINCT_TYPE) {
                        return supportsCasting(sourceType, ((DistinctType) 
targetType).getSourceType(), allowExplicit);
+               } else if (hasFamily(sourceType, INTERVAL) && 
hasFamily(targetType, EXACT_NUMERIC)) {
+                       // cast between interval and exact numeric is only 
supported if interval has a single field
+                       return isSingleFieldInterval(sourceType);
+               } else if (hasFamily(sourceType, EXACT_NUMERIC) && 
hasFamily(targetType, INTERVAL)) {
+                       // cast between interval and exact numeric is only 
supported if interval has a single field
+                       return isSingleFieldInterval(targetType);
+               }  else if (hasFamily(sourceType, CONSTRUCTED) || 
hasFamily(targetType, CONSTRUCTED)) {
+                       return supportsConstructedCasting(sourceType, 
targetType, allowExplicit);
                } else if (sourceRoot == STRUCTURED_TYPE || targetRoot == 
STRUCTURED_TYPE) {
                        // inheritance is not supported yet, so structured type 
must be fully equal
                        return false;
-               } else if (sourceRoot == NULL) {
-                       // null can be cast to an arbitrary type
-                       return true;
                } else if (sourceRoot == RAW || targetRoot == RAW) {
                        // the two raw types are not equal (from initial 
invariant), casting is not possible
                        return false;
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java
index 03fb1b7..b817754 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java
@@ -106,6 +106,18 @@ public class LogicalTypeCastsTest {
 
                                {new NullType(), new IntType(), true, true},
 
+                               {
+                                       new NullType(),
+                                       new RowType(
+                                               Arrays.asList(
+                                                       new RowField("f1", new 
IntType()),
+                                                       new RowField("f2", new 
IntType())
+                                               )
+                                       ),
+                                       true,
+                                       true
+                               },
+
                                {new ArrayType(new IntType()), new 
ArrayType(new BigIntType()), true, true},
 
                                {new ArrayType(new IntType()), new 
ArrayType(new VarCharType(Integer.MAX_VALUE)), false, true},
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
index cfffc62..50f0d32 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java
@@ -22,8 +22,10 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.types.logical.DecimalType;
 
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.sql.JoinType;
+import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlJoin;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlLiteral;
@@ -38,6 +40,7 @@ import org.apache.calcite.sql.validate.SqlValidatorScope;
 import org.apache.calcite.util.Static;
 
 import java.math.BigDecimal;
+import java.util.List;
 
 import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL;
 
@@ -85,4 +88,11 @@ public final class FlinkCalciteSqlValidator extends 
SqlValidatorImpl {
                }
                super.validateJoin(join, scope);
        }
+
+       @Override
+       public void validateColumnListParams(SqlFunction function, 
List<RelDataType> argTypes, List<SqlNode> operands) {
+               // we don't support column lists and translate them into the 
unknown type in the type factory,
+               // this makes it possible to ignore them in the validator and 
fall back to regular row types
+               // see also SqlFunction#deriveType
+       }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
index 57d64f8..acc09da 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
@@ -311,6 +311,11 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem)
       // keep precision/scale in sync with our type system's default value,
       // see DecimalType.USER_DEFAULT.
       createSqlType(typeName, DecimalType.DEFAULT_PRECISION, 
DecimalType.DEFAULT_SCALE)
+    } else if (typeName == COLUMN_LIST) {
+      // we don't support column lists and translate them into the unknown 
type,
+      // this makes it possible to ignore them in the validator and fall back 
to regular row types
+      // see also SqlFunction#deriveType
+      createUnknownType()
     } else {
       super.createSqlType(typeName)
     }

Reply via email to