[ 
https://issues.apache.org/jira/browse/FLINK-31829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17713352#comment-17713352
 ] 

Jane Chan commented on FLINK-31829:
-----------------------------------

Sure. I'd like to take this ticket. I reproduced this issue in the local env 
and found that FlinkTypeFactory#createTypeWithNullability only changes the 
outer type's nullability for the row type, which is very similar to FLINK-30282.

>  Conversion to relational algebra failed to preserve datatypes' nullabilities
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-31829
>                 URL: https://issues.apache.org/jira/browse/FLINK-31829
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.17.0
>            Reporter: lincoln lee
>            Priority: Major
>             Fix For: 1.18.0
>
>
> AssertionError when run such a case:
> {code}
>   @Test
>   def testCoalesceOnNestedColumns(): Unit = {
>     val tEnv = util.tableEnv
>     val tableDescriptor = TableDescriptor
>       .forConnector("datagen")
>       .schema(
>         Schema.newBuilder
>           .column("id", DataTypes.INT.notNull)
>           .column("a", DataTypes.ROW(DataTypes.FIELD("np", 
> DataTypes.INT)).nullable)
>           .build)
>       .build
>     tEnv.createTemporaryTable("t1", tableDescriptor)
>     tEnv.createTemporaryTable("t2", tableDescriptor)
>     val res = tEnv.executeSql(
>       "EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np) c1, IFNULL(a.a.np, 
> b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a is null or a.a.np 
> is null")
>     res.print()
>   }
> {code}
> stack:
> {code}
> java.lang.AssertionError: Conversion to relational algebra failed to preserve 
> datatypes:
> validated type:
> RecordType(INTEGER B1, INTEGER NOT NULL B2, INTEGER BenchmarkId1, INTEGER NOT 
> NULL BenchmarkIdWithIfNull, INTEGER NOT NULL BenchmarkId2) NOT NULL
> converted type:
> RecordType(INTEGER NOT NULL B1, INTEGER NOT NULL B2, INTEGER BenchmarkId1, 
> INTEGER NOT NULL BenchmarkIdWithIfNull, INTEGER NOT NULL BenchmarkId2) NOT 
> NULL
> rel:
> LogicalProject(B1=[$4.BenchmarkId], B2=[$2.BenchmarkId], BenchmarkId1=[IF(IS 
> NOT NULL($4), $4.BenchmarkId, IF(true, $2.BenchmarkId, null:INTEGER))], 
> BenchmarkIdWithIfNull=[IFNULL($4.BenchmarkId, $2.BenchmarkId)], 
> BenchmarkId2=[COALESCE($4.BenchmarkId, $2.BenchmarkId)])
>   LogicalFilter(condition=[OR(IS NULL($4), IS NULL($4.BenchmarkId))])
>     LogicalJoin(condition=[=($3, $0)], joinType=[left])
>       LogicalJoin(condition=[=($1, $0)], joinType=[inner])
>         LogicalTableScan(table=[[default_catalog, default_database, 
> dbo_book]])
>         LogicalTableScan(table=[[default_catalog, default_database, 
> static_book]])
>       LogicalTableScan(table=[[default_catalog, default_database, 
> onebook_book_benchmark]])
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:500)
>       at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:611)
>       at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:216)
>       at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:192)
>       at 
> org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:56)
>       at 
> org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48)
>       at 
> org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:65)
>       at 
> org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:281)
>       at 
> org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:271)
>       at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:665)
> {code}
> but the equivalent tests using sql ddl to create table works fine:
> {code}
>   @Test
>   def testCoalesceOnNestedColumns2(): Unit = {
>     val tEnv = util.tableEnv
>     tEnv.executeSql(s"""
>                        |create temporary table t1 (
>                        |  id int not null,
>                        |  a row<np int>
>                        |) with (
>                        | 'connector' = 'datagen'
>                        |)
>                        |""".stripMargin)
>     tEnv.executeSql(s"""
>                        |create temporary table t2 (
>                        |  id int not null,
>                        |  a row<np int>
>                        |) with (
>                        | 'connector' = 'datagen'
>                        |)
>                        |""".stripMargin)
>     val res = tEnv.executeSql(
>       "EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np) c1, IFNULL(a.a.np, 
> b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a is null or a.a.np 
> is null")
>     res.print()
>   }
> == Abstract Syntax Tree ==
> LogicalProject(id=[$0], c1=[COALESCE($1.np, $3.np)], c2=[IFNULL($1.np, 
> $3.np)])
> +- LogicalFilter(condition=[OR(IS NULL($1), IS NULL($1.np))])
>    +- LogicalJoin(condition=[=($0, $2)], joinType=[left])
>       :- LogicalTableScan(table=[[default_catalog, default_database, t1]])
>       +- LogicalTableScan(table=[[default_catalog, default_database, t2]])
> == Optimized Physical Plan ==
> Calc(select=[id, COALESCE(a.np, a0.np) AS c1, IFNULL(a.np, a0.np) AS c2])
> +- Join(joinType=[LeftOuterJoin], where=[=(id, id0)], select=[id, a, id0, 
> a0], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
>    :- Exchange(distribution=[hash[id]])
>    :  +- Calc(select=[id, a], where=[OR(IS NULL(a), IS NULL(a.np))])
>    :     +- TableSourceScan(table=[[default_catalog, default_database, t1]], 
> fields=[id, a])
>    +- Exchange(distribution=[hash[id]])
>       +- TableSourceScan(table=[[default_catalog, default_database, t2]], 
> fields=[id, a])
> == Optimized Execution Plan ==
> Calc(select=[id, COALESCE(a.np, a0.np) AS c1, IFNULL(a.np, a0.np) AS c2])
> +- Join(joinType=[LeftOuterJoin], where=[(id = id0)], select=[id, a, id0, 
> a0], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
>    :- Exchange(distribution=[hash[id]])
>    :  +- Calc(select=[id, a], where=[(a IS NULL OR a.np IS NULL)])
>    :     +- TableSourceScan(table=[[default_catalog, default_database, t1]], 
> fields=[id, a])
>    +- Exchange(distribution=[hash[id]])
>       +- TableSourceScan(table=[[default_catalog, default_database, t2]], 
> fields=[id, a])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to