[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-09-03 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/2029#issuecomment-54396074
  
What were the actual results of the benchmark?  It is acceptable for there 
to be some performance hit here.  In cases where there are too many keys, its 
much better to spill to disk than to OOM, though you have a good point about 
just adding more partitions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-09-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2029#issuecomment-54694443
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-09-17 Thread guowei2
Github user guowei2 commented on the pull request:

https://github.com/apache/spark/pull/2029#issuecomment-55858585
  
I've run a micro benchmark in my local with 5 records,1500 keys.

Type | OnHeapAggregate  | ExternalAggregate (happens 10 spills)
 | - | --
First run | 876ms | 16.9s
Stablized runs | 150ms | 15.0s



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-08 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/1822#discussion_r16007101
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AggregatesSuite.scala
 ---
@@ -0,0 +1,70 @@
+package org.apache.spark.sql.catalyst.expressions
+
+import org.scalatest.FunSuite
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.catalyst.dsl.expressions._
+
+
+class AggregatesSuite extends FunSuite {
+
+  val testRows = Seq(1,1,2,2,3,3,4,4).map(x => {
+val row = new GenericMutableRow(1)
+row(0) = x
+row
+  })
+
+  val dataType: DataType = IntegerType
+
+  val exp = BoundReference(0,dataType,true)
+
+  def checkMethod(f:AggregateExpression) = {
--- End diff --

Space after `:`.  Can you add some scala doc about what this is checking?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org




[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-08 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/1822#discussion_r16007142
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AggregatesSuite.scala
 ---
@@ -0,0 +1,70 @@
+package org.apache.spark.sql.catalyst.expressions
+
+import org.scalatest.FunSuite
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.catalyst.dsl.expressions._
+
+
+class AggregatesSuite extends FunSuite {
--- End diff --

Add some scala doc about what sorts of things this suite is testing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-08 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/1822#discussion_r16007110
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AggregatesSuite.scala
 ---
@@ -0,0 +1,70 @@
+package org.apache.spark.sql.catalyst.expressions
+
+import org.scalatest.FunSuite
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.catalyst.dsl.expressions._
+
+
+class AggregatesSuite extends FunSuite {
+
+  val testRows = Seq(1,1,2,2,3,3,4,4).map(x => {
+val row = new GenericMutableRow(1)
+row(0) = x
+row
+  })
+
+  val dataType: DataType = IntegerType
+
+  val exp = BoundReference(0,dataType,true)
--- End diff --

Space after `,`s


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-08 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/1822#discussion_r16007186
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
@@ -45,6 +46,8 @@ case class Aggregate(
 child: SparkPlan)
   extends UnaryNode {
 
+  private val externalSorting = 
SparkEnv.get.conf.getBoolean("spark.shuffle.spill", false)
--- End diff --

We should be using SQLConf and create a special setting for external 
aggregation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-08 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/1822#discussion_r16007279
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
@@ -148,53 +151,121 @@ case class Aggregate(
 Iterator(resultProjection(aggregateResults))
   }
 } else {
-  child.execute().mapPartitions { iter =>
-val hashTable = new HashMap[Row, Array[AggregateFunction]]
-val groupingProjection = new 
InterpretedMutableProjection(groupingExpressions, childOutput)
+  if (!externalSorting) {
--- End diff --

Instead of doing the if check here, what do you think about this:
 - Create a trait called `Aggregate` that contains shared code.
 - Create two operators `OnHeapAggregation` and `ExternalAggregation`.
 - Choose which one to use in the planner based on the above configuration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-08 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/1822#discussion_r16007364
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala 
---
@@ -340,4 +340,9 @@ private[hive] case class HiveUdafFunction(
 val inputs = inputProjection(input).asInstanceOf[Seq[AnyRef]].toArray
 function.iterate(buffer, inputs)
   }
+
+  override def merge(input: AggregateFunction): Unit = {
--- End diff --

Does this mean that we cant do external aggregation if there are hive 
UDAFS.  We should be throwing an exception here.  It is very bad to silently 
return wrong results.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-08 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/1822#issuecomment-51637601
  
Thanks for working on this!  This will be a great addition for 1.2 :)  
Minor feedback on structure and testing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-11 Thread guowei2
Github user guowei2 commented on the pull request:

https://github.com/apache/spark/pull/1822#issuecomment-51764097
  
Thank you for your suggestion, It truely encourage me . I'll do my best to 
fix it up 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-13 Thread guowei2
Github user guowei2 commented on the pull request:

https://github.com/apache/spark/pull/1822#issuecomment-52032345
  
I've improve the structure and testing. is it better now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-13 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/1822#discussion_r16196160
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---
@@ -316,12 +316,12 @@ class SQLContext(@transient val sparkContext: 
SparkContext)
 val strategies: Seq[Strategy] =
   CommandStrategy(self) ::
   TakeOrdered ::
-  HashAggregation ::
+  HashAggregation(self) ::
--- End diff --

Can't the strategies already access `self` though `sqlContext`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-13 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/1822#discussion_r16196360
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
@@ -20,30 +20,31 @@ package org.apache.spark.sql.execution
 import java.util.HashMap
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkEnv, SparkContext}
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.SQLContext
+import org.apache.spark.util.collection.ExternalAppendOnlyMap
 
 /**
- * :: DeveloperApi ::
  * Groups input data by `groupingExpressions` and computes the 
`aggregateExpressions` for each
  * group.
  *
- * @param partial if true then aggregation is done partially on local data 
without shuffling to
+ * partial if true then aggregation is done partially on local data 
without shuffling to
--- End diff --

How about we add something like `Concrete subclasses of this trait must 
implement the following abstract members:`  and then use [wiki 
syntax](https://wiki.scala-lang.org/display/SW/Syntax) to make the items into a 
list.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-13 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/1822#discussion_r16196452
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
@@ -90,7 +91,7 @@ case class Aggregate(
   private[this] val computedSchema = 
computedAggregates.map(_.resultAttribute)
 
   /** Creates a new aggregate buffer for a group. */
-  private[this] def newAggregateBuffer(): Array[AggregateFunction] = {
+  def newAggregateBuffer(): Array[AggregateFunction] = {
--- End diff --

`protected`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-13 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/1822#discussion_r16196431
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
@@ -58,7 +59,7 @@ case class Aggregate(
 
   // HACK: Generators don't correctly preserve their output through 
serializations so we grab
   // out child's output attributes statically here.
-  private[this] val childOutput = child.output
+  val childOutput = child.output
--- End diff --

`protected` instead of making this public.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-13 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/1822#discussion_r16196620
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala 
---
@@ -340,4 +340,10 @@ private[hive] case class HiveUdafFunction(
 val inputs = inputProjection(input).asInstanceOf[Seq[AnyRef]].toArray
 function.iterate(buffer, inputs)
   }
+
+  //hiveUdaf does not support external aggregate, for HiveUdafFunction 
need to spill to disk,
+  //and all the vals above need Serializable
+  override def merge(input: AggregateFunction): Unit = {
+throw new NotImplementedError(s"HiveUdaf does not support external 
aggregate")
--- End diff --

I think we are going to have to figure out something that works here.  
Perhaps we can use `SerializableWritable` or something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-13 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/1822#discussion_r16196674
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
@@ -123,34 +124,73 @@ case class Aggregate(
 }
   }
 
-  override def execute() = attachTree(this, "execute") {
-if (groupingExpressions.isEmpty) {
-  child.execute().mapPartitions { iter =>
-val buffer = newAggregateBuffer()
-var currentRow: Row = null
-while (iter.hasNext) {
-  currentRow = iter.next()
-  var i = 0
-  while (i < buffer.length) {
-buffer(i).update(currentRow)
-i += 1
-  }
-}
-val resultProjection = new 
InterpretedProjection(resultExpressions, computedSchema)
-val aggregateResults = new 
GenericMutableRow(computedAggregates.length)
-
+  def aggregateNoGrouping() = {
--- End diff --

Probably could be `protected`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-13 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/1822#discussion_r16196737
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
@@ -123,34 +124,73 @@ case class Aggregate(
 }
   }
 
-  override def execute() = attachTree(this, "execute") {
-if (groupingExpressions.isEmpty) {
-  child.execute().mapPartitions { iter =>
-val buffer = newAggregateBuffer()
-var currentRow: Row = null
-while (iter.hasNext) {
-  currentRow = iter.next()
-  var i = 0
-  while (i < buffer.length) {
-buffer(i).update(currentRow)
-i += 1
-  }
-}
-val resultProjection = new 
InterpretedProjection(resultExpressions, computedSchema)
-val aggregateResults = new 
GenericMutableRow(computedAggregates.length)
-
+  def aggregateNoGrouping() = {
+child.execute().mapPartitions { iter =>
+  val buffer = newAggregateBuffer()
+  var currentRow: Row = null
+  while (iter.hasNext) {
+currentRow = iter.next()
 var i = 0
 while (i < buffer.length) {
-  aggregateResults(i) = buffer(i).eval(EmptyRow)
+  buffer(i).update(currentRow)
   i += 1
 }
+  }
+  val resultProjection = new InterpretedProjection(resultExpressions, 
computedSchema)
+  val aggregateResults = new 
GenericMutableRow(computedAggregates.length)
 
-Iterator(resultProjection(aggregateResults))
+  var i = 0
+  while (i < buffer.length) {
+aggregateResults(i) = buffer(i).eval(EmptyRow)
+i += 1
   }
+
+  Iterator(resultProjection(aggregateResults))
+}
+  }
+
+  def resultRow(iter: Iterator[(Row,Array[AggregateFunction])]) = {
--- End diff --

Can you add some scala doc to explain whats going on here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-13 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/1822#discussion_r16196975
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---
@@ -168,32 +208,68 @@ case class Aggregate(
 i += 1
   }
 }
-
-new Iterator[Row] {
+val iterPair = new Iterator[(Row, Array[AggregateFunction])] {
   private[this] val hashTableIter = hashTable.entrySet().iterator()
-  private[this] val aggregateResults = new 
GenericMutableRow(computedAggregates.length)
-  private[this] val resultProjection =
-new InterpretedMutableProjection(
-  resultExpressions, computedSchema ++ namedGroups.map(_._2))
-  private[this] val joinedRow = new JoinedRow
-
   override final def hasNext: Boolean = hashTableIter.hasNext
 
-  override final def next(): Row = {
+  override final def next(): (Row, Array[AggregateFunction]) = {
--- End diff --

I am a little concerned about the addition of an extra tuple object 
allocation here.  This may not be a problem, but we'll want to run some 
benchmarks and make sure that we are not slowing down the on-heap version here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-13 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/1822#discussion_r16197056
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -134,15 +134,26 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
  groupingExpressions,
  partialComputation,
  child) =>
-execution.Aggregate(
-  partial = false,
-  namedGroupingAttributes,
-  rewrittenAggregateExpressions,
-  execution.Aggregate(
-partial = true,
-groupingExpressions,
-partialComputation,
-planLater(child))) :: Nil
+
+val preAggregate = execution.OnHeapAggregate(
--- End diff --

Why is the preAggreate always on heap?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-13 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/1822#issuecomment-52095270
  
This is looking better! Thanks again for working on it :)

A few more comments. Also, it would be really good if we could do some (at 
least micro) benchmarks before an after for on-heap and then a comparison with 
the external version.  Let me know if you need some help coming up with that.

Also, one note.  Since this is a pretty major change, I'll want to wait 
until after the 1.1 release to merge it in.  That said, really excited about it 
for 1.2.  You may want to rebase / merge to master to avoid getting to far 
behind though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-13 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/1822#discussion_r16222998
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
 ---
@@ -368,6 +411,8 @@ case class SumDistinctFunction(expr: Expression, base: 
AggregateExpression)
 
   private val seen = new scala.collection.mutable.HashSet[Any]()
 
+  def getSeen: scala.collection.mutable.HashSet[Any] = seen
--- End diff --

This is not necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-13 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/1822#discussion_r16223044
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
 ---
@@ -375,6 +420,10 @@ case class SumDistinctFunction(expr: Expression, base: 
AggregateExpression)
 }
   }
 
+  override def merge(input: AggregateFunction): Unit = {
+input.asInstanceOf[SumDistinctFunction].getSeen.map(seen += _)
--- End diff --

use `seen ++= input.asInstanceOf[SumDistinctFunction].seen` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-13 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/1822#discussion_r16223080
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
 ---
@@ -114,6 +115,14 @@ case class MinFunction(expr: Expression, base: 
AggregateExpression) extends Aggr
 }
   }
 
+  override def merge(input: AggregateFunction): Unit = {
+if (currentMin == null) {
+  currentMin = input.eval(EmptyRow)
+} else if(GreaterThan(this, input).eval(EmptyRow) == true) {
--- End diff --

Space after `if`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-13 Thread chenghao-intel
Github user chenghao-intel commented on a diff in the pull request:

https://github.com/apache/spark/pull/1822#discussion_r16223599
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
 ---
@@ -292,6 +312,11 @@ case class AverageFunction(expr: Expression, base: 
AggregateExpression)
   sum.update(addFunction(evaluatedExpr), input)
 }
   }
+
+  override def merge(input: AggregateFunction): Unit = {
+count += input.asInstanceOf[AverageFunction].getCount
--- End diff --

You can access the private member directly here, as they are the same type. 
So the `getXXX` methods is not necessary.
e.g.
```
scala> class A(private var a: Int) {
 |   def add(v: A): A = {
 | a = a + v.a
 | this
 |   }
 |   override def toString = {a.toString}
 | }
defined class A

scala> val a = new A(11)
a: A = 11

scala> val b = new A(12)
b: A = 12

scala> val c = a.add(b)
c: A = 23
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-14 Thread guowei2
Github user guowei2 commented on a diff in the pull request:

https://github.com/apache/spark/pull/1822#discussion_r16235116
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -134,15 +134,26 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
  groupingExpressions,
  partialComputation,
  child) =>
-execution.Aggregate(
-  partial = false,
-  namedGroupingAttributes,
-  rewrittenAggregateExpressions,
-  execution.Aggregate(
-partial = true,
-groupingExpressions,
-partialComputation,
-planLater(child))) :: Nil
+
+val preAggregate = execution.OnHeapAggregate(
--- End diff --

i think  map side aggregate does not need external, for data in one 
partition (1 task) isn't large enough to the JVM heap size .also user can 
easily control the partition data size.
In most cases, OOM occur on reduce side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-14 Thread guowei2
Github user guowei2 commented on a diff in the pull request:

https://github.com/apache/spark/pull/1822#discussion_r16235328
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala
 ---
@@ -292,6 +312,11 @@ case class AverageFunction(expr: Expression, base: 
AggregateExpression)
   sum.update(addFunction(evaluatedExpr), input)
 }
   }
+
+  override def merge(input: AggregateFunction): Unit = {
+count += input.asInstanceOf[AverageFunction].getCount
--- End diff --

@chenghao-intel  
aha, it's a difference between java and scala .thanks a lot 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-18 Thread guowei2
Github user guowei2 commented on the pull request:

https://github.com/apache/spark/pull/1822#issuecomment-52489381
  
i'm very sorry. i just rebase my brach to spark/master. what should i do to 
fix this .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-18 Thread guowei2
Github user guowei2 commented on the pull request:

https://github.com/apache/spark/pull/1822#issuecomment-52490972
  
may i close this PR and create a new PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-18 Thread witgo
Github user witgo commented on the pull request:

https://github.com/apache/spark/pull/1822#issuecomment-52491562
  
Try this: `git commit -m "Big-ass commit" --allow-empty`  `git rebase -i  
master`, `git push origin sql-memory-patch -f `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-18 Thread guowei2
Github user guowei2 closed the pull request at:

https://github.com/apache/spark/pull/1822


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-18 Thread guowei2
Github user guowei2 commented on the pull request:

https://github.com/apache/spark/pull/1822#issuecomment-52581172
  
i have to close this PR and close a new one .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-18 Thread guowei2
GitHub user guowei2 opened a pull request:

https://github.com/apache/spark/pull/2029

[SPARK-2873] [SQL] using ExternalAppendOnlyMap to resolve OOM when 
aggregating

A new PR clone from PR 1822
Fix numbers of problems
Reuse the CompactBuffer from Spark Core to save memory and pointer 
dereferences as PR 1993
Hive UDAF not support external aggregate, for hive AggregationBuffer need  
serializable and hive GenericUDAFEvaluator has no method implement to merge two 
evaluators



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guowei2/spark sql-memory-patch

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2029.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2029


commit 5cbaaac088346077bcc27229073a5fd3de8eb3a3
Author: guowei2 
Date:   2014-08-19T02:51:29Z

merge PR 1822




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2029#issuecomment-52594557
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-19 Thread guowei2
Github user guowei2 commented on the pull request:

https://github.com/apache/spark/pull/2029#issuecomment-52606844
  
@marmbrus 
what should i give the outputs about the benchmarks?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-20 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/2029#issuecomment-52830430
  
People usually just summarize the benchmark itself and the results in 
description of the PR.  For example: https://github.com/apache/spark/pull/1439


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-26 Thread guowei2
Github user guowei2 commented on the pull request:

https://github.com/apache/spark/pull/2029#issuecomment-53426380
  
```
import org.apache.spark.sql.catalyst.types.{IntegerType, DataType}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.SparkContext._
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.OnHeapAggregate
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.expressions.BoundReference


object AggregateBenchMark extends App {

  val sc = new SparkContext(
new SparkConf().setMaster("local").setAppName("agg-benchmark"))

  val dataType: DataType = IntegerType
  val aggExps = Seq(Alias(sum(BoundReference(1, dataType, true)),"sum")())
  val groupExps = Seq(BoundReference(0, dataType, true))
  val attributes =  aggExps.map(_.toAttribute)
  val childPlan = rowsPlan(sc, attributes)

  def benchmarkOnHeap = {
val begin = System.currentTimeMillis()
OnHeapAggregate(false, groupExps, aggExps, 
childPlan).execute().foreach(_ => {})
val end = System.currentTimeMillis()
end - begin
  }

  def benchmarkExternal = {
val begin = System.currentTimeMillis()
ExternalAggregate(false, groupExps, aggExps, 
childPlan).execute().foreach(_ => {})
val end = System.currentTimeMillis()
end - begin
  }

  (1 to 5).map(_=> println("OnHeapAggregate time: "+ benchmarkOnHeap))
  (1 to 5).map(_=> println("ExternalAggregate time: "+ benchmarkExternal))

}
private[spark] class TestRDD(
   sc: SparkContext,
   numPartitions: Int) extends RDD[Row](sc, Nil) with Serializable {

  override def compute(split: Partition, context: TaskContext): 
Iterator[Row] = {
new Iterator[Row] {
  var lines = 0
  override final def hasNext: Boolean = lines < 300
  override final def next(): Row = {
lines += 1
val row = new GenericMutableRow(2)
row(0) = (math.random * 2000).toInt
row(1) = (math.random * 50).toInt
row.asInstanceOf[Row]
  }
}
  }
  override def getPartitions = (0 until numPartitions).map(i => new 
Partition {
override def index = i
  }).toArray
  override def getPreferredLocations(split: Partition): Seq[String] = Nil
  override def toString: String = "TestRDD " + id
}


case class rowsPlan(@transient val sc:SparkContext, attributes: 
Seq[Attribute]) extends LeafNode {

  override def output = attributes

  override def execute() = {
new TestRDD(sc, 1).asInstanceOf[RDD[Row]]
  }
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-08-26 Thread guowei2
Github user guowei2 commented on the pull request:

https://github.com/apache/spark/pull/2029#issuecomment-53433705
  
@marmbrus 

it's very sad about the result of benchmark above.
once one spill happen, usually batch of spills will happen one by one.

the size of AppendOnlyMap is according to the number of keys for values 
with the same key merged

i think it's not a good way by using ExternalAppendOnlyMap,fot it is too 
expensive when records with the same key spill to disk over and over again.

otherwise, user can easily avoid OOM by raising 
spark.sql.shuffle.partitions to reduce the key numbsers

i think the logic of ExternalAppendOnlyMap should  Optimize.

join seems have similar problems. meanwhile, both left and right table put 
into ExternalAppendOnlyMap is expensive too 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-12-01 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/2029#issuecomment-65164171
  
Thanks for working on this, but we are trying to clean up the PR queue (in 
order to make it easier for us to review).  Thus, I think we should close this 
issue for now and reopen when its ready for review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2873] [SQL] using ExternalAppendOnlyMap...

2014-12-01 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/2029


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org