This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ec9427cea8c3f0d1d88ab5a077eafe6b39761bb7 Author: Sergey Nuyanzin <[email protected]> AuthorDate: Sat Oct 25 22:35:51 2025 +0200 [FLINK-20539][table] Type mismatch when using computed ROW column Co-authored-by: xuyang <[email protected]> --- .../parser/type/ExtendedSqlRowTypeNameSpec.java | 2 + .../java/org/apache/flink/sql/parser/Fixture.java | 6 ++ .../flink/sql/parser/FlinkDDLDataTypeTest.java | 14 ++- .../table/planner/calcite/FlinkRexBuilder.java | 82 +++++++++++---- .../planner/functions/CastFunctionMiscITCase.java | 7 +- .../operations/SqlDdlToOperationConverterTest.java | 6 +- .../PushProjectIntoTableSourceScanRuleTest.xml | 4 +- .../table/planner/plan/stream/sql/CalcTest.xml | 114 +++++++++++++++++++++ .../table/planner/plan/stream/sql/CalcTest.scala | 73 +++++++++++++ .../planner/runtime/stream/sql/CalcITCase.scala | 30 ++++++ 10 files changed, 309 insertions(+), 29 deletions(-) diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java index a75d523caed..9605a6e5aef 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java @@ -20,6 +20,7 @@ package org.apache.flink.sql.parser.type; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.StructKind; import org.apache.calcite.sql.SqlCharStringLiteral; import org.apache.calcite.sql.SqlDataTypeSpec; import org.apache.calcite.sql.SqlIdentifier; @@ -156,6 +157,7 @@ public class ExtendedSqlRowTypeNameSpec extends SqlTypeNameSpec { public RelDataType deriveType(SqlValidator sqlValidator) { final RelDataTypeFactory typeFactory = sqlValidator.getTypeFactory(); return typeFactory.createStructType( + StructKind.PEEK_FIELDS_NO_EXPAND, fieldTypes.stream() .map(dt -> dt.deriveType(sqlValidator)) .collect(Collectors.toList()), diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/Fixture.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/Fixture.java index 81d13f70437..620df32d849 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/Fixture.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/Fixture.java @@ -19,6 +19,7 @@ package org.apache.flink.sql.parser; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.StructKind; import org.apache.calcite.sql.type.SqlTypeName; import java.util.List; @@ -113,6 +114,11 @@ public class Fixture { return typeFactory.createStructType(keyTypes, names); } + public RelDataType createStructType( + StructKind structKind, List<RelDataType> keyTypes, List<String> names) { + return typeFactory.createStructType(structKind, keyTypes, names); + } + public RelDataType createStructuredType( String className, List<RelDataType> typeList, List<String> fieldNameList) { return typeFactory.createStructuredType(className, typeList, fieldNameList); diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java index 8596a505d70..7ba04b59315 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java @@ -27,6 +27,7 @@ import org.apache.calcite.avatica.util.Quoting; import org.apache.calcite.rel.type.DelegatingTypeSystem; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.type.StructKind; import org.apache.calcite.sql.SqlDataTypeSpec; import org.apache.calcite.sql.SqlDialect; import org.apache.calcite.sql.SqlNode; @@ -211,6 +212,7 @@ class FlinkDDLDataTypeTest { "ROW<f0 INT NOT NULL, f1 BOOLEAN>", nullable( FIXTURE.createStructType( + StructKind.PEEK_FIELDS_NO_EXPAND, Arrays.asList( FIXTURE.intType, nullable(FIXTURE.booleanType)), Arrays.asList("f0", "f1"))), @@ -219,6 +221,7 @@ class FlinkDDLDataTypeTest { "ROW(f0 INT NOT NULL, f1 BOOLEAN)", nullable( FIXTURE.createStructType( + StructKind.PEEK_FIELDS_NO_EXPAND, Arrays.asList( FIXTURE.intType, nullable(FIXTURE.booleanType)), Arrays.asList("f0", "f1"))), @@ -227,6 +230,7 @@ class FlinkDDLDataTypeTest { "ROW<`f0` INT>", nullable( FIXTURE.createStructType( + StructKind.PEEK_FIELDS_NO_EXPAND, Collections.singletonList(nullable(FIXTURE.intType)), Collections.singletonList("f0"))), "ROW< `f0` INTEGER >"), @@ -234,6 +238,7 @@ class FlinkDDLDataTypeTest { "ROW(`f0` INT)", nullable( FIXTURE.createStructType( + StructKind.PEEK_FIELDS_NO_EXPAND, Collections.singletonList(nullable(FIXTURE.intType)), Collections.singletonList("f0"))), "ROW(`f0` INTEGER)"), @@ -241,19 +246,24 @@ class FlinkDDLDataTypeTest { "ROW<>", nullable( FIXTURE.createStructType( - Collections.emptyList(), Collections.emptyList())), + StructKind.PEEK_FIELDS_NO_EXPAND, + Collections.emptyList(), + Collections.emptyList())), "ROW<>"), createArgumentsTestItem( "ROW()", nullable( FIXTURE.createStructType( - Collections.emptyList(), Collections.emptyList())), + StructKind.PEEK_FIELDS_NO_EXPAND, + Collections.emptyList(), + Collections.emptyList())), "ROW()"), createArgumentsTestItem( "ROW<f0 INT NOT NULL 'This is a comment.', " + "f1 BOOLEAN 'This as well.'>", nullable( FIXTURE.createStructType( + StructKind.PEEK_FIELDS_NO_EXPAND, Arrays.asList( FIXTURE.intType, nullable(FIXTURE.booleanType)), Arrays.asList("f0", "f1"))), diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRexBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRexBuilder.java index 265e1fdcbce..739579d28e6 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRexBuilder.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRexBuilder.java @@ -21,8 +21,13 @@ package org.apache.flink.table.planner.calcite; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexFieldAccess; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.sql.SqlKind; import org.apache.calcite.util.TimestampString; /** A slim extension over a {@link RexBuilder}. See the overridden methods for more explanation. */ @@ -40,16 +45,8 @@ public final class FlinkRexBuilder extends RexBuilder { */ @Override public RexNode makeFieldAccess(RexNode expr, String fieldName, boolean caseSensitive) { - RexNode field = super.makeFieldAccess(expr, fieldName, caseSensitive); - if (expr.getType().isNullable() && !field.getType().isNullable()) { - return makeCast( - typeFactory.createTypeWithNullability(field.getType(), true), - field, - true, - false); - } - - return field; + final RexNode field = super.makeFieldAccess(expr, fieldName, caseSensitive); + return makeFieldAccess(expr, field); } /** @@ -61,16 +58,8 @@ public final class FlinkRexBuilder extends RexBuilder { */ @Override public RexNode makeFieldAccess(RexNode expr, int i) { - RexNode field = super.makeFieldAccess(expr, i); - if (expr.getType().isNullable() && !field.getType().isNullable()) { - return makeCast( - typeFactory.createTypeWithNullability(field.getType(), true), - field, - true, - false); - } - - return field; + final RexNode field = super.makeFieldAccess(expr, i); + return makeFieldAccess(expr, field); } /** @@ -102,4 +91,57 @@ public final class FlinkRexBuilder extends RexBuilder { return super.makeZeroLiteral(type); } } + + private RexNode makeFieldAccess(RexNode expr, RexNode field) { + final RexNode fieldWithRemovedCast = removeCastNullableFromFieldAccess(field); + if (field.getType().isNullable() != fieldWithRemovedCast.getType().isNullable() + || expr.getType().isNullable() && !field.getType().isNullable()) { + return makeCast( + typeFactory.createTypeWithNullability(field.getType(), true), + fieldWithRemovedCast, + true, + false); + } + + return expr.getType().isNullable() && fieldWithRemovedCast.getType().isNullable() + ? fieldWithRemovedCast + : field; + } + + /** + * {@link FlinkRexBuilder#makeFieldAccess} will adjust nullability based on nullability of the + * enclosing type. However, it might be a deeply nested column and for every step {@link + * FlinkRexBuilder#makeFieldAccess} will try to insert a cast. This method will remove previous + * cast in order to keep only one. + */ + private RexNode removeCastNullableFromFieldAccess(RexNode rexFieldAccess) { + if (!(rexFieldAccess instanceof RexFieldAccess)) { + return rexFieldAccess; + } + RexNode rexNode = rexFieldAccess; + while (rexNode instanceof RexFieldAccess) { + rexNode = ((RexFieldAccess) rexNode).getReferenceExpr(); + } + if (rexNode.getKind() != SqlKind.CAST) { + return rexFieldAccess; + } + RexShuttle visitor = + new RexShuttle() { + @Override + public RexNode visitCall(final RexCall call) { + if (call.getKind() == SqlKind.CAST + && !call.operands.get(0).getType().isNullable() + && call.getType().isNullable() + && call.getOperands() + .get(0) + .getType() + .getFieldList() + .equals(call.getType().getFieldList())) { + return RexUtil.removeCast(call); + } + return call; + } + }; + return RexUtil.apply(visitor, new RexNode[] {rexFieldAccess})[0]; + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java index 36288559bc3..224bf9b7cac 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java @@ -94,7 +94,9 @@ class CastFunctionMiscITCase extends BuiltInFunctionTestBase { FIELD( "r", ROW( - FIELD("s", STRING()), + FIELD( + "s", + STRING().notNull()), FIELD("b", BOOLEAN()), FIELD("i", INT()))), FIELD("s", STRING()))), @@ -103,7 +105,8 @@ class CastFunctionMiscITCase extends BuiltInFunctionTestBase { // the inner NOT NULL is ignored in SQL because the outer ROW is // nullable and the cast does not allow setting the outer // nullability but derives it from the source operand - DataTypes.of("ROW<r ROW<s STRING, b BOOLEAN, i INT>, s STRING>")), + DataTypes.of( + "ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>")), TestSetSpec.forFunction( BuiltInFunctionDefinitions.CAST, "explicit with nested rows and explicit nullability change") diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java index 13835f16d61..26aa8a7f047 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java @@ -885,13 +885,13 @@ class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversionTestBas createTestItem( "ROW<f0 INT NOT NULL, f1 BOOLEAN>", DataTypes.ROW( - DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f0", DataTypes.INT().notNull()), DataTypes.FIELD("f1", DataTypes.BOOLEAN()))), // Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>. createTestItem( "ROW(f0 INT NOT NULL, f1 BOOLEAN)", DataTypes.ROW( - DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f0", DataTypes.INT().notNull()), DataTypes.FIELD("f1", DataTypes.BOOLEAN()))), createTestItem( "ROW<`f0` INT>", @@ -906,7 +906,7 @@ class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversionTestBas "ROW<f0 INT NOT NULL 'This is a comment.'," + " f1 BOOLEAN 'This as well.'>", DataTypes.ROW( - DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f0", DataTypes.INT().notNull()), DataTypes.FIELD("f1", DataTypes.BOOLEAN()))), createTestItem( "ARRAY<ROW<f0 INT, f1 BOOLEAN>>", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml index 87fd6e17821..92ec23fe38c 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml @@ -268,13 +268,13 @@ LogicalProject(EXPR$0=[ITEM($0, 2).value], data_arr=[$0]) </Resource> <Resource name="ast"> <![CDATA[ -LogicalProject(EXPR$0=[CAST(CAST(ITEM($5.result, 1).meta):RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL symbol).symbol):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]) +LogicalProject(EXPR$0=[CAST(ITEM(CAST($5.result):RecordType:peek_no_expand(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL symbol) NOT NULL meta) NOT NULL ARRAY, 1).meta.symbol):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]) +- LogicalTableScan(table=[[default_catalog, default_database, ItemTable]]) ]]> </Resource> <Resource name="optimized rel plan"> <![CDATA[ -LogicalProject(EXPR$0=[CAST(CAST(ITEM($0.result, 1).meta):RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL symbol).symbol):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]) +LogicalProject(EXPR$0=[CAST(ITEM(CAST($0.result):RecordType:peek_no_expand(RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" NOT NULL symbol) NOT NULL meta) NOT NULL ARRAY, 1).meta.symbol):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]) +- LogicalTableScan(table=[[default_catalog, default_database, ItemTable, project=[chart], metadata=[]]]) ]]> </Resource> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml index 0e1a1942201..0e7049343d1 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml @@ -100,6 +100,60 @@ LogicalProject(a=[$0]) <![CDATA[ Calc(select=[a], where=[>(random_udf(b), 10)]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c]) +]]> + </Resource> + </TestCase> + <TestCase name="testCastOfTestToSameType"> + <Resource name="sql"> + <![CDATA[SELECT `field2`, COALESCE(TRY_CAST(`field1`.`data`.`nested`.`trId` AS STRING)) AS transactionId FROM testCastOfTestToSameType]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(field2=[$1], transactionId=[COALESCE($0.data.nested.trId)]) ++- LogicalProject(field1=[$0], field2=[$0]) + +- LogicalTableScan(table=[[default_catalog, default_database, testCastOfTestToSameType]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[field1 AS field2, CAST(field1.data.nested.trId AS VARCHAR(2147483647)) AS transactionId]) ++- TableSourceScan(table=[[default_catalog, default_database, testCastOfTestToSameType]], fields=[field1]) +]]> + </Resource> + </TestCase> + <TestCase name="testCastOfTestToSameTypeWithArray"> + <Resource name="sql"> + <![CDATA[SELECT `field2`, COALESCE(TRY_CAST(`field1`.`data`[0].`nested`.`trId` AS STRING)) AS transactionId FROM testCastOfTestToSameTypeWithArray]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(field2=[$1], transactionId=[COALESCE(ITEM($0.data, 0).nested.trId)]) ++- LogicalProject(field1=[$0], field2=[$0]) + +- LogicalTableScan(table=[[default_catalog, default_database, testCastOfTestToSameTypeWithArray]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[field1 AS field2, CAST(ITEM(field1.data, 0).nested.trId AS VARCHAR(2147483647)) AS transactionId]) ++- TableSourceScan(table=[[default_catalog, default_database, testCastOfTestToSameTypeWithArray]], fields=[field1]) +]]> + </Resource> + </TestCase> + <TestCase name="testCastOfTestToSameTypeWithNullableNestedType"> + <Resource name="sql"> + <![CDATA[SELECT `field2`, COALESCE(TRY_CAST(`field1`.`data`.`nested`.`trId` AS STRING)) AS transactionId FROM testCastOfTestToSameTypeWithNullableNestedType]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(field2=[$1], transactionId=[COALESCE($0.data.nested.trId)]) ++- LogicalProject(field1=[$0], field2=[$0]) + +- LogicalTableScan(table=[[default_catalog, default_database, testCastOfTestToSameTypeWithNullableNestedType]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[field1 AS field2, COALESCE(field1.data.nested.trId) AS transactionId]) ++- TableSourceScan(table=[[default_catalog, default_database, testCastOfTestToSameTypeWithNullableNestedType]], fields=[field1]) ]]> </Resource> </TestCase> @@ -227,6 +281,66 @@ LogicalProject(a=[$0], b=[$1], c=[$2]) <![CDATA[ Calc(select=[a, b, CAST('xx' AS VARCHAR(2147483647)) AS c], where=[(SEARCH(b, Sarg[1, 3, 4, 5, 6]) AND (c = 'xx'))]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c]) +]]> + </Resource> + </TestCase> + <TestCase name="testJoinAndAggregateOnNested"> + <Resource name="sql"> + <![CDATA[SELECT COUNT(t.data[0].nested[1].trId) +FROM testJoinAndAggregateOnNested t, testJoinAndAggregateOnNested2 t2 +WHERE t.data[0].nested[0].trId = t2.data1.nested.trId +GROUP BY t.data[1].nested[0].trId]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(EXPR$0=[$1]) ++- LogicalAggregate(group=[{0}], EXPR$0=[COUNT($1)]) + +- LogicalProject($f0=[CAST(ITEM(ITEM($0.data, 1).nested, 0).trId):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], $f1=[CAST(ITEM(ITEM($0.data, 0).nested, 1).trId):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]) + +- LogicalFilter(condition=[=(CAST(ITEM(ITEM($0.data, 0).nested, 0).trId):VARCHAR(2147483647) CHARACTER SET "UTF-16LE", CAST($1.data1.nested.trId):VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalTableScan(table=[[default_catalog, default_database, testJoinAndAggregateOnNested]]) + +- LogicalTableScan(table=[[default_catalog, default_database, testJoinAndAggregateOnNested2]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[EXPR$0]) ++- GroupAggregate(groupBy=[$f0], select=[$f0, COUNT($f1) AS EXPR$0]) + +- Exchange(distribution=[hash[$f0]]) + +- Calc(select=[CAST(ITEM(ITEM(field1.data, 1).nested, 0).trId AS VARCHAR(2147483647)) AS $f0, CAST(ITEM(ITEM(field1.data, 0).nested, 1).trId AS VARCHAR(2147483647)) AS $f1]) + +- Join(joinType=[InnerJoin], where=[($f1 = $f10)], select=[field1, $f1, $f10], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[$f1]]) + : +- Calc(select=[field1, ITEM(ITEM(field1.data, 0).nested, 0).trId AS $f1]) + : +- TableSourceScan(table=[[default_catalog, default_database, testJoinAndAggregateOnNested]], fields=[field1]) + +- Exchange(distribution=[hash[$f1]]) + +- Calc(select=[field1.data1.nested.trId AS $f1]) + +- TableSourceScan(table=[[default_catalog, default_database, testJoinAndAggregateOnNested2]], fields=[field1]) +]]> + </Resource> + </TestCase> + <TestCase name="testJoinOnNested"> + <Resource name="sql"> + <![CDATA[SELECT t.field1 as dt FROM testJoinOnNested t, testJoinOnNested2 t2 WHERE t.data[0].nested[0].trId = t2.data1.nested.trId]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(dt=[$0]) ++- LogicalFilter(condition=[=(CAST(ITEM(ITEM($0.data, 0).nested, 0).trId):VARCHAR(2147483647) CHARACTER SET "UTF-16LE", CAST($1.data1.nested.trId):VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalTableScan(table=[[default_catalog, default_database, testJoinOnNested]]) + +- LogicalTableScan(table=[[default_catalog, default_database, testJoinOnNested2]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[field1 AS dt]) ++- Join(joinType=[InnerJoin], where=[($f1 = $f10)], select=[field1, $f1, $f10], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[$f1]]) + : +- Calc(select=[field1, ITEM(ITEM(field1.data, 0).nested, 0).trId AS $f1]) + : +- TableSourceScan(table=[[default_catalog, default_database, testJoinOnNested]], fields=[field1]) + +- Exchange(distribution=[hash[$f1]]) + +- Calc(select=[field1.data1.nested.trId AS $f1]) + +- TableSourceScan(table=[[default_catalog, default_database, testJoinOnNested2]], fields=[field1]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala index 896d2d24e60..45ef3e380ee 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala @@ -254,4 +254,77 @@ class CalcTest extends TableTestBase { val sqlQuery = "SELECT TRY_CAST(TRY_CAST(a AS STRING) AS INTEGER) FROM MyTable" util.verifyExecPlan(sqlQuery) } + + @Test + def testCastOfTestToSameType(): Unit = { + val rowDataType = "ROW<`data` ROW<`nested` ROW<`trId` STRING NOT NULL>>NOT NULL>" + util.tableEnv.executeSql( + "CREATE TABLE testCastOfTestToSameType (`field1` " + + rowDataType + ", `field2` AS CAST(`field1` AS " + + rowDataType + ")) WITH ('connector' = 'datagen')"); + val sql = "SELECT `field2`, " + + "COALESCE(TRY_CAST(`field1`.`data`.`nested`.`trId` AS STRING)) AS transactionId " + + "FROM testCastOfTestToSameType"; + util.verifyExecPlan(sql) + } + + @Test + def testCastOfTestToSameTypeWithArray(): Unit = { + val rowDataType = "ROW<`data` ARRAY<ROW<`nested` ROW<`trId` STRING NOT NULL>>NOT NULL>>" + util.tableEnv.executeSql( + "CREATE TABLE testCastOfTestToSameTypeWithArray (`field1` " + + rowDataType + ", `field2` AS CAST(`field1` AS " + + rowDataType + ")) WITH ('connector' = 'datagen')"); + val sql = "SELECT `field2`, " + + "COALESCE(TRY_CAST(`field1`.`data`[0].`nested`.`trId` AS STRING)) AS transactionId " + + "FROM testCastOfTestToSameTypeWithArray"; + util.verifyExecPlan(sql) + } + + @Test + def testJoinOnNested(): Unit = { + val rowDataType = + "ROW<`data` ARRAY<ROW<`nested` ARRAY<ROW<`trId` STRING NOT NULL>NOT NULL>>NOT NULL>>" + util.tableEnv.executeSql( + "CREATE TABLE testJoinOnNested (`field1` " + + rowDataType + ") WITH ('connector' = 'datagen')"); + val rowDataType2 = "ROW<`data1` ROW<`nested` ROW<`trId` STRING NOT NULL>NOT NULL>>" + util.tableEnv.executeSql( + "CREATE TABLE testJoinOnNested2 (`field1` " + + rowDataType2 + ") WITH ('connector' = 'datagen')"); + val sql = "SELECT t.field1 as dt " + + "FROM testJoinOnNested t, testJoinOnNested2 t2 WHERE t.data[0].nested[0].trId = t2.data1.nested.trId"; + util.verifyExecPlan(sql) + } + + @Test + def testCastOfTestToSameTypeWithNullableNestedType(): Unit = { + val rowDataType = "ROW<`data` ROW<`nested` ROW<`trId` STRING>>NOT NULL>" + util.tableEnv.executeSql( + "CREATE TABLE testCastOfTestToSameTypeWithNullableNestedType (`field1` " + + rowDataType + ", `field2` AS CAST(`field1` AS " + + rowDataType + ")) WITH ('connector' = 'datagen')"); + val sql = "SELECT `field2`, " + + "COALESCE(TRY_CAST(`field1`.`data`.`nested`.`trId` AS STRING)) AS transactionId " + + "FROM testCastOfTestToSameTypeWithNullableNestedType"; + util.verifyExecPlan(sql) + } + + @Test + def testJoinAndAggregateOnNested(): Unit = { + val rowDataType = + "ROW<`data` ARRAY<ROW<`nested` ARRAY<ROW<`trId` STRING NOT NULL>NOT NULL>>NOT NULL>>" + util.tableEnv.executeSql( + "CREATE TABLE testJoinAndAggregateOnNested (`field1` " + + rowDataType + ") WITH ('connector' = 'datagen')"); + val rowDataType2 = "ROW<`data1` ROW<`nested` ROW<`trId` STRING NOT NULL>NOT NULL>>" + util.tableEnv.executeSql( + "CREATE TABLE testJoinAndAggregateOnNested2 (`field1` " + + rowDataType2 + ") WITH ('connector' = 'datagen')"); + val sql = "SELECT COUNT(t.data[0].nested[1].trId)\n" + + "FROM testJoinAndAggregateOnNested t, testJoinAndAggregateOnNested2 t2\n" + + "WHERE t.data[0].nested[0].trId = t2.data1.nested.trId\n" + + "GROUP BY t.data[1].nested[0].trId"; + util.verifyExecPlan(sql) + } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala index 3cd45066b5b..4f556d7d36f 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala @@ -825,4 +825,34 @@ class CalcITCase extends StreamingTestBase { val expected = List("16") assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted) } + + @Test + def testCastRow(): Unit = { + val testDataId = TestValuesTableFactory.registerData( + Seq( + row(row(row(row("1")))), + row(row(row(row("2")))) + )) + tEnv.executeSql( + s""" + |CREATE TABLE t ( + | a ROW<`data` ROW<`nested` ROW<`trId` STRING NOT NULL>>NOT NULL>, + | b AS CAST(a as ROW<`data` ROW<`nested` ROW<`trId` STRING NOT NULL>>NOT NULL>) + |) WITH ( + | 'connector' = 'values', + | 'data-id' = '$testDataId', + | 'bounded' = 'true' + |) + |""".stripMargin) + val expected = List( + row("1", "1", row(row(row("1")))), + row("2", "2", row(row(row("2")))) + ) + val actual = tEnv + .executeSql("select a.data.nested.trId, b.data.nested.trId, b AS col from t") + .collect() + .map(r => r) + .toList + assertThat(actual).isEqualTo(expected) + } }
