Repository: spark Updated Branches: refs/heads/master c8affec21 -> ae61f187a
[SPARK-22206][SQL][SPARKR] gapply in R can't work on empty grouping columns ## What changes were proposed in this pull request? Looks like `FlatMapGroupsInRExec.requiredChildDistribution` didn't consider empty grouping attributes. It should be a problem when running `EnsureRequirements` and `gapply` in R can't work on empty grouping columns. ## How was this patch tested? Added test. Author: Liang-Chi Hsieh <vii...@gmail.com> Closes #19436 from viirya/fix-flatmapinr-distribution. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ae61f187 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ae61f187 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ae61f187 Branch: refs/heads/master Commit: ae61f187aa0471242c046fdeac6ed55b9b98a3f6 Parents: c8affec Author: Liang-Chi Hsieh <vii...@gmail.com> Authored: Thu Oct 5 23:36:18 2017 +0900 Committer: hyukjinkwon <gurwls...@gmail.com> Committed: Thu Oct 5 23:36:18 2017 +0900 ---------------------------------------------------------------------- R/pkg/tests/fulltests/test_sparkSQL.R | 5 +++++ .../main/scala/org/apache/spark/sql/execution/objects.scala | 6 +++++- 2 files changed, 10 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ae61f187/R/pkg/tests/fulltests/test_sparkSQL.R ---------------------------------------------------------------------- diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 7f781f2..bbea25b 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -3075,6 +3075,11 @@ test_that("gapply() and gapplyCollect() on a DataFrame", { df1Collect <- gapplyCollect(df, list("a"), function(key, x) { x }) expect_identical(df1Collect, expected) + # gapply on empty grouping columns. + df1 <- gapply(df, c(), function(key, x) { x }, schema(df)) + actual <- collect(df1) + expect_identical(actual, expected) + # Computes the sum of second column by grouping on the first and third columns # and checks if the sum is larger than 2 schemas <- list(structType(structField("a", "integer"), structField("e", "boolean")), http://git-wip-us.apache.org/repos/asf/spark/blob/ae61f187/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index 5a3fcad..c68975b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -394,7 +394,11 @@ case class FlatMapGroupsInRExec( override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr) override def requiredChildDistribution: Seq[Distribution] = - ClusteredDistribution(groupingAttributes) :: Nil + if (groupingAttributes.isEmpty) { + AllTuples :: Nil + } else { + ClusteredDistribution(groupingAttributes) :: Nil + } override def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq(groupingAttributes.map(SortOrder(_, Ascending))) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org