Github user docete commented on the issue:
https://github.com/apache/flink/pull/3111
@fhueske Yes, I have checked the execution plan. It's very similar to your
description:
Take example for SQL "select sum(distinct a), sum(distinct b), sum(c) from
expr", where expr is a table, and it has 3 fields: a, b, c.
The explaination for the query is:
```
== Abstract Syntax Tree ==
LogicalAggregate(group=[{}], EXPR$0=[SUM(DISTINCT $0)],
EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[SUM($2)])
LogicalTableScan(table=[[expr]])
== Optimized Logical Plan ==
DataSetCalc(select=[EXPR$0, EXPR$1, EXPR$2])
DataSetSingleRowJoin(where=[true], join=[EXPR$2, EXPR$0, EXPR$1],
joinType=[NestedLoopJoin])
DataSetSingleRowJoin(where=[true], join=[EXPR$2, EXPR$0],
joinType=[NestedLoopJoin])
DataSetAggregate(select=[SUM(c) AS EXPR$2])
DataSetUnion(union=[a, b, c])
DataSetValues(tuples=[[{ null, null, null }]], values=[a, b, c])
DataSetScan(table=[[_DataSetTable_0]])
DataSetAggregate(select=[SUM(a) AS EXPR$0])
DataSetUnion(union=[a])
DataSetValues(tuples=[[{ null }]], values=[a])
DataSetAggregate(groupBy=[a], select=[a])
DataSetCalc(select=[a])
DataSetScan(table=[[_DataSetTable_0]])
DataSetAggregate(select=[SUM(b) AS EXPR$1])
DataSetUnion(union=[b])
DataSetValues(tuples=[[{ null }]], values=[b])
DataSetAggregate(groupBy=[b], select=[b])
DataSetCalc(select=[b])
DataSetScan(table=[[_DataSetTable_0]])
== Physical Execution Plan ==
Stage 8 : Data Source
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 14 : Data Source
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 13 : Map
content : from: (a, b, c)
ship_strategy : Forward
exchange_mode : BATCH
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 12 : FlatMap
content : select: (a)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
Partitioning : RANDOM_PARTITIONED
Stage 11 : Map
content : prepare select: (a)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 10 : GroupCombine
content : groupBy: (a), select: (a)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED
Stage 9 : GroupReduce
content : groupBy: (a), select:
(a)
ship_strategy : Hash Partition
on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group
Reduce
Partitioning :
RANDOM_PARTITIONED
Stage 7 : Union
content :
ship_strategy :
Redistribute
exchange_mode :
PIPELINED
Partitioning :
RANDOM_PARTITIONED
Stage 6 : Map
content :
prepare select: (SUM(a) AS EXPR$0)
ship_strategy :
Forward
exchange_mode :
PIPELINED
driver_strategy
: Map
Partitioning :
RANDOM_PARTITIONED
Stage 5 :
GroupCombine
content
: select:(SUM(a) AS EXPR$0)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Group Reduce All
Partitioning : RANDOM_PARTITIONED
Stage 4
: GroupReduce
content : select:(SUM(a) AS EXPR$0)
ship_strategy : Redistribute
exchange_mode : PIPELINED
driver_strategy : Group Reduce All
Partitioning : RANDOM_PARTITIONED
Stage 19 : Data Source
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 20 : Map
content : from: (a, b, c)
ship_strategy : Forward
exchange_mode : BATCH
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 18 : Union
content :
ship_strategy : Redistribute
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
Stage 17 : Map
content : prepare select: (SUM(c) AS EXPR$2)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 16 : GroupCombine
content : select:(SUM(c) AS EXPR$2)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Group Reduce All
Partitioning : RANDOM_PARTITIONED
Stage 15 : GroupReduce
content : select:(SUM(c) AS
EXPR$2)
ship_strategy : Redistribute
exchange_mode : PIPELINED
driver_strategy : Group Reduce
All
Partitioning :
RANDOM_PARTITIONED
Stage 3 : FlatMap
content : where:
(true), join: (EXPR$2, EXPR$0)
ship_strategy : Forward
exchange_mode :
PIPELINED
driver_strategy :
FlatMap
Partitioning :
RANDOM_PARTITIONED
Stage 25 : Data Source
content : collect elements with CollectionInputFormat
Partitioning : RANDOM_PARTITIONED
Stage 30 : Map
content : from: (a, b, c)
ship_strategy : Forward
exchange_mode : BATCH
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 29 : FlatMap
content : select: (b)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
Partitioning : RANDOM_PARTITIONED
Stage 28 : Map
content : prepare select: (b)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Map
Partitioning : RANDOM_PARTITIONED
Stage 27 : GroupCombine
content : groupBy: (b), select: (b)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Sorted Combine
Partitioning : RANDOM_PARTITIONED
Stage 26 : GroupReduce
content : groupBy: (b), select:
(b)
ship_strategy : Hash Partition
on [0]
exchange_mode : PIPELINED
driver_strategy : Sorted Group
Reduce
Partitioning :
RANDOM_PARTITIONED
Stage 24 : Union
content :
ship_strategy :
Redistribute
exchange_mode :
PIPELINED
Partitioning :
RANDOM_PARTITIONED
Stage 23 : Map
content :
prepare select: (SUM(b) AS EXPR$1)
ship_strategy :
Forward
exchange_mode :
PIPELINED
driver_strategy
: Map
Partitioning :
RANDOM_PARTITIONED
Stage 22 :
GroupCombine
content
: select:(SUM(b) AS EXPR$1)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : Group Reduce All
Partitioning : RANDOM_PARTITIONED
Stage
21 : GroupReduce
content : select:(SUM(b) AS EXPR$1)
ship_strategy : Redistribute
exchange_mode : PIPELINED
driver_strategy : Group Reduce All
Partitioning : RANDOM_PARTITIONED
Stage 2 : FlatMap
content : where: (true), join: (EXPR$2, EXPR$0, EXPR$1)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
Partitioning : RANDOM_PARTITIONED
Stage 1 : FlatMap
content : select: (EXPR$0, EXPR$1, EXPR$2)
ship_strategy : Forward
exchange_mode : PIPELINED
driver_strategy : FlatMap
Partitioning : RANDOM_PARTITIONED
Stage 0 : Data Sink
content :
org.apache.flink.api.java.io.DiscardingOutputFormat
ship_strategy : Forward
exchange_mode : PIPELINED
Partitioning : RANDOM_PARTITIONED
```--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at [email protected] or file a JIRA ticket with INFRA. ---
