[ 
https://issues.apache.org/jira/browse/FLINK-19939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Caizhi Weng updated FLINK-19939:
--------------------------------
    Comment: was deleted

(was: cc [~pnowojski] [~AHeise])

> Remove redundant union from multiple input node
> -----------------------------------------------
>
>                 Key: FLINK-19939
>                 URL: https://issues.apache.org/jira/browse/FLINK-19939
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>            Reporter: Caizhi Weng
>            Priority: Major
>             Fix For: 1.12.0
>
>
> Consider the following SQL and the execution plan.
> {code:sql}
> WITH
>   T1 AS (SELECT COUNT(*) AS cnt FROM x GROUP BY a),
>   T2 AS (SELECT COUNT(*) AS cnt FROM y GROUP BY d),
>   T3 AS (SELECT b AS cnt FROM x INNER JOIN y ON x.b = y.e)
> SELECT cnt FROM
>   (SELECT cnt FROM T1)
>   UNION ALL
>   (SELECT cnt FROM T2)
>   UNION ALL
>   (SELECT cnt FROM T3)
> {code}
> {code}
> MultipleInputNode(readOrder=[1,0,0,0], members=[\nUnion(all=[true], 
> union=[cnt])\n:- Union(all=[true], union=[cnt])\n:  :- Calc(select=[CAST(cnt) 
> AS cnt])\n:  :  +- HashAggregate(isMerge=[true], groupBy=[a], select=[a, 
> Final_COUNT(count1$0) AS cnt])\n:  :     +- [#3] 
> Exchange(distribution=[hash[a]])\n:  +- Calc(select=[CAST(cnt) AS cnt])\n:    
>  +- HashAggregate(isMerge=[true], groupBy=[d], select=[d, 
> Final_COUNT(count1$0) AS cnt])\n:        +- [#4] 
> Exchange(distribution=[hash[d]])\n+- Calc(select=[b AS cnt])\n   +- 
> HashJoin(joinType=[InnerJoin], where=[=(b, e)], select=[b, e], 
> build=[right])\n      :- [#1] Exchange(distribution=[hash[b]])\n      +- [#2] 
> Exchange(distribution=[hash[e]])\n])
> :- Exchange(distribution=[hash[b]])
> :  +- Calc(select=[b])
> :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
> source: [TestTableSource(a, b, c, nx)]]], fields=[a, b, c, nx])
> :- Exchange(distribution=[hash[e]])
> :  +- Calc(select=[e])
> :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, y, 
> source: [TestTableSource(d, e, f, ny)]]], fields=[d, e, f, ny])
> :- Exchange(distribution=[hash[a]])
> :  +- LocalHashAggregate(groupBy=[a], select=[a, Partial_COUNT(*) AS 
> count1$0])
> :     +- Calc(select=[a])
> :        +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
> x, source: [TestTableSource(a, b, c, nx)]]], fields=[a, b, c, nx])
> +- Exchange(distribution=[hash[d]])
>    +- LocalHashAggregate(groupBy=[d], select=[d, Partial_COUNT(*) AS 
> count1$0])
>       +- Calc(select=[d])
>          +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
> y, source: [TestTableSource(d, e, f, ny)]]], fields=[d, e, f, ny])
> {code}
> The two unions here in multiple input here is actually redundant, as the 
> amount of data shuffled will not change even if they're moved out of the 
> multiple input node. We should remove such redundant union from multiple 
> input nodes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to