xuyang created FLINK-29221:
------------------------------

             Summary: Adding join hint in sql may cause imcompatible state
                 Key: FLINK-29221
                 URL: https://issues.apache.org/jira/browse/FLINK-29221
             Project: Flink
          Issue Type: Improvement
            Reporter: xuyang


The cause of the possible imcompatible state is that the sql before adding join 
hint and after is changed.

Adding the following code in DagOptimizationTest.scala can re-produce this 
change.
{code:java}

@Test
def testMultiSinks6(): Unit = {
  val stmtSet = util.tableEnv.createStatementSet()
  util.tableEnv.getConfig.set(
    
RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED,
    Boolean.box(true))
  // test with non-deterministic udf
  util.tableEnv.registerFunction("random_udf", new NonDeterministicUdf())
  val table1 = util.tableEnv.sqlQuery(
    "SELECT random_udf(a) AS a, cast(b as int) as b, c FROM MyTable join 
MyTable1 on MyTable.c = MyTable1.f")
  util.tableEnv.registerTable("table1", table1)
  val table2 = util.tableEnv.sqlQuery("SELECT SUM(a) AS total_sum FROM table1")
  val table3 = util.tableEnv.sqlQuery("SELECT MIN(b) AS total_min FROM table1")

  val sink1 = util.createCollectTableSink(Array("total_sum"), Array(INT))
  
util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("sink1",
 sink1)
  stmtSet.addInsert("sink1", table2)

  val sink2 = util.createCollectTableSink(Array("total_min"), Array(INT))
  
util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("sink2",
 sink2)
  stmtSet.addInsert("sink2", table3)

  util.verifyExecPlan(stmtSet)
} {code}
The plan is :
{code:java}
// ast
LogicalLegacySink(name=[`default_catalog`.`default_database`.`sink1`], 
fields=[total_sum])
+- LogicalAggregate(group=[{}], total_sum=[SUM($0)])
   +- LogicalProject(a=[$0])
      +- LogicalProject(a=[random_udf($0)], b=[CAST($1):INTEGER], c=[$2])
         +- LogicalJoin(condition=[=($2, $5)], joinType=[inner])
            :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]])
            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(d, e, f)]]])

LogicalLegacySink(name=[`default_catalog`.`default_database`.`sink2`], 
fields=[total_min])
+- LogicalAggregate(group=[{}], total_min=[MIN($0)])
   +- LogicalProject(b=[$1])
      +- LogicalProject(a=[random_udf($0)], b=[CAST($1):INTEGER], c=[$2])
         +- LogicalJoin(condition=[=($2, $5)], joinType=[inner])
            :- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]])
            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(d, e, f)]]])

// optimized exec
HashJoin(joinType=[InnerJoin], where=[(c = f)], select=[a, b, c, d, e, f], 
build=[right])(reuse_id=[1])
:- Exchange(distribution=[hash[c]])
:  +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[f]])
   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])

LegacySink(name=[`default_catalog`.`default_database`.`sink1`], 
fields=[total_sum])
+- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS total_sum])
   +- Exchange(distribution=[single])
      +- LocalHashAggregate(select=[Partial_SUM(a) AS sum$0])
         +- Calc(select=[random_udf(a) AS a])
            +- Reused(reference_id=[1])

LegacySink(name=[`default_catalog`.`default_database`.`sink2`], 
fields=[total_min])
+- HashAggregate(isMerge=[true], select=[Final_MIN(min$0) AS total_min])
   +- Exchange(distribution=[single])
      +- LocalHashAggregate(select=[Partial_MIN(b) AS min$0])
         +- Calc(select=[CAST(b AS INTEGER) AS b])
            +- Reused(reference_id=[1]){code}
If the join hint is added, the `sqlToRelConverterConfig` will add a config 
'withBloat(-1)' and disable merging project when convert sql node to rel 
node(see more in FlinkPlannerImpl), and the optimized exec plan will be changed 
because of SubGraphBasedOptimizer:
{code:java}
// optimized exec
Calc(select=[random_udf(a) AS a, CAST(b AS INTEGER) AS b, c])(reuse_id=[1])
+- HashJoin(joinType=[InnerJoin], where=[(c = f)], select=[a, b, c, f], 
build=[right])
   :- Exchange(distribution=[hash[c]])
   :  +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
   +- Exchange(distribution=[hash[f]])
      +- Calc(select=[f])
         +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(d, e, f)]]], fields=[d, e, 
f])LegacySink(name=[`default_catalog`.`default_database`.`sink1`], 
fields=[total_sum])
+- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS total_sum])
   +- Exchange(distribution=[single])
      +- LocalHashAggregate(select=[Partial_SUM(a) AS sum$0])
         +- Calc(select=[a])
            +- 
Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`sink2`],
 fields=[total_min])
+- HashAggregate(isMerge=[true], select=[Final_MIN(min$0) AS total_min])
   +- Exchange(distribution=[single])
      +- LocalHashAggregate(select=[Partial_MIN(b) AS min$0])
         +- Calc(select=[b])
            +- Reused(reference_id=[1]) {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to