Repository: spark
Updated Branches:
  refs/heads/branch-2.0 d8370ef11 -> 790de600b


[SPARK-15704][SQL] add a test case in DatasetAggregatorSuite for regression 
testing

## What changes were proposed in this pull request?

This change fixes a crash in TungstenAggregate while executing "Dataset complex 
Aggregator" test case due to IndexOutOfBoundsException.

jira entry for detail: https://issues.apache.org/jira/browse/SPARK-15704

## How was this patch tested?
Using existing unit tests (including DatasetBenchmark)

Author: Hiroshi Inoue <inoue...@jp.ibm.com>

Closes #13446 from inouehrs/fix_aggregate.

(cherry picked from commit 79268aa461abd237bc4f96a7d31457c98e11798c)
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/790de600
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/790de600
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/790de600

Branch: refs/heads/branch-2.0
Commit: 790de600beb3f6cae1914f59a61a43c02440884f
Parents: d8370ef
Author: Hiroshi Inoue <inoue...@jp.ibm.com>
Authored: Sun Jun 5 20:10:33 2016 -0700
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Sun Jun 5 20:10:39 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/DatasetAggregatorSuite.scala       | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/790de600/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
index ead7bd9..f9b4cd8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.expressions.Aggregator
 import org.apache.spark.sql.expressions.scalalang.typed
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StringType
 
 
 object ComplexResultAgg extends Aggregator[(String, Int), (Long, Long), (Long, 
Long)] {
@@ -52,6 +53,16 @@ object ClassInputAgg extends Aggregator[AggData, Int, Int] {
 }
 
 
+object ClassBufferAggregator extends Aggregator[AggData, AggData, Int] {
+  override def zero: AggData = AggData(0, "")
+  override def reduce(b: AggData, a: AggData): AggData = AggData(b.a + a.a, "")
+  override def finish(reduction: AggData): Int = reduction.a
+  override def merge(b1: AggData, b2: AggData): AggData = AggData(b1.a + b2.a, 
"")
+  override def bufferEncoder: Encoder[AggData] = Encoders.product[AggData]
+  override def outputEncoder: Encoder[Int] = Encoders.scalaInt
+}
+
+
 object ComplexBufferAgg extends Aggregator[AggData, (Int, AggData), Int] {
   override def zero: (Int, AggData) = 0 -> AggData(0, "0")
   override def reduce(b: (Int, AggData), a: AggData): (Int, AggData) = (b._1 + 
1, a)
@@ -173,6 +184,14 @@ class DatasetAggregatorSuite extends QueryTest with 
SharedSQLContext {
       ("one", 1))
   }
 
+  test("Typed aggregation using aggregator") {
+    // based on Dataset complex Aggregator test of DatasetBenchmark
+    val ds = Seq(AggData(1, "x"), AggData(2, "y"), AggData(3, "z")).toDS()
+    checkDataset(
+      ds.select(ClassBufferAggregator.toColumn),
+      6)
+  }
+
   test("typed aggregation: complex input") {
     val ds = Seq(AggData(1, "one"), AggData(2, "two")).toDS()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to