[ 
https://issues.apache.org/jira/browse/SPARK-24293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16477669#comment-16477669
 ] 

Wenchen Fan commented on SPARK-24293:
-------------------------------------

The unsafe shuffle is mostly created for Spark SQL. For Spark SQL, we don't 
want to rely on the mapSideCombine from ShuffleManager, but doing the map side 
combine manually with SQL operators, to get SQL optimizations like 
whole-stage-codegen.

> Serialized shuffle supports mapSideCombine
> ------------------------------------------
>
>                 Key: SPARK-24293
>                 URL: https://issues.apache.org/jira/browse/SPARK-24293
>             Project: Spark
>          Issue Type: Brainstorming
>          Components: Shuffle
>    Affects Versions: 2.3.0
>            Reporter: Xianjin YE
>            Priority: Major
>
> While doing research on integrating my company's internal Shuffle Service 
> with Spark, I found it is possible to support mapSideCombine with serialized 
> shuffle.
> The simple idea is that the `UnsafeShuffleWriter` uses a `Combiner` to 
> accumulate records when mapSideCombine is required before inserting into 
> `ShuffleExternalSorter`. The `Combiner` will tracking it's memory usage or 
> elements accumulated and is never spilled. When the `Combiner` accumulates 
> enough records(varied by different strategies), the accumulated (K, C) pairs 
> are then inserted into the `ShuffleExternalSorter`.  After that, the 
> `Combiner` is reset to empty state.
> After this change, combinedValues are sent to sorter segment by segment, and 
> the `BlockStoreShuffleReader` already handles this case.
> I did a local POC, and looks like that I can get the same result with normal 
> SortShuffle. The performance is not optimized yet. The most significant part 
> of code is shown as below: 
> {code:java}
> // code placeholder
> while (records.hasNext()) {
>   Product2<K, V> record = records.next();
>   if (this.mapSideCombine) {
>     this.aggregator.accumulateRecord(record);
>     if (this.aggregator.accumulatedKeyNum() >= 160_000) { // for poc
>       scala.collection.Iterator<Tuple2<K, C>> combinedIterator = 
> this.aggregator.accumulatedIterator();
>       while (combinedIterator.hasNext()) {
>         insertRecordIntoSorter(combinedIterator.next());
>       }
>       this.aggregator.resetAccumulation();
>     }
>   } else {
>     insertRecordIntoSorter(record);
>   }
> }
> if (this.mapSideCombine && this.aggregator.accumulatedKeyNum() > 0) {
>   scala.collection.Iterator<Tuple2<K, C>> combinedIterator = 
> this.aggregator.accumulatedIterator();
>   while (combinedIterator.hasNext()) {
>     insertRecordIntoSorter(combinedIterator.next());
>   }
>   this.aggregator.resetAccumulation(1);
> }
> {code}
>  
>  Is there something I am missing? cc [~joshrosen] [~cloud_fan] [~XuanYuan]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to