Repository: spark
Updated Branches:
  refs/heads/branch-1.5 675e22494 -> 5be517584


[SPARK-10100] [SQL] Eliminate hash table lookup if there is no grouping key in 
aggregation.

This improves performance by ~ 20 - 30% in one of my local test and should fix 
the performance regression from 1.4 to 1.5 on ss_max.

Author: Reynold Xin <r...@databricks.com>

Closes #8332 from rxin/SPARK-10100.

(cherry picked from commit b4f4e91c395cb69ced61d9ff1492d1b814f96828)
Signed-off-by: Yin Huai <yh...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5be51758
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5be51758
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5be51758

Branch: refs/heads/branch-1.5
Commit: 5be517584be0c78dc4641a4aa14ea9da05ed344d
Parents: 675e224
Author: Reynold Xin <r...@databricks.com>
Authored: Thu Aug 20 07:53:27 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Thu Aug 20 07:53:40 2015 -0700

----------------------------------------------------------------------
 .../execution/aggregate/TungstenAggregate.scala |  2 +-
 .../aggregate/TungstenAggregationIterator.scala | 30 ++++++++++++++------
 2 files changed, 22 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5be51758/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 99f51ba..ba379d3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -104,7 +104,7 @@ case class TungstenAggregate(
         } else {
           // This is a grouped aggregate and the input iterator is empty,
           // so return an empty iterator.
-          Iterator[UnsafeRow]()
+          Iterator.empty
         }
       } else {
         aggregationIterator.start(parentIterator)

http://git-wip-us.apache.org/repos/asf/spark/blob/5be51758/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index af7e0fc..26fdbc8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -357,18 +357,30 @@ class TungstenAggregationIterator(
   // sort-based aggregation (by calling switchToSortBasedAggregation).
   private def processInputs(): Unit = {
     assert(inputIter != null, "attempted to process input when iterator was 
null")
-    while (!sortBased && inputIter.hasNext) {
-      val newInput = inputIter.next()
-      numInputRows += 1
-      val groupingKey = groupProjection.apply(newInput)
+    if (groupingExpressions.isEmpty) {
+      // If there is no grouping expressions, we can just reuse the same 
buffer over and over again.
+      // Note that it would be better to eliminate the hash map entirely in 
the future.
+      val groupingKey = groupProjection.apply(null)
       val buffer: UnsafeRow = 
hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
-      if (buffer == null) {
-        // buffer == null means that we could not allocate more memory.
-        // Now, we need to spill the map and switch to sort-based aggregation.
-        switchToSortBasedAggregation(groupingKey, newInput)
-      } else {
+      while (inputIter.hasNext) {
+        val newInput = inputIter.next()
+        numInputRows += 1
         processRow(buffer, newInput)
       }
+    } else {
+      while (!sortBased && inputIter.hasNext) {
+        val newInput = inputIter.next()
+        numInputRows += 1
+        val groupingKey = groupProjection.apply(newInput)
+        val buffer: UnsafeRow = 
hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
+        if (buffer == null) {
+          // buffer == null means that we could not allocate more memory.
+          // Now, we need to spill the map and switch to sort-based 
aggregation.
+          switchToSortBasedAggregation(groupingKey, newInput)
+        } else {
+          processRow(buffer, newInput)
+        }
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to