[ https://issues.apache.org/jira/browse/SPARK-25203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16589356#comment-16589356 ]
Eyal Farago commented on SPARK-25203: ------------------------------------- seems I was wrong regarding the resulting distribution, modified the query a bit: {code:java} select *, spark_partition_id() as P from (select * from t1DU distribute by c1) -- !query 15 schema struct<c1:int,c2:string,P:int> -- !query 15 output 1 a 43 2 a 174 2 b 174 3 b 51 {code} it's now obvious that records are properly clustered. (I guess the original query computed the partition id BEFORE the repartitioning). > spark sql, union all does not propagate child partitioning (when possible) > -------------------------------------------------------------------------- > > Key: SPARK-25203 > URL: https://issues.apache.org/jira/browse/SPARK-25203 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL > Affects Versions: 2.2.0, 2.3.0, 2.4.0 > Reporter: Eyal Farago > Priority: Major > > in spark-sql, union all does not propagate partitioning when all child plans > have the same partitioning, this causes introduction of non necessary > Exchange nodes when parent operator requires a distribution satisfied by this > partitioning. > > {code:java} > CREATE OR REPLACE TEMPORARY VIEW t1 AS VALUES (1, 'a'), (2, 'b') tbl(c1, c2); > CREATE OR REPLACE TEMPORARY VIEW t1D1 AS select c1, c2 from t1 distribute by > c1; > CREATE OR REPLACE TEMPORARY VIEW t1D2 AS select c1 + 1 as c11, c2 from t1 > distribute by c11; > create or REPLACE TEMPORARY VIEW t1DU as > select * from t1D1 > UNION ALL > select * from t1D2; > EXPLAIN select * from t1DU distribute by c1; > == Physical Plan == > Exchange hashpartitioning(c1#x, 200) > +- Union > :- Exchange hashpartitioning(c1#x, 200) > : +- LocalTableScan [c1#x, c2#x] > +- Exchange hashpartitioning(c11#x, 200) > +- LocalTableScan [c11#x, c2#x] > {code} > the Exchange introduced in the last query is unnecessary since the unioned > data is already partitioned by column _c1_, in fact the equivalent RDD > operation identifies this scenario and introduces a PartitionerAwareUnionRDD > which maintains children's shared partitioner. > I suggest modifying modifying org.apache.spark.sql.execution.UnionExec by > overriding _outputPartitioning_ in a way that identifies common partitioning > among child plans and use that (falling back to default implementation > otherwise). > furthermore, it seems current implementation does not properly clusters data: > {code:java} > select *, spark_partition_id() as P from t1DU distribute by c1 > -- !query 15 schema > struct<c1:int,c2:string,P:int> > -- !query 15 output > 1 a 43 > 2 a 374 > 2 b 174 > 3 b 251 > {code} > notice _c1=2_ in partitions 174 and 374. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org