spark git commit: [SPARK-12068][SQL] use a single column in Dataset.groupBy and count will fail

2015-12-01 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 9b99b2b46 -> 6e3e3c648


[SPARK-12068][SQL] use a single column in Dataset.groupBy and count will fail

The reason is that, for a single culumn `RowEncoder`(or a single field product 
encoder), when we use it as the encoder for grouping key, we should also 
combine the grouping attributes, although there is only one grouping attribute.

Author: Wenchen Fan 

Closes #10059 from cloud-fan/bug.

(cherry picked from commit 8ddc55f1d5823ca135510b2ea776e889e481)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e3e3c64
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e3e3c64
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e3e3c64

Branch: refs/heads/branch-1.6
Commit: 6e3e3c648c4f74d9c1aabe767dbadfe47bd7e658
Parents: 9b99b2b
Author: Wenchen Fan 
Authored: Tue Dec 1 10:22:55 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Dec 1 10:23:17 2015 -0800

--
 .../scala/org/apache/spark/sql/Dataset.scala |  2 +-
 .../org/apache/spark/sql/GroupedDataset.scala|  7 ---
 .../org/apache/spark/sql/DatasetSuite.scala  | 19 +++
 .../scala/org/apache/spark/sql/QueryTest.scala   |  6 +++---
 4 files changed, 27 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6e3e3c64/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index da46001..c357f88 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -70,7 +70,7 @@ class Dataset[T] private[sql](
* implicit so that we can use it when constructing new [[Dataset]] objects 
that have the same
* object type (that will be possibly resolved to a different schema).
*/
-  private implicit val unresolvedTEncoder: ExpressionEncoder[T] = 
encoderFor(tEncoder)
+  private[sql] implicit val unresolvedTEncoder: ExpressionEncoder[T] = 
encoderFor(tEncoder)
 
   /** The encoder for this [[Dataset]] that has been resolved to its output 
schema. */
   private[sql] val resolvedTEncoder: ExpressionEncoder[T] =

http://git-wip-us.apache.org/repos/asf/spark/blob/6e3e3c64/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
index a10a893..4bf0b25 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
@@ -228,10 +228,11 @@ class GroupedDataset[K, V] private[sql](
 val namedColumns =
   columns.map(
 _.withInputType(resolvedVEncoder, dataAttributes).named)
-val keyColumn = if (groupingAttributes.length > 1) {
-  Alias(CreateStruct(groupingAttributes), "key")()
-} else {
+val keyColumn = if (resolvedKEncoder.flat) {
+  assert(groupingAttributes.length == 1)
   groupingAttributes.head
+} else {
+  Alias(CreateStruct(groupingAttributes), "key")()
 }
 val aggregate = Aggregate(groupingAttributes, keyColumn +: namedColumns, 
logicalPlan)
 val execution = new QueryExecution(sqlContext, aggregate)

http://git-wip-us.apache.org/repos/asf/spark/blob/6e3e3c64/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 7d53918..a2c8d20 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -272,6 +272,16 @@ class DatasetSuite extends QueryTest with SharedSQLContext 
{
   3 -> "abcxyz", 5 -> "hello")
   }
 
+  test("groupBy single field class, count") {
+val ds = Seq("abc", "xyz", "hello").toDS()
+val count = ds.groupBy(s => Tuple1(s.length)).count()
+
+checkAnswer(
+  count,
+  (Tuple1(3), 2L), (Tuple1(5), 1L)
+)
+  }
+
   test("groupBy columns, map") {
 val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
 val grouped = ds.groupBy($"_1")
@@ -282,6 +292,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext 
{
   ("a", 30), ("b", 3), ("c", 1))
   }
 
+  test("groupBy columns, count") {
+val ds = Seq("a" -> 1, "b" -> 1, "a" -> 2).toDS()
+val count = ds.groupBy($

spark git commit: [SPARK-12068][SQL] use a single column in Dataset.groupBy and count will fail

2015-12-01 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 69dbe6b40 -> 8ddc55f1d


[SPARK-12068][SQL] use a single column in Dataset.groupBy and count will fail

The reason is that, for a single culumn `RowEncoder`(or a single field product 
encoder), when we use it as the encoder for grouping key, we should also 
combine the grouping attributes, although there is only one grouping attribute.

Author: Wenchen Fan 

Closes #10059 from cloud-fan/bug.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ddc55f1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ddc55f1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ddc55f1

Branch: refs/heads/master
Commit: 8ddc55f1d5823ca135510b2ea776e889e481
Parents: 69dbe6b
Author: Wenchen Fan 
Authored: Tue Dec 1 10:22:55 2015 -0800
Committer: Michael Armbrust 
Committed: Tue Dec 1 10:22:55 2015 -0800

--
 .../scala/org/apache/spark/sql/Dataset.scala |  2 +-
 .../org/apache/spark/sql/GroupedDataset.scala|  7 ---
 .../org/apache/spark/sql/DatasetSuite.scala  | 19 +++
 .../scala/org/apache/spark/sql/QueryTest.scala   |  6 +++---
 4 files changed, 27 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8ddc55f1/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index da46001..c357f88 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -70,7 +70,7 @@ class Dataset[T] private[sql](
* implicit so that we can use it when constructing new [[Dataset]] objects 
that have the same
* object type (that will be possibly resolved to a different schema).
*/
-  private implicit val unresolvedTEncoder: ExpressionEncoder[T] = 
encoderFor(tEncoder)
+  private[sql] implicit val unresolvedTEncoder: ExpressionEncoder[T] = 
encoderFor(tEncoder)
 
   /** The encoder for this [[Dataset]] that has been resolved to its output 
schema. */
   private[sql] val resolvedTEncoder: ExpressionEncoder[T] =

http://git-wip-us.apache.org/repos/asf/spark/blob/8ddc55f1/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
index a10a893..4bf0b25 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
@@ -228,10 +228,11 @@ class GroupedDataset[K, V] private[sql](
 val namedColumns =
   columns.map(
 _.withInputType(resolvedVEncoder, dataAttributes).named)
-val keyColumn = if (groupingAttributes.length > 1) {
-  Alias(CreateStruct(groupingAttributes), "key")()
-} else {
+val keyColumn = if (resolvedKEncoder.flat) {
+  assert(groupingAttributes.length == 1)
   groupingAttributes.head
+} else {
+  Alias(CreateStruct(groupingAttributes), "key")()
 }
 val aggregate = Aggregate(groupingAttributes, keyColumn +: namedColumns, 
logicalPlan)
 val execution = new QueryExecution(sqlContext, aggregate)

http://git-wip-us.apache.org/repos/asf/spark/blob/8ddc55f1/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 7d53918..a2c8d20 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -272,6 +272,16 @@ class DatasetSuite extends QueryTest with SharedSQLContext 
{
   3 -> "abcxyz", 5 -> "hello")
   }
 
+  test("groupBy single field class, count") {
+val ds = Seq("abc", "xyz", "hello").toDS()
+val count = ds.groupBy(s => Tuple1(s.length)).count()
+
+checkAnswer(
+  count,
+  (Tuple1(3), 2L), (Tuple1(5), 1L)
+)
+  }
+
   test("groupBy columns, map") {
 val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS()
 val grouped = ds.groupBy($"_1")
@@ -282,6 +292,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext 
{
   ("a", 30), ("b", 3), ("c", 1))
   }
 
+  test("groupBy columns, count") {
+val ds = Seq("a" -> 1, "b" -> 1, "a" -> 2).toDS()
+val count = ds.groupBy($"_1").count()
+
+checkAnswer(
+  count,
+  (Row("a"), 2L), (Row("b"), 1L))
+  }
+
   test("groupBy