[ 
https://issues.apache.org/jira/browse/FLINK-31830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730846#comment-17730846
 ] 

Jane Chan edited comment on FLINK-31830 at 6/9/23 7:26 AM:
-----------------------------------------------------------

It takes some time to reason out the cause, and sorry for the late update. 
h4. 1. Identify the problem

The issue has been reproduced using Flink release-1.14.6 (which depends on 
calcite-1.26).
h4. 2. Why SQL gets a correct plan, while API doesn't

First, the resolved schema differs. You can verify this by the following code 
snippet.
{code:scala}
 @Test
  def testSchema(): Unit = {
    // create temporary table t1
    val tableDescriptor = TableDescriptor.forConnector("datagen")
      .schema(Schema.newBuilder
        .column("id", DataTypes.INT.notNull)
        .column("a", DataTypes.ROW(DataTypes.FIELD("np", 
DataTypes.INT.notNull())).nullable)
        .build)
      .build
    tableEnv.createTemporaryTable("t1", tableDescriptor)

    // create temporary table t2
    tableEnv.executeSql(
      s"""
         |create temporary table t2 (
         |  id int not null,
         |  a row<np int not null>
         |) with (
         | 'connector' = 'datagen'
         |)
         |""".stripMargin)

    val catalogManager = 
tableEnv.asInstanceOf[StreamTableEnvironmentImpl].getCatalogManager
    val result1 =
      catalogManager.getTable(ObjectIdentifier.of("default_catalog", 
"default_database", "t1"))
    val result2 =
      catalogManager.getTable(ObjectIdentifier.of("default_catalog", 
"default_database", "t2"))
    println(result1.get().getResolvedSchema)
    println(result2.get().getResolvedSchema)
  }
{code}
The result will be
{code:sql}
--result1
(
  `id` INT NOT NULL,
  `a` ROW<`np` INT NOT NULL>
)

--result2  
(
  `id` INT NOT NULL,
  `a` ROW<`np` INT> -- changed by 
org.apache.calcite.sql.SqlDataTypeSpec#fixNullability
)
{code}
You can tell from the print result that the nullability specified by the user 
does not get respect. However, this is a by-design behavior for Calcite. The 
community has an in-depth discussion CALCITE-2464 on the semantics of setting 
nullability for structured type.

TL;DR

!image-2023-06-09-15-06-01-322.png|width=844,height=219!

