Repository: spark Updated Branches: refs/heads/branch-2.0 d8370ef11 -> 790de600b
[SPARK-15704][SQL] add a test case in DatasetAggregatorSuite for regression testing ## What changes were proposed in this pull request? This change fixes a crash in TungstenAggregate while executing "Dataset complex Aggregator" test case due to IndexOutOfBoundsException. jira entry for detail: https://issues.apache.org/jira/browse/SPARK-15704 ## How was this patch tested? Using existing unit tests (including DatasetBenchmark) Author: Hiroshi Inoue <inoue...@jp.ibm.com> Closes #13446 from inouehrs/fix_aggregate. (cherry picked from commit 79268aa461abd237bc4f96a7d31457c98e11798c) Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/790de600 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/790de600 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/790de600 Branch: refs/heads/branch-2.0 Commit: 790de600beb3f6cae1914f59a61a43c02440884f Parents: d8370ef Author: Hiroshi Inoue <inoue...@jp.ibm.com> Authored: Sun Jun 5 20:10:33 2016 -0700 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Sun Jun 5 20:10:39 2016 -0700 ---------------------------------------------------------------------- .../spark/sql/DatasetAggregatorSuite.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/790de600/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala index ead7bd9..f9b4cd8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StringType object ComplexResultAgg extends Aggregator[(String, Int), (Long, Long), (Long, Long)] { @@ -52,6 +53,16 @@ object ClassInputAgg extends Aggregator[AggData, Int, Int] { } +object ClassBufferAggregator extends Aggregator[AggData, AggData, Int] { + override def zero: AggData = AggData(0, "") + override def reduce(b: AggData, a: AggData): AggData = AggData(b.a + a.a, "") + override def finish(reduction: AggData): Int = reduction.a + override def merge(b1: AggData, b2: AggData): AggData = AggData(b1.a + b2.a, "") + override def bufferEncoder: Encoder[AggData] = Encoders.product[AggData] + override def outputEncoder: Encoder[Int] = Encoders.scalaInt +} + + object ComplexBufferAgg extends Aggregator[AggData, (Int, AggData), Int] { override def zero: (Int, AggData) = 0 -> AggData(0, "0") override def reduce(b: (Int, AggData), a: AggData): (Int, AggData) = (b._1 + 1, a) @@ -173,6 +184,14 @@ class DatasetAggregatorSuite extends QueryTest with SharedSQLContext { ("one", 1)) } + test("Typed aggregation using aggregator") { + // based on Dataset complex Aggregator test of DatasetBenchmark + val ds = Seq(AggData(1, "x"), AggData(2, "y"), AggData(3, "z")).toDS() + checkDataset( + ds.select(ClassBufferAggregator.toColumn), + 6) + } + test("typed aggregation: complex input") { val ds = Seq(AggData(1, "one"), AggData(2, "two")).toDS() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org