spark git commit: [SPARK-10100] [SQL] Eliminate hash table lookup if there is no grouping key in aggregation.

2015-08-20 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 675e22494 - 5be517584


[SPARK-10100] [SQL] Eliminate hash table lookup if there is no grouping key in 
aggregation.

This improves performance by ~ 20 - 30% in one of my local test and should fix 
the performance regression from 1.4 to 1.5 on ss_max.

Author: Reynold Xin r...@databricks.com

Closes #8332 from rxin/SPARK-10100.

(cherry picked from commit b4f4e91c395cb69ced61d9ff1492d1b814f96828)
Signed-off-by: Yin Huai yh...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5be51758
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5be51758
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5be51758

Branch: refs/heads/branch-1.5
Commit: 5be517584be0c78dc4641a4aa14ea9da05ed344d
Parents: 675e224
Author: Reynold Xin r...@databricks.com
Authored: Thu Aug 20 07:53:27 2015 -0700
Committer: Yin Huai yh...@databricks.com
Committed: Thu Aug 20 07:53:40 2015 -0700

--
 .../execution/aggregate/TungstenAggregate.scala |  2 +-
 .../aggregate/TungstenAggregationIterator.scala | 30 ++--
 2 files changed, 22 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5be51758/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 99f51ba..ba379d3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -104,7 +104,7 @@ case class TungstenAggregate(
 } else {
   // This is a grouped aggregate and the input iterator is empty,
   // so return an empty iterator.
-  Iterator[UnsafeRow]()
+  Iterator.empty
 }
   } else {
 aggregationIterator.start(parentIterator)

http://git-wip-us.apache.org/repos/asf/spark/blob/5be51758/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index af7e0fc..26fdbc8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -357,18 +357,30 @@ class TungstenAggregationIterator(
   // sort-based aggregation (by calling switchToSortBasedAggregation).
   private def processInputs(): Unit = {
 assert(inputIter != null, attempted to process input when iterator was 
null)
-while (!sortBased  inputIter.hasNext) {
-  val newInput = inputIter.next()
-  numInputRows += 1
-  val groupingKey = groupProjection.apply(newInput)
+if (groupingExpressions.isEmpty) {
+  // If there is no grouping expressions, we can just reuse the same 
buffer over and over again.
+  // Note that it would be better to eliminate the hash map entirely in 
the future.
+  val groupingKey = groupProjection.apply(null)
   val buffer: UnsafeRow = 
hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
-  if (buffer == null) {
-// buffer == null means that we could not allocate more memory.
-// Now, we need to spill the map and switch to sort-based aggregation.
-switchToSortBasedAggregation(groupingKey, newInput)
-  } else {
+  while (inputIter.hasNext) {
+val newInput = inputIter.next()
+numInputRows += 1
 processRow(buffer, newInput)
   }
+} else {
+  while (!sortBased  inputIter.hasNext) {
+val newInput = inputIter.next()
+numInputRows += 1
+val groupingKey = groupProjection.apply(newInput)
+val buffer: UnsafeRow = 
hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
+if (buffer == null) {
+  // buffer == null means that we could not allocate more memory.
+  // Now, we need to spill the map and switch to sort-based 
aggregation.
+  switchToSortBasedAggregation(groupingKey, newInput)
+} else {
+  processRow(buffer, newInput)
+}
+  }
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: 

spark git commit: [SPARK-10100] [SQL] Eliminate hash table lookup if there is no grouping key in aggregation.

2015-08-20 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 43e013542 - b4f4e91c3


[SPARK-10100] [SQL] Eliminate hash table lookup if there is no grouping key in 
aggregation.

This improves performance by ~ 20 - 30% in one of my local test and should fix 
the performance regression from 1.4 to 1.5 on ss_max.

Author: Reynold Xin r...@databricks.com

Closes #8332 from rxin/SPARK-10100.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b4f4e91c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b4f4e91c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b4f4e91c

Branch: refs/heads/master
Commit: b4f4e91c395cb69ced61d9ff1492d1b814f96828
Parents: 43e0135
Author: Reynold Xin r...@databricks.com
Authored: Thu Aug 20 07:53:27 2015 -0700
Committer: Yin Huai yh...@databricks.com
Committed: Thu Aug 20 07:53:27 2015 -0700

--
 .../execution/aggregate/TungstenAggregate.scala |  2 +-
 .../aggregate/TungstenAggregationIterator.scala | 30 ++--
 2 files changed, 22 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b4f4e91c/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 99f51ba..ba379d3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -104,7 +104,7 @@ case class TungstenAggregate(
 } else {
   // This is a grouped aggregate and the input iterator is empty,
   // so return an empty iterator.
-  Iterator[UnsafeRow]()
+  Iterator.empty
 }
   } else {
 aggregationIterator.start(parentIterator)

http://git-wip-us.apache.org/repos/asf/spark/blob/b4f4e91c/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index af7e0fc..26fdbc8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -357,18 +357,30 @@ class TungstenAggregationIterator(
   // sort-based aggregation (by calling switchToSortBasedAggregation).
   private def processInputs(): Unit = {
 assert(inputIter != null, attempted to process input when iterator was 
null)
-while (!sortBased  inputIter.hasNext) {
-  val newInput = inputIter.next()
-  numInputRows += 1
-  val groupingKey = groupProjection.apply(newInput)
+if (groupingExpressions.isEmpty) {
+  // If there is no grouping expressions, we can just reuse the same 
buffer over and over again.
+  // Note that it would be better to eliminate the hash map entirely in 
the future.
+  val groupingKey = groupProjection.apply(null)
   val buffer: UnsafeRow = 
hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
-  if (buffer == null) {
-// buffer == null means that we could not allocate more memory.
-// Now, we need to spill the map and switch to sort-based aggregation.
-switchToSortBasedAggregation(groupingKey, newInput)
-  } else {
+  while (inputIter.hasNext) {
+val newInput = inputIter.next()
+numInputRows += 1
 processRow(buffer, newInput)
   }
+} else {
+  while (!sortBased  inputIter.hasNext) {
+val newInput = inputIter.next()
+numInputRows += 1
+val groupingKey = groupProjection.apply(newInput)
+val buffer: UnsafeRow = 
hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
+if (buffer == null) {
+  // buffer == null means that we could not allocate more memory.
+  // Now, we need to spill the map and switch to sort-based 
aggregation.
+  switchToSortBasedAggregation(groupingKey, newInput)
+} else {
+  processRow(buffer, newInput)
+}
+  }
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org