[ https://issues.apache.org/jira/browse/SPARK-24293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16478406#comment-16478406 ]
Li Yuanjian commented on SPARK-24293: ------------------------------------- {quote} doing the map side combine manually with SQL operators, to get SQL optimizations like whole-stage-codegen. {quote} Thanks for [~cloud_fan] detailed explanation, this resolved our confusion of why map-side combine not supported in SerializedShuffle. > Serialized shuffle supports mapSideCombine > ------------------------------------------ > > Key: SPARK-24293 > URL: https://issues.apache.org/jira/browse/SPARK-24293 > Project: Spark > Issue Type: Brainstorming > Components: Shuffle > Affects Versions: 2.3.0 > Reporter: Xianjin YE > Priority: Major > > While doing research on integrating my company's internal Shuffle Service > with Spark, I found it is possible to support mapSideCombine with serialized > shuffle. > The simple idea is that the `UnsafeShuffleWriter` uses a `Combiner` to > accumulate records when mapSideCombine is required before inserting into > `ShuffleExternalSorter`. The `Combiner` will tracking it's memory usage or > elements accumulated and is never spilled. When the `Combiner` accumulates > enough records(varied by different strategies), the accumulated (K, C) pairs > are then inserted into the `ShuffleExternalSorter`. After that, the > `Combiner` is reset to empty state. > After this change, combinedValues are sent to sorter segment by segment, and > the `BlockStoreShuffleReader` already handles this case. > I did a local POC, and looks like that I can get the same result with normal > SortShuffle. The performance is not optimized yet. The most significant part > of code is shown as below: > {code:java} > // code placeholder > while (records.hasNext()) { > Product2<K, V> record = records.next(); > if (this.mapSideCombine) { > this.aggregator.accumulateRecord(record); > if (this.aggregator.accumulatedKeyNum() >= 160_000) { // for poc > scala.collection.Iterator<Tuple2<K, C>> combinedIterator = > this.aggregator.accumulatedIterator(); > while (combinedIterator.hasNext()) { > insertRecordIntoSorter(combinedIterator.next()); > } > this.aggregator.resetAccumulation(); > } > } else { > insertRecordIntoSorter(record); > } > } > if (this.mapSideCombine && this.aggregator.accumulatedKeyNum() > 0) { > scala.collection.Iterator<Tuple2<K, C>> combinedIterator = > this.aggregator.accumulatedIterator(); > while (combinedIterator.hasNext()) { > insertRecordIntoSorter(combinedIterator.next()); > } > this.aggregator.resetAccumulation(1); > } > {code} > > Is there something I am missing? cc [~joshrosen] [~cloud_fan] [~XuanYuan] -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org