Repository: spark
Updated Branches:
  refs/heads/master 0cdcf9114 -> c40597720


[SPARK-20020][SPARKR] DataFrame checkpoint API

## What changes were proposed in this pull request?

Add checkpoint, setCheckpointDir API to R

## How was this patch tested?

unit tests, manual tests

Author: Felix Cheung <felixcheun...@hotmail.com>

Closes #17351 from felixcheung/rdfcheckpoint.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c4059772
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c4059772
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c4059772

Branch: refs/heads/master
Commit: c40597720e8e66a6b11ca241b1ad387154a8fe72
Parents: 0cdcf91
Author: Felix Cheung <felixcheun...@hotmail.com>
Authored: Sun Mar 19 22:34:18 2017 -0700
Committer: Felix Cheung <felixche...@apache.org>
Committed: Sun Mar 19 22:34:18 2017 -0700

----------------------------------------------------------------------
 R/pkg/NAMESPACE                           |  2 ++
 R/pkg/R/DataFrame.R                       | 29 ++++++++++++++++++++++++++
 R/pkg/R/RDD.R                             |  2 +-
 R/pkg/R/context.R                         | 21 ++++++++++++++++++-
 R/pkg/R/generics.R                        |  6 +++++-
 R/pkg/inst/tests/testthat/test_rdd.R      |  4 ++--
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 11 ++++++++++
 7 files changed, 70 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c4059772/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 78344ce..8be7875 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -82,6 +82,7 @@ exportMethods("arrange",
               "as.data.frame",
               "attach",
               "cache",
+              "checkpoint",
               "coalesce",
               "collect",
               "colnames",
@@ -369,6 +370,7 @@ export("as.DataFrame",
        "read.parquet",
        "read.stream",
        "read.text",
+       "setCheckpointDir",
        "spark.lapply",
        "spark.addFile",
        "spark.getSparkFilesRootDirectory",

http://git-wip-us.apache.org/repos/asf/spark/blob/c4059772/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index bc81633..97786df 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -3613,3 +3613,32 @@ setMethod("write.stream",
             ssq <- handledCallJMethod(write, "start")
             streamingQuery(ssq)
           })
+
+#' checkpoint
+#'
+#' Returns a checkpointed version of this SparkDataFrame. Checkpointing can be 
used to truncate the
+#' logical plan, which is especially useful in iterative algorithms where the 
plan may grow
+#' exponentially. It will be saved to files inside the checkpoint directory 
set with
+#' \code{setCheckpointDir}
+#'
+#' @param x A SparkDataFrame
+#' @param eager whether to checkpoint this SparkDataFrame immediately
+#' @return a new checkpointed SparkDataFrame
+#' @family SparkDataFrame functions
+#' @aliases checkpoint,SparkDataFrame-method
+#' @rdname checkpoint
+#' @name checkpoint
+#' @seealso \link{setCheckpointDir}
+#' @export
+#' @examples
+#'\dontrun{
+#' setCheckpointDir("/checkpoint")
+#' df <- checkpoint(df)
+#' }
+#' @note checkpoint since 2.2.0
+setMethod("checkpoint",
+          signature(x = "SparkDataFrame"),
+          function(x, eager = TRUE) {
+            df <- callJMethod(x@sdf, "checkpoint", as.logical(eager))
+            dataFrame(df)
+          })

