Caizhi Weng created FLINK-19939:
-----------------------------------

             Summary: Remove redundant union from multiple input node
                 Key: FLINK-19939
                 URL: https://issues.apache.org/jira/browse/FLINK-19939
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
            Reporter: Caizhi Weng
             Fix For: 1.12.0


Consider the following SQL and the execution plan.
{code:sql}
WITH
  T1 AS (SELECT COUNT(*) AS cnt FROM x GROUP BY a),
  T2 AS (SELECT COUNT(*) AS cnt FROM y GROUP BY d),
  T3 AS (SELECT b AS cnt FROM x INNER JOIN y ON x.b = y.e)
SELECT cnt FROM
  (SELECT cnt FROM T1)
  UNION ALL
  (SELECT cnt FROM T2)
  UNION ALL
  (SELECT cnt FROM T3)
{code}

{code}

MultipleInputNode(readOrder=[1,0,0,0], members=[\nUnion(all=[true], 
union=[cnt])\n:- Union(all=[true], union=[cnt])\n:  :- Calc(select=[CAST(cnt) 
AS cnt])\n:  :  +- HashAggregate(isMerge=[true], groupBy=[a], select=[a, 
Final_COUNT(count1$0) AS cnt])\n:  :     +- [#3] 
Exchange(distribution=[hash[a]])\n:  +- Calc(select=[CAST(cnt) AS cnt])\n:     
+- HashAggregate(isMerge=[true], groupBy=[d], select=[d, Final_COUNT(count1$0) 
AS cnt])\n:        +- [#4] Exchange(distribution=[hash[d]])\n+- Calc(select=[b 
AS cnt])\n   +- HashJoin(joinType=[InnerJoin], where=[=(b, e)], select=[b, e], 
build=[right])\n      :- [#1] Exchange(distribution=[hash[b]])\n      +- [#2] 
Exchange(distribution=[hash[e]])\n])
:- Exchange(distribution=[hash[b]])
:  +- Calc(select=[b])
:     +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
source: [TestTableSource(a, b, c, nx)]]], fields=[a, b, c, nx])
:- Exchange(distribution=[hash[e]])
:  +- Calc(select=[e])
:     +- LegacyTableSourceScan(table=[[default_catalog, default_database, y, 
source: [TestTableSource(d, e, f, ny)]]], fields=[d, e, f, ny])
:- Exchange(distribution=[hash[a]])
:  +- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(*) AS count1$0])
:     +- Calc(select=[a])
:        +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
source: [TestTableSource(a, b, c, nx)]]], fields=[a, b, c, nx])
+- Exchange(distribution=[hash[d]])
   +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(*) AS count1$0])
      +- Calc(select=[d])
         +- LegacyTableSourceScan(table=[[default_catalog, default_database, y, 
source: [TestTableSource(d, e, f, ny)]]], fields=[d, e, f, ny])
{code}

The two unions here in multiple input here is actually redundant, as the amount 
of data shuffled will not change even if they're moved out of the multiple 
input node. We should remove such redundant union from multiple input nodes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to