Shengkai Fang created FLINK-33446:
-------------------------------------

             Summary: 
SubQueryAntiJoinTest#testMultiNotExistsWithCorrelatedOnWhere_NestedCorrelation 
doesn't produce the correct plan
                 Key: FLINK-33446
                 URL: https://issues.apache.org/jira/browse/FLINK-33446
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.17.2, 1.19.0, 1.18.1
            Reporter: Shengkai Fang


Although this test doesn't throw an exception, you can find the final produce 3 
columns rather than 2 columns after optimization.

{code:java}
LogicalProject(inputs=[0..1], exprs=[[$4]])
+- LogicalFilter(condition=[IS NULL($5)])
   +- LogicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[left])
      :- LogicalProject(exprs=[[+(2, $0), +(3, $1)]])
      :  +- LogicalTableScan(table=[[default_catalog, default_database, l, 
source: [TestTableSource(a, b, c)]]])
      +- LogicalProject(inputs=[0..2], exprs=[[true]])
         +- LogicalAggregate(group=[{0, 1, 2}])
            +- LogicalProject(inputs=[0..2])
               +- LogicalFilter(condition=[IS NULL($3)])
                  +- LogicalJoin(condition=[true], joinType=[left])
                     :- LogicalFilter(condition=[IS NOT NULL($0)])
                     :  +- LogicalProject(exprs=[[+($0, 1)]])
                     :     +- LogicalTableScan(table=[[default_catalog, 
default_database, r, source: [TestTableSource(d, e, f)]]])
                     +- LogicalProject(inputs=[0..1], exprs=[[true]])
                        +- LogicalAggregate(group=[{0, 1}])
                           +- LogicalProject(exprs=[[$3, $0]])
                              +- LogicalFilter(condition=[AND(=($1, $0), 
=(CAST($2):BIGINT, $3))])
                                 +- LogicalProject(exprs=[[+($0, 4), +($0, 5), 
+($0, 6), CAST(+($0, 6)):BIGINT]])
                                    +- 
LogicalTableScan(table=[[default_catalog, default_database, t, source: 
[TestTableSource(i, j, k)]]])

{code}

After digging, I think it's the SubQueryRemoveRule doesn't generate the 
Correlate but generates the Join node, which causes the failure of the 
decorrelation. For a quick fix, I think we should throw an exception to notify 
users it's not a supported feature in the Flink. 

There might exist 2 ways to fix this issue:
1. Expand subquery when converting SQL to rel.  After experimenting with 
calcite, I find the Sql2RelConverter generates the correct plan.

{code:java}
LogicalProject(inputs=[0..1])
+- LogicalFilter(condition=[IS NULL($2)])
   +- LogicalCorrelate(correlation=[$cor7], joinType=[left], 
requiredColumns=[{0, 1}])
      :- LogicalProject(exprs=[[+(2, $0), +(3, $1)]])
      :  +- LogicalTableScan(table=[[default_catalog, default_database, l, 
source: [TestTableSource(a, b, c)]]])
      +- LogicalAggregate(group=[{}], agg#0=[MIN($0)])
         +- LogicalProject(exprs=[[true]])
            +- LogicalFilter(condition=[AND(=($0, $cor7.d2), IS NULL($1))])
               +- LogicalCorrelate(correlation=[$cor4], joinType=[left], 
requiredColumns=[{0}])
                  :- LogicalProject(inputs=[0])
                  :  +- LogicalTableScan(table=[[default_catalog, 
default_database, r, source: [TestTableSource(d1, e, f)]]])
                  +- LogicalAggregate(group=[{}], agg#0=[MIN($0)])
                     +- LogicalProject(exprs=[[true]])
                        +- LogicalFilter(condition=[AND(=($0, $cor4.d1), =($1, 
$cor4.d1), =(CAST($2):BIGINT, $cor7.d3))])
                           +- LogicalProject(exprs=[[+($0, 4), +($0, 5), +($0, 
6)]])
                              +- LogicalTableScan(table=[[default_catalog, 
default_database, t, source: [TestTableSource(i, j, k)]]])
{code}

You can find the new plan uses a correlate node rather than join node.

2. CALCITE-4686 might fix this problem by removing the nested correlation node.









--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to