http://git-wip-us.apache.org/repos/asf/spark/blob/c4059772/R/pkg/R/RDD.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R
index 5667b9d..7ad3993 100644
--- a/R/pkg/R/RDD.R
+++ b/R/pkg/R/RDD.R
@@ -291,7 +291,7 @@ setMethod("unpersistRDD",
 #' @rdname checkpoint-methods
 #' @aliases checkpoint,RDD-method
 #' @noRd
-setMethod("checkpoint",
+setMethod("checkpointRDD",
           signature(x = "RDD"),
           function(x) {
             jrdd <- getJRDD(x)

http://git-wip-us.apache.org/repos/asf/spark/blob/c4059772/R/pkg/R/context.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index 1a0dd65..cb0f83b 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -291,7 +291,7 @@ broadcast <- function(sc, object) {
 #' rdd <- parallelize(sc, 1:2, 2L)
 #' checkpoint(rdd)
 #'}
-setCheckpointDir <- function(sc, dirName) {
+setCheckpointDirSC <- function(sc, dirName) {
   invisible(callJMethod(sc, "setCheckpointDir", 
suppressWarnings(normalizePath(dirName))))
 }
 
@@ -410,3 +410,22 @@ setLogLevel <- function(level) {
   sc <- getSparkContext()
   invisible(callJMethod(sc, "setLogLevel", level))
 }
+
+#' Set checkpoint directory
+#'
+#' Set the directory under which SparkDataFrame are going to be checkpointed. 
The directory must be
+#' a HDFS path if running on a cluster.
+#'
+#' @rdname setCheckpointDir
+#' @param directory Directory path to checkpoint to
+#' @seealso \link{checkpoint}
+#' @export
+#' @examples
+#'\dontrun{
+#' setCheckpointDir("/checkpoint")
+#'}
+#' @note setCheckpointDir since 2.0.0
+setCheckpointDir <- function(directory) {
+  sc <- getSparkContext()
+  invisible(callJMethod(sc, "setCheckpointDir", 
suppressWarnings(normalizePath(directory))))
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c4059772/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 0297712..80283e4 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -32,7 +32,7 @@ setGeneric("coalesceRDD", function(x, numPartitions, ...) { 
standardGeneric("coa
 
 # @rdname checkpoint-methods
 # @export
-setGeneric("checkpoint", function(x) { standardGeneric("checkpoint") })
+setGeneric("checkpointRDD", function(x) { standardGeneric("checkpointRDD") })
 
 setGeneric("collectRDD", function(x, ...) { standardGeneric("collectRDD") })
 
@@ -406,6 +406,10 @@ setGeneric("attach")
 #' @export
 setGeneric("cache", function(x) { standardGeneric("cache") })
 
+#' @rdname checkpoint
+#' @export
+setGeneric("checkpoint", function(x, eager = TRUE) { 
standardGeneric("checkpoint") })
+
 #' @rdname coalesce
 #' @param x a Column or a SparkDataFrame.
 #' @param ... additional argument(s). If \code{x} is a Column, additional 
Columns can be optionally

http://git-wip-us.apache.org/repos/asf/spark/blob/c4059772/R/pkg/inst/tests/testthat/test_rdd.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_rdd.R 
b/R/pkg/inst/tests/testthat/test_rdd.R
index 787ef51..b72c801 100644
--- a/R/pkg/inst/tests/testthat/test_rdd.R
+++ b/R/pkg/inst/tests/testthat/test_rdd.R
@@ -143,8 +143,8 @@ test_that("PipelinedRDD support actions: cache(), 
persist(), unpersist(), checkp
   expect_false(rdd2@env$isCached)
 
   tempDir <- tempfile(pattern = "checkpoint")
-  setCheckpointDir(sc, tempDir)
-  checkpoint(rdd2)
+  setCheckpointDirSC(sc, tempDir)
+  checkpointRDD(rdd2)
   expect_true(rdd2@env$isCheckpointed)
 
   rdd2 <- lapply(rdd2, function(x) x)

http://git-wip-us.apache.org/repos/asf/spark/blob/c4059772/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R 
b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 9c38e0d..cbc3569 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -841,6 +841,17 @@ test_that("cache(), storageLevel(), persist(), and 
unpersist() on a DataFrame",
   expect_true(is.data.frame(collect(df)))
 })
 
+test_that("setCheckpointDir(), checkpoint() on a DataFrame", {
+  checkpointDir <- file.path(tempdir(), "cproot")
+  expect_true(length(list.files(path = checkpointDir, all.files = TRUE)) == 0)
+
+  setCheckpointDir(checkpointDir)
+  df <- read.json(jsonPath)
+  df <- checkpoint(df)
+  expect_is(df, "SparkDataFrame")
+  expect_false(length(list.files(path = checkpointDir, all.files = TRUE)) == 0)
+})
+
 test_that("schema(), dtypes(), columns(), names() return the correct 
values/format", {
   df <- read.json(jsonPath)
   testSchema <- schema(df)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to