[ https://issues.apache.org/jira/browse/FLINK-31829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713352#comment-17713352 ]
Jane Chan commented on FLINK-31829: ----------------------------------- Sure. I'd like to take this ticket. I reproduced this issue in the local env and found that FlinkTypeFactory#createTypeWithNullability only changes the outer type's nullability for the row type, which is very similar to FLINK-30282. > Conversion to relational algebra failed to preserve datatypes' nullabilities > ----------------------------------------------------------------------------- > > Key: FLINK-31829 > URL: https://issues.apache.org/jira/browse/FLINK-31829 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.17.0 > Reporter: lincoln lee > Priority: Major > Fix For: 1.18.0 > > > AssertionError when run such a case: > {code} > @Test > def testCoalesceOnNestedColumns(): Unit = { > val tEnv = util.tableEnv > val tableDescriptor = TableDescriptor > .forConnector("datagen") > .schema( > Schema.newBuilder > .column("id", DataTypes.INT.notNull) > .column("a", DataTypes.ROW(DataTypes.FIELD("np", > DataTypes.INT)).nullable) > .build) > .build > tEnv.createTemporaryTable("t1", tableDescriptor) > tEnv.createTemporaryTable("t2", tableDescriptor) > val res = tEnv.executeSql( > "EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np) c1, IFNULL(a.a.np, > b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a is null or a.a.np > is null") > res.print() > } > {code} > stack: > {code} > java.lang.AssertionError: Conversion to relational algebra failed to preserve > datatypes: > validated type: > RecordType(INTEGER B1, INTEGER NOT NULL B2, INTEGER BenchmarkId1, INTEGER NOT > NULL BenchmarkIdWithIfNull, INTEGER NOT NULL BenchmarkId2) NOT NULL > converted type: > RecordType(INTEGER NOT NULL B1, INTEGER NOT NULL B2, INTEGER BenchmarkId1, > INTEGER NOT NULL BenchmarkIdWithIfNull, INTEGER NOT NULL BenchmarkId2) NOT > NULL > rel: > LogicalProject(B1=[$4.BenchmarkId], B2=[$2.BenchmarkId], BenchmarkId1=[IF(IS > NOT NULL($4), $4.BenchmarkId, IF(true, $2.BenchmarkId, null:INTEGER))], > BenchmarkIdWithIfNull=[IFNULL($4.BenchmarkId, $2.BenchmarkId)], > BenchmarkId2=[COALESCE($4.BenchmarkId, $2.BenchmarkId)]) > LogicalFilter(condition=[OR(IS NULL($4), IS NULL($4.BenchmarkId))]) > LogicalJoin(condition=[=($3, $0)], joinType=[left]) > LogicalJoin(condition=[=($1, $0)], joinType=[inner]) > LogicalTableScan(table=[[default_catalog, default_database, > dbo_book]]) > LogicalTableScan(table=[[default_catalog, default_database, > static_book]]) > LogicalTableScan(table=[[default_catalog, default_database, > onebook_book_benchmark]]) > at > org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:500) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:611) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:216) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:192) > at > org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:56) > at > org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48) > at > org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:65) > at > org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:281) > at > org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:271) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:665) > {code} > but the equivalent tests using sql ddl to create table works fine: > {code} > @Test > def testCoalesceOnNestedColumns2(): Unit = { > val tEnv = util.tableEnv > tEnv.executeSql(s""" > |create temporary table t1 ( > | id int not null, > | a row<np int> > |) with ( > | 'connector' = 'datagen' > |) > |""".stripMargin) > tEnv.executeSql(s""" > |create temporary table t2 ( > | id int not null, > | a row<np int> > |) with ( > | 'connector' = 'datagen' > |) > |""".stripMargin) > val res = tEnv.executeSql( > "EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np) c1, IFNULL(a.a.np, > b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a is null or a.a.np > is null") > res.print() > } > == Abstract Syntax Tree == > LogicalProject(id=[$0], c1=[COALESCE($1.np, $3.np)], c2=[IFNULL($1.np, > $3.np)]) > +- LogicalFilter(condition=[OR(IS NULL($1), IS NULL($1.np))]) > +- LogicalJoin(condition=[=($0, $2)], joinType=[left]) > :- LogicalTableScan(table=[[default_catalog, default_database, t1]]) > +- LogicalTableScan(table=[[default_catalog, default_database, t2]]) > == Optimized Physical Plan == > Calc(select=[id, COALESCE(a.np, a0.np) AS c1, IFNULL(a.np, a0.np) AS c2]) > +- Join(joinType=[LeftOuterJoin], where=[=(id, id0)], select=[id, a, id0, > a0], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[id]]) > : +- Calc(select=[id, a], where=[OR(IS NULL(a), IS NULL(a.np))]) > : +- TableSourceScan(table=[[default_catalog, default_database, t1]], > fields=[id, a]) > +- Exchange(distribution=[hash[id]]) > +- TableSourceScan(table=[[default_catalog, default_database, t2]], > fields=[id, a]) > == Optimized Execution Plan == > Calc(select=[id, COALESCE(a.np, a0.np) AS c1, IFNULL(a.np, a0.np) AS c2]) > +- Join(joinType=[LeftOuterJoin], where=[(id = id0)], select=[id, a, id0, > a0], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[id]]) > : +- Calc(select=[id, a], where=[(a IS NULL OR a.np IS NULL)]) > : +- TableSourceScan(table=[[default_catalog, default_database, t1]], > fields=[id, a]) > +- Exchange(distribution=[hash[id]]) > +- TableSourceScan(table=[[default_catalog, default_database, t2]], > fields=[id, a]) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)