[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2018-07-19 Thread pgandhi999
Github user pgandhi999 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r203828759
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -365,4 +394,74 @@ private[hive] case class HiveUDAFFunction(
 val distinct = if (isDistinct) "DISTINCT " else " "
 s"$name($distinct${children.map(_.sql).mkString(", ")})"
   }
+
+  override def createAggregationBuffer(): AggregationBuffer =
+partial1ModeEvaluator.getNewAggregationBuffer
+
+  @transient
+  private lazy val inputProjection = UnsafeProjection.create(children)
+
+  override def update(buffer: AggregationBuffer, input: InternalRow): Unit 
= {
+partial1ModeEvaluator.iterate(
+  buffer, wrap(inputProjection(input), inputWrappers, cached, 
inputDataTypes))
+  }
+
+  override def merge(buffer: AggregationBuffer, input: AggregationBuffer): 
Unit = {
+// The 2nd argument of the Hive `GenericUDAFEvaluator.merge()` method 
is an input aggregation
+// buffer in the 3rd format mentioned in the ScalaDoc of this class. 
Originally, Hive converts
+// this `AggregationBuffer`s into this format before shuffling partial 
aggregation results, and
+// calls `GenericUDAFEvaluator.terminatePartial()` to do the 
conversion.
+partial2ModeEvaluator.merge(buffer, 
partial1ModeEvaluator.terminatePartial(input))
--- End diff --

If we follow the code flow from interfaces.scala, we see that the results 
of aggregation buffer mode in PARTIAL2 is merged with the aggregation buffer in 
PARTIAL1. I am new to Spark and Hive, so just wanted to know the reason behind 
the above behaviour. If there are any docs suggesting this, do let me know. 
Thank you.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88382948
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -365,4 +380,66 @@ private[hive] case class HiveUDAFFunction(
 val distinct = if (isDistinct) "DISTINCT " else " "
 s"$name($distinct${children.map(_.sql).mkString(", ")})"
   }
+
+  override def createAggregationBuffer(): AggregationBuffer =
+partial1ModeEvaluator.getNewAggregationBuffer
+
+  @transient
+  private lazy val inputProjection = new InterpretedProjection(children)
+
+  override def update(buffer: AggregationBuffer, input: InternalRow): Unit 
= {
+partial1ModeEvaluator.iterate(
+  buffer, wrap(inputProjection(input), inputWrappers, cached, 
inputDataTypes))
+  }
+
+  override def merge(buffer: AggregationBuffer, input: AggregationBuffer): 
Unit = {
+partial2ModeEvaluator.merge(buffer, 
partial1ModeEvaluator.terminatePartial(input))
+  }
+
+  override def eval(buffer: AggregationBuffer): Any = {
+resultUnwrapper(finalModeEvaluator.terminate(buffer))
+  }
+
+  override def serialize(buffer: AggregationBuffer): Array[Byte] = {
+aggBufferSerDe.serialize(buffer)
+  }
+
+  override def deserialize(bytes: Array[Byte]): AggregationBuffer = {
+aggBufferSerDe.deserialize(bytes)
+  }
+
+  // Helper class used to de/serialize Hive UDAF `AggregationBuffer` 
objects
+  private class AggregationBufferSerDe {
+private val partialResultUnwrapper = 
unwrapperFor(partialResultInspector)
+
+private val partialResultWrapper = wrapperFor(partialResultInspector, 
partialResultDataType)
+
+private val projection = 
UnsafeProjection.create(Array(partialResultDataType))
+
+private val mutableRow = new GenericInternalRow(1)
+
+def serialize(buffer: AggregationBuffer): Array[Byte] = {
+  // `GenericUDAFEvaluator.terminatePartial()` converts an 
`AggregationBuffer` into an object
+  // that can be inspected by the `ObjectInspector` returned by 
`GenericUDAFEvaluator.init()`.
+  // Then we can unwrap it to a Spark SQL value.
+  mutableRow.update(0, 
partialResultUnwrapper(partial1ModeEvaluator.terminatePartial(buffer)))
+  val unsafeRow = projection(mutableRow)
+  val bytes = ByteBuffer.allocate(unsafeRow.getSizeInBytes)
+  unsafeRow.writeTo(bytes)
+  bytes.array()
--- End diff --

Actually they are different. If the buffer type is fixed length, then the 
`unsafeRow` is just a fixed-length bytes array, and `UnsafeRow.getBytes` will 
just return that array, instead of copying the memory.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88382746
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -365,4 +380,66 @@ private[hive] case class HiveUDAFFunction(
 val distinct = if (isDistinct) "DISTINCT " else " "
 s"$name($distinct${children.map(_.sql).mkString(", ")})"
   }
+
+  override def createAggregationBuffer(): AggregationBuffer =
+partial1ModeEvaluator.getNewAggregationBuffer
+
+  @transient
+  private lazy val inputProjection = new InterpretedProjection(children)
+
+  override def update(buffer: AggregationBuffer, input: InternalRow): Unit 
= {
+partial1ModeEvaluator.iterate(
+  buffer, wrap(inputProjection(input), inputWrappers, cached, 
inputDataTypes))
+  }
+
+  override def merge(buffer: AggregationBuffer, input: AggregationBuffer): 
Unit = {
+partial2ModeEvaluator.merge(buffer, 
partial1ModeEvaluator.terminatePartial(input))
+  }
+
+  override def eval(buffer: AggregationBuffer): Any = {
+resultUnwrapper(finalModeEvaluator.terminate(buffer))
+  }
+
+  override def serialize(buffer: AggregationBuffer): Array[Byte] = {
+aggBufferSerDe.serialize(buffer)
+  }
+
+  override def deserialize(bytes: Array[Byte]): AggregationBuffer = {
+aggBufferSerDe.deserialize(bytes)
+  }
+
+  // Helper class used to de/serialize Hive UDAF `AggregationBuffer` 
objects
+  private class AggregationBufferSerDe {
+private val partialResultUnwrapper = 
unwrapperFor(partialResultInspector)
+
+private val partialResultWrapper = wrapperFor(partialResultInspector, 
partialResultDataType)
+
+private val projection = 
UnsafeProjection.create(Array(partialResultDataType))
+
+private val mutableRow = new GenericInternalRow(1)
+
+def serialize(buffer: AggregationBuffer): Array[Byte] = {
+  // `GenericUDAFEvaluator.terminatePartial()` converts an 
`AggregationBuffer` into an object
+  // that can be inspected by the `ObjectInspector` returned by 
`GenericUDAFEvaluator.init()`.
+  // Then we can unwrap it to a Spark SQL value.
+  mutableRow.update(0, 
partialResultUnwrapper(partial1ModeEvaluator.terminatePartial(buffer)))
+  val unsafeRow = projection(mutableRow)
+  val bytes = ByteBuffer.allocate(unsafeRow.getSizeInBytes)
+  unsafeRow.writeTo(bytes)
+  bytes.array()
--- End diff --

but you also create an unnecessary `ByteBuffer`... as they are equivalent, 
doesn't `unsafeRow.getBytes` simpler?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/15703


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-16 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88326725
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -289,73 +302,75 @@ private[hive] case class HiveUDAFFunction(
   funcWrapper.createFunction[AbstractGenericUDAFResolver]()
 }
 
+  // Hive `ObjectInspector`s for all child expressions (input parameters 
of the function).
   @transient
-  private lazy val inspectors = children.map(toInspector).toArray
+  private lazy val inputInspectors = children.map(toInspector).toArray
 
+  // Spark SQL data types of input parameters.
   @transient
-  private lazy val functionAndInspector = {
-val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, 
false, false)
-val f = resolver.getEvaluator(parameterInfo)
-f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
+  private lazy val inputDataTypes: Array[DataType] = 
children.map(_.dataType).toArray
+
+  private def newEvaluator(): GenericUDAFEvaluator = {
+val parameterInfo = new 
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
+resolver.getEvaluator(parameterInfo)
   }
 
+  // The UDAF evaluator used to consume raw input rows and produce partial 
aggregation results.
   @transient
-  private lazy val function = functionAndInspector._1
+  private lazy val partial1ModeEvaluator = newEvaluator()
--- End diff --

ok. I think in general we should avoid of using this pattern. If we have to 
use it now, let's explain it in the comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-16 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88312625
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala 
---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, 
GenericUDAFEvaluator, GenericUDAFMax}
+import 
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.{AggregationBuffer, 
Mode}
+import org.apache.hadoop.hive.ql.util.JavaDataModel
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, 
ObjectInspectorFactory}
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+class HiveUDAFSuite extends QueryTest with TestHiveSingleton with 
SQLTestUtils {
+  import testImplicits._
+
+  protected override def beforeAll(): Unit = {
+sql(s"CREATE TEMPORARY FUNCTION mock AS 
'${classOf[MockUDAF].getName}'")
+sql(s"CREATE TEMPORARY FUNCTION hive_max AS 
'${classOf[GenericUDAFMax].getName}'")
+
+Seq(
+  (0: Integer) -> "val_0",
+  (1: Integer) -> "val_1",
+  (2: Integer) -> null,
+  (3: Integer) -> null
+).toDF("key", "value").repartition(2).createOrReplaceTempView("t")
+  }
+
+  protected override def afterAll(): Unit = {
+sql(s"DROP TEMPORARY FUNCTION IF EXISTS mock")
+sql(s"DROP TEMPORARY FUNCTION IF EXISTS hive_max")
+  }
+
+  test("built-in Hive UDAF") {
+val df = sql("SELECT hive_max(key) FROM t GROUP BY key % 2")
--- End diff --

Done. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-16 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88312643
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala 
---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, 
GenericUDAFEvaluator, GenericUDAFMax}
+import 
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.{AggregationBuffer, 
Mode}
+import org.apache.hadoop.hive.ql.util.JavaDataModel
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, 
ObjectInspectorFactory}
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+class HiveUDAFSuite extends QueryTest with TestHiveSingleton with 
SQLTestUtils {
+  import testImplicits._
+
+  protected override def beforeAll(): Unit = {
+sql(s"CREATE TEMPORARY FUNCTION mock AS 
'${classOf[MockUDAF].getName}'")
+sql(s"CREATE TEMPORARY FUNCTION hive_max AS 
'${classOf[GenericUDAFMax].getName}'")
+
+Seq(
+  (0: Integer) -> "val_0",
+  (1: Integer) -> "val_1",
+  (2: Integer) -> null,
+  (3: Integer) -> null
+).toDF("key", "value").repartition(2).createOrReplaceTempView("t")
+  }
+
+  protected override def afterAll(): Unit = {
+sql(s"DROP TEMPORARY FUNCTION IF EXISTS mock")
+sql(s"DROP TEMPORARY FUNCTION IF EXISTS hive_max")
+  }
+
+  test("built-in Hive UDAF") {
+val df = sql("SELECT hive_max(key) FROM t GROUP BY key % 2")
+
+val aggs = df.queryExecution.executedPlan.collect {
+  case agg: ObjectHashAggregateExec => agg
+}
+
+// There should be two aggregate operators, one for partial 
aggregation, and the other for
+// global aggregation.
+assert(aggs.length == 2)
+
+checkAnswer(df, Seq(
+  Row(2),
+  Row(3)
+))
+  }
+
+  test("customized Hive UDAF") {
+val df = sql("SELECT mock(value) FROM t GROUP BY key % 2")
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-16 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88312590
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -365,4 +380,66 @@ private[hive] case class HiveUDAFFunction(
 val distinct = if (isDistinct) "DISTINCT " else " "
 s"$name($distinct${children.map(_.sql).mkString(", ")})"
   }
+
+  override def createAggregationBuffer(): AggregationBuffer =
+partial1ModeEvaluator.getNewAggregationBuffer
+
+  @transient
+  private lazy val inputProjection = new InterpretedProjection(children)
+
+  override def update(buffer: AggregationBuffer, input: InternalRow): Unit 
= {
+partial1ModeEvaluator.iterate(
+  buffer, wrap(inputProjection(input), inputWrappers, cached, 
inputDataTypes))
+  }
+
+  override def merge(buffer: AggregationBuffer, input: AggregationBuffer): 
Unit = {
+partial2ModeEvaluator.merge(buffer, 
partial1ModeEvaluator.terminatePartial(input))
+  }
+
+  override def eval(buffer: AggregationBuffer): Any = {
+resultUnwrapper(finalModeEvaluator.terminate(buffer))
+  }
+
+  override def serialize(buffer: AggregationBuffer): Array[Byte] = {
+aggBufferSerDe.serialize(buffer)
+  }
+
+  override def deserialize(bytes: Array[Byte]): AggregationBuffer = {
+aggBufferSerDe.deserialize(bytes)
+  }
+
+  // Helper class used to de/serialize Hive UDAF `AggregationBuffer` 
objects
+  private class AggregationBufferSerDe {
+private val partialResultUnwrapper = 
unwrapperFor(partialResultInspector)
+
+private val partialResultWrapper = wrapperFor(partialResultInspector, 
partialResultDataType)
+
+private val projection = 
UnsafeProjection.create(Array(partialResultDataType))
+
+private val mutableRow = new GenericInternalRow(1)
+
+def serialize(buffer: AggregationBuffer): Array[Byte] = {
+  // `GenericUDAFEvaluator.terminatePartial()` converts an 
`AggregationBuffer` into an object
+  // that can be inspected by the `ObjectInspector` returned by 
`GenericUDAFEvaluator.init()`.
+  // Then we can unwrap it to a Spark SQL value.
+  mutableRow.update(0, 
partialResultUnwrapper(partial1ModeEvaluator.terminatePartial(buffer)))
+  val unsafeRow = projection(mutableRow)
+  val bytes = ByteBuffer.allocate(unsafeRow.getSizeInBytes)
+  unsafeRow.writeTo(bytes)
+  bytes.array()
--- End diff --

Aren't they equivalent in this case? `UnsafeRow.getBytes` also performs 
some more checks that are not necessary here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-16 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88311998
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -365,4 +380,66 @@ private[hive] case class HiveUDAFFunction(
 val distinct = if (isDistinct) "DISTINCT " else " "
 s"$name($distinct${children.map(_.sql).mkString(", ")})"
   }
+
+  override def createAggregationBuffer(): AggregationBuffer =
+partial1ModeEvaluator.getNewAggregationBuffer
+
+  @transient
+  private lazy val inputProjection = new InterpretedProjection(children)
+
+  override def update(buffer: AggregationBuffer, input: InternalRow): Unit 
= {
+partial1ModeEvaluator.iterate(
+  buffer, wrap(inputProjection(input), inputWrappers, cached, 
inputDataTypes))
+  }
+
+  override def merge(buffer: AggregationBuffer, input: AggregationBuffer): 
Unit = {
+partial2ModeEvaluator.merge(buffer, 
partial1ModeEvaluator.terminatePartial(input))
+  }
+
+  override def eval(buffer: AggregationBuffer): Any = {
+resultUnwrapper(finalModeEvaluator.terminate(buffer))
+  }
+
+  override def serialize(buffer: AggregationBuffer): Array[Byte] = {
+aggBufferSerDe.serialize(buffer)
+  }
+
+  override def deserialize(bytes: Array[Byte]): AggregationBuffer = {
+aggBufferSerDe.deserialize(bytes)
+  }
+
+  // Helper class used to de/serialize Hive UDAF `AggregationBuffer` 
objects
+  private class AggregationBufferSerDe {
+private val partialResultUnwrapper = 
unwrapperFor(partialResultInspector)
+
+private val partialResultWrapper = wrapperFor(partialResultInspector, 
partialResultDataType)
+
+private val projection = 
UnsafeProjection.create(Array(partialResultDataType))
+
+private val mutableRow = new GenericInternalRow(1)
+
+def serialize(buffer: AggregationBuffer): Array[Byte] = {
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-16 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88311961
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -365,4 +380,66 @@ private[hive] case class HiveUDAFFunction(
 val distinct = if (isDistinct) "DISTINCT " else " "
 s"$name($distinct${children.map(_.sql).mkString(", ")})"
   }
+
+  override def createAggregationBuffer(): AggregationBuffer =
+partial1ModeEvaluator.getNewAggregationBuffer
+
+  @transient
+  private lazy val inputProjection = new InterpretedProjection(children)
+
+  override def update(buffer: AggregationBuffer, input: InternalRow): Unit 
= {
+partial1ModeEvaluator.iterate(
+  buffer, wrap(inputProjection(input), inputWrappers, cached, 
inputDataTypes))
+  }
+
+  override def merge(buffer: AggregationBuffer, input: AggregationBuffer): 
Unit = {
+partial2ModeEvaluator.merge(buffer, 
partial1ModeEvaluator.terminatePartial(input))
+  }
+
+  override def eval(buffer: AggregationBuffer): Any = {
+resultUnwrapper(finalModeEvaluator.terminate(buffer))
+  }
+
+  override def serialize(buffer: AggregationBuffer): Array[Byte] = {
+aggBufferSerDe.serialize(buffer)
+  }
+
+  override def deserialize(bytes: Array[Byte]): AggregationBuffer = {
+aggBufferSerDe.deserialize(bytes)
+  }
+
+  // Helper class used to de/serialize Hive UDAF `AggregationBuffer` 
objects
+  private class AggregationBufferSerDe {
--- End diff --

We can, but it doesn't seem to be necessary. Make it a nested class also 
simplifies implementation since it has access to fields of the outer class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-16 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88311780
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -365,4 +380,66 @@ private[hive] case class HiveUDAFFunction(
 val distinct = if (isDistinct) "DISTINCT " else " "
 s"$name($distinct${children.map(_.sql).mkString(", ")})"
   }
+
+  override def createAggregationBuffer(): AggregationBuffer =
+partial1ModeEvaluator.getNewAggregationBuffer
+
+  @transient
+  private lazy val inputProjection = new InterpretedProjection(children)
+
+  override def update(buffer: AggregationBuffer, input: InternalRow): Unit 
= {
+partial1ModeEvaluator.iterate(
+  buffer, wrap(inputProjection(input), inputWrappers, cached, 
inputDataTypes))
+  }
+
+  override def merge(buffer: AggregationBuffer, input: AggregationBuffer): 
Unit = {
+partial2ModeEvaluator.merge(buffer, 
partial1ModeEvaluator.terminatePartial(input))
+  }
+
+  override def eval(buffer: AggregationBuffer): Any = {
+resultUnwrapper(finalModeEvaluator.terminate(buffer))
+  }
+
+  override def serialize(buffer: AggregationBuffer): Array[Byte] = {
+aggBufferSerDe.serialize(buffer)
+  }
+
+  override def deserialize(bytes: Array[Byte]): AggregationBuffer = {
+aggBufferSerDe.deserialize(bytes)
+  }
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-16 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88311737
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -365,4 +380,66 @@ private[hive] case class HiveUDAFFunction(
 val distinct = if (isDistinct) "DISTINCT " else " "
 s"$name($distinct${children.map(_.sql).mkString(", ")})"
   }
+
+  override def createAggregationBuffer(): AggregationBuffer =
+partial1ModeEvaluator.getNewAggregationBuffer
+
+  @transient
+  private lazy val inputProjection = new InterpretedProjection(children)
+
+  override def update(buffer: AggregationBuffer, input: InternalRow): Unit 
= {
+partial1ModeEvaluator.iterate(
+  buffer, wrap(inputProjection(input), inputWrappers, cached, 
inputDataTypes))
+  }
+
+  override def merge(buffer: AggregationBuffer, input: AggregationBuffer): 
Unit = {
+partial2ModeEvaluator.merge(buffer, 
partial1ModeEvaluator.terminatePartial(input))
--- End diff --

Comment added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-16 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88311655
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -365,4 +380,66 @@ private[hive] case class HiveUDAFFunction(
 val distinct = if (isDistinct) "DISTINCT " else " "
 s"$name($distinct${children.map(_.sql).mkString(", ")})"
   }
+
+  override def createAggregationBuffer(): AggregationBuffer =
+partial1ModeEvaluator.getNewAggregationBuffer
+
+  @transient
+  private lazy val inputProjection = new InterpretedProjection(children)
--- End diff --

Copied from the original code. Replaced with `UnsafeProjection` instead. 
Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-16 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88310719
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -289,73 +302,75 @@ private[hive] case class HiveUDAFFunction(
   funcWrapper.createFunction[AbstractGenericUDAFResolver]()
 }
 
+  // Hive `ObjectInspector`s for all child expressions (input parameters 
of the function).
   @transient
-  private lazy val inspectors = children.map(toInspector).toArray
+  private lazy val inputInspectors = children.map(toInspector).toArray
 
+  // Spark SQL data types of input parameters.
   @transient
-  private lazy val functionAndInspector = {
-val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, 
false, false)
-val f = resolver.getEvaluator(parameterInfo)
-f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
+  private lazy val inputDataTypes: Array[DataType] = 
children.map(_.dataType).toArray
+
+  private def newEvaluator(): GenericUDAFEvaluator = {
+val parameterInfo = new 
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
+resolver.getEvaluator(parameterInfo)
   }
 
+  // The UDAF evaluator used to consume raw input rows and produce partial 
aggregation results.
   @transient
-  private lazy val function = functionAndInspector._1
+  private lazy val partial1ModeEvaluator = newEvaluator()
 
+  // Hive `ObjectInspector` used to inspect partial aggregation results.
--- End diff --

Yea. They are those objects returned by `terminatePartial()`, which is the 
inspectable version of Hive `AggregationBuffer`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-16 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88310296
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -289,73 +302,75 @@ private[hive] case class HiveUDAFFunction(
   funcWrapper.createFunction[AbstractGenericUDAFResolver]()
 }
 
+  // Hive `ObjectInspector`s for all child expressions (input parameters 
of the function).
   @transient
-  private lazy val inspectors = children.map(toInspector).toArray
+  private lazy val inputInspectors = children.map(toInspector).toArray
 
+  // Spark SQL data types of input parameters.
   @transient
-  private lazy val functionAndInspector = {
-val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, 
false, false)
-val f = resolver.getEvaluator(parameterInfo)
-f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
+  private lazy val inputDataTypes: Array[DataType] = 
children.map(_.dataType).toArray
+
+  private def newEvaluator(): GenericUDAFEvaluator = {
+val parameterInfo = new 
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
+resolver.getEvaluator(parameterInfo)
   }
 
+  // The UDAF evaluator used to consume raw input rows and produce partial 
aggregation results.
   @transient
-  private lazy val function = functionAndInspector._1
+  private lazy val partial1ModeEvaluator = newEvaluator()
--- End diff --

Yes. It has to be transient and lazy so that it's also available on 
executor side since Hive UDAF evaluators are not serializable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-16 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88310092
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -263,8 +265,19 @@ private[hive] case class HiveGenericUDTF(
 }
 
 /**
- * Currently we don't support partial aggregation for queries using Hive 
UDAF, which may hurt
- * performance a lot.
+ * While being evaluated by Spark SQL, the aggregation state of a Hive 
UDAF may be in the following
+ * three formats:
+ *
+ *  1. a Spark SQL value, or
+ *  2. an instance of some concrete 
`GenericUDAFEvaluator.AggregationBuffer` class, or
+ *  3. a Java object that can be inspected using the `ObjectInspector` 
returned by the
+ * `GenericUDAFEvaluator.init()` method.
--- End diff --

Thanks, added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88172060
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -365,4 +380,66 @@ private[hive] case class HiveUDAFFunction(
 val distinct = if (isDistinct) "DISTINCT " else " "
 s"$name($distinct${children.map(_.sql).mkString(", ")})"
   }
+
+  override def createAggregationBuffer(): AggregationBuffer =
+partial1ModeEvaluator.getNewAggregationBuffer
+
+  @transient
+  private lazy val inputProjection = new InterpretedProjection(children)
+
+  override def update(buffer: AggregationBuffer, input: InternalRow): Unit 
= {
+partial1ModeEvaluator.iterate(
+  buffer, wrap(inputProjection(input), inputWrappers, cached, 
inputDataTypes))
+  }
+
+  override def merge(buffer: AggregationBuffer, input: AggregationBuffer): 
Unit = {
+partial2ModeEvaluator.merge(buffer, 
partial1ModeEvaluator.terminatePartial(input))
+  }
+
+  override def eval(buffer: AggregationBuffer): Any = {
+resultUnwrapper(finalModeEvaluator.terminate(buffer))
+  }
+
+  override def serialize(buffer: AggregationBuffer): Array[Byte] = {
+aggBufferSerDe.serialize(buffer)
+  }
+
+  override def deserialize(bytes: Array[Byte]): AggregationBuffer = {
+aggBufferSerDe.deserialize(bytes)
+  }
+
+  // Helper class used to de/serialize Hive UDAF `AggregationBuffer` 
objects
+  private class AggregationBufferSerDe {
--- End diff --

Can we take this class out from HiveUDAFFunction?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88171437
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -289,73 +302,75 @@ private[hive] case class HiveUDAFFunction(
   funcWrapper.createFunction[AbstractGenericUDAFResolver]()
 }
 
+  // Hive `ObjectInspector`s for all child expressions (input parameters 
of the function).
   @transient
-  private lazy val inspectors = children.map(toInspector).toArray
+  private lazy val inputInspectors = children.map(toInspector).toArray
 
+  // Spark SQL data types of input parameters.
   @transient
-  private lazy val functionAndInspector = {
-val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, 
false, false)
-val f = resolver.getEvaluator(parameterInfo)
-f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
+  private lazy val inputDataTypes: Array[DataType] = 
children.map(_.dataType).toArray
+
+  private def newEvaluator(): GenericUDAFEvaluator = {
+val parameterInfo = new 
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
+resolver.getEvaluator(parameterInfo)
   }
 
+  // The UDAF evaluator used to consume raw input rows and produce partial 
aggregation results.
   @transient
-  private lazy val function = functionAndInspector._1
+  private lazy val partial1ModeEvaluator = newEvaluator()
 
+  // Hive `ObjectInspector` used to inspect partial aggregation results.
   @transient
-  private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), 
x.dataType)).toArray
+  private val partialResultInspector = partial1ModeEvaluator.init(
+GenericUDAFEvaluator.Mode.PARTIAL1,
+inputInspectors
+  )
 
+  // The UDAF evaluator used to merge partial aggregation results.
   @transient
-  private lazy val returnInspector = functionAndInspector._2
+  private lazy val partial2ModeEvaluator = {
+val evaluator = newEvaluator()
+evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2, 
Array(partialResultInspector))
+evaluator
+  }
 
+  // Spark SQL data type of partial aggregation results
   @transient
-  private lazy val unwrapper = unwrapperFor(returnInspector)
+  private lazy val partialResultDataType = 
inspectorToDataType(partialResultInspector)
 
+  // The UDAF evaluator used to compute the final result from a partial 
aggregation result objects.
   @transient
-  private[this] var buffer: GenericUDAFEvaluator.AggregationBuffer = _
-
-  override def eval(input: InternalRow): Any = 
unwrapper(function.evaluate(buffer))
+  private lazy val finalModeEvaluator = newEvaluator()
 
+  // Hive `ObjectInspector` used to inspect the final aggregation result 
object.
   @transient
-  private lazy val inputProjection = new InterpretedProjection(children)
+  private val returnInspector = finalModeEvaluator.init(
+GenericUDAFEvaluator.Mode.FINAL,
+Array(partialResultInspector)
+  )
 
+  // Wrapper functions used to wrap Spark SQL input arguments into Hive 
specific format.
   @transient
-  private lazy val cached = new Array[AnyRef](children.length)
+  private lazy val inputWrappers = children.map(x => 
wrapperFor(toInspector(x), x.dataType)).toArray
 
+  // Unwrapper function used to unwrap final aggregation result objects 
returned by Hive UDAFs into
+  // Spark SQL specific format.
   @transient
-  private lazy val inputDataTypes: Array[DataType] = 
children.map(_.dataType).toArray
-
-  // Hive UDAF has its own buffer, so we don't need to occupy a slot in 
the aggregation
-  // buffer for it.
-  override def aggBufferSchema: StructType = StructType(Nil)
-
-  override def update(_buffer: InternalRow, input: InternalRow): Unit = {
-val inputs = inputProjection(input)
-function.iterate(buffer, wrap(inputs, wrappers, cached, 
inputDataTypes))
-  }
-
-  override def merge(buffer1: InternalRow, buffer2: InternalRow): Unit = {
-throw new UnsupportedOperationException(
-  "Hive UDAF doesn't support partial aggregate")
-  }
+  private lazy val resultUnwrapper = unwrapperFor(returnInspector)
 
-  override def initialize(_buffer: InternalRow): Unit = {
-buffer = function.getNewAggregationBuffer
-  }
-
-  override val aggBufferAttributes: Seq[AttributeReference] = Nil
+  @transient
+  private lazy val cached: Array[AnyRef] = new 
Array[AnyRef](children.length)
 
-  // Note: although this simply copies aggBufferAttributes, this common 
code can not be placed
-  // in the superclass because that will lead to initialization ordering 
issues.
-  override val inputAggBufferAttributes: 

[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88171373
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -289,73 +302,75 @@ private[hive] case class HiveUDAFFunction(
   funcWrapper.createFunction[AbstractGenericUDAFResolver]()
 }
 
+  // Hive `ObjectInspector`s for all child expressions (input parameters 
of the function).
   @transient
-  private lazy val inspectors = children.map(toInspector).toArray
+  private lazy val inputInspectors = children.map(toInspector).toArray
 
+  // Spark SQL data types of input parameters.
   @transient
-  private lazy val functionAndInspector = {
-val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, 
false, false)
-val f = resolver.getEvaluator(parameterInfo)
-f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
+  private lazy val inputDataTypes: Array[DataType] = 
children.map(_.dataType).toArray
+
+  private def newEvaluator(): GenericUDAFEvaluator = {
+val parameterInfo = new 
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
+resolver.getEvaluator(parameterInfo)
   }
 
+  // The UDAF evaluator used to consume raw input rows and produce partial 
aggregation results.
   @transient
-  private lazy val function = functionAndInspector._1
+  private lazy val partial1ModeEvaluator = newEvaluator()
 
+  // Hive `ObjectInspector` used to inspect partial aggregation results.
--- End diff --

Partial aggregation result is aggregation buffer, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88171285
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -289,73 +302,75 @@ private[hive] case class HiveUDAFFunction(
   funcWrapper.createFunction[AbstractGenericUDAFResolver]()
 }
 
+  // Hive `ObjectInspector`s for all child expressions (input parameters 
of the function).
   @transient
-  private lazy val inspectors = children.map(toInspector).toArray
+  private lazy val inputInspectors = children.map(toInspector).toArray
--- End diff --

Let's add docs to explain when these internal vals are used (like which 
vals are needed for a given mode).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88171097
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -289,73 +302,75 @@ private[hive] case class HiveUDAFFunction(
   funcWrapper.createFunction[AbstractGenericUDAFResolver]()
 }
 
+  // Hive `ObjectInspector`s for all child expressions (input parameters 
of the function).
   @transient
-  private lazy val inspectors = children.map(toInspector).toArray
+  private lazy val inputInspectors = children.map(toInspector).toArray
 
+  // Spark SQL data types of input parameters.
   @transient
-  private lazy val functionAndInspector = {
-val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, 
false, false)
-val f = resolver.getEvaluator(parameterInfo)
-f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
+  private lazy val inputDataTypes: Array[DataType] = 
children.map(_.dataType).toArray
+
+  private def newEvaluator(): GenericUDAFEvaluator = {
+val parameterInfo = new 
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
+resolver.getEvaluator(parameterInfo)
   }
 
+  // The UDAF evaluator used to consume raw input rows and produce partial 
aggregation results.
   @transient
-  private lazy val function = functionAndInspector._1
+  private lazy val partial1ModeEvaluator = newEvaluator()
--- End diff --

Should we always call init to make the code consistent?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88170694
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -263,8 +265,19 @@ private[hive] case class HiveGenericUDTF(
 }
 
 /**
- * Currently we don't support partial aggregation for queries using Hive 
UDAF, which may hurt
- * performance a lot.
+ * While being evaluated by Spark SQL, the aggregation state of a Hive 
UDAF may be in the following
+ * three formats:
+ *
+ *  1. a Spark SQL value, or
+ *  2. an instance of some concrete 
`GenericUDAFEvaluator.AggregationBuffer` class, or
+ *  3. a Java object that can be inspected using the `ObjectInspector` 
returned by the
+ * `GenericUDAFEvaluator.init()` method.
--- End diff --

(is the doc below enough?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88170550
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -263,8 +265,19 @@ private[hive] case class HiveGenericUDTF(
 }
 
 /**
- * Currently we don't support partial aggregation for queries using Hive 
UDAF, which may hurt
- * performance a lot.
+ * While being evaluated by Spark SQL, the aggregation state of a Hive 
UDAF may be in the following
+ * three formats:
+ *
+ *  1. a Spark SQL value, or
+ *  2. an instance of some concrete 
`GenericUDAFEvaluator.AggregationBuffer` class, or
+ *  3. a Java object that can be inspected using the `ObjectInspector` 
returned by the
+ * `GenericUDAFEvaluator.init()` method.
--- End diff --

(we can just put the pr description to here)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88168751
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -263,8 +265,19 @@ private[hive] case class HiveGenericUDTF(
 }
 
 /**
- * Currently we don't support partial aggregation for queries using Hive 
UDAF, which may hurt
- * performance a lot.
+ * While being evaluated by Spark SQL, the aggregation state of a Hive 
UDAF may be in the following
+ * three formats:
+ *
+ *  1. a Spark SQL value, or
+ *  2. an instance of some concrete 
`GenericUDAFEvaluator.AggregationBuffer` class, or
+ *  3. a Java object that can be inspected using the `ObjectInspector` 
returned by the
+ * `GenericUDAFEvaluator.init()` method.
--- End diff --

Besides of explaining what are these three formats, let's also explain when 
we will use each of them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88140760
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -365,4 +380,66 @@ private[hive] case class HiveUDAFFunction(
 val distinct = if (isDistinct) "DISTINCT " else " "
 s"$name($distinct${children.map(_.sql).mkString(", ")})"
   }
+
+  override def createAggregationBuffer(): AggregationBuffer =
+partial1ModeEvaluator.getNewAggregationBuffer
+
+  @transient
+  private lazy val inputProjection = new InterpretedProjection(children)
--- End diff --

Why `InterpretedProjection`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88140970
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -365,4 +380,66 @@ private[hive] case class HiveUDAFFunction(
 val distinct = if (isDistinct) "DISTINCT " else " "
 s"$name($distinct${children.map(_.sql).mkString(", ")})"
   }
+
+  override def createAggregationBuffer(): AggregationBuffer =
+partial1ModeEvaluator.getNewAggregationBuffer
+
+  @transient
+  private lazy val inputProjection = new InterpretedProjection(children)
+
+  override def update(buffer: AggregationBuffer, input: InternalRow): Unit 
= {
+partial1ModeEvaluator.iterate(
+  buffer, wrap(inputProjection(input), inputWrappers, cached, 
inputDataTypes))
+  }
+
+  override def merge(buffer: AggregationBuffer, input: AggregationBuffer): 
Unit = {
+partial2ModeEvaluator.merge(buffer, 
partial1ModeEvaluator.terminatePartial(input))
--- End diff --

Let's explain what we are trying to do using 
`partial1ModeEvaluator.terminatePartial(input)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88141512
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala 
---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, 
GenericUDAFEvaluator, GenericUDAFMax}
+import 
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.{AggregationBuffer, 
Mode}
+import org.apache.hadoop.hive.ql.util.JavaDataModel
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, 
ObjectInspectorFactory}
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+class HiveUDAFSuite extends QueryTest with TestHiveSingleton with 
SQLTestUtils {
+  import testImplicits._
+
+  protected override def beforeAll(): Unit = {
+sql(s"CREATE TEMPORARY FUNCTION mock AS 
'${classOf[MockUDAF].getName}'")
+sql(s"CREATE TEMPORARY FUNCTION hive_max AS 
'${classOf[GenericUDAFMax].getName}'")
+
+Seq(
+  (0: Integer) -> "val_0",
+  (1: Integer) -> "val_1",
+  (2: Integer) -> null,
+  (3: Integer) -> null
+).toDF("key", "value").repartition(2).createOrReplaceTempView("t")
+  }
+
+  protected override def afterAll(): Unit = {
+sql(s"DROP TEMPORARY FUNCTION IF EXISTS mock")
+sql(s"DROP TEMPORARY FUNCTION IF EXISTS hive_max")
+  }
+
+  test("built-in Hive UDAF") {
+val df = sql("SELECT hive_max(key) FROM t GROUP BY key % 2")
--- End diff --

Also keep the key?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88140933
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -365,4 +380,66 @@ private[hive] case class HiveUDAFFunction(
 val distinct = if (isDistinct) "DISTINCT " else " "
 s"$name($distinct${children.map(_.sql).mkString(", ")})"
   }
+
+  override def createAggregationBuffer(): AggregationBuffer =
+partial1ModeEvaluator.getNewAggregationBuffer
+
+  @transient
+  private lazy val inputProjection = new InterpretedProjection(children)
+
+  override def update(buffer: AggregationBuffer, input: InternalRow): Unit 
= {
+partial1ModeEvaluator.iterate(
+  buffer, wrap(inputProjection(input), inputWrappers, cached, 
inputDataTypes))
+  }
+
+  override def merge(buffer: AggregationBuffer, input: AggregationBuffer): 
Unit = {
+partial2ModeEvaluator.merge(buffer, 
partial1ModeEvaluator.terminatePartial(input))
+  }
+
+  override def eval(buffer: AggregationBuffer): Any = {
+resultUnwrapper(finalModeEvaluator.terminate(buffer))
+  }
+
+  override def serialize(buffer: AggregationBuffer): Array[Byte] = {
+aggBufferSerDe.serialize(buffer)
+  }
+
+  override def deserialize(bytes: Array[Byte]): AggregationBuffer = {
+aggBufferSerDe.deserialize(bytes)
+  }
--- End diff --

Let's add docs to explain what these functions are doing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88141492
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala 
---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, 
GenericUDAFEvaluator, GenericUDAFMax}
+import 
org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.{AggregationBuffer, 
Mode}
+import org.apache.hadoop.hive.ql.util.JavaDataModel
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, 
ObjectInspectorFactory}
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+class HiveUDAFSuite extends QueryTest with TestHiveSingleton with 
SQLTestUtils {
+  import testImplicits._
+
+  protected override def beforeAll(): Unit = {
+sql(s"CREATE TEMPORARY FUNCTION mock AS 
'${classOf[MockUDAF].getName}'")
+sql(s"CREATE TEMPORARY FUNCTION hive_max AS 
'${classOf[GenericUDAFMax].getName}'")
+
+Seq(
+  (0: Integer) -> "val_0",
+  (1: Integer) -> "val_1",
+  (2: Integer) -> null,
+  (3: Integer) -> null
+).toDF("key", "value").repartition(2).createOrReplaceTempView("t")
+  }
+
+  protected override def afterAll(): Unit = {
+sql(s"DROP TEMPORARY FUNCTION IF EXISTS mock")
+sql(s"DROP TEMPORARY FUNCTION IF EXISTS hive_max")
+  }
+
+  test("built-in Hive UDAF") {
+val df = sql("SELECT hive_max(key) FROM t GROUP BY key % 2")
+
+val aggs = df.queryExecution.executedPlan.collect {
+  case agg: ObjectHashAggregateExec => agg
+}
+
+// There should be two aggregate operators, one for partial 
aggregation, and the other for
+// global aggregation.
+assert(aggs.length == 2)
+
+checkAnswer(df, Seq(
+  Row(2),
+  Row(3)
+))
+  }
+
+  test("customized Hive UDAF") {
+val df = sql("SELECT mock(value) FROM t GROUP BY key % 2")
--- End diff --

How about we also keep the key?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88140713
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -289,73 +302,75 @@ private[hive] case class HiveUDAFFunction(
   funcWrapper.createFunction[AbstractGenericUDAFResolver]()
 }
 
+  // Hive `ObjectInspector`s for all child expressions (input parameters 
of the function).
   @transient
-  private lazy val inspectors = children.map(toInspector).toArray
+  private lazy val inputInspectors = children.map(toInspector).toArray
 
+  // Spark SQL data types of input parameters.
   @transient
-  private lazy val functionAndInspector = {
-val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, 
false, false)
-val f = resolver.getEvaluator(parameterInfo)
-f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
+  private lazy val inputDataTypes: Array[DataType] = 
children.map(_.dataType).toArray
+
+  private def newEvaluator(): GenericUDAFEvaluator = {
+val parameterInfo = new 
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
+resolver.getEvaluator(parameterInfo)
   }
 
+  // The UDAF evaluator used to consume raw input rows and produce partial 
aggregation results.
   @transient
-  private lazy val function = functionAndInspector._1
+  private lazy val partial1ModeEvaluator = newEvaluator()
 
+  // Hive `ObjectInspector` used to inspect partial aggregation results.
   @transient
-  private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), 
x.dataType)).toArray
+  private val partialResultInspector = partial1ModeEvaluator.init(
+GenericUDAFEvaluator.Mode.PARTIAL1,
+inputInspectors
+  )
 
+  // The UDAF evaluator used to merge partial aggregation results.
   @transient
-  private lazy val returnInspector = functionAndInspector._2
+  private lazy val partial2ModeEvaluator = {
+val evaluator = newEvaluator()
+evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2, 
Array(partialResultInspector))
+evaluator
+  }
 
+  // Spark SQL data type of partial aggregation results
   @transient
-  private lazy val unwrapper = unwrapperFor(returnInspector)
+  private lazy val partialResultDataType = 
inspectorToDataType(partialResultInspector)
 
+  // The UDAF evaluator used to compute the final result from a partial 
aggregation result objects.
   @transient
-  private[this] var buffer: GenericUDAFEvaluator.AggregationBuffer = _
-
-  override def eval(input: InternalRow): Any = 
unwrapper(function.evaluate(buffer))
+  private lazy val finalModeEvaluator = newEvaluator()
 
+  // Hive `ObjectInspector` used to inspect the final aggregation result 
object.
   @transient
-  private lazy val inputProjection = new InterpretedProjection(children)
+  private val returnInspector = finalModeEvaluator.init(
+GenericUDAFEvaluator.Mode.FINAL,
+Array(partialResultInspector)
+  )
 
+  // Wrapper functions used to wrap Spark SQL input arguments into Hive 
specific format.
   @transient
-  private lazy val cached = new Array[AnyRef](children.length)
+  private lazy val inputWrappers = children.map(x => 
wrapperFor(toInspector(x), x.dataType)).toArray
 
+  // Unwrapper function used to unwrap final aggregation result objects 
returned by Hive UDAFs into
+  // Spark SQL specific format.
   @transient
-  private lazy val inputDataTypes: Array[DataType] = 
children.map(_.dataType).toArray
-
-  // Hive UDAF has its own buffer, so we don't need to occupy a slot in 
the aggregation
-  // buffer for it.
-  override def aggBufferSchema: StructType = StructType(Nil)
-
-  override def update(_buffer: InternalRow, input: InternalRow): Unit = {
-val inputs = inputProjection(input)
-function.iterate(buffer, wrap(inputs, wrappers, cached, 
inputDataTypes))
-  }
-
-  override def merge(buffer1: InternalRow, buffer2: InternalRow): Unit = {
-throw new UnsupportedOperationException(
-  "Hive UDAF doesn't support partial aggregate")
-  }
+  private lazy val resultUnwrapper = unwrapperFor(returnInspector)
 
-  override def initialize(_buffer: InternalRow): Unit = {
-buffer = function.getNewAggregationBuffer
-  }
-
-  override val aggBufferAttributes: Seq[AttributeReference] = Nil
+  @transient
+  private lazy val cached: Array[AnyRef] = new 
Array[AnyRef](children.length)
 
-  // Note: although this simply copies aggBufferAttributes, this common 
code can not be placed
-  // in the superclass because that will lead to initialization ordering 
issues.
-  override val inputAggBufferAttributes: 

[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88141072
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -365,4 +380,66 @@ private[hive] case class HiveUDAFFunction(
 val distinct = if (isDistinct) "DISTINCT " else " "
 s"$name($distinct${children.map(_.sql).mkString(", ")})"
   }
+
+  override def createAggregationBuffer(): AggregationBuffer =
+partial1ModeEvaluator.getNewAggregationBuffer
+
+  @transient
+  private lazy val inputProjection = new InterpretedProjection(children)
+
+  override def update(buffer: AggregationBuffer, input: InternalRow): Unit 
= {
+partial1ModeEvaluator.iterate(
+  buffer, wrap(inputProjection(input), inputWrappers, cached, 
inputDataTypes))
+  }
+
+  override def merge(buffer: AggregationBuffer, input: AggregationBuffer): 
Unit = {
+partial2ModeEvaluator.merge(buffer, 
partial1ModeEvaluator.terminatePartial(input))
+  }
+
+  override def eval(buffer: AggregationBuffer): Any = {
+resultUnwrapper(finalModeEvaluator.terminate(buffer))
+  }
+
+  override def serialize(buffer: AggregationBuffer): Array[Byte] = {
+aggBufferSerDe.serialize(buffer)
+  }
+
+  override def deserialize(bytes: Array[Byte]): AggregationBuffer = {
+aggBufferSerDe.deserialize(bytes)
+  }
+
+  // Helper class used to de/serialize Hive UDAF `AggregationBuffer` 
objects
+  private class AggregationBufferSerDe {
+private val partialResultUnwrapper = 
unwrapperFor(partialResultInspector)
+
+private val partialResultWrapper = wrapperFor(partialResultInspector, 
partialResultDataType)
+
+private val projection = 
UnsafeProjection.create(Array(partialResultDataType))
+
+private val mutableRow = new GenericInternalRow(1)
+
+def serialize(buffer: AggregationBuffer): Array[Byte] = {
--- End diff --

doc?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88140381
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -289,73 +302,75 @@ private[hive] case class HiveUDAFFunction(
   funcWrapper.createFunction[AbstractGenericUDAFResolver]()
 }
 
+  // Hive `ObjectInspector`s for all child expressions (input parameters 
of the function).
   @transient
-  private lazy val inspectors = children.map(toInspector).toArray
+  private lazy val inputInspectors = children.map(toInspector).toArray
 
+  // Spark SQL data types of input parameters.
   @transient
-  private lazy val functionAndInspector = {
-val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, 
false, false)
-val f = resolver.getEvaluator(parameterInfo)
-f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
+  private lazy val inputDataTypes: Array[DataType] = 
children.map(_.dataType).toArray
+
+  private def newEvaluator(): GenericUDAFEvaluator = {
+val parameterInfo = new 
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
+resolver.getEvaluator(parameterInfo)
   }
 
+  // The UDAF evaluator used to consume raw input rows and produce partial 
aggregation results.
   @transient
-  private lazy val function = functionAndInspector._1
+  private lazy val partial1ModeEvaluator = newEvaluator()
--- End diff --

Do we need to make it a lazy val since partialResultInspector is uses it 
right below?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r87329510
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -365,4 +382,66 @@ private[hive] case class HiveUDAFFunction(
 val distinct = if (isDistinct) "DISTINCT " else " "
 s"$name($distinct${children.map(_.sql).mkString(", ")})"
   }
+
+  override def createAggregationBuffer(): AggregationBuffer =
+partial1ModeEvaluator.getNewAggregationBuffer
+
+  @transient
+  private lazy val inputProjection = new InterpretedProjection(children)
+
+  override def update(buffer: AggregationBuffer, input: InternalRow): Unit 
= {
+partial1ModeEvaluator.iterate(
+  buffer, wrap(inputProjection(input), inputWrappers, cached, 
inputDataTypes))
+  }
+
+  override def merge(buffer: AggregationBuffer, input: AggregationBuffer): 
Unit = {
+partial2ModeEvaluator.merge(buffer, 
partial1ModeEvaluator.terminatePartial(input))
+  }
+
+  override def eval(buffer: AggregationBuffer): Any = {
+resultUnwrapper(finalModeEvaluator.terminate(buffer))
+  }
+
+  override def serialize(buffer: AggregationBuffer): Array[Byte] = {
+aggBufferSerDe.serialize(buffer)
+  }
+
+  override def deserialize(bytes: Array[Byte]): AggregationBuffer = {
+aggBufferSerDe.deserialize(bytes)
+  }
+
+  // Helper class used to de/serialize Hive UDAF `AggregationBuffer` 
objects
+  private class AggregationBufferSerDe {
+private val partialResultUnwrapper = 
unwrapperFor(partialResultInspector)
+
+private val partialResultWrapper = wrapperFor(partialResultInspector, 
partialResultDataType)
+
+private val projection = 
UnsafeProjection.create(Array(partialResultDataType))
--- End diff --

I tried to use unsafe projection in `percentile_approx` before, but failed 
in spark shell, maybe it's a different problem, nvm.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-09 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r87309805
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -289,73 +302,77 @@ private[hive] case class HiveUDAFFunction(
   funcWrapper.createFunction[AbstractGenericUDAFResolver]()
 }
 
+  // Hive `ObjectInspector`s for all child expressions (input parameters 
of the function).
   @transient
-  private lazy val inspectors = children.map(toInspector).toArray
+  private lazy val inputInspectors = children.map(toInspector).toArray
 
+  // Spark SQL data types of input parameters.
   @transient
-  private lazy val functionAndInspector = {
-val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, 
false, false)
-val f = resolver.getEvaluator(parameterInfo)
-f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
-  }
+  private lazy val inputDataTypes: Array[DataType] = 
children.map(_.dataType).toArray
 
+  // The UDAF evaluator used to consume raw input rows and produce partial 
aggregation results.
   @transient
-  private lazy val function = functionAndInspector._1
+  private lazy val partial1ModeEvaluator = {
+val parameterInfo = new 
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
+resolver.getEvaluator(parameterInfo)
+  }
 
+  // Hive `ObjectInspector` used to inspect partial aggregation results.
   @transient
-  private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), 
x.dataType)).toArray
+  private val partialResultInspector = partial1ModeEvaluator.init(
+GenericUDAFEvaluator.Mode.PARTIAL1,
+inputInspectors
+  )
 
+  // The UDAF evaluator used to merge partial aggregation results.
   @transient
-  private lazy val returnInspector = functionAndInspector._2
+  private lazy val partial2ModeEvaluator = {
+val parameterInfo = new 
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
+val evaluator = resolver.getEvaluator(parameterInfo)
+evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2, 
Array(partialResultInspector))
+evaluator
+  }
 
+  // Spark SQL data type of partial aggregation results
   @transient
-  private lazy val unwrapper = unwrapperFor(returnInspector)
+  private lazy val partialResultDataType = 
inspectorToDataType(partialResultInspector)
 
+  // The UDAF evaluator used to compute the final result from a partial 
aggregation result objects.
   @transient
-  private[this] var buffer: GenericUDAFEvaluator.AggregationBuffer = _
-
-  override def eval(input: InternalRow): Any = 
unwrapper(function.evaluate(buffer))
+  private lazy val finalModeEvaluator = {
+val parameterInfo = new 
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
+resolver.getEvaluator(parameterInfo)
--- End diff --

Good point, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-09 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r87309760
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -365,4 +382,66 @@ private[hive] case class HiveUDAFFunction(
 val distinct = if (isDistinct) "DISTINCT " else " "
 s"$name($distinct${children.map(_.sql).mkString(", ")})"
   }
+
+  override def createAggregationBuffer(): AggregationBuffer =
+partial1ModeEvaluator.getNewAggregationBuffer
+
+  @transient
+  private lazy val inputProjection = new InterpretedProjection(children)
+
+  override def update(buffer: AggregationBuffer, input: InternalRow): Unit 
= {
+partial1ModeEvaluator.iterate(
+  buffer, wrap(inputProjection(input), inputWrappers, cached, 
inputDataTypes))
+  }
+
+  override def merge(buffer: AggregationBuffer, input: AggregationBuffer): 
Unit = {
+partial2ModeEvaluator.merge(buffer, 
partial1ModeEvaluator.terminatePartial(input))
+  }
+
+  override def eval(buffer: AggregationBuffer): Any = {
+resultUnwrapper(finalModeEvaluator.terminate(buffer))
+  }
+
+  override def serialize(buffer: AggregationBuffer): Array[Byte] = {
+aggBufferSerDe.serialize(buffer)
+  }
+
+  override def deserialize(bytes: Array[Byte]): AggregationBuffer = {
+aggBufferSerDe.deserialize(bytes)
+  }
+
+  // Helper class used to de/serialize Hive UDAF `AggregationBuffer` 
objects
+  private class AggregationBufferSerDe {
+private val partialResultUnwrapper = 
unwrapperFor(partialResultInspector)
+
+private val partialResultWrapper = wrapperFor(partialResultInspector, 
partialResultDataType)
+
+private val projection = 
UnsafeProjection.create(Array(partialResultDataType))
--- End diff --

It does work as expected:

```
scala> sql("CREATE TEMPORARY FUNCTION hive_max AS 
'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax'")
res0: org.apache.spark.sql.DataFrame = []

scala> spark.range(100).createOrReplaceTempView("t")

scala> sql("SELECT hive_max(id) FROM t").explain()
== Physical Plan ==
SortAggregate(key=[], functions=[hive_max(hive_max, 
HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax@144792d5),
 id#1L, false, 0, 0)])
+- Exchange SinglePartition
   +- SortAggregate(key=[], functions=[partial_hive_max(hive_max, 
HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax@144792d5),
 id#1L, false, 0, 0)])
  +- *Range (0, 100, step=1, splits=Some(8))

scala> sql("SELECT hive_max(id) FROM t").show()
+-+
|hive_max( id)|
+-+
|   99|
+-+
```

Why do you think `UnsafeProjection` can't be used here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r86487495
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -365,4 +382,66 @@ private[hive] case class HiveUDAFFunction(
 val distinct = if (isDistinct) "DISTINCT " else " "
 s"$name($distinct${children.map(_.sql).mkString(", ")})"
   }
+
+  override def createAggregationBuffer(): AggregationBuffer =
+partial1ModeEvaluator.getNewAggregationBuffer
+
+  @transient
+  private lazy val inputProjection = new InterpretedProjection(children)
+
+  override def update(buffer: AggregationBuffer, input: InternalRow): Unit 
= {
+partial1ModeEvaluator.iterate(
+  buffer, wrap(inputProjection(input), inputWrappers, cached, 
inputDataTypes))
+  }
+
+  override def merge(buffer: AggregationBuffer, input: AggregationBuffer): 
Unit = {
+partial2ModeEvaluator.merge(buffer, 
partial1ModeEvaluator.terminatePartial(input))
+  }
+
+  override def eval(buffer: AggregationBuffer): Any = {
+resultUnwrapper(finalModeEvaluator.terminate(buffer))
+  }
+
+  override def serialize(buffer: AggregationBuffer): Array[Byte] = {
+aggBufferSerDe.serialize(buffer)
+  }
+
+  override def deserialize(bytes: Array[Byte]): AggregationBuffer = {
+aggBufferSerDe.deserialize(bytes)
+  }
+
+  // Helper class used to de/serialize Hive UDAF `AggregationBuffer` 
objects
+  private class AggregationBufferSerDe {
+private val partialResultUnwrapper = 
unwrapperFor(partialResultInspector)
+
+private val partialResultWrapper = wrapperFor(partialResultInspector, 
partialResultDataType)
+
+private val projection = 
UnsafeProjection.create(Array(partialResultDataType))
--- End diff --

Can you try to run hive udaf in spark shell? IIUC, we can't create unsafe 
projection inside UDAF


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r86487127
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -289,73 +302,77 @@ private[hive] case class HiveUDAFFunction(
   funcWrapper.createFunction[AbstractGenericUDAFResolver]()
 }
 
+  // Hive `ObjectInspector`s for all child expressions (input parameters 
of the function).
   @transient
-  private lazy val inspectors = children.map(toInspector).toArray
+  private lazy val inputInspectors = children.map(toInspector).toArray
 
+  // Spark SQL data types of input parameters.
   @transient
-  private lazy val functionAndInspector = {
-val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, 
false, false)
-val f = resolver.getEvaluator(parameterInfo)
-f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
-  }
+  private lazy val inputDataTypes: Array[DataType] = 
children.map(_.dataType).toArray
 
+  // The UDAF evaluator used to consume raw input rows and produce partial 
aggregation results.
   @transient
-  private lazy val function = functionAndInspector._1
+  private lazy val partial1ModeEvaluator = {
+val parameterInfo = new 
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
+resolver.getEvaluator(parameterInfo)
+  }
 
+  // Hive `ObjectInspector` used to inspect partial aggregation results.
   @transient
-  private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), 
x.dataType)).toArray
+  private val partialResultInspector = partial1ModeEvaluator.init(
+GenericUDAFEvaluator.Mode.PARTIAL1,
+inputInspectors
+  )
 
+  // The UDAF evaluator used to merge partial aggregation results.
   @transient
-  private lazy val returnInspector = functionAndInspector._2
+  private lazy val partial2ModeEvaluator = {
+val parameterInfo = new 
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
+val evaluator = resolver.getEvaluator(parameterInfo)
+evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2, 
Array(partialResultInspector))
+evaluator
+  }
 
+  // Spark SQL data type of partial aggregation results
   @transient
-  private lazy val unwrapper = unwrapperFor(returnInspector)
+  private lazy val partialResultDataType = 
inspectorToDataType(partialResultInspector)
 
+  // The UDAF evaluator used to compute the final result from a partial 
aggregation result objects.
   @transient
-  private[this] var buffer: GenericUDAFEvaluator.AggregationBuffer = _
-
-  override def eval(input: InternalRow): Any = 
unwrapper(function.evaluate(buffer))
+  private lazy val finalModeEvaluator = {
+val parameterInfo = new 
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
+resolver.getEvaluator(parameterInfo)
--- End diff --

These 2 lines are duplicated many times, should we abstract them to a 
method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-01 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r85987778
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -293,69 +307,57 @@ private[hive] case class HiveUDAFFunction(
   private lazy val inspectors = children.map(toInspector).toArray
 
   @transient
-  private lazy val functionAndInspector = {
+  private lazy val function = {
 val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, 
false, false)
-val f = resolver.getEvaluator(parameterInfo)
-f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
+resolver.getEvaluator(parameterInfo)
   }
 
   @transient
-  private lazy val function = functionAndInspector._1
+  private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), 
x.dataType)).toArray
 
   @transient
-  private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), 
x.dataType)).toArray
+  private lazy val returnInspector =
+function.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
 
   @transient
-  private lazy val returnInspector = functionAndInspector._2
+  private lazy val partialResultInspector =
+function.init(GenericUDAFEvaluator.Mode.PARTIAL1, inspectors)
+
+  // The following two lines initializes `function: GenericUDAFEvaluator` 
eagerly. These two fields
--- End diff --

Confirmed that we can remove this hack. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-01 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r85981118
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -293,69 +307,57 @@ private[hive] case class HiveUDAFFunction(
   private lazy val inspectors = children.map(toInspector).toArray
 
   @transient
-  private lazy val functionAndInspector = {
+  private lazy val function = {
 val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, 
false, false)
-val f = resolver.getEvaluator(parameterInfo)
-f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
+resolver.getEvaluator(parameterInfo)
   }
 
   @transient
-  private lazy val function = functionAndInspector._1
+  private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), 
x.dataType)).toArray
 
   @transient
-  private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), 
x.dataType)).toArray
+  private lazy val returnInspector =
+function.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
 
   @transient
-  private lazy val returnInspector = functionAndInspector._2
+  private lazy val partialResultInspector =
+function.init(GenericUDAFEvaluator.Mode.PARTIAL1, inspectors)
+
+  // The following two lines initializes `function: GenericUDAFEvaluator` 
eagerly. These two fields
--- End diff --

Good point, I didn't check whether these two fields are really used on 
executor side. If not, it's true that we can remove the `lazy` modifier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-10-31 Thread liancheng
GitHub user liancheng opened a pull request:

https://github.com/apache/spark/pull/15703

[SPARK-18186] Migrate HiveUDAFFunction to TypedImperativeAggregate for 
partial aggregation support

## What changes were proposed in this pull request?

This PR migrates `HiveUDAFFunction` to `TypedImperativeAggregate` for 
partial aggregation support.

## How was this patch tested?

New test suite `HiveUDAFSuite` is added.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/liancheng/spark partial-agg-hive-udaf

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15703.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15703


commit deb7a2700cfbc20aa1b5b7fb52b70306a429a940
Author: Cheng Lian 
Date:   2016-10-28T22:36:35Z

Initial draft

commit c0029f1a529935c263f9c83691cf84921b343e67
Author: Cheng Lian 
Date:   2016-10-31T22:23:28Z

Fix test failures




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org