[ 
https://issues.apache.org/jira/browse/SPARK-24293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-24293.
----------------------------------
    Resolution: Incomplete

> Serialized shuffle supports mapSideCombine
> ------------------------------------------
>
>                 Key: SPARK-24293
>                 URL: https://issues.apache.org/jira/browse/SPARK-24293
>             Project: Spark
>          Issue Type: Brainstorming
>          Components: Shuffle
>    Affects Versions: 2.3.0
>            Reporter: Xianjin YE
>            Priority: Major
>              Labels: bulk-closed
>
> While doing research on integrating my company's internal Shuffle Service 
> with Spark, I found it is possible to support mapSideCombine with serialized 
> shuffle.
> The simple idea is that the `UnsafeShuffleWriter` uses a `Combiner` to 
> accumulate records when mapSideCombine is required before inserting into 
> `ShuffleExternalSorter`. The `Combiner` will tracking it's memory usage or 
> elements accumulated and is never spilled. When the `Combiner` accumulates 
> enough records(varied by different strategies), the accumulated (K, C) pairs 
> are then inserted into the `ShuffleExternalSorter`.  After that, the 
> `Combiner` is reset to empty state.
> After this change, combinedValues are sent to sorter segment by segment, and 
> the `BlockStoreShuffleReader` already handles this case.
> I did a local POC, and looks like that I can get the same result with normal 
> SortShuffle. The performance is not optimized yet. The most significant part 
> of code is shown as below: 
> {code:java}
> // code placeholder
> while (records.hasNext()) {
>   Product2<K, V> record = records.next();
>   if (this.mapSideCombine) {
>     this.aggregator.accumulateRecord(record);
>     if (this.aggregator.accumulatedKeyNum() >= 160_000) { // for poc
>       scala.collection.Iterator<Tuple2<K, C>> combinedIterator = 
> this.aggregator.accumulatedIterator();
>       while (combinedIterator.hasNext()) {
>         insertRecordIntoSorter(combinedIterator.next());
>       }
>       this.aggregator.resetAccumulation();
>     }
>   } else {
>     insertRecordIntoSorter(record);
>   }
> }
> if (this.mapSideCombine && this.aggregator.accumulatedKeyNum() > 0) {
>   scala.collection.Iterator<Tuple2<K, C>> combinedIterator = 
> this.aggregator.accumulatedIterator();
>   while (combinedIterator.hasNext()) {
>     insertRecordIntoSorter(combinedIterator.next());
>   }
>   this.aggregator.resetAccumulation(1);
> }
> {code}
>  
>  Is there something I am missing? cc [~joshrosen] [~cloud_fan] [~XuanYuan]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to