[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-22 Thread clockfly
Github user clockfly closed the pull request at:

https://github.com/apache/spark/pull/14723


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75700385
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/AggregateWithObjectAggregateBufferSuite.scala
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import 
org.apache.spark.sql.AggregateWithObjectAggregateBufferSuite.MaxWithObjectAggregateBuffer
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, GenericMutableRow, MutableRow, UnsafeRow}
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate, 
WithObjectAggregateBuffer}
+import org.apache.spark.sql.execution.aggregate.{SortAggregateExec}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{AbstractDataType, DataType, 
IntegerType, StructType}
+
+class AggregateWithObjectAggregateBufferSuite extends QueryTest with 
SharedSQLContext {
--- End diff --

oh right, I misread the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-22 Thread clockfly
Github user clockfly commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75688342
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/AggregateWithObjectAggregateBufferSuite.scala
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import 
org.apache.spark.sql.AggregateWithObjectAggregateBufferSuite.MaxWithObjectAggregateBuffer
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, GenericMutableRow, MutableRow, UnsafeRow}
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate, 
WithObjectAggregateBuffer}
+import org.apache.spark.sql.execution.aggregate.{SortAggregateExec}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{AbstractDataType, DataType, 
IntegerType, StructType}
+
+class AggregateWithObjectAggregateBufferSuite extends QueryTest with 
SharedSQLContext {
--- End diff --

We will not use HashAggregationExec, so there is no point to fallback from 
HashAggregationExec?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-22 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75641493
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
 ---
@@ -90,6 +98,21 @@ class SortBasedAggregationIterator(
   // compared to MutableRow (aggregation buffer) directly.
   private[this] val safeProj: Projection = 
FromUnsafeProjection(valueAttributes.map(_.dataType))
 
+  // AggregationFunction which store generic object in AggregationBuffer.
+  // @see [[WithObjectAggregationBuffer]] for more information
+  private val aggFunctionsWithObjectAggregationBuffer = 
aggregateFunctions.collect {
+case (ag: ImperativeAggregate with WithObjectAggregateBuffer) => ag
--- End diff --

Heavy? In what sense?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-22 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75632667
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 ---
@@ -389,3 +389,126 @@ abstract class DeclarativeAggregate
 def right: AttributeReference = 
inputAggBufferAttributes(aggBufferAttributes.indexOf(a))
   }
 }
+
+/**
+ * This traits allows an AggregateFunction to store **arbitrary** Java 
objects in internal
--- End diff --

Nit: traits => trait


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-22 Thread clockfly
Github user clockfly commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75622311
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
 ---
@@ -90,6 +98,21 @@ class SortBasedAggregationIterator(
   // compared to MutableRow (aggregation buffer) directly.
   private[this] val safeProj: Projection = 
FromUnsafeProjection(valueAttributes.map(_.dataType))
 
+  // AggregationFunction which store generic object in AggregationBuffer.
+  // @see [[WithObjectAggregationBuffer]] for more information
+  private val aggFunctionsWithObjectAggregationBuffer = 
aggregateFunctions.collect {
+case (ag: ImperativeAggregate with WithObjectAggregateBuffer) => ag
--- End diff --

ImperativeAggregate is an abstract class, that will make 
`WithObjectAggregateBuffer` quite heavy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-22 Thread clockfly
Github user clockfly commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75622250
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 ---
@@ -389,3 +389,89 @@ abstract class DeclarativeAggregate
 def right: AttributeReference = 
inputAggBufferAttributes(aggBufferAttributes.indexOf(a))
   }
 }
+
+/**
+ * This traits allow user to define an AggregateFunction which can store 
**arbitrary** Java objects
+ * in Aggregation buffer during aggregation of each key group. This trait 
must be mixed with
+ * class ImperativeAggregate.
+ *
+ * Here is how it works in a typical aggregation flow (Partial mode 
aggregate at Mapper side, and
+ * Final mode aggregate at Reducer side).
+ *
+ * Stage 1: Partial aggregate at Mapper side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores an arbitrary empty
+ *object, object A for example, in aggBuffer. The object A will be 
used to store the
+ *accumulated aggregation result.
+ *  1. Upon calling method `update(mutableAggBuffer: MutableRow, inputRow: 
InternalRow)` in
+ *current group (group by key), user extracts object A from 
mutableAggBuffer, and then updates
+ *object A with current inputRow. After updating, object A is stored 
back to mutableAggBuffer.
+ *  1. After processing all rows of current group, the framework will call 
method
+ *`serializeObjectAggregationBufferInPlace(aggregationBuffer: 
MutableRow)` to serialize object A
+ *to a serializable format in place.
+ *  1. The framework may spill the aggregationBuffer to disk if there is 
not enough memory.
+ *It is safe since we have already convert aggregationBuffer to 
serializable format.
+ *  1. Spark framework moves on to next group, until all groups have been
+ *processed.
+ *
+ * Shuffling exchange data to Reducer tasks...
+ *
+ * Stage 2: Final mode aggregate at Reducer side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores a new empty object A1
+ *in aggBuffer. The object A1 will be used to store the accumulated 
aggregation result.
+ *  1. Upon calling method `merge(mutableAggBuffer: MutableRow, 
inputAggBuffer: InternalRow)`, user
+ *extracts object A1 from mutableAggBuffer, and extracts object A2 
from inputAggBuffer. then
--- End diff --

It is 'merge', 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-22 Thread clockfly
Github user clockfly commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75622002
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/AggregateWithObjectAggregateBufferSuite.scala
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import 
org.apache.spark.sql.AggregateWithObjectAggregateBufferSuite.MaxWithObjectAggregateBuffer
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, GenericMutableRow, MutableRow, UnsafeRow}
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate, 
WithObjectAggregateBuffer}
+import org.apache.spark.sql.execution.aggregate.{SortAggregateExec}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{AbstractDataType, DataType, 
IntegerType, StructType}
+
+class AggregateWithObjectAggregateBufferSuite extends QueryTest with 
SharedSQLContext {
+
+  import testImplicits._
+
+  private val data = Seq((1, 0), (3, 1), (2, 0), (6, 3), (3, 1), (4, 1), 
(5, 0))
+
+
+  test("aggregate with object aggregate buffer, should not use 
HashAggregate") {
+val df = data.toDF("a", "b")
+val max = new MaxWithObjectAggregateBuffer($"a".expr)
+
+// Always use SortAggregateExec instead of HashAggregateExec for 
planning even if the aggregate
+//  buffer attributes are mutable fields (every field can be mutated 
inline like int, long...)
+val allFieldsMutable = 
max.aggBufferSchema.map(_.dataType).forall(UnsafeRow.isMutable)
+val sparkPlan = 
df.select(Column(max.toAggregateExpression())).queryExecution.sparkPlan
+assert(allFieldsMutable == true && 
sparkPlan.isInstanceOf[SortAggregateExec])
+  }
+
+  test("aggregate with object aggregate buffer, no group by") {
+val df = data.toDF("a", "b").coalesce(2)
+checkAnswer(
+  df.select(objectAggregateMax($"a"), count($"a"), 
objectAggregateMax($"b"), count($"b")),
+  Seq(Row(6, 7, 3, 7))
+)
+  }
+
+  test("aggregate with object aggregate buffer, with group by") {
+val df = data.toDF("a", "b").coalesce(2)
+checkAnswer(
+  df.groupBy($"b").agg(objectAggregateMax($"a"), count($"a"), 
objectAggregateMax($"a")),
+  Seq(
+Row(0, 5, 3, 5),
+Row(1, 4, 3, 4),
+Row(3, 6, 1, 6)
+  )
+)
+  }
+
+  test("aggregate with object aggregate buffer, empty inputs, no group 
by") {
+val empty = Seq.empty[(Int, Int)].toDF("a", "b")
+checkAnswer(
+  empty.select(objectAggregateMax($"a"), count($"a"), 
objectAggregateMax($"b"), count($"b")),
+  Seq(Row(Int.MinValue, 0, Int.MinValue, 0)))
+  }
+
+  test("aggregate with object aggregate buffer, empty inputs, with group 
by") {
+val empty = Seq.empty[(Int, Int)].toDF("a", "b")
+checkAnswer(
+  empty.groupBy($"b").agg(objectAggregateMax($"a"), count($"a"), 
objectAggregateMax($"a")),
+  Seq.empty[Row])
+  }
+
+  private def objectAggregateMax(column: Column): Column = {
+val max = MaxWithObjectAggregateBuffer(column.expr)
+Column(max.toAggregateExpression())
+  }
+}
+
+object AggregateWithObjectAggregateBufferSuite {
--- End diff --

I use the companion object to define a private scope.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75613875
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/AggregateWithObjectAggregateBufferSuite.scala
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import 
org.apache.spark.sql.AggregateWithObjectAggregateBufferSuite.MaxWithObjectAggregateBuffer
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, GenericMutableRow, MutableRow, UnsafeRow}
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate, 
WithObjectAggregateBuffer}
+import org.apache.spark.sql.execution.aggregate.{SortAggregateExec}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{AbstractDataType, DataType, 
IntegerType, StructType}
+
+class AggregateWithObjectAggregateBufferSuite extends QueryTest with 
SharedSQLContext {
--- End diff --

We should also put a basic test in 
`HashAggregationQueryWithControlledFallbackSuite`, to test the fallback.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75613743
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
 ---
@@ -90,6 +98,21 @@ class SortBasedAggregationIterator(
   // compared to MutableRow (aggregation buffer) directly.
   private[this] val safeProj: Projection = 
FromUnsafeProjection(valueAttributes.map(_.dataType))
 
+  // AggregationFunction which store generic object in AggregationBuffer.
+  // @see [[WithObjectAggregationBuffer]] for more information
+  private val aggFunctionsWithObjectAggregationBuffer = 
aggregateFunctions.collect {
+case (ag: ImperativeAggregate with WithObjectAggregateBuffer) => ag
--- End diff --

how about we make `WithObjectAggregateBuffer` extends `ImperativeAggregate`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75613492
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 ---
@@ -389,3 +389,89 @@ abstract class DeclarativeAggregate
 def right: AttributeReference = 
inputAggBufferAttributes(aggBufferAttributes.indexOf(a))
   }
 }
+
+/**
+ * This traits allow user to define an AggregateFunction which can store 
**arbitrary** Java objects
+ * in Aggregation buffer during aggregation of each key group. This trait 
must be mixed with
+ * class ImperativeAggregate.
+ *
+ * Here is how it works in a typical aggregation flow (Partial mode 
aggregate at Mapper side, and
+ * Final mode aggregate at Reducer side).
+ *
+ * Stage 1: Partial aggregate at Mapper side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores an arbitrary empty
+ *object, object A for example, in aggBuffer. The object A will be 
used to store the
+ *accumulated aggregation result.
+ *  1. Upon calling method `update(mutableAggBuffer: MutableRow, inputRow: 
InternalRow)` in
+ *current group (group by key), user extracts object A from 
mutableAggBuffer, and then updates
+ *object A with current inputRow. After updating, object A is stored 
back to mutableAggBuffer.
+ *  1. After processing all rows of current group, the framework will call 
method
+ *`serializeObjectAggregationBufferInPlace(aggregationBuffer: 
MutableRow)` to serialize object A
+ *to a serializable format in place.
+ *  1. The framework may spill the aggregationBuffer to disk if there is 
not enough memory.
+ *It is safe since we have already convert aggregationBuffer to 
serializable format.
+ *  1. Spark framework moves on to next group, until all groups have been
+ *processed.
+ *
+ * Shuffling exchange data to Reducer tasks...
+ *
+ * Stage 2: Final mode aggregate at Reducer side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores a new empty object A1
+ *in aggBuffer. The object A1 will be used to store the accumulated 
aggregation result.
+ *  1. Upon calling method `merge(mutableAggBuffer: MutableRow, 
inputAggBuffer: InternalRow)`, user
+ *extracts object A1 from mutableAggBuffer, and extracts object A2 
from inputAggBuffer. then
+ *user needs to merge A1, and A2, and stores the merged result back to 
mutableAggBuffer.
+ *  1. After processing all inputAggBuffer of current group (group by 
key), the Spark framework will
+ *call method 
`serializeObjectAggregationBufferInPlace(aggregationBuffer: MutableRow)` to
+ *serialize object A1 to a serializable format in place.
+ *  1. The Spark framework may spill the aggregationBuffer to disk if 
there is not enough memory.
+ *It is safe since we have already convert aggregationBuffer to 
serializable format.
+ *  1. Spark framework moves on to next group, until all groups have been 
processed.
+ */
+trait WithObjectAggregateBuffer {
+  this: ImperativeAggregate =>
+
+  /**
+   * Serializes and in-place replaces the object stored in Aggregation 
buffer. The framework
+   * calls this method every time after finishing updating/merging one 
group (group by key).
+   *
+   * aggregationBuffer before serialization:
+   *
+   * The object stored in aggregationBuffer can be **arbitrary** Java 
objects defined by user.
+   *
+   * aggregationBuffer after serialization:
+   *
+   * The object's type must be one of:
--- End diff --

how about `The serialized object must be Spark SQL internal format.`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75613439
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 ---
@@ -389,3 +389,89 @@ abstract class DeclarativeAggregate
 def right: AttributeReference = 
inputAggBufferAttributes(aggBufferAttributes.indexOf(a))
   }
 }
+
+/**
+ * This traits allow user to define an AggregateFunction which can store 
**arbitrary** Java objects
+ * in Aggregation buffer during aggregation of each key group. This trait 
must be mixed with
+ * class ImperativeAggregate.
+ *
+ * Here is how it works in a typical aggregation flow (Partial mode 
aggregate at Mapper side, and
+ * Final mode aggregate at Reducer side).
+ *
+ * Stage 1: Partial aggregate at Mapper side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores an arbitrary empty
+ *object, object A for example, in aggBuffer. The object A will be 
used to store the
+ *accumulated aggregation result.
+ *  1. Upon calling method `update(mutableAggBuffer: MutableRow, inputRow: 
InternalRow)` in
+ *current group (group by key), user extracts object A from 
mutableAggBuffer, and then updates
+ *object A with current inputRow. After updating, object A is stored 
back to mutableAggBuffer.
+ *  1. After processing all rows of current group, the framework will call 
method
+ *`serializeObjectAggregationBufferInPlace(aggregationBuffer: 
MutableRow)` to serialize object A
+ *to a serializable format in place.
+ *  1. The framework may spill the aggregationBuffer to disk if there is 
not enough memory.
+ *It is safe since we have already convert aggregationBuffer to 
serializable format.
+ *  1. Spark framework moves on to next group, until all groups have been
+ *processed.
+ *
+ * Shuffling exchange data to Reducer tasks...
+ *
+ * Stage 2: Final mode aggregate at Reducer side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores a new empty object A1
+ *in aggBuffer. The object A1 will be used to store the accumulated 
aggregation result.
+ *  1. Upon calling method `merge(mutableAggBuffer: MutableRow, 
inputAggBuffer: InternalRow)`, user
+ *extracts object A1 from mutableAggBuffer, and extracts object A2 
from inputAggBuffer. then
--- End diff --

I think we should extract the Spark SQL format from inputAggBuffer and 
deserialize it to object A2


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75613381
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 ---
@@ -389,3 +389,89 @@ abstract class DeclarativeAggregate
 def right: AttributeReference = 
inputAggBufferAttributes(aggBufferAttributes.indexOf(a))
   }
 }
+
+/**
+ * This traits allow user to define an AggregateFunction which can store 
**arbitrary** Java objects
+ * in Aggregation buffer during aggregation of each key group. This trait 
must be mixed with
+ * class ImperativeAggregate.
+ *
+ * Here is how it works in a typical aggregation flow (Partial mode 
aggregate at Mapper side, and
+ * Final mode aggregate at Reducer side).
+ *
+ * Stage 1: Partial aggregate at Mapper side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores an arbitrary empty
+ *object, object A for example, in aggBuffer. The object A will be 
used to store the
+ *accumulated aggregation result.
+ *  1. Upon calling method `update(mutableAggBuffer: MutableRow, inputRow: 
InternalRow)` in
+ *current group (group by key), user extracts object A from 
mutableAggBuffer, and then updates
+ *object A with current inputRow. After updating, object A is stored 
back to mutableAggBuffer.
+ *  1. After processing all rows of current group, the framework will call 
method
+ *`serializeObjectAggregationBufferInPlace(aggregationBuffer: 
MutableRow)` to serialize object A
+ *to a serializable format in place.
+ *  1. The framework may spill the aggregationBuffer to disk if there is 
not enough memory.
+ *It is safe since we have already convert aggregationBuffer to 
serializable format.
+ *  1. Spark framework moves on to next group, until all groups have been
+ *processed.
+ *
+ * Shuffling exchange data to Reducer tasks...
+ *
+ * Stage 2: Final mode aggregate at Reducer side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores a new empty object A1
+ *in aggBuffer. The object A1 will be used to store the accumulated 
aggregation result.
--- End diff --

`accumulated aggregation result`? it's still buffer right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-21 Thread clockfly
Github user clockfly commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75612665
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 ---
@@ -389,3 +389,89 @@ abstract class DeclarativeAggregate
 def right: AttributeReference = 
inputAggBufferAttributes(aggBufferAttributes.indexOf(a))
   }
 }
+
+/**
+ * This traits allow user to define an AggregateFunction which can store 
**arbitrary** Java objects
+ * in Aggregation buffer during aggregation of each key group. This trait 
must be mixed with
+ * class ImperativeAggregate.
+ *
+ * Here is how it works in a typical aggregation flow (Partial mode 
aggregate at Mapper side, and
+ * Final mode aggregate at Reducer side).
+ *
+ * Stage 1: Partial aggregate at Mapper side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores an arbitrary empty
+ *object, object A for example, in aggBuffer. The object A will be 
used to store the
+ *accumulated aggregation result.
+ *  1. Upon calling method `update(mutableAggBuffer: MutableRow, inputRow: 
InternalRow)` in
+ *current group (group by key), user extracts object A from 
mutableAggBuffer, and then updates
+ *object A with current inputRow. After updating, object A is stored 
back to mutableAggBuffer.
+ *  1. After processing all rows of current group, the framework will call 
method
+ *`serializeObjectAggregationBufferInPlace(aggregationBuffer: 
MutableRow)` to serialize object A
+ *to a serializable format in place.
+ *  1. The framework may spill the aggregationBuffer to disk if there is 
not enough memory.
+ *It is safe since we have already convert aggregationBuffer to 
serializable format.
+ *  1. Spark framework moves on to next group, until all groups have been
+ *processed.
+ *
+ * Shuffling exchange data to Reducer tasks...
+ *
+ * Stage 2: Final mode aggregate at Reducer side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores a new empty object A1
+ *in aggBuffer. The object A1 will be used to store the accumulated 
aggregation result.
+ *  1. Upon calling method `merge(mutableAggBuffer: MutableRow, 
inputAggBuffer: InternalRow)`, user
+ *extracts object A1 from mutableAggBuffer, and extracts object A2 
from inputAggBuffer. then
+ *user needs to merge A1, and A2, and stores the merged result back to 
mutableAggBuffer.
+ *  1. After processing all inputAggBuffer of current group (group by 
key), the Spark framework will
+ *call method 
`serializeObjectAggregationBufferInPlace(aggregationBuffer: MutableRow)` to
+ *serialize object A1 to a serializable format in place.
+ *  1. The Spark framework may spill the aggregationBuffer to disk if 
there is not enough memory.
+ *It is safe since we have already convert aggregationBuffer to 
serializable format.
+ *  1. Spark framework moves on to next group, until all groups have been 
processed.
+ */
+trait WithObjectAggregateBuffer {
+  this: ImperativeAggregate =>
+
+  /**
+   * Serializes and in-place replaces the object stored in Aggregation 
buffer. The framework
+   * calls this method every time after finishing updating/merging one 
group (group by key).
+   *
+   * aggregationBuffer before serialization:
+   *
+   * The object stored in aggregationBuffer can be **arbitrary** Java 
objects defined by user.
+   *
+   * aggregationBuffer after serialization:
+   *
+   * The object's type must be one of:
--- End diff --

OK, I will rephrase this part. I meant to say object type after 
serialization


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-21 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75612644
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 ---
@@ -389,3 +389,89 @@ abstract class DeclarativeAggregate
 def right: AttributeReference = 
inputAggBufferAttributes(aggBufferAttributes.indexOf(a))
   }
 }
+
+/**
+ * This traits allow user to define an AggregateFunction which can store 
**arbitrary** Java objects
+ * in Aggregation buffer during aggregation of each key group. This trait 
must be mixed with
+ * class ImperativeAggregate.
+ *
+ * Here is how it works in a typical aggregation flow (Partial mode 
aggregate at Mapper side, and
+ * Final mode aggregate at Reducer side).
+ *
+ * Stage 1: Partial aggregate at Mapper side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores an arbitrary empty
+ *object, object A for example, in aggBuffer. The object A will be 
used to store the
+ *accumulated aggregation result.
+ *  1. Upon calling method `update(mutableAggBuffer: MutableRow, inputRow: 
InternalRow)` in
+ *current group (group by key), user extracts object A from 
mutableAggBuffer, and then updates
+ *object A with current inputRow. After updating, object A is stored 
back to mutableAggBuffer.
+ *  1. After processing all rows of current group, the framework will call 
method
+ *`serializeObjectAggregationBufferInPlace(aggregationBuffer: 
MutableRow)` to serialize object A
+ *to a serializable format in place.
+ *  1. The framework may spill the aggregationBuffer to disk if there is 
not enough memory.
+ *It is safe since we have already convert aggregationBuffer to 
serializable format.
+ *  1. Spark framework moves on to next group, until all groups have been
--- End diff --

So many 1. : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75612521
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 ---
@@ -389,3 +389,89 @@ abstract class DeclarativeAggregate
 def right: AttributeReference = 
inputAggBufferAttributes(aggBufferAttributes.indexOf(a))
   }
 }
+
+/**
+ * This traits allow user to define an AggregateFunction which can store 
**arbitrary** Java objects
+ * in Aggregation buffer during aggregation of each key group. This trait 
must be mixed with
+ * class ImperativeAggregate.
+ *
+ * Here is how it works in a typical aggregation flow (Partial mode 
aggregate at Mapper side, and
+ * Final mode aggregate at Reducer side).
+ *
+ * Stage 1: Partial aggregate at Mapper side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores an arbitrary empty
+ *object, object A for example, in aggBuffer. The object A will be 
used to store the
+ *accumulated aggregation result.
+ *  1. Upon calling method `update(mutableAggBuffer: MutableRow, inputRow: 
InternalRow)` in
+ *current group (group by key), user extracts object A from 
mutableAggBuffer, and then updates
+ *object A with current inputRow. After updating, object A is stored 
back to mutableAggBuffer.
+ *  1. After processing all rows of current group, the framework will call 
method
+ *`serializeObjectAggregationBufferInPlace(aggregationBuffer: 
MutableRow)` to serialize object A
+ *to a serializable format in place.
--- End diff --

`to a Spark SQL internal format(mostly BinaryType) in place`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-20 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75586776
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/AggregateWithObjectAggregateBufferSuite.scala
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import 
org.apache.spark.sql.AggregateWithObjectAggregateBufferSuite.MaxWithObjectAggregateBuffer
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, GenericMutableRow, MutableRow, UnsafeRow}
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.{ImperativeAggregate, 
WithObjectAggregateBuffer}
+import org.apache.spark.sql.execution.aggregate.{SortAggregateExec}
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{AbstractDataType, DataType, 
IntegerType, StructType}
+
+class AggregateWithObjectAggregateBufferSuite extends QueryTest with 
SharedSQLContext {
+
+  import testImplicits._
+
+  private val data = Seq((1, 0), (3, 1), (2, 0), (6, 3), (3, 1), (4, 1), 
(5, 0))
+
+
+  test("aggregate with object aggregate buffer, should not use 
HashAggregate") {
+val df = data.toDF("a", "b")
+val max = new MaxWithObjectAggregateBuffer($"a".expr)
+
+// Always use SortAggregateExec instead of HashAggregateExec for 
planning even if the aggregate
+//  buffer attributes are mutable fields (every field can be mutated 
inline like int, long...)
+val allFieldsMutable = 
max.aggBufferSchema.map(_.dataType).forall(UnsafeRow.isMutable)
+val sparkPlan = 
df.select(Column(max.toAggregateExpression())).queryExecution.sparkPlan
+assert(allFieldsMutable == true && 
sparkPlan.isInstanceOf[SortAggregateExec])
+  }
+
+  test("aggregate with object aggregate buffer, no group by") {
+val df = data.toDF("a", "b").coalesce(2)
+checkAnswer(
+  df.select(objectAggregateMax($"a"), count($"a"), 
objectAggregateMax($"b"), count($"b")),
+  Seq(Row(6, 7, 3, 7))
+)
+  }
+
+  test("aggregate with object aggregate buffer, with group by") {
+val df = data.toDF("a", "b").coalesce(2)
+checkAnswer(
+  df.groupBy($"b").agg(objectAggregateMax($"a"), count($"a"), 
objectAggregateMax($"a")),
+  Seq(
+Row(0, 5, 3, 5),
+Row(1, 4, 3, 4),
+Row(3, 6, 1, 6)
+  )
+)
+  }
+
+  test("aggregate with object aggregate buffer, empty inputs, no group 
by") {
+val empty = Seq.empty[(Int, Int)].toDF("a", "b")
+checkAnswer(
+  empty.select(objectAggregateMax($"a"), count($"a"), 
objectAggregateMax($"b"), count($"b")),
+  Seq(Row(Int.MinValue, 0, Int.MinValue, 0)))
+  }
+
+  test("aggregate with object aggregate buffer, empty inputs, with group 
by") {
+val empty = Seq.empty[(Int, Int)].toDF("a", "b")
+checkAnswer(
+  empty.groupBy($"b").agg(objectAggregateMax($"a"), count($"a"), 
objectAggregateMax($"a")),
+  Seq.empty[Row])
+  }
+
+  private def objectAggregateMax(column: Column): Column = {
+val max = MaxWithObjectAggregateBuffer(column.expr)
+Column(max.toAggregateExpression())
+  }
+}
+
+object AggregateWithObjectAggregateBufferSuite {
--- End diff --

(we do not need to put the example class inside this object.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-20 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75586764
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 ---
@@ -389,3 +389,89 @@ abstract class DeclarativeAggregate
 def right: AttributeReference = 
inputAggBufferAttributes(aggBufferAttributes.indexOf(a))
   }
 }
+
+/**
+ * This traits allow user to define an AggregateFunction which can store 
**arbitrary** Java objects
+ * in Aggregation buffer during aggregation of each key group. This trait 
must be mixed with
+ * class ImperativeAggregate.
+ *
+ * Here is how it works in a typical aggregation flow (Partial mode 
aggregate at Mapper side, and
+ * Final mode aggregate at Reducer side).
+ *
+ * Stage 1: Partial aggregate at Mapper side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores an arbitrary empty
+ *object, object A for example, in aggBuffer. The object A will be 
used to store the
+ *accumulated aggregation result.
+ *  1. Upon calling method `update(mutableAggBuffer: MutableRow, inputRow: 
InternalRow)` in
+ *current group (group by key), user extracts object A from 
mutableAggBuffer, and then updates
+ *object A with current inputRow. After updating, object A is stored 
back to mutableAggBuffer.
+ *  1. After processing all rows of current group, the framework will call 
method
+ *`serializeObjectAggregationBufferInPlace(aggregationBuffer: 
MutableRow)` to serialize object A
+ *to a serializable format in place.
+ *  1. The framework may spill the aggregationBuffer to disk if there is 
not enough memory.
+ *It is safe since we have already convert aggregationBuffer to 
serializable format.
+ *  1. Spark framework moves on to next group, until all groups have been
+ *processed.
+ *
+ * Shuffling exchange data to Reducer tasks...
+ *
+ * Stage 2: Final mode aggregate at Reducer side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores a new empty object A1
+ *in aggBuffer. The object A1 will be used to store the accumulated 
aggregation result.
+ *  1. Upon calling method `merge(mutableAggBuffer: MutableRow, 
inputAggBuffer: InternalRow)`, user
+ *extracts object A1 from mutableAggBuffer, and extracts object A2 
from inputAggBuffer. then
+ *user needs to merge A1, and A2, and stores the merged result back to 
mutableAggBuffer.
+ *  1. After processing all inputAggBuffer of current group (group by 
key), the Spark framework will
+ *call method 
`serializeObjectAggregationBufferInPlace(aggregationBuffer: MutableRow)` to
+ *serialize object A1 to a serializable format in place.
+ *  1. The Spark framework may spill the aggregationBuffer to disk if 
there is not enough memory.
+ *It is safe since we have already convert aggregationBuffer to 
serializable format.
+ *  1. Spark framework moves on to next group, until all groups have been 
processed.
+ */
+trait WithObjectAggregateBuffer {
+  this: ImperativeAggregate =>
+
+  /**
+   * Serializes and in-place replaces the object stored in Aggregation 
buffer. The framework
+   * calls this method every time after finishing updating/merging one 
group (group by key).
+   *
+   * aggregationBuffer before serialization:
+   *
+   * The object stored in aggregationBuffer can be **arbitrary** Java 
objects defined by user.
--- End diff --

Seems we want to mention that the data type is `ObjectType`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-20 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75586760
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 ---
@@ -389,3 +389,89 @@ abstract class DeclarativeAggregate
 def right: AttributeReference = 
inputAggBufferAttributes(aggBufferAttributes.indexOf(a))
   }
 }
+
+/**
+ * This traits allow user to define an AggregateFunction which can store 
**arbitrary** Java objects
+ * in Aggregation buffer during aggregation of each key group. This trait 
must be mixed with
+ * class ImperativeAggregate.
+ *
+ * Here is how it works in a typical aggregation flow (Partial mode 
aggregate at Mapper side, and
+ * Final mode aggregate at Reducer side).
+ *
+ * Stage 1: Partial aggregate at Mapper side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores an arbitrary empty
+ *object, object A for example, in aggBuffer. The object A will be 
used to store the
+ *accumulated aggregation result.
+ *  1. Upon calling method `update(mutableAggBuffer: MutableRow, inputRow: 
InternalRow)` in
+ *current group (group by key), user extracts object A from 
mutableAggBuffer, and then updates
+ *object A with current inputRow. After updating, object A is stored 
back to mutableAggBuffer.
+ *  1. After processing all rows of current group, the framework will call 
method
+ *`serializeObjectAggregationBufferInPlace(aggregationBuffer: 
MutableRow)` to serialize object A
+ *to a serializable format in place.
+ *  1. The framework may spill the aggregationBuffer to disk if there is 
not enough memory.
+ *It is safe since we have already convert aggregationBuffer to 
serializable format.
+ *  1. Spark framework moves on to next group, until all groups have been
+ *processed.
+ *
+ * Shuffling exchange data to Reducer tasks...
+ *
+ * Stage 2: Final mode aggregate at Reducer side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores a new empty object A1
+ *in aggBuffer. The object A1 will be used to store the accumulated 
aggregation result.
+ *  1. Upon calling method `merge(mutableAggBuffer: MutableRow, 
inputAggBuffer: InternalRow)`, user
+ *extracts object A1 from mutableAggBuffer, and extracts object A2 
from inputAggBuffer. then
+ *user needs to merge A1, and A2, and stores the merged result back to 
mutableAggBuffer.
+ *  1. After processing all inputAggBuffer of current group (group by 
key), the Spark framework will
+ *call method 
`serializeObjectAggregationBufferInPlace(aggregationBuffer: MutableRow)` to
+ *serialize object A1 to a serializable format in place.
+ *  1. The Spark framework may spill the aggregationBuffer to disk if 
there is not enough memory.
+ *It is safe since we have already convert aggregationBuffer to 
serializable format.
+ *  1. Spark framework moves on to next group, until all groups have been 
processed.
+ */
+trait WithObjectAggregateBuffer {
+  this: ImperativeAggregate =>
+
+  /**
+   * Serializes and in-place replaces the object stored in Aggregation 
buffer. The framework
+   * calls this method every time after finishing updating/merging one 
group (group by key).
+   *
+   * aggregationBuffer before serialization:
+   *
+   * The object stored in aggregationBuffer can be **arbitrary** Java 
objects defined by user.
+   *
+   * aggregationBuffer after serialization:
+   *
+   * The object's type must be one of:
+   *
+   *  - Null
+   *  - Boolean
+   *  - Byte
+   *  - Short
+   *  - Int
+   *  - Long
+   *  - Float
+   *  - Double
+   *  - Array[Byte]
+   *  - org.apache.spark.sql.types.Decimal
+   *  - org.apache.spark.unsafe.types.UTF8String
+   *  - org.apache.spark.unsafe.types.CalendarInterval
+   *  - org.apache.spark.sql.catalyst.util.MapData
+   *  - org.apache.spark.sql.catalyst.util.ArrayData
+   *  - org.apache.spark.sql.catalyst.InternalRow
+   *
+   * Code example:
+   *
+   * {{{
+   *   override def 
serializeObjectAggregationBufferInPlace(aggregationBuffer: MutableRow): Unit = {
+   * val obj = buffer.get(mutableAggBufferOffset, 
ObjectType(classOf[A])).asInstanceOf[A]
+   * // Convert the obj to bytes, which is a serializable format.
+   * buffer(mutableAggBufferOffset) = toBytes(obj)
--- End diff --

I am not sure it is the best example. At here, we are showing that the 
value of a field can be an java object or an byte array. 

I guess a more general question for this method will be if this approach 
work for all "supported" serialized types (e.g. the serialized 

[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-20 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75586661
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 ---
@@ -389,3 +389,89 @@ abstract class DeclarativeAggregate
 def right: AttributeReference = 
inputAggBufferAttributes(aggBufferAttributes.indexOf(a))
   }
 }
+
+/**
+ * This traits allow user to define an AggregateFunction which can store 
**arbitrary** Java objects
+ * in Aggregation buffer during aggregation of each key group. This trait 
must be mixed with
+ * class ImperativeAggregate.
+ *
+ * Here is how it works in a typical aggregation flow (Partial mode 
aggregate at Mapper side, and
+ * Final mode aggregate at Reducer side).
+ *
+ * Stage 1: Partial aggregate at Mapper side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores an arbitrary empty
+ *object, object A for example, in aggBuffer. The object A will be 
used to store the
+ *accumulated aggregation result.
+ *  1. Upon calling method `update(mutableAggBuffer: MutableRow, inputRow: 
InternalRow)` in
+ *current group (group by key), user extracts object A from 
mutableAggBuffer, and then updates
+ *object A with current inputRow. After updating, object A is stored 
back to mutableAggBuffer.
+ *  1. After processing all rows of current group, the framework will call 
method
+ *`serializeObjectAggregationBufferInPlace(aggregationBuffer: 
MutableRow)` to serialize object A
+ *to a serializable format in place.
+ *  1. The framework may spill the aggregationBuffer to disk if there is 
not enough memory.
+ *It is safe since we have already convert aggregationBuffer to 
serializable format.
+ *  1. Spark framework moves on to next group, until all groups have been
+ *processed.
+ *
+ * Shuffling exchange data to Reducer tasks...
+ *
+ * Stage 2: Final mode aggregate at Reducer side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores a new empty object A1
+ *in aggBuffer. The object A1 will be used to store the accumulated 
aggregation result.
+ *  1. Upon calling method `merge(mutableAggBuffer: MutableRow, 
inputAggBuffer: InternalRow)`, user
+ *extracts object A1 from mutableAggBuffer, and extracts object A2 
from inputAggBuffer. then
+ *user needs to merge A1, and A2, and stores the merged result back to 
mutableAggBuffer.
+ *  1. After processing all inputAggBuffer of current group (group by 
key), the Spark framework will
+ *call method 
`serializeObjectAggregationBufferInPlace(aggregationBuffer: MutableRow)` to
+ *serialize object A1 to a serializable format in place.
+ *  1. The Spark framework may spill the aggregationBuffer to disk if 
there is not enough memory.
+ *It is safe since we have already convert aggregationBuffer to 
serializable format.
+ *  1. Spark framework moves on to next group, until all groups have been 
processed.
+ */
+trait WithObjectAggregateBuffer {
+  this: ImperativeAggregate =>
+
+  /**
+   * Serializes and in-place replaces the object stored in Aggregation 
buffer. The framework
+   * calls this method every time after finishing updating/merging one 
group (group by key).
+   *
+   * aggregationBuffer before serialization:
+   *
+   * The object stored in aggregationBuffer can be **arbitrary** Java 
objects defined by user.
+   *
+   * aggregationBuffer after serialization:
+   *
+   * The object's type must be one of:
--- End diff --

How about we rephrase this part? We mentioned that we can use `arbitrary` 
java objects. But, here we are saying that `The object's type must be one of:`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-20 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75586622
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 ---
@@ -389,3 +389,89 @@ abstract class DeclarativeAggregate
 def right: AttributeReference = 
inputAggBufferAttributes(aggBufferAttributes.indexOf(a))
   }
 }
+
+/**
+ * This traits allow user to define an AggregateFunction which can store 
**arbitrary** Java objects
+ * in Aggregation buffer during aggregation of each key group. This trait 
must be mixed with
+ * class ImperativeAggregate.
--- End diff --

I think at here, we need to emphasize that the buffer is an internal buffer 
because we will emit this buffer as the result of an aggregate operator.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-20 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75586350
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 ---
@@ -389,3 +389,89 @@ abstract class DeclarativeAggregate
 def right: AttributeReference = 
inputAggBufferAttributes(aggBufferAttributes.indexOf(a))
   }
 }
+
+/**
+ * This traits allow user to define an AggregateFunction which can store 
**arbitrary** Java objects
+ * in Aggregation buffer during aggregation of each key group. This trait 
must be mixed with
+ * class ImperativeAggregate.
+ *
+ * Here is how it works in a typical aggregation flow (Partial mode 
aggregate at Mapper side, and
+ * Final mode aggregate at Reducer side).
+ *
+ * Stage 1: Partial aggregate at Mapper side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores an arbitrary empty
+ *object, object A for example, in aggBuffer. The object A will be 
used to store the
+ *accumulated aggregation result.
+ *  1. Upon calling method `update(mutableAggBuffer: MutableRow, inputRow: 
InternalRow)` in
+ *current group (group by key), user extracts object A from 
mutableAggBuffer, and then updates
+ *object A with current inputRow. After updating, object A is stored 
back to mutableAggBuffer.
+ *  1. After processing all rows of current group, the framework will call 
method
+ *`serializeObjectAggregationBufferInPlace(aggregationBuffer: 
MutableRow)` to serialize object A
+ *to a serializable format in place.
+ *  1. The framework may spill the aggregationBuffer to disk if there is 
not enough memory.
+ *It is safe since we have already convert aggregationBuffer to 
serializable format.
+ *  1. Spark framework moves on to next group, until all groups have been
+ *processed.
+ *
+ * Shuffling exchange data to Reducer tasks...
+ *
+ * Stage 2: Final mode aggregate at Reducer side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores a new empty object A1
+ *in aggBuffer. The object A1 will be used to store the accumulated 
aggregation result.
+ *  1. Upon calling method `merge(mutableAggBuffer: MutableRow, 
inputAggBuffer: InternalRow)`, user
+ *extracts object A1 from mutableAggBuffer, and extracts object A2 
from inputAggBuffer. then
+ *user needs to merge A1, and A2, and stores the merged result back to 
mutableAggBuffer.
+ *  1. After processing all inputAggBuffer of current group (group by 
key), the Spark framework will
+ *call method 
`serializeObjectAggregationBufferInPlace(aggregationBuffer: MutableRow)` to
+ *serialize object A1 to a serializable format in place.
+ *  1. The Spark framework may spill the aggregationBuffer to disk if 
there is not enough memory.
+ *It is safe since we have already convert aggregationBuffer to 
serializable format.
+ *  1. Spark framework moves on to next group, until all groups have been 
processed.
+ */
+trait WithObjectAggregateBuffer {
+  this: ImperativeAggregate =>
--- End diff --

oh, seems this trait will be still an java `interface`. But, I think in 
general, we do not really need to have this line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-20 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75586238
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 ---
@@ -389,3 +389,89 @@ abstract class DeclarativeAggregate
 def right: AttributeReference = 
inputAggBufferAttributes(aggBufferAttributes.indexOf(a))
   }
 }
+
+/**
+ * This traits allow user to define an AggregateFunction which can store 
**arbitrary** Java objects
--- End diff --

I think it is better to remove `allow users` because it is not exposed to 
end-users for defining UDAFs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-20 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75586232
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 ---
@@ -389,3 +389,89 @@ abstract class DeclarativeAggregate
 def right: AttributeReference = 
inputAggBufferAttributes(aggBufferAttributes.indexOf(a))
   }
 }
+
+/**
+ * This traits allow user to define an AggregateFunction which can store 
**arbitrary** Java objects
+ * in Aggregation buffer during aggregation of each key group. This trait 
must be mixed with
+ * class ImperativeAggregate.
+ *
+ * Here is how it works in a typical aggregation flow (Partial mode 
aggregate at Mapper side, and
+ * Final mode aggregate at Reducer side).
+ *
+ * Stage 1: Partial aggregate at Mapper side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores an arbitrary empty
+ *object, object A for example, in aggBuffer. The object A will be 
used to store the
+ *accumulated aggregation result.
+ *  1. Upon calling method `update(mutableAggBuffer: MutableRow, inputRow: 
InternalRow)` in
+ *current group (group by key), user extracts object A from 
mutableAggBuffer, and then updates
+ *object A with current inputRow. After updating, object A is stored 
back to mutableAggBuffer.
+ *  1. After processing all rows of current group, the framework will call 
method
+ *`serializeObjectAggregationBufferInPlace(aggregationBuffer: 
MutableRow)` to serialize object A
+ *to a serializable format in place.
+ *  1. The framework may spill the aggregationBuffer to disk if there is 
not enough memory.
+ *It is safe since we have already convert aggregationBuffer to 
serializable format.
+ *  1. Spark framework moves on to next group, until all groups have been
+ *processed.
+ *
+ * Shuffling exchange data to Reducer tasks...
+ *
+ * Stage 2: Final mode aggregate at Reducer side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores a new empty object A1
+ *in aggBuffer. The object A1 will be used to store the accumulated 
aggregation result.
+ *  1. Upon calling method `merge(mutableAggBuffer: MutableRow, 
inputAggBuffer: InternalRow)`, user
+ *extracts object A1 from mutableAggBuffer, and extracts object A2 
from inputAggBuffer. then
+ *user needs to merge A1, and A2, and stores the merged result back to 
mutableAggBuffer.
+ *  1. After processing all inputAggBuffer of current group (group by 
key), the Spark framework will
+ *call method 
`serializeObjectAggregationBufferInPlace(aggregationBuffer: MutableRow)` to
+ *serialize object A1 to a serializable format in place.
+ *  1. The Spark framework may spill the aggregationBuffer to disk if 
there is not enough memory.
+ *It is safe since we have already convert aggregationBuffer to 
serializable format.
+ *  1. Spark framework moves on to next group, until all groups have been 
processed.
+ */
+trait WithObjectAggregateBuffer {
+  this: ImperativeAggregate =>
--- End diff --

I guess having this line will make this trait hard to be used in Java.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-20 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75586233
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 ---
@@ -389,3 +389,89 @@ abstract class DeclarativeAggregate
 def right: AttributeReference = 
inputAggBufferAttributes(aggBufferAttributes.indexOf(a))
   }
 }
+
+/**
+ * This traits allow user to define an AggregateFunction which can store 
**arbitrary** Java objects
--- End diff --

`This trait allows an AggregateFunction to use ...`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-20 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/14723#discussion_r75586183
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 ---
@@ -389,3 +389,89 @@ abstract class DeclarativeAggregate
 def right: AttributeReference = 
inputAggBufferAttributes(aggBufferAttributes.indexOf(a))
   }
 }
+
+/**
+ * This traits allow user to define an AggregateFunction which can store 
**arbitrary** Java objects
+ * in Aggregation buffer during aggregation of each key group. This trait 
must be mixed with
+ * class ImperativeAggregate.
+ *
+ * Here is how it works in a typical aggregation flow (Partial mode 
aggregate at Mapper side, and
+ * Final mode aggregate at Reducer side).
+ *
+ * Stage 1: Partial aggregate at Mapper side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores an arbitrary empty
+ *object, object A for example, in aggBuffer. The object A will be 
used to store the
+ *accumulated aggregation result.
+ *  1. Upon calling method `update(mutableAggBuffer: MutableRow, inputRow: 
InternalRow)` in
+ *current group (group by key), user extracts object A from 
mutableAggBuffer, and then updates
+ *object A with current inputRow. After updating, object A is stored 
back to mutableAggBuffer.
+ *  1. After processing all rows of current group, the framework will call 
method
+ *`serializeObjectAggregationBufferInPlace(aggregationBuffer: 
MutableRow)` to serialize object A
+ *to a serializable format in place.
+ *  1. The framework may spill the aggregationBuffer to disk if there is 
not enough memory.
+ *It is safe since we have already convert aggregationBuffer to 
serializable format.
+ *  1. Spark framework moves on to next group, until all groups have been
+ *processed.
+ *
+ * Shuffling exchange data to Reducer tasks...
+ *
+ * Stage 2: Final mode aggregate at Reducer side:
+ *
+ *  1. Upon calling method `initialize(aggBuffer: MutableRow)`, user 
stores a new empty object A1
+ *in aggBuffer. The object A1 will be used to store the accumulated 
aggregation result.
+ *  1. Upon calling method `merge(mutableAggBuffer: MutableRow, 
inputAggBuffer: InternalRow)`, user
+ *extracts object A1 from mutableAggBuffer, and extracts object A2 
from inputAggBuffer. then
+ *user needs to merge A1, and A2, and stores the merged result back to 
mutableAggBuffer.
+ *  1. After processing all inputAggBuffer of current group (group by 
key), the Spark framework will
+ *call method 
`serializeObjectAggregationBufferInPlace(aggregationBuffer: MutableRow)` to
+ *serialize object A1 to a serializable format in place.
+ *  1. The Spark framework may spill the aggregationBuffer to disk if 
there is not enough memory.
+ *It is safe since we have already convert aggregationBuffer to 
serializable format.
+ *  1. Spark framework moves on to next group, until all groups have been 
processed.
+ */
+trait WithObjectAggregateBuffer {
+  this: ImperativeAggregate =>
--- End diff --

Semes we do not really need this line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14723: [SQL][WIP][Test] Supports object-based aggregatio...

2016-08-19 Thread clockfly
GitHub user clockfly opened a pull request:

https://github.com/apache/spark/pull/14723

[SQL][WIP][Test] Supports object-based aggregation function which can store 
arbitrary objects in aggregation buffer.

## What changes were proposed in this pull request

This PR allows user to define an AggregateFunction which can store 
**arbitrary** Java objects
in aggregation buffer, and use the Java object to do aggregation. Before 
this PR, user are only allowed to store a limited set of object type in 
aggregation buffer. Please see example usage at
class 
`org.apache.spark.sql.AggregateWithObjectAggregateBufferSuite.MaxWithObjectAggregateBuffer`

## How was this patch tested?

Unit tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/clockfly/spark object_aggregation_buffer_part1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14723.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14723






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org