This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ec9427cea8c3f0d1d88ab5a077eafe6b39761bb7
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Sat Oct 25 22:35:51 2025 +0200

    [FLINK-20539][table] Type mismatch when using computed ROW column
    
    Co-authored-by: xuyang <[email protected]>
---
 .../parser/type/ExtendedSqlRowTypeNameSpec.java    |   2 +
 .../java/org/apache/flink/sql/parser/Fixture.java  |   6 ++
 .../flink/sql/parser/FlinkDDLDataTypeTest.java     |  14 ++-
 .../table/planner/calcite/FlinkRexBuilder.java     |  82 +++++++++++----
 .../planner/functions/CastFunctionMiscITCase.java  |   7 +-
 .../operations/SqlDdlToOperationConverterTest.java |   6 +-
 .../PushProjectIntoTableSourceScanRuleTest.xml     |   4 +-
 .../table/planner/plan/stream/sql/CalcTest.xml     | 114 +++++++++++++++++++++
 .../table/planner/plan/stream/sql/CalcTest.scala   |  73 +++++++++++++
 .../planner/runtime/stream/sql/CalcITCase.scala    |  30 ++++++
 10 files changed, 309 insertions(+), 29 deletions(-)

diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java
index a75d523caed..9605a6e5aef 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java
@@ -20,6 +20,7 @@ package org.apache.flink.sql.parser.type;
 
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.StructKind;
 import org.apache.calcite.sql.SqlCharStringLiteral;
 import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlIdentifier;
