Repository: spark Updated Branches: refs/heads/master 8c2edf46d -> cb77a6689
[SPARK-21291][R] add R partitionBy API in DataFrame ## What changes were proposed in this pull request? add R partitionBy API in write.df I didn't add bucketBy in write.df. The last line of write.df is ``` write <- handledCallJMethod(write, "save") ``` save doesn't support bucketBy right now. ``` assertNotBucketed("save") ``` ## How was this patch tested? Add unit test in test_sparkSQL.R Closes #22537 from huaxingao/spark-21291. Authored-by: Huaxin Gao <huax...@us.ibm.com> Signed-off-by: hyukjinkwon <gurwls...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb77a668 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb77a668 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb77a668 Branch: refs/heads/master Commit: cb77a6689137916e64bc5692b0c942e86ca1a0ea Parents: 8c2edf4 Author: Huaxin Gao <huax...@us.ibm.com> Authored: Wed Sep 26 09:37:44 2018 +0800 Committer: hyukjinkwon <gurwls...@apache.org> Committed: Wed Sep 26 09:37:44 2018 +0800 ---------------------------------------------------------------------- R/pkg/R/DataFrame.R | 17 +++++++++++++++-- R/pkg/tests/fulltests/test_sparkSQL.R | 8 ++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/cb77a668/R/pkg/R/DataFrame.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index a1cb478..3469188 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2954,6 +2954,9 @@ setMethod("exceptAll", #' @param source a name for external data source. #' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore' #' save mode (it is 'error' by default) +#' @param partitionBy a name or a list of names of columns to partition the output by on the file +#' system. If specified, the output is laid out on the file system similar +#' to Hive's partitioning scheme. #' @param ... additional argument(s) passed to the method. #' #' @family SparkDataFrame functions @@ -2965,13 +2968,13 @@ setMethod("exceptAll", #' sparkR.session() #' path <- "path/to/file.json" #' df <- read.json(path) -#' write.df(df, "myfile", "parquet", "overwrite") +#' write.df(df, "myfile", "parquet", "overwrite", partitionBy = c("col1", "col2")) #' saveDF(df, parquetPath2, "parquet", mode = "append", mergeSchema = TRUE) #' } #' @note write.df since 1.4.0 setMethod("write.df", signature(df = "SparkDataFrame"), - function(df, path = NULL, source = NULL, mode = "error", ...) { + function(df, path = NULL, source = NULL, mode = "error", partitionBy = NULL, ...) { if (!is.null(path) && !is.character(path)) { stop("path should be character, NULL or omitted.") } @@ -2985,8 +2988,18 @@ setMethod("write.df", if (is.null(source)) { source <- getDefaultSqlSource() } + cols <- NULL + if (!is.null(partitionBy)) { + if (!all(sapply(partitionBy, function(c) is.character(c)))) { + stop("All partitionBy column names should be characters.") + } + cols <- as.list(partitionBy) + } write <- callJMethod(df@sdf, "write") write <- callJMethod(write, "format", source) + if (!is.null(cols)) { + write <- callJMethod(write, "partitionBy", cols) + } write <- setWriteOptions(write, path = path, mode = mode, ...) write <- handledCallJMethod(write, "save") }) http://git-wip-us.apache.org/repos/asf/spark/blob/cb77a668/R/pkg/tests/fulltests/test_sparkSQL.R ---------------------------------------------------------------------- diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index a874bfb..50eff37 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -2701,8 +2701,16 @@ test_that("read/write text files", { expect_equal(colnames(df2), c("value")) expect_equal(count(df2), count(df) * 2) + df3 <- createDataFrame(list(list(1L, "1"), list(2L, "2"), list(1L, "1"), list(2L, "2")), + schema = c("key", "value")) + textPath3 <- tempfile(pattern = "textPath3", fileext = ".txt") + write.df(df3, textPath3, "text", mode = "overwrite", partitionBy = "key") + df4 <- read.df(textPath3, "text") + expect_equal(count(df3), count(df4)) + unlink(textPath) unlink(textPath2) + unlink(textPath3) }) test_that("read/write text files - compression option", { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org