This feature is introduced in Calcite 1.19. As a result, the DDL `a` ROW<`np` 
INT NOT NULL> will be rewritten to `a` ROW<`np` INT> during the 
SQL-to-operation conversion. (Please check 
org.apache.calcite.sql.SqlDataTypeSpec#fixNullability for more details).

 

As for the plan, I assume based on the schema produced by API
{code:java}
( `id` INT NOT NULL, `a` ROW<`np` INT NOT NULL> ) {code}
the optimization rules work as expected.

 

The filter condition after the left outer join is
{code:java}
where a.a is null or a.a.np is null {code}

and can be reduced and pushdown as 
{code:java}
where a.a is null {code}
since the nullability of a.np is always false.

 

And RemoveUnreachableCoalesceArgumentsRule matches the following case
{code:java}
COALESCE(a.a.np, b.a.np) c1 {code}

because a.a.np is never nullability, so the invocation of coalesce is reduced. 

!image-2023-06-09-15-21-13-720.png|width=892,height=575!

 
h4. 3. Conclusion

So in a nutshell, we should align the structured type nullability created 
through API with SQL. Last but not least, we should improve the document and 
add a description of the nullability of structured type, o.w. it might not be 
straightforward for users to understand.
h4.  


was (Author: qingyue):
It takes some time to reason out the cause, and sorry for the late update. 
h4. 1. Identify the problem

The issue has been reproduced using Flink release-1.14.6 (which depends on 
calcite-1.26).
h4. 2. Why SQL gets a correct plan, while API doesn't

First, the resolved schema differs. You can verify this by the following code 
snippet.
{code:scala}
 @Test
  def testSchema(): Unit = {
    // create temporary table t1
    val tableDescriptor = TableDescriptor.forConnector("datagen")
      .schema(Schema.newBuilder
        .column("id", DataTypes.INT.notNull)
        .column("a", DataTypes.ROW(DataTypes.FIELD("np", 
DataTypes.INT.notNull())).nullable)
        .build)
      .build
    tableEnv.createTemporaryTable("t1", tableDescriptor)

    // create temporary table t2
    tableEnv.executeSql(
      s"""
         |create temporary table t2 (
         |  id int not null,
         |  a row<np int not null>
         |) with (
         | 'connector' = 'datagen'
         |)
         |""".stripMargin)

    val catalogManager = 
tableEnv.asInstanceOf[StreamTableEnvironmentImpl].getCatalogManager
    val result1 =
      catalogManager.getTable(ObjectIdentifier.of("default_catalog", 
"default_database", "t1"))
    val result2 =
      catalogManager.getTable(ObjectIdentifier.of("default_catalog", 
"default_database", "t2"))
    println(result1.get().getResolvedSchema)
    println(result2.get().getResolvedSchema)
  }
{code}
The result will be
{code:sql}
--result1
(
  `id` INT NOT NULL,
  `a` ROW<`np` INT NOT NULL>
)

--result2  
(
  `id` INT NOT NULL,
  `a` ROW<`np` INT> -- changed by 
org.apache.calcite.sql.SqlDataTypeSpec#fixNullability
)
{code}
You can tell from the print result that the nullability specified by the user 
does not get respect. However, this is a by-design behavior for Calcite. The 
community has an in-depth discussion CALCITE-2464 on the semantics of setting 
nullability for structured type.

TL;DR

!image-2023-06-09-15-06-01-322.png|width=844,height=219!

This feature is introduced in Calcite 1.19. As a result, the DDL `a` ROW<`np` 
INT NOT NULL> will be rewritten to `a` ROW<`np` INT> during the 
SQL-to-operation conversion. (Please check 
org.apache.calcite.sql.SqlDataTypeSpec#fixNullability for more details).

As for the plan, I assume based on the schema produced by API, the optimization 
rules work as expected.

( `id` INT NOT NULL, `a` ROW<`np` INT NOT NULL> )

The filter condition after join
where a.a is null or a.a.np is null
can be reduced and pushdown as 
where a.a is null
since the nullability of np is false.

And RemoveUnreachableCoalesceArgumentsRule matches the following case
COALESCE(a.a.np, b.a.np) c1
because a.a.np is never nullability, so the invocation of coalesce is reduced. 

!image-2023-06-09-15-21-13-720.png|width=892,height=575!

 
h4. 3. Conclusion

So in a nutshell, we should align the structured type nullability created 
through API with SQL. Last but not least, we should improve the document and 
add a description of the nullability of structured type, o.w. it might not be 
straightforward for users to understand.
h4.  

> Coalesce on nested fields with different nullabilities will get wrong plan
> --------------------------------------------------------------------------
>
>                 Key: FLINK-31830
>                 URL: https://issues.apache.org/jira/browse/FLINK-31830
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.14.6
>            Reporter: lincoln lee
>            Assignee: Jane Chan
>            Priority: Major
>         Attachments: image-2023-06-09-15-06-01-322.png, 
> image-2023-06-09-15-21-13-720.png
>
>
> A test case similar to FLINK-31829, only changes the nullable field `a.np` to 
> not null, will get a wrong plan in 1.14.x (reported from the community user):
> {code}
>   @Test
>   def testCoalesceOnNestedColumns(): Unit = {
>     val tEnv = util.tableEnv
>     val tableDescriptor = TableDescriptor.forConnector("datagen")
>         .schema(Schema.newBuilder
>             .column("id", DataTypes.INT.notNull)
>             .column("a", DataTypes.ROW(DataTypes.FIELD("np", 
> DataTypes.INT.notNull())).nullable)
>             .build)
>         .build
>     tEnv.createTemporaryTable("t1", tableDescriptor)
>     tEnv.createTemporaryTable("t2", tableDescriptor)
>     val res = tEnv.executeSql("EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np) 
> c1, IFNULL(a.a.np, b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a 
> is null or a.a.np is null")
>     res.print()
> }  
> == Abstract Syntax Tree ==
> LogicalProject(id=[$0], c1=[CAST($1.np):INTEGER], c2=[IFNULL($1.np, $3.np)])
> +- LogicalFilter(condition=[OR(IS NULL($1), IS NULL(CAST($1.np):INTEGER))])
>    +- LogicalJoin(condition=[=($0, $2)], joinType=[left])
>       :- LogicalTableScan(table=[[default_catalog, default_database, t1]])
>       +- LogicalTableScan(table=[[default_catalog, default_database, t2]])
> {code}
> the top project in the ast is wrong:  `LogicalProject(id=[$0], 
> c1=[CAST($1.np):INTEGER], c2=[IFNULL($1.np, $3.np)])`, the 
> `c1=[CAST($1.np):INTEGER]` relate to `COALESCE(a.a.np, b.a.np) c1` is 
> incorrect,
> but this works fine when using sql ddl to create tables
> {code}
>   @Test
>   def testCoalesceOnNestedColumns2(): Unit = {
>     val tEnv = util.tableEnv
>     tEnv.executeSql(
>       s"""
>          |create temporary table t1 (
>          |  id int not null,
>          |  a row<np int not null>
>          |) with (
>          | 'connector' = 'datagen'
>          |)
>          |""".stripMargin)
>     tEnv.executeSql(
>       s"""
>          |create temporary table t2 (
>          |  id int not null,
>          |  a row<np int not null>
>          |) with (
>          | 'connector' = 'datagen'
>          |)
>          |""".stripMargin)
>     val res = tEnv.executeSql(
>       "EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np) c1, IFNULL(a.a.np, 
> b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a is null or a.a.np 
> is null")
>     res.print()
>   }
> {code}
> from 1.15, the coalesce will be a new builtin function, and the ast looks 
> correct in version 1.15+, while before 1.15 it was rewritten as `case when`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to