@@ -156,6 +157,7 @@ public class ExtendedSqlRowTypeNameSpec extends 
SqlTypeNameSpec {
     public RelDataType deriveType(SqlValidator sqlValidator) {
         final RelDataTypeFactory typeFactory = sqlValidator.getTypeFactory();
         return typeFactory.createStructType(
+                StructKind.PEEK_FIELDS_NO_EXPAND,
                 fieldTypes.stream()
                         .map(dt -> dt.deriveType(sqlValidator))
                         .collect(Collectors.toList()),
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/Fixture.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/Fixture.java
index 81d13f70437..620df32d849 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/Fixture.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/Fixture.java
@@ -19,6 +19,7 @@
 package org.apache.flink.sql.parser;
 
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.StructKind;
 import org.apache.calcite.sql.type.SqlTypeName;
 
 import java.util.List;
@@ -113,6 +114,11 @@ public class Fixture {
         return typeFactory.createStructType(keyTypes, names);
     }
 
+    public RelDataType createStructType(
+            StructKind structKind, List<RelDataType> keyTypes, List<String> 
names) {
+        return typeFactory.createStructType(structKind, keyTypes, names);
+    }
+
     public RelDataType createStructuredType(
             String className, List<RelDataType> typeList, List<String> 
fieldNameList) {
         return typeFactory.createStructuredType(className, typeList, 
fieldNameList);
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
index 8596a505d70..7ba04b59315 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java
@@ -27,6 +27,7 @@ import org.apache.calcite.avatica.util.Quoting;
 import org.apache.calcite.rel.type.DelegatingTypeSystem;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.StructKind;
 import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlDialect;
 import org.apache.calcite.sql.SqlNode;
@@ -211,6 +212,7 @@ class FlinkDDLDataTypeTest {
                         "ROW<f0 INT NOT NULL, f1 BOOLEAN>",
                         nullable(
                                 FIXTURE.createStructType(
+                                        StructKind.PEEK_FIELDS_NO_EXPAND,
                                         Arrays.asList(
                                                 FIXTURE.intType, 
nullable(FIXTURE.booleanType)),
                                         Arrays.asList("f0", "f1"))),
@@ -219,6 +221,7 @@ class FlinkDDLDataTypeTest {
                         "ROW(f0 INT NOT NULL, f1 BOOLEAN)",
                         nullable(
                                 FIXTURE.createStructType(
+                                        StructKind.PEEK_FIELDS_NO_EXPAND,
                                         Arrays.asList(
                                                 FIXTURE.intType, 
nullable(FIXTURE.booleanType)),
                                         Arrays.asList("f0", "f1"))),
@@ -227,6 +230,7 @@ class FlinkDDLDataTypeTest {
                         "ROW<`f0` INT>",
                         nullable(
                                 FIXTURE.createStructType(
+                                        StructKind.PEEK_FIELDS_NO_EXPAND,
                                         
Collections.singletonList(nullable(FIXTURE.intType)),
                                         Collections.singletonList("f0"))),
                         "ROW< `f0` INTEGER >"),
@@ -234,6 +238,7 @@ class FlinkDDLDataTypeTest {
                         "ROW(`f0` INT)",
                         nullable(
                                 FIXTURE.createStructType(
+                                        StructKind.PEEK_FIELDS_NO_EXPAND,
                                         
Collections.singletonList(nullable(FIXTURE.intType)),
                                         Collections.singletonList("f0"))),
                         "ROW(`f0` INTEGER)"),
@@ -241,19 +246,24 @@ class FlinkDDLDataTypeTest {
                         "ROW<>",
                         nullable(
                                 FIXTURE.createStructType(
-                                        Collections.emptyList(), 
Collections.emptyList())),
+                                        StructKind.PEEK_FIELDS_NO_EXPAND,
+                                        Collections.emptyList(),
+                                        Collections.emptyList())),
                         "ROW<>"),
                 createArgumentsTestItem(
                         "ROW()",
                         nullable(
                                 FIXTURE.createStructType(
-                                        Collections.emptyList(), 
Collections.emptyList())),
+                                        StructKind.PEEK_FIELDS_NO_EXPAND,
+                                        Collections.emptyList(),
+                                        Collections.emptyList())),
                         "ROW()"),
                 createArgumentsTestItem(
                         "ROW<f0 INT NOT NULL 'This is a comment.', "
                                 + "f1 BOOLEAN 'This as well.'>",
                         nullable(
                                 FIXTURE.createStructType(
+                                        StructKind.PEEK_FIELDS_NO_EXPAND,
                                         Arrays.asList(
                                                 FIXTURE.intType, 
nullable(FIXTURE.booleanType)),
                                         Arrays.asList("f0", "f1"))),
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRexBuilder.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRexBuilder.java
index 265e1fdcbce..739579d28e6 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRexBuilder.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRexBuilder.java
@@ -21,8 +21,13 @@ package org.apache.flink.table.planner.calcite;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexFieldAccess;
 import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.util.TimestampString;
 
 /** A slim extension over a {@link RexBuilder}. See the overridden methods for 
more explanation. */
@@ -40,16 +45,8 @@ public final class FlinkRexBuilder extends RexBuilder {
      */
     @Override
     public RexNode makeFieldAccess(RexNode expr, String fieldName, boolean 
caseSensitive) {
-        RexNode field = super.makeFieldAccess(expr, fieldName, caseSensitive);
-        if (expr.getType().isNullable() && !field.getType().isNullable()) {
-            return makeCast(
-                    typeFactory.createTypeWithNullability(field.getType(), 
true),
-                    field,
-                    true,
-                    false);
-        }
-
-        return field;
+        final RexNode field = super.makeFieldAccess(expr, fieldName, 
caseSensitive);
+        return makeFieldAccess(expr, field);
     }
 
     /**
@@ -61,16 +58,8 @@ public final class FlinkRexBuilder extends RexBuilder {
      */
     @Override
     public RexNode makeFieldAccess(RexNode expr, int i) {
-        RexNode field = super.makeFieldAccess(expr, i);
-        if (expr.getType().isNullable() && !field.getType().isNullable()) {
-            return makeCast(
-                    typeFactory.createTypeWithNullability(field.getType(), 
true),
-                    field,
-                    true,
-                    false);
-        }
-
-        return field;
+        final RexNode field = super.makeFieldAccess(expr, i);
+        return makeFieldAccess(expr, field);
     }
 
     /**
@@ -102,4 +91,57 @@ public final class FlinkRexBuilder extends RexBuilder {
                 return super.makeZeroLiteral(type);
         }
     }
+
+    private RexNode makeFieldAccess(RexNode expr, RexNode field) {
+        final RexNode fieldWithRemovedCast = 
removeCastNullableFromFieldAccess(field);
+        if (field.getType().isNullable() != 
fieldWithRemovedCast.getType().isNullable()
+                || expr.getType().isNullable() && 
!field.getType().isNullable()) {
+            return makeCast(
+                    typeFactory.createTypeWithNullability(field.getType(), 
true),
+                    fieldWithRemovedCast,
+                    true,
+                    false);
+        }
+
+        return expr.getType().isNullable() && 
fieldWithRemovedCast.getType().isNullable()
+                ? fieldWithRemovedCast
+                : field;
+    }
+
+    /**
+     * {@link FlinkRexBuilder#makeFieldAccess} will adjust nullability based 
on nullability of the
+     * enclosing type. However, it might be a deeply nested column and for 
every step {@link
+     * FlinkRexBuilder#makeFieldAccess} will try to insert a cast. This method 
will remove previous
+     * cast in order to keep only one.
+     */
+    private RexNode removeCastNullableFromFieldAccess(RexNode rexFieldAccess) {
+        if (!(rexFieldAccess instanceof RexFieldAccess)) {
+            return rexFieldAccess;
+        }
+        RexNode rexNode = rexFieldAccess;
+        while (rexNode instanceof RexFieldAccess) {
+            rexNode = ((RexFieldAccess) rexNode).getReferenceExpr();
+        }
+        if (rexNode.getKind() != SqlKind.CAST) {
+            return rexFieldAccess;
+        }
+        RexShuttle visitor =
+                new RexShuttle() {
+                    @Override
+                    public RexNode visitCall(final RexCall call) {
+                        if (call.getKind() == SqlKind.CAST
+                                && !call.operands.get(0).getType().isNullable()
+                                && call.getType().isNullable()
+                                && call.getOperands()
+                                        .get(0)
+                                        .getType()
+                                        .getFieldList()
+                                        
.equals(call.getType().getFieldList())) {
+                            return RexUtil.removeCast(call);
+                        }
+                        return call;
+                    }
+                };
+        return RexUtil.apply(visitor, new RexNode[] {rexFieldAccess})[0];
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java
index 36288559bc3..224bf9b7cac 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java
@@ -94,7 +94,9 @@ class CastFunctionMiscITCase extends BuiltInFunctionTestBase {
                                                         FIELD(
                                                                 "r",
                                                                 ROW(
-                                                                        
FIELD("s", STRING()),
+                                                                        FIELD(
+                                                                               
 "s",
+                                                                               
 STRING().notNull()),
                                                                         
FIELD("b", BOOLEAN()),
                                                                         
FIELD("i", INT()))),
                                                         FIELD("s", STRING()))),
@@ -103,7 +105,8 @@ class CastFunctionMiscITCase extends 
BuiltInFunctionTestBase {
                                 // the inner NOT NULL is ignored in SQL 
because the outer ROW is
                                 // nullable and the cast does not allow 
setting the outer
                                 // nullability but derives it from the source 
operand
-                                DataTypes.of("ROW<r ROW<s STRING, b BOOLEAN, i 
INT>, s STRING>")),
+                                DataTypes.of(
+                                        "ROW<r ROW<s STRING NOT NULL, b 
BOOLEAN, i INT>, s STRING>")),
                 TestSetSpec.forFunction(
                                 BuiltInFunctionDefinitions.CAST,
                                 "explicit with nested rows and explicit 
nullability change")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
index 13835f16d61..26aa8a7f047 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java
@@ -885,13 +885,13 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
                         createTestItem(
                                 "ROW<f0 INT NOT NULL, f1 BOOLEAN>",
                                 DataTypes.ROW(
-                                        DataTypes.FIELD("f0", DataTypes.INT()),
+                                        DataTypes.FIELD("f0", 
DataTypes.INT().notNull()),
                                         DataTypes.FIELD("f1", 
DataTypes.BOOLEAN()))),
                         // Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>.
                         createTestItem(
                                 "ROW(f0 INT NOT NULL, f1 BOOLEAN)",
                                 DataTypes.ROW(
-                                        DataTypes.FIELD("f0", DataTypes.INT()),
+                                        DataTypes.FIELD("f0", 
DataTypes.INT().notNull()),
                                         DataTypes.FIELD("f1", 
DataTypes.BOOLEAN()))),
                         createTestItem(
                                 "ROW<`f0` INT>",
@@ -906,7 +906,7 @@ class SqlDdlToOperationConverterTest extends 
SqlNodeToOperationConversionTestBas
                                 "ROW<f0 INT NOT NULL 'This is a comment.',"
                                         + " f1 BOOLEAN 'This as well.'>",
                                 DataTypes.ROW(
-                                        DataTypes.FIELD("f0", DataTypes.INT()),
+                                        DataTypes.FIELD("f0", 
DataTypes.INT().notNull()),
                                         DataTypes.FIELD("f1", 
DataTypes.BOOLEAN()))),
                         createTestItem(
                                 "ARRAY<ROW<f0 INT, f1 BOOLEAN>>",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml
index 87fd6e17821..92ec23fe38c 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.xml
@@ -268,13 +268,13 @@ LogicalProject(EXPR$0=[ITEM($0, 2).value], data_arr=[$0])
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(EXPR$0=[CAST(CAST(ITEM($5.result, 
1).meta):RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
NOT NULL symbol).symbol):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
+LogicalProject(EXPR$0=[CAST(ITEM(CAST($5.result):RecordType:peek_no_expand(RecordType:peek_no_expand(VARCHAR(2147483647)
 CHARACTER SET "UTF-16LE" NOT NULL symbol) NOT NULL meta) NOT NULL ARRAY, 
1).meta.symbol):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
 +- LogicalTableScan(table=[[default_catalog, default_database, ItemTable]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-LogicalProject(EXPR$0=[CAST(CAST(ITEM($0.result, 
1).meta):RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
NOT NULL symbol).symbol):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
+LogicalProject(EXPR$0=[CAST(ITEM(CAST($0.result):RecordType:peek_no_expand(RecordType:peek_no_expand(VARCHAR(2147483647)
 CHARACTER SET "UTF-16LE" NOT NULL symbol) NOT NULL meta) NOT NULL ARRAY, 
1).meta.symbol):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
 +- LogicalTableScan(table=[[default_catalog, default_database, ItemTable, 
project=[chart], metadata=[]]])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
index 0e1a1942201..0e7049343d1 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml
@@ -100,6 +100,60 @@ LogicalProject(a=[$0])
       <![CDATA[
 Calc(select=[a], where=[>(random_udf(b), 10)])
 +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCastOfTestToSameType">
+    <Resource name="sql">
+      <![CDATA[SELECT `field2`, 
COALESCE(TRY_CAST(`field1`.`data`.`nested`.`trId` AS STRING)) AS transactionId 
FROM testCastOfTestToSameType]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(field2=[$1], transactionId=[COALESCE($0.data.nested.trId)])
++- LogicalProject(field1=[$0], field2=[$0])
+   +- LogicalTableScan(table=[[default_catalog, default_database, 
testCastOfTestToSameType]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[field1 AS field2, CAST(field1.data.nested.trId AS 
VARCHAR(2147483647)) AS transactionId])
++- TableSourceScan(table=[[default_catalog, default_database, 
testCastOfTestToSameType]], fields=[field1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCastOfTestToSameTypeWithArray">
+    <Resource name="sql">
+      <![CDATA[SELECT `field2`, 
COALESCE(TRY_CAST(`field1`.`data`[0].`nested`.`trId` AS STRING)) AS 
transactionId FROM testCastOfTestToSameTypeWithArray]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(field2=[$1], transactionId=[COALESCE(ITEM($0.data, 
0).nested.trId)])
++- LogicalProject(field1=[$0], field2=[$0])
+   +- LogicalTableScan(table=[[default_catalog, default_database, 
testCastOfTestToSameTypeWithArray]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[field1 AS field2, CAST(ITEM(field1.data, 0).nested.trId AS 
VARCHAR(2147483647)) AS transactionId])
++- TableSourceScan(table=[[default_catalog, default_database, 
testCastOfTestToSameTypeWithArray]], fields=[field1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCastOfTestToSameTypeWithNullableNestedType">
+    <Resource name="sql">
+      <![CDATA[SELECT `field2`, 
COALESCE(TRY_CAST(`field1`.`data`.`nested`.`trId` AS STRING)) AS transactionId 
FROM testCastOfTestToSameTypeWithNullableNestedType]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(field2=[$1], transactionId=[COALESCE($0.data.nested.trId)])
++- LogicalProject(field1=[$0], field2=[$0])
+   +- LogicalTableScan(table=[[default_catalog, default_database, 
testCastOfTestToSameTypeWithNullableNestedType]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[field1 AS field2, COALESCE(field1.data.nested.trId) AS 
transactionId])
++- TableSourceScan(table=[[default_catalog, default_database, 
testCastOfTestToSameTypeWithNullableNestedType]], fields=[field1])
 ]]>
     </Resource>
   </TestCase>
@@ -227,6 +281,66 @@ LogicalProject(a=[$0], b=[$1], c=[$2])
       <![CDATA[
 Calc(select=[a, b, CAST('xx' AS VARCHAR(2147483647)) AS c], where=[(SEARCH(b, 
Sarg[1, 3, 4, 5, 6]) AND (c = 'xx'))])
 +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinAndAggregateOnNested">
+    <Resource name="sql">
+      <![CDATA[SELECT COUNT(t.data[0].nested[1].trId)
+FROM testJoinAndAggregateOnNested t, testJoinAndAggregateOnNested2 t2
+WHERE t.data[0].nested[0].trId = t2.data1.nested.trId
+GROUP BY t.data[1].nested[0].trId]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(EXPR$0=[$1])
++- LogicalAggregate(group=[{0}], EXPR$0=[COUNT($1)])
+   +- LogicalProject($f0=[CAST(ITEM(ITEM($0.data, 1).nested, 
0).trId):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
$f1=[CAST(ITEM(ITEM($0.data, 0).nested, 1).trId):VARCHAR(2147483647) CHARACTER 
SET "UTF-16LE"])
+      +- LogicalFilter(condition=[=(CAST(ITEM(ITEM($0.data, 0).nested, 
0).trId):VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
CAST($1.data1.nested.trId):VARCHAR(2147483647) CHARACTER SET "UTF-16LE")])
+         +- LogicalJoin(condition=[true], joinType=[inner])
+            :- LogicalTableScan(table=[[default_catalog, default_database, 
testJoinAndAggregateOnNested]])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
testJoinAndAggregateOnNested2]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[EXPR$0])
++- GroupAggregate(groupBy=[$f0], select=[$f0, COUNT($f1) AS EXPR$0])
+   +- Exchange(distribution=[hash[$f0]])
+      +- Calc(select=[CAST(ITEM(ITEM(field1.data, 1).nested, 0).trId AS 
VARCHAR(2147483647)) AS $f0, CAST(ITEM(ITEM(field1.data, 0).nested, 1).trId AS 
VARCHAR(2147483647)) AS $f1])
+         +- Join(joinType=[InnerJoin], where=[($f1 = $f10)], select=[field1, 
$f1, $f10], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+            :- Exchange(distribution=[hash[$f1]])
+            :  +- Calc(select=[field1, ITEM(ITEM(field1.data, 0).nested, 
0).trId AS $f1])
+            :     +- TableSourceScan(table=[[default_catalog, 
default_database, testJoinAndAggregateOnNested]], fields=[field1])
+            +- Exchange(distribution=[hash[$f1]])
+               +- Calc(select=[field1.data1.nested.trId AS $f1])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, testJoinAndAggregateOnNested2]], fields=[field1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinOnNested">
+    <Resource name="sql">
+      <![CDATA[SELECT t.field1 as dt FROM testJoinOnNested t, 
testJoinOnNested2 t2 WHERE t.data[0].nested[0].trId = t2.data1.nested.trId]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(dt=[$0])
++- LogicalFilter(condition=[=(CAST(ITEM(ITEM($0.data, 0).nested, 
0).trId):VARCHAR(2147483647) CHARACTER SET "UTF-16LE", 
CAST($1.data1.nested.trId):VARCHAR(2147483647) CHARACTER SET "UTF-16LE")])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
testJoinOnNested]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
testJoinOnNested2]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[field1 AS dt])
++- Join(joinType=[InnerJoin], where=[($f1 = $f10)], select=[field1, $f1, 
$f10], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[$f1]])
+   :  +- Calc(select=[field1, ITEM(ITEM(field1.data, 0).nested, 0).trId AS 
$f1])
+   :     +- TableSourceScan(table=[[default_catalog, default_database, 
testJoinOnNested]], fields=[field1])
+   +- Exchange(distribution=[hash[$f1]])
+      +- Calc(select=[field1.data1.nested.trId AS $f1])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
testJoinOnNested2]], fields=[field1])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
index 896d2d24e60..45ef3e380ee 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/CalcTest.scala
@@ -254,4 +254,77 @@ class CalcTest extends TableTestBase {
     val sqlQuery = "SELECT TRY_CAST(TRY_CAST(a AS STRING) AS INTEGER) FROM 
MyTable"
     util.verifyExecPlan(sqlQuery)
   }
+
+  @Test
+  def testCastOfTestToSameType(): Unit = {
+    val rowDataType = "ROW<`data` ROW<`nested` ROW<`trId` STRING NOT NULL>>NOT 
NULL>"
+    util.tableEnv.executeSql(
+      "CREATE TABLE testCastOfTestToSameType (`field1` "
+        + rowDataType + ", `field2` AS CAST(`field1` AS "
+        + rowDataType + ")) WITH ('connector' = 'datagen')");
+    val sql = "SELECT `field2`, " +
+      "COALESCE(TRY_CAST(`field1`.`data`.`nested`.`trId` AS STRING)) AS 
transactionId " +
+      "FROM testCastOfTestToSameType";
+    util.verifyExecPlan(sql)
+  }
+
+  @Test
+  def testCastOfTestToSameTypeWithArray(): Unit = {
+    val rowDataType = "ROW<`data` ARRAY<ROW<`nested` ROW<`trId` STRING NOT 
NULL>>NOT NULL>>"
+    util.tableEnv.executeSql(
+      "CREATE TABLE testCastOfTestToSameTypeWithArray (`field1` "
+        + rowDataType + ", `field2` AS CAST(`field1` AS "
+        + rowDataType + ")) WITH ('connector' = 'datagen')");
+    val sql = "SELECT `field2`, " +
+      "COALESCE(TRY_CAST(`field1`.`data`[0].`nested`.`trId` AS STRING)) AS 
transactionId " +
+      "FROM testCastOfTestToSameTypeWithArray";
+    util.verifyExecPlan(sql)
+  }
+
+  @Test
+  def testJoinOnNested(): Unit = {
+    val rowDataType =
+      "ROW<`data` ARRAY<ROW<`nested` ARRAY<ROW<`trId` STRING NOT NULL>NOT 
NULL>>NOT NULL>>"
+    util.tableEnv.executeSql(
+      "CREATE TABLE testJoinOnNested (`field1` "
+        + rowDataType + ") WITH ('connector' = 'datagen')");
+    val rowDataType2 = "ROW<`data1` ROW<`nested` ROW<`trId` STRING NOT 
NULL>NOT NULL>>"
+    util.tableEnv.executeSql(
+      "CREATE TABLE testJoinOnNested2 (`field1` "
+        + rowDataType2 + ") WITH ('connector' = 'datagen')");
+    val sql = "SELECT t.field1 as dt " +
+      "FROM testJoinOnNested t, testJoinOnNested2 t2 WHERE 
t.data[0].nested[0].trId = t2.data1.nested.trId";
+    util.verifyExecPlan(sql)
+  }
+
+  @Test
+  def testCastOfTestToSameTypeWithNullableNestedType(): Unit = {
+    val rowDataType = "ROW<`data` ROW<`nested` ROW<`trId` STRING>>NOT NULL>"
+    util.tableEnv.executeSql(
+      "CREATE TABLE testCastOfTestToSameTypeWithNullableNestedType (`field1` "
+        + rowDataType + ", `field2` AS CAST(`field1` AS "
+        + rowDataType + ")) WITH ('connector' = 'datagen')");
+    val sql = "SELECT `field2`, " +
+      "COALESCE(TRY_CAST(`field1`.`data`.`nested`.`trId` AS STRING)) AS 
transactionId " +
+      "FROM testCastOfTestToSameTypeWithNullableNestedType";
+    util.verifyExecPlan(sql)
+  }
+
+  @Test
+  def testJoinAndAggregateOnNested(): Unit = {
+    val rowDataType =
+      "ROW<`data` ARRAY<ROW<`nested` ARRAY<ROW<`trId` STRING NOT NULL>NOT 
NULL>>NOT NULL>>"
+    util.tableEnv.executeSql(
+      "CREATE TABLE testJoinAndAggregateOnNested (`field1` "
+        + rowDataType + ") WITH ('connector' = 'datagen')");
+    val rowDataType2 = "ROW<`data1` ROW<`nested` ROW<`trId` STRING NOT 
NULL>NOT NULL>>"
+    util.tableEnv.executeSql(
+      "CREATE TABLE testJoinAndAggregateOnNested2 (`field1` "
+        + rowDataType2 + ") WITH ('connector' = 'datagen')");
+    val sql = "SELECT COUNT(t.data[0].nested[1].trId)\n" +
+      "FROM testJoinAndAggregateOnNested t, testJoinAndAggregateOnNested2 
t2\n" +
+      "WHERE t.data[0].nested[0].trId = t2.data1.nested.trId\n" +
+      "GROUP BY t.data[1].nested[0].trId";
+    util.verifyExecPlan(sql)
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
index 3cd45066b5b..4f556d7d36f 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala
@@ -825,4 +825,34 @@ class CalcITCase extends StreamingTestBase {
     val expected = List("16")
     assertThat(sink.getAppendResults.sorted).isEqualTo(expected.sorted)
   }
+
+  @Test
+  def testCastRow(): Unit = {
+    val testDataId = TestValuesTableFactory.registerData(
+      Seq(
+        row(row(row(row("1")))),
+        row(row(row(row("2"))))
+      ))
+    tEnv.executeSql(
+      s"""
+         |CREATE TABLE t (
+         |  a ROW<`data` ROW<`nested` ROW<`trId` STRING NOT NULL>>NOT NULL>,
+         |  b AS CAST(a as ROW<`data` ROW<`nested` ROW<`trId` STRING NOT 
NULL>>NOT NULL>)
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$testDataId',
+         |  'bounded' = 'true'
+         |)
+         |""".stripMargin)
+    val expected = List(
+      row("1", "1", row(row(row("1")))),
+      row("2", "2", row(row(row("2"))))
+    )
+    val actual = tEnv
+      .executeSql("select a.data.nested.trId, b.data.nested.trId, b AS col 
from t")
+      .collect()
+      .map(r => r)
+      .toList
+    assertThat(actual).isEqualTo(expected)
+  }
 }

Reply via email to