xuyang created FLINK-29221: ------------------------------ Summary: Adding join hint in sql may cause imcompatible state Key: FLINK-29221 URL: https://issues.apache.org/jira/browse/FLINK-29221 Project: Flink Issue Type: Improvement Reporter: xuyang
The cause of the possible imcompatible state is that the sql before adding join hint and after is changed. Adding the following code in DagOptimizationTest.scala can re-produce this change. {code:java} @Test def testMultiSinks6(): Unit = { val stmtSet = util.tableEnv.createStatementSet() util.tableEnv.getConfig.set( RelNodeBlockPlanBuilder.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, Boolean.box(true)) // test with non-deterministic udf util.tableEnv.registerFunction("random_udf", new NonDeterministicUdf()) val table1 = util.tableEnv.sqlQuery( "SELECT random_udf(a) AS a, cast(b as int) as b, c FROM MyTable join MyTable1 on MyTable.c = MyTable1.f") util.tableEnv.registerTable("table1", table1) val table2 = util.tableEnv.sqlQuery("SELECT SUM(a) AS total_sum FROM table1") val table3 = util.tableEnv.sqlQuery("SELECT MIN(b) AS total_min FROM table1") val sink1 = util.createCollectTableSink(Array("total_sum"), Array(INT)) util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("sink1", sink1) stmtSet.addInsert("sink1", table2) val sink2 = util.createCollectTableSink(Array("total_min"), Array(INT)) util.tableEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal("sink2", sink2) stmtSet.addInsert("sink2", table3) util.verifyExecPlan(stmtSet) } {code} The plan is : {code:java} // ast LogicalLegacySink(name=[`default_catalog`.`default_database`.`sink1`], fields=[total_sum]) +- LogicalAggregate(group=[{}], total_sum=[SUM($0)]) +- LogicalProject(a=[$0]) +- LogicalProject(a=[random_udf($0)], b=[CAST($1):INTEGER], c=[$2]) +- LogicalJoin(condition=[=($2, $5)], joinType=[inner]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(d, e, f)]]]) LogicalLegacySink(name=[`default_catalog`.`default_database`.`sink2`], fields=[total_min]) +- LogicalAggregate(group=[{}], total_min=[MIN($0)]) +- LogicalProject(b=[$1]) +- LogicalProject(a=[random_udf($0)], b=[CAST($1):INTEGER], c=[$2]) +- LogicalJoin(condition=[=($2, $5)], joinType=[inner]) :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(d, e, f)]]]) // optimized exec HashJoin(joinType=[InnerJoin], where=[(c = f)], select=[a, b, c, d, e, f], build=[right])(reuse_id=[1]) :- Exchange(distribution=[hash[c]]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +- Exchange(distribution=[hash[f]]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) LegacySink(name=[`default_catalog`.`default_database`.`sink1`], fields=[total_sum]) +- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS total_sum]) +- Exchange(distribution=[single]) +- LocalHashAggregate(select=[Partial_SUM(a) AS sum$0]) +- Calc(select=[random_udf(a) AS a]) +- Reused(reference_id=[1]) LegacySink(name=[`default_catalog`.`default_database`.`sink2`], fields=[total_min]) +- HashAggregate(isMerge=[true], select=[Final_MIN(min$0) AS total_min]) +- Exchange(distribution=[single]) +- LocalHashAggregate(select=[Partial_MIN(b) AS min$0]) +- Calc(select=[CAST(b AS INTEGER) AS b]) +- Reused(reference_id=[1]){code} If the join hint is added, the `sqlToRelConverterConfig` will add a config 'withBloat(-1)' and disable merging project when convert sql node to rel node(see more in FlinkPlannerImpl), and the optimized exec plan will be changed because of SubGraphBasedOptimizer: {code:java} // optimized exec Calc(select=[random_udf(a) AS a, CAST(b AS INTEGER) AS b, c])(reuse_id=[1]) +- HashJoin(joinType=[InnerJoin], where=[(c = f)], select=[a, b, c, f], build=[right]) :- Exchange(distribution=[hash[c]]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +- Exchange(distribution=[hash[f]]) +- Calc(select=[f]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])LegacySink(name=[`default_catalog`.`default_database`.`sink1`], fields=[total_sum]) +- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS total_sum]) +- Exchange(distribution=[single]) +- LocalHashAggregate(select=[Partial_SUM(a) AS sum$0]) +- Calc(select=[a]) +- Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`sink2`], fields=[total_min]) +- HashAggregate(isMerge=[true], select=[Final_MIN(min$0) AS total_min]) +- Exchange(distribution=[single]) +- LocalHashAggregate(select=[Partial_MIN(b) AS min$0]) +- Calc(select=[b]) +- Reused(reference_id=[1]) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)