Repository: spark
Updated Branches:
  refs/heads/master c55397652 -> 26afb4ce4


[SPARK-16012][SPARKR] Implement gapplyCollect which will apply a R function on 
each group similar to gapply and collect the result back to R data.frame

## What changes were proposed in this pull request?
gapplyCollect() does gapply() on a SparkDataFrame and collect the result back 
to R. Compared to gapply() + collect(), gapplyCollect() offers performance 
optimization as well as programming convenience, as no schema is needed to be 
provided.

This is similar to dapplyCollect().

## How was this patch tested?
Added test cases for gapplyCollect similar to dapplyCollect

Author: Narine Kokhlikyan <nar...@slice.com>

Closes #13760 from NarineK/gapplyCollect.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26afb4ce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26afb4ce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26afb4ce

Branch: refs/heads/master
Commit: 26afb4ce4099e7942f8db1ead3817ed8fbf71ce3
Parents: c553976
Author: Narine Kokhlikyan <nar...@slice.com>
Authored: Fri Jul 1 13:55:13 2016 -0700
Committer: Shivaram Venkataraman <shiva...@cs.berkeley.edu>
Committed: Fri Jul 1 13:55:13 2016 -0700

----------------------------------------------------------------------
 R/pkg/NAMESPACE                           |   1 +
 R/pkg/R/DataFrame.R                       | 111 +++++++++++++++++++++++--
 R/pkg/R/generics.R                        |   4 +
 R/pkg/R/group.R                           |  93 +++++++++------------
 R/pkg/inst/tests/testthat/test_sparkSQL.R |  35 ++++++--
 5 files changed, 177 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/26afb4ce/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index abc6588..bc3aceb 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -69,6 +69,7 @@ exportMethods("arrange",
               "first",
               "freqItems",
               "gapply",
+              "gapplyCollect",
               "group_by",
               "groupBy",
               "head",

http://git-wip-us.apache.org/repos/asf/spark/blob/26afb4ce/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 25327be..5944bbc 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1344,7 +1344,7 @@ setMethod("dapplyCollect",
 
 #' gapply
 #'
-#' Group the SparkDataFrame using the specified columns and apply the R 
function to each
+#' Groups the SparkDataFrame using the specified columns and applies the R 
function to each
 #' group.
 #'
 #' @param x A SparkDataFrame
@@ -1356,9 +1356,11 @@ setMethod("dapplyCollect",
 #' @param schema The schema of the resulting SparkDataFrame after the function 
is applied.
 #'               The schema must match to output of `func`. It has to be 
defined for each
 #'               output column with preferred output column name and 
corresponding data type.
+#' @return a SparkDataFrame
 #' @family SparkDataFrame functions
 #' @rdname gapply
 #' @name gapply
+#' @seealso \link{gapplyCollect}
 #' @export
 #' @examples
 #'
@@ -1374,14 +1376,22 @@ setMethod("dapplyCollect",
 #' columns with data types integer and string and the mean which is a double.
 #' schema <-  structType(structField("a", "integer"), structField("c", 
"string"),
 #'   structField("avg", "double"))
-#' df1 <- gapply(
+#' result <- gapply(
 #'   df,
-#'   list("a", "c"),
+#'   c("a", "c"),
 #'   function(key, x) {
 #'     y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
-#'   },
-#' schema)
-#' collect(df1)
+#' }, schema)
+#'
+#' We can also group the data and afterwards call gapply on GroupedData.
+#' For Example:
+#' gdf <- group_by(df, "a", "c")
+#' result <- gapply(
+#'   gdf,
+#'   function(key, x) {
+#'     y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
+#' }, schema)
+#' collect(result)
 #'
 #' Result
 #' ------
@@ -1399,7 +1409,7 @@ setMethod("dapplyCollect",
 #'   structField("Petal_Width", "double"))
 #' df1 <- gapply(
 #'   df,
-#'   list(df$"Species"),
+#'   df$"Species",
 #'   function(key, x) {
 #'     m <- suppressWarnings(lm(Sepal_Length ~
 #'     Sepal_Width + Petal_Length + Petal_Width, x))
@@ -1407,8 +1417,8 @@ setMethod("dapplyCollect",
 #'   }, schema)
 #' collect(df1)
 #'
-#'Result
-#'---------
+#' Result
+#' ---------
 #' Model  (Intercept)  Sepal_Width  Petal_Length  Petal_Width
 #' 1        0.699883    0.3303370    0.9455356    -0.1697527
 #' 2        1.895540    0.3868576    0.9083370    -0.6792238
@@ -1423,6 +1433,89 @@ setMethod("gapply",
             gapply(grouped, func, schema)
           })
 
+#' gapplyCollect
+#'
+#' Groups the SparkDataFrame using the specified columns, applies the R 
function to each
+#' group and collects the result back to R as data.frame.
+#'
+#' @param x A SparkDataFrame
+#' @param cols Grouping columns
+#' @param func A function to be applied to each group partition specified by 
grouping
+#'             column of the SparkDataFrame. The function `func` takes as 
argument
+#'             a key - grouping columns and a data frame - a local R 
data.frame.
+#'             The output of `func` is a local R data.frame.
+#' @return a data.frame
+#' @family SparkDataFrame functions
+#' @rdname gapplyCollect
+#' @name gapplyCollect
+#' @seealso \link{gapply}
+#' @export
+#' @examples
+#'
+#' \dontrun{
+#' Computes the arithmetic mean of the second column by grouping
+#' on the first and third columns. Output the grouping values and the average.
+#'
+#' df <- createDataFrame (
+#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
+#'   c("a", "b", "c", "d"))
+#'
+#' result <- gapplyCollect(
+#'   df,
+#'   c("a", "c"),
+#'   function(key, x) {
+#'     y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
+#'     colnames(y) <- c("key_a", "key_c", "mean_b")
+#'     y
+#'   })
+#'
+#' We can also group the data and afterwards call gapply on GroupedData.
+#' For Example:
+#' gdf <- group_by(df, "a", "c")
+#' result <- gapplyCollect(
+#'   gdf,
+#'   function(key, x) {
+#'     y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
+#'     colnames(y) <- c("key_a", "key_c", "mean_b")
+#'     y
+#'   })
+#'
+#' Result
+#' ------
+#' key_a key_c mean_b
+#' 3 3 3.0
+#' 1 1 1.5
+#'
+#' Fits linear models on iris dataset by grouping on the 'Species' column and
+#' using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length'
+#' and 'Petal_Width' as training features.
+#'
+#' df <- createDataFrame (iris)
+#' result <- gapplyCollect(
+#'   df,
+#'   df$"Species",
+#'   function(key, x) {
+#'     m <- suppressWarnings(lm(Sepal_Length ~
+#'     Sepal_Width + Petal_Length + Petal_Width, x))
+#'     data.frame(t(coef(m)))
+#'   })
+#'
+#' Result
+#'---------
+#' Model  X.Intercept.  Sepal_Width  Petal_Length  Petal_Width
+#' 1        0.699883    0.3303370    0.9455356    -0.1697527
+#' 2        1.895540    0.3868576    0.9083370    -0.6792238
+#' 3        2.351890    0.6548350    0.2375602     0.2521257
+#'
+#'}
+#' @note gapplyCollect(SparkDataFrame) since 2.0.0
+setMethod("gapplyCollect",
+          signature(x = "SparkDataFrame"),
+          function(x, cols, func) {
+            grouped <- do.call("groupBy", c(x, cols))
+            gapplyCollect(grouped, func)
+          })
+
 ############################## RDD Map Functions 
##################################
 # All of the following functions mirror the existing RDD map functions,        
   #
 # but allow for use with DataFrames by first converting to an RRDD before 
calling #

http://git-wip-us.apache.org/repos/asf/spark/blob/26afb4ce/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index d9080b6..e4ec508 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -469,6 +469,10 @@ setGeneric("dapplyCollect", function(x, func) { 
standardGeneric("dapplyCollect")
 #' @export
 setGeneric("gapply", function(x, ...) { standardGeneric("gapply") })
 
+#' @rdname gapplyCollect
+#' @export
+setGeneric("gapplyCollect", function(x, ...) { 
standardGeneric("gapplyCollect") })
+
 #' @rdname summary
 #' @export
 setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })

http://git-wip-us.apache.org/repos/asf/spark/blob/26afb4ce/R/pkg/R/group.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/group.R b/R/pkg/R/group.R
index 0687f14..5ed7e8a 100644
--- a/R/pkg/R/group.R
+++ b/R/pkg/R/group.R
@@ -196,64 +196,51 @@ createMethods()
 
 #' gapply
 #'
-#' Applies a R function to each group in the input GroupedData
-#'
-#' @param x a GroupedData
-#' @param func A function to be applied to each group partition specified by 
GroupedData.
-#'             The function `func` takes as argument a key - grouping columns 
and
-#'             a data frame - a local R data.frame.
-#'             The output of `func` is a local R data.frame.
-#' @param schema The schema of the resulting SparkDataFrame after the function 
is applied.
-#'               The schema must match to output of `func`. It has to be 
defined for each
-#'               output column with preferred output column name and 
corresponding data type.
-#' @return a SparkDataFrame
+#' @param x A GroupedData
 #' @rdname gapply
 #' @name gapply
 #' @export
-#' @examples
-#' \dontrun{
-#' Computes the arithmetic mean of the second column by grouping
-#' on the first and third columns. Output the grouping values and the average.
-#'
-#' df <- createDataFrame (
-#' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
-#'   c("a", "b", "c", "d"))
-#'
-#' Here our output contains three columns, the key which is a combination of 
two
-#' columns with data types integer and string and the mean which is a double.
-#' schema <-  structType(structField("a", "integer"), structField("c", 
"string"),
-#'   structField("avg", "double"))
-#' df1 <- gapply(
-#'   df,
-#'   list("a", "c"),
-#'   function(key, x) {
-#'     y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
-#'   },
-#' schema)
-#' collect(df1)
-#'
-#' Result
-#' ------
-#' a c avg
-#' 3 3 3.0
-#' 1 1 1.5
-#' }
 #' @note gapply(GroupedData) since 2.0.0
 setMethod("gapply",
           signature(x = "GroupedData"),
           function(x, func, schema) {
-            try(if (is.null(schema)) stop("schema cannot be NULL"))
-            packageNamesArr <- serialize(.sparkREnv[[".packages"]],
-                                 connection = NULL)
-            broadcastArr <- lapply(ls(.broadcastNames),
-                              function(name) { get(name, .broadcastNames) })
-            sdf <- callJStatic(
-                     "org.apache.spark.sql.api.r.SQLUtils",
-                     "gapply",
-                     x@sgd,
-                     serialize(cleanClosure(func), connection = NULL),
-                     packageNamesArr,
-                     broadcastArr,
-                     schema$jobj)
-            dataFrame(sdf)
+            if (is.null(schema)) stop("schema cannot be NULL")
+            gapplyInternal(x, func, schema)
+          })
+
+#' gapplyCollect
+#'
+#' @param x A GroupedData
+#' @rdname gapplyCollect
+#' @name gapplyCollect
+#' @export
+#' @note gapplyCollect(GroupedData) since 2.0.0
+setMethod("gapplyCollect",
+          signature(x = "GroupedData"),
+          function(x, func) {
+            gdf <- gapplyInternal(x, func, NULL)
+            content <- callJMethod(gdf@sdf, "collect")
+            # content is a list of items of struct type. Each item has a 
single field
+            # which is a serialized data.frame corresponds to one group of the
+            # SparkDataFrame.
+            ldfs <- lapply(content, function(x) { unserialize(x[[1]]) })
+            ldf <- do.call(rbind, ldfs)
+            row.names(ldf) <- NULL
+            ldf
           })
