[ 
https://issues.apache.org/jira/browse/SPARK-22276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16205655#comment-16205655
 ] 

Fernando Pereira commented on SPARK-22276:
------------------------------------------

I added a simple example that shows what I mean.
You see two exchanges, even though data is already partitioned according to the 
same field. I wonder if the Exchange hashpartitioning is really needed, since 
in my case it just slows things down.
Thanks

> Unnecessary repartitioning
> --------------------------
>
>                 Key: SPARK-22276
>                 URL: https://issues.apache.org/jira/browse/SPARK-22276
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 2.2.0
>            Reporter: Fernando Pereira
>
> When a dataframe is sorted it is partitioned with a RangePartitioner.
> If later we aggregate by the exact same fields over which sort was applied 
> there is a new (apparently useless) Exchange repartitioning by a 
> HashPartitioner.
> In my use case the groupBy exchange is still very costly as the aggregate 
> function won't reduce the data volume.
> Is there any reason why groupBy always shuffles data, or could this be 
> improved? 
> Is there currently a way to workaround for the moment, without going to 
> mapPartitions?
> Example
> {code}
> nrn_vals.printSchema()
> (nrn_vals
>  .sort("post_gid")
>  .groupBy("post_gid")
>  .agg(F.collect_list("pre_gid").alias("pre_gids"))
>  ).explain()
> {code}
> Outputs the following
> {code}
> root
>  |-- pre_gid: integer (nullable = true)
>  |-- post_gid: integer (nullable = true)
>  |-- floatvec: array (nullable = false)
>  |    |-- element: float (containsNull = true)
> == Physical Plan ==
> ObjectHashAggregate(keys=[post_gid#1386], 
> functions=[collect_list(pre_gid#1385, 0, 0)])
> +- Exchange hashpartitioning(post_gid#1386, 1)
>    +- ObjectHashAggregate(keys=[post_gid#1386], 
> functions=[partial_collect_list(pre_gid#1385, 0, 0)])
>       +- *Sort [post_gid#1386 ASC NULLS FIRST], true, 0
>          +- Exchange rangepartitioning(post_gid#1386 ASC NULLS FIRST, 1)
>             +- *FileScan parquet [pre_gid#1385,post_gid#1386] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/media/psf/Home/dev/Functionalizer/pyspark/spykfunc_output/extended_touche...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct<pre_gid:int,post_gid:int>
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to