Caizhi Weng created FLINK-19959: ----------------------------------- Summary: Multiple input creation algorithm will deduce an incorrect input order if the inputs are related under PIPELINED shuffle mode Key: FLINK-19959 URL: https://issues.apache.org/jira/browse/FLINK-19959 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Caizhi Weng Fix For: 1.12.0
Consider the following SQL {code:sql} WITH T1 AS (SELECT x.a AS a, y.d AS b FROM y LEFT JOIN x ON y.d = x.a), T2 AS (SELECT a, b FROM (SELECT a, b FROM T1) UNION ALL (SELECT x.a AS a, x.b AS b FROM x)) SELECT * FROM T2 LEFT JOIN t ON T2.a = t.a {code} The multiple input creation algorithm will currently deduce the following plan under the PIPELINED shuffle mode: {code} MultipleInputNode(readOrder=[0,1,1,0], members=[\nNestedLoopJoin(joinType=[LeftOuterJoin], where=[=(a, a0)], select=[a, b, a0, b0, c], build=[right])\n:- Union(all=[true], union=[a, b])\n: :- Calc(select=[a, CAST(d) AS b])\n: : +- NestedLoopJoin(joinType=[LeftOuterJoin], where=[=(d, a)], select=[d, a], build=[right])\n: : :- [#3] Calc(select=[d])\n: : +- [#4] Exchange(distribution=[broadcast])\n: +- [#2] Calc(select=[a, b])\n+- [#1] Exchange(distribution=[broadcast])\n]) :- Exchange(distribution=[broadcast]) : +- BoundedStreamScan(table=[[default_catalog, default_database, t]], fields=[a, b, c]) :- Calc(select=[a, b]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, source: [TestTableSource(a, b, c, nx)]]], fields=[a, b, c, nx], reuse_id=[1]) :- Calc(select=[d]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, y, source: [TestTableSource(d, e, f, ny)]]], fields=[d, e, f, ny]) +- Exchange(distribution=[broadcast]) +- Calc(select=[a]) +- Reused(reference_id=[1]) {code} It's obvious that the 2nd and the 4th input for the multiple input node should have the same input priority, otherwise a deadlock will occur. This is because the current algorithm fails to consider the case when the inputs are related out of the multiple input node. -- This message was sent by Atlassian Jira (v8.3.4#803005)