Repository: spark
Updated Branches:
  refs/heads/master bb7f35239 -> 2a4e00ca4


[SPARK-9803] [SPARKR] Add subset and transform + tests

Add subset and transform
Also reorganize `[` & `[[` to subset instead of select

Note: for transform, transform is very similar to mutate. Spark doesn't seem to 
replace existing column with the name in mutate (ie. `mutate(df, age = df$age + 
2)` - returned DataFrame has 2 columns with the same name 'age'), so therefore 
not doing that for now in transform.
Though it is clearly stated it should replace column with matching name (should 
I open a JIRA for mutate/transform?)

Author: felixcheung <felixcheun...@hotmail.com>

Closes #8503 from felixcheung/rsubset_transform.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a4e00ca
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a4e00ca
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a4e00ca

Branch: refs/heads/master
Commit: 2a4e00ca4d4e7a148b4ff8ce0ad1c6d517cee55f
Parents: bb7f352
Author: felixcheung <felixcheun...@hotmail.com>
Authored: Fri Aug 28 18:35:01 2015 -0700
Committer: Shivaram Venkataraman <shiva...@cs.berkeley.edu>
Committed: Fri Aug 28 18:35:01 2015 -0700

----------------------------------------------------------------------
 R/pkg/NAMESPACE                  |  2 +
 R/pkg/R/DataFrame.R              | 70 +++++++++++++++++++++++++++--------
 R/pkg/R/generics.R               | 10 ++++-
 R/pkg/inst/tests/test_sparkSQL.R | 20 +++++++++-
 4 files changed, 85 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2a4e00ca/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 5286c01..9d39630 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -69,9 +69,11 @@ exportMethods("arrange",
               "selectExpr",
               "show",
               "showDF",
+              "subset",
               "summarize",
               "summary",
               "take",
+              "transform",
               "unionAll",
               "unique",
               "unpersist",

http://git-wip-us.apache.org/repos/asf/spark/blob/2a4e00ca/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 74de7c8..8a00238 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -987,7 +987,7 @@ setMethod("$<-", signature(x = "DataFrame"),
 
 setClassUnion("numericOrcharacter", c("numeric", "character"))
 
-#' @rdname select
+#' @rdname subset
 #' @name [[
 setMethod("[[", signature(x = "DataFrame", i = "numericOrcharacter"),
           function(x, i) {
@@ -998,7 +998,7 @@ setMethod("[[", signature(x = "DataFrame", i = 
"numericOrcharacter"),
             getColumn(x, i)
           })
 
-#' @rdname select
+#' @rdname subset
 #' @name [
 setMethod("[", signature(x = "DataFrame", i = "missing"),
           function(x, i, j, ...) {
@@ -1012,7 +1012,7 @@ setMethod("[", signature(x = "DataFrame", i = "missing"),
             select(x, j)
           })
 
-#' @rdname select
+#' @rdname subset
 #' @name [
 setMethod("[", signature(x = "DataFrame", i = "Column"),
           function(x, i, j, ...) {
@@ -1020,12 +1020,43 @@ setMethod("[", signature(x = "DataFrame", i = "Column"),
             # 
https://stat.ethz.ch/R-manual/R-devel/library/base/html/Extract.data.frame.html
             filtered <- filter(x, i)
             if (!missing(j)) {
-              filtered[, j]
+              filtered[, j, ...]
             } else {
               filtered
             }
           })
 
+#' Subset
+#'
+#' Return subsets of DataFrame according to given conditions
+#' @param x A DataFrame
+#' @param subset A logical expression to filter on rows
+#' @param select expression for the single Column or a list of columns to 
select from the DataFrame
+#' @return A new DataFrame containing only the rows that meet the condition 
with selected columns
+#' @export
+#' @rdname subset
+#' @name subset
+#' @aliases [
+#' @family subsetting functions
+#' @examples
+#' \dontrun{
+#'   # Columns can be selected using `[[` and `[`
+#'   df[[2]] == df[["age"]]
+#'   df[,2] == df[,"age"]
+#'   df[,c("name", "age")]
+#'   # Or to filter rows
+#'   df[df$age > 20,]
+#'   # DataFrame can be subset on both rows and Columns
+#'   df[df$name == "Smith", c(1,2)]
+#'   df[df$age %in% c(19, 30), 1:2]
+#'   subset(df, df$age %in% c(19, 30), 1:2)
+#'   subset(df, df$age %in% c(19), select = c(1,2))
+#' }
+setMethod("subset", signature(x = "DataFrame"),
+          function(x, subset, select, ...) {
+            x[subset, select, ...]
+          })
+
 #' Select
 #'
 #' Selects a set of columns with names or Column expressions.
@@ -1034,6 +1065,8 @@ setMethod("[", signature(x = "DataFrame", i = "Column"),
 #' @return A new DataFrame with selected columns
 #' @export
 #' @rdname select
+#' @name select
+#' @family subsetting functions
 #' @examples
 #' \dontrun{
 #'   select(df, "*")
@@ -1041,15 +1074,8 @@ setMethod("[", signature(x = "DataFrame", i = "Column"),
 #'   select(df, df$name, df$age + 1)
 #'   select(df, c("col1", "col2"))
 #'   select(df, list(df$name, df$age + 1))
-#'   # Columns can also be selected using `[[` and `[`
-#'   df[[2]] == df[["age"]]
-#'   df[,2] == df[,"age"]
-#'   df[,c("name", "age")]
 #'   # Similar to R data frames columns can also be selected using `$`
 #'   df$age
-#'   # It can also be subset on rows and Columns
-#'   df[df$name == "Smith", c(1,2)]
-#'   df[df$age %in% c(19, 30), 1:2]
 #' }
 setMethod("select", signature(x = "DataFrame", col = "character"),
           function(x, col, ...) {
@@ -1121,7 +1147,7 @@ setMethod("selectExpr",
 #' @return A DataFrame with the new column added.
 #' @rdname withColumn
 #' @name withColumn
-#' @aliases mutate
+#' @aliases mutate transform
 #' @export
 #' @examples
 #'\dontrun{
@@ -1141,11 +1167,12 @@ setMethod("withColumn",
 #'
 #' Return a new DataFrame with the specified columns added.
 #'
-#' @param x A DataFrame
+#' @param .data A DataFrame
 #' @param col a named argument of the form name = col
 #' @return A new DataFrame with the new columns added.
 #' @rdname withColumn
 #' @name mutate
+#' @aliases withColumn transform
 #' @export
 #' @examples
 #'\dontrun{
@@ -1155,10 +1182,12 @@ setMethod("withColumn",
 #' df <- jsonFile(sqlContext, path)
 #' newDF <- mutate(df, newCol = df$col1 * 5, newCol2 = df$col1 * 2)
 #' names(newDF) # Will contain newCol, newCol2
+#' newDF2 <- transform(df, newCol = df$col1 / 5, newCol2 = df$col1 * 2)
 #' }
 setMethod("mutate",
-          signature(x = "DataFrame"),
-          function(x, ...) {
+          signature(.data = "DataFrame"),
+          function(.data, ...) {
+            x <- .data
             cols <- list(...)
             stopifnot(length(cols) > 0)
             stopifnot(class(cols[[1]]) == "Column")
@@ -1173,6 +1202,16 @@ setMethod("mutate",
             do.call(select, c(x, x$"*", cols))
           })
 
+#' @export
+#' @rdname withColumn
+#' @name transform
+#' @aliases withColumn mutate
+setMethod("transform",
+          signature(`_data` = "DataFrame"),
+          function(`_data`, ...) {
+            mutate(`_data`, ...)
+          })
+
 #' WithColumnRenamed
 #'
 #' Rename an existing column in a DataFrame.
@@ -1300,6 +1339,7 @@ setMethod("orderBy",
 #' @return A DataFrame containing only the rows that meet the condition.
 #' @rdname filter
 #' @name filter
+#' @family subsetting functions
 #' @export
 #' @examples
 #'\dontrun{

http://git-wip-us.apache.org/repos/asf/spark/blob/2a4e00ca/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index b578b87..43dd8d2 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -467,7 +467,7 @@ setGeneric("merge")
 
 #' @rdname withColumn
 #' @export
-setGeneric("mutate", function(x, ...) {standardGeneric("mutate") })
+setGeneric("mutate", function(.data, ...) {standardGeneric("mutate") })
 
 #' @rdname arrange
 #' @export
@@ -507,6 +507,10 @@ setGeneric("saveAsTable", function(df, tableName, source, 
mode, ...) {
   standardGeneric("saveAsTable")
 })
 
+#' @rdname withColumn
+#' @export
+setGeneric("transform", function(`_data`, ...) {standardGeneric("transform") })
+
 #' @rdname write.df
 #' @export
 setGeneric("write.df", function(df, path, ...) { standardGeneric("write.df") })
@@ -531,6 +535,10 @@ setGeneric("selectExpr", function(x, expr, ...) { 
standardGeneric("selectExpr")
 #' @export
 setGeneric("showDF", function(x,...) { standardGeneric("showDF") })
 
+# @rdname subset
+# @export
+setGeneric("subset", function(x, subset, select, ...) { 
standardGeneric("subset") })
+
 #' @rdname agg
 #' @export
 setGeneric("summarize", function(x,...) { standardGeneric("summarize") })

http://git-wip-us.apache.org/repos/asf/spark/blob/2a4e00ca/R/pkg/inst/tests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index 933b11c..0da5e38 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -612,6 +612,10 @@ test_that("subsetting", {
   df5 <- df[df$age %in% c(19), c(1,2)]
   expect_equal(count(df5), 1)
   expect_equal(columns(df5), c("name", "age"))
+
+  df6 <- subset(df, df$age %in% c(30), c(1,2))
+  expect_equal(count(df6), 1)
+  expect_equal(columns(df6), c("name", "age"))
 })
 
 test_that("selectExpr() on a DataFrame", {
@@ -1028,7 +1032,7 @@ test_that("withColumn() and withColumnRenamed()", {
   expect_equal(columns(newDF2)[1], "newerAge")
 })
 
-test_that("mutate(), rename() and names()", {
+test_that("mutate(), transform(), rename() and names()", {
   df <- jsonFile(sqlContext, jsonPath)
   newDF <- mutate(df, newAge = df$age + 2)
   expect_equal(length(columns(newDF)), 3)
@@ -1042,6 +1046,20 @@ test_that("mutate(), rename() and names()", {
   names(newDF2) <- c("newerName", "evenNewerAge")
   expect_equal(length(names(newDF2)), 2)
   expect_equal(names(newDF2)[1], "newerName")
+
+  transformedDF <- transform(df, newAge = -df$age, newAge2 = df$age / 2)
+  expect_equal(length(columns(transformedDF)), 4)
+  expect_equal(columns(transformedDF)[3], "newAge")
+  expect_equal(columns(transformedDF)[4], "newAge2")
+  expect_equal(first(filter(transformedDF, transformedDF$name == 
"Andy"))$newAge, -30)
+
+  # test if transform on local data frames works
+  # ensure the proper signature is used - otherwise this will fail to run
+  attach(airquality)
+  result <- transform(Ozone, logOzone = log(Ozone))
+  expect_equal(nrow(result), 153)
+  expect_equal(ncol(result), 2)
+  detach(airquality)
 })
 
 test_that("write.df() on DataFrame and works with parquetFile", {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to