Repository: spark Updated Branches: refs/heads/master 1c5739f68 -> fc7edc9e7
SPARK-2519. Eliminate pattern-matching on Tuple2 in performance-critical... ... aggregation code Author: Sandy Ryza <sa...@cloudera.com> Closes #1435 from sryza/sandy-spark-2519 and squashes the following commits: 640706a [Sandy Ryza] SPARK-2519. Eliminate pattern-matching on Tuple2 in performance-critical aggregation code Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc7edc9e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc7edc9e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc7edc9e Branch: refs/heads/master Commit: fc7edc9e76f97b25e456ae7b72ef8636656f4f1a Parents: 1c5739f Author: Sandy Ryza <sa...@cloudera.com> Authored: Wed Jul 16 11:07:16 2014 -0700 Committer: Reynold Xin <r...@apache.org> Committed: Wed Jul 16 11:07:16 2014 -0700 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/Aggregator.scala | 8 ++++---- .../spark/util/collection/ExternalAppendOnlyMap.scala | 12 +++++++----- 2 files changed, 11 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/fc7edc9e/core/src/main/scala/org/apache/spark/Aggregator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 59fdf65..1d64057 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -56,8 +56,8 @@ case class Aggregator[K, V, C] ( } else { val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) while (iter.hasNext) { - val (k, v) = iter.next() - combiners.insert(k, v) + val pair = iter.next() + combiners.insert(pair._1, pair._2) } // TODO: Make this non optional in a future release Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled) @@ -85,8 +85,8 @@ case class Aggregator[K, V, C] ( } else { val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners) while (iter.hasNext) { - val (k, c) = iter.next() - combiners.insert(k, c) + val pair = iter.next() + combiners.insert(pair._1, pair._2) } // TODO: Make this non optional in a future release Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled) http://git-wip-us.apache.org/repos/asf/spark/blob/fc7edc9e/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 292d096..765254b 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -268,10 +268,10 @@ class ExternalAppendOnlyMap[K, V, C]( private def mergeIfKeyExists(key: K, baseCombiner: C, buffer: StreamBuffer): C = { var i = 0 while (i < buffer.pairs.length) { - val (k, c) = buffer.pairs(i) - if (k == key) { + val pair = buffer.pairs(i) + if (pair._1 == key) { buffer.pairs.remove(i) - return mergeCombiners(baseCombiner, c) + return mergeCombiners(baseCombiner, pair._2) } i += 1 } @@ -293,9 +293,11 @@ class ExternalAppendOnlyMap[K, V, C]( } // Select a key from the StreamBuffer that holds the lowest key hash val minBuffer = mergeHeap.dequeue() - val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash) + val minPairs = minBuffer.pairs + val minHash = minBuffer.minKeyHash val minPair = minPairs.remove(0) - var (minKey, minCombiner) = minPair + val minKey = minPair._1 + var minCombiner = minPair._2 assert(getKeyHashCode(minPair) == minHash) // For all other streams that may have this key (i.e. have the same minimum key hash),