+
+gapplyInternal <- function(x, func, schema) {
+  packageNamesArr <- serialize(.sparkREnv[[".packages"]],
+                       connection = NULL)
+  broadcastArr <- lapply(ls(.broadcastNames),
+                    function(name) { get(name, .broadcastNames) })
+  sdf <- callJStatic(
+           "org.apache.spark.sql.api.r.SQLUtils",
+           "gapply",
+           x@sgd,
+           serialize(cleanClosure(func), connection = NULL),
+           packageNamesArr,
+           broadcastArr,
+           if (class(schema) == "structType") { schema$jobj } else { NULL })
+  dataFrame(sdf)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/26afb4ce/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R 
b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 588c217..3b8d570 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -2257,21 +2257,24 @@ test_that("repartition by columns on DataFrame", {
   expect_equal(nrow(df1), 2)
 })
 
-test_that("gapply() on a DataFrame", {
+test_that("gapply() and gapplyCollect() on a DataFrame", {
   df <- createDataFrame (
     list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
     c("a", "b", "c", "d"))
   expected <- collect(df)
-  df1 <- gapply(df, list("a"), function(key, x) { x }, schema(df))
+  df1 <- gapply(df, "a", function(key, x) { x }, schema(df))
   actual <- collect(df1)
   expect_identical(actual, expected)
 
+  df1Collect <- gapplyCollect(df, list("a"), function(key, x) { x })
+  expect_identical(df1Collect, expected)
+
   # Computes the sum of second column by grouping on the first and third 
columns
   # and checks if the sum is larger than 2
   schema <- structType(structField("a", "integer"), structField("e", 
"boolean"))
   df2 <- gapply(
     df,
-    list(df$"a", df$"c"),
+    c(df$"a", df$"c"),
     function(key, x) {
       y <- data.frame(key[1], sum(x$b) > 2)
     },
@@ -2280,13 +2283,24 @@ test_that("gapply() on a DataFrame", {
   expected <- c(TRUE, TRUE)
   expect_identical(actual, expected)
 
+  df2Collect <- gapplyCollect(
+    df,
+    c(df$"a", df$"c"),
+    function(key, x) {
+      y <- data.frame(key[1], sum(x$b) > 2)
+      colnames(y) <- c("a", "e")
+      y
+    })
+    actual <- df2Collect$e
+    expect_identical(actual, expected)
+
   # Computes the arithmetic mean of the second column by grouping
   # on the first and third columns. Output the groupping value and the average.
   schema <-  structType(structField("a", "integer"), structField("c", 
"string"),
                structField("avg", "double"))
   df3 <- gapply(
     df,
-    list("a", "c"),
+    c("a", "c"),
     function(key, x) {
       y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
     },
@@ -2301,11 +2315,22 @@ test_that("gapply() on a DataFrame", {
   rownames(expected) <- NULL
   expect_identical(actual, expected)
 
+  df3Collect <- gapplyCollect(
+    df,
+    c("a", "c"),
+    function(key, x) {
+      y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
+      colnames(y) <- c("a", "c", "avg")
+      y
+    })
+  actual <- df3Collect[order(df3Collect$a), ]
+  expect_identical(actual$avg, expected$avg)
+
   irisDF <- suppressWarnings(createDataFrame (iris))
   schema <-  structType(structField("Sepal_Length", "double"), 
structField("Avg", "double"))
   # Groups by `Sepal_Length` and computes the average for `Sepal_Width`
   df4 <- gapply(
-    cols = list("Sepal_Length"),
+    cols = "Sepal_Length",
     irisDF,
     function(key, x) {
       y <- data.frame(key, mean(x$Sepal_Width), stringsAsFactors = FALSE)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to