Repository: spark
Updated Branches:
  refs/heads/master 8c2edf46d -> cb77a6689


[SPARK-21291][R] add R partitionBy API in DataFrame

## What changes were proposed in this pull request?

add R partitionBy API in write.df
I didn't add bucketBy in write.df. The last line of write.df is
```
write <- handledCallJMethod(write, "save")
```
save doesn't support bucketBy right now.
```
 assertNotBucketed("save")
```

## How was this patch tested?

Add unit test in test_sparkSQL.R

Closes #22537 from huaxingao/spark-21291.

Authored-by: Huaxin Gao <huax...@us.ibm.com>
Signed-off-by: hyukjinkwon <gurwls...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb77a668
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb77a668
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb77a668

Branch: refs/heads/master
Commit: cb77a6689137916e64bc5692b0c942e86ca1a0ea
Parents: 8c2edf4
Author: Huaxin Gao <huax...@us.ibm.com>
Authored: Wed Sep 26 09:37:44 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Wed Sep 26 09:37:44 2018 +0800

----------------------------------------------------------------------
 R/pkg/R/DataFrame.R                   | 17 +++++++++++++++--
 R/pkg/tests/fulltests/test_sparkSQL.R |  8 ++++++++
 2 files changed, 23 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cb77a668/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index a1cb478..3469188 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -2954,6 +2954,9 @@ setMethod("exceptAll",
 #' @param source a name for external data source.
 #' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
 #'             save mode (it is 'error' by default)
+#' @param partitionBy a name or a list of names of columns to partition the 
output by on the file
+#'                    system. If specified, the output is laid out on the file 
system similar
+#'                    to Hive's partitioning scheme.
 #' @param ... additional argument(s) passed to the method.
 #'
 #' @family SparkDataFrame functions
@@ -2965,13 +2968,13 @@ setMethod("exceptAll",
 #' sparkR.session()
 #' path <- "path/to/file.json"
 #' df <- read.json(path)
-#' write.df(df, "myfile", "parquet", "overwrite")
+#' write.df(df, "myfile", "parquet", "overwrite", partitionBy = c("col1", 
"col2"))
 #' saveDF(df, parquetPath2, "parquet", mode = "append", mergeSchema = TRUE)
 #' }
 #' @note write.df since 1.4.0
 setMethod("write.df",
           signature(df = "SparkDataFrame"),
-          function(df, path = NULL, source = NULL, mode = "error", ...) {
+          function(df, path = NULL, source = NULL, mode = "error", partitionBy 
= NULL, ...) {
             if (!is.null(path) && !is.character(path)) {
               stop("path should be character, NULL or omitted.")
             }
@@ -2985,8 +2988,18 @@ setMethod("write.df",
             if (is.null(source)) {
               source <- getDefaultSqlSource()
             }
+            cols <- NULL
+            if (!is.null(partitionBy)) {
+              if (!all(sapply(partitionBy, function(c) is.character(c)))) {
+                stop("All partitionBy column names should be characters.")
+              }
+              cols <- as.list(partitionBy)
+            }
             write <- callJMethod(df@sdf, "write")
             write <- callJMethod(write, "format", source)
+            if (!is.null(cols)) {
+              write <- callJMethod(write, "partitionBy", cols)
+            }
             write <- setWriteOptions(write, path = path, mode = mode, ...)
             write <- handledCallJMethod(write, "save")
           })

http://git-wip-us.apache.org/repos/asf/spark/blob/cb77a668/R/pkg/tests/fulltests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R 
b/R/pkg/tests/fulltests/test_sparkSQL.R
index a874bfb..50eff37 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -2701,8 +2701,16 @@ test_that("read/write text files", {
   expect_equal(colnames(df2), c("value"))
   expect_equal(count(df2), count(df) * 2)
 
+  df3 <- createDataFrame(list(list(1L, "1"), list(2L, "2"), list(1L, "1"), 
list(2L, "2")),
+                         schema = c("key", "value"))
+  textPath3 <- tempfile(pattern = "textPath3", fileext = ".txt")
+  write.df(df3, textPath3, "text", mode = "overwrite", partitionBy = "key")
+  df4 <- read.df(textPath3, "text")
+  expect_equal(count(df3), count(df4))
+
   unlink(textPath)
   unlink(textPath2)
+  unlink(textPath3)
 })
 
 test_that("read/write text files - compression option", {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to