Repository: spark
Updated Branches:
  refs/heads/master ded6d27e4 -> 76e8a1d7e


[SPARK-22843][R] Adds localCheckpoint in R

## What changes were proposed in this pull request?

This PR proposes to add `localCheckpoint(..)` in R API.

```r
df <- localCheckpoint(createDataFrame(iris))
```

## How was this patch tested?

Unit tests added in `R/pkg/tests/fulltests/test_sparkSQL.R`

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #20073 from HyukjinKwon/SPARK-22843.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76e8a1d7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76e8a1d7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76e8a1d7

Branch: refs/heads/master
Commit: 76e8a1d7e2619c1e6bd75c399314d2583a86b93b
Parents: ded6d27
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Thu Dec 28 20:17:26 2017 +0900
Committer: hyukjinkwon <gurwls...@gmail.com>
Committed: Thu Dec 28 20:17:26 2017 +0900

----------------------------------------------------------------------
 R/pkg/NAMESPACE                       |  1 +
 R/pkg/R/DataFrame.R                   | 27 +++++++++++++++++++++++++++
 R/pkg/R/generics.R                    |  4 ++++
 R/pkg/tests/fulltests/test_sparkSQL.R | 22 ++++++++++++++++++++++
 4 files changed, 54 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/76e8a1d7/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index dce64e1..4b699de 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -133,6 +133,7 @@ exportMethods("arrange",
               "isStreaming",
               "join",
               "limit",
+              "localCheckpoint",
               "merge",
               "mutate",
               "na.omit",

http://git-wip-us.apache.org/repos/asf/spark/blob/76e8a1d7/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index b8d732a..ace49da 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -3782,6 +3782,33 @@ setMethod("checkpoint",
             dataFrame(df)
           })
 
+#' localCheckpoint
+#'
+#' Returns a locally checkpointed version of this SparkDataFrame. 
Checkpointing can be used to
+#' truncate the logical plan, which is especially useful in iterative 
algorithms where the plan
+#' may grow exponentially. Local checkpoints are stored in the executors using 
the caching
+#' subsystem and therefore they are not reliable.
+#'
+#' @param x A SparkDataFrame
+#' @param eager whether to locally checkpoint this SparkDataFrame immediately
+#' @return a new locally checkpointed SparkDataFrame
+#' @family SparkDataFrame functions
+#' @aliases localCheckpoint,SparkDataFrame-method
+#' @rdname localCheckpoint
+#' @name localCheckpoint
+#' @export
+#' @examples
+#'\dontrun{
+#' df <- localCheckpoint(df)
+#' }
+#' @note localCheckpoint since 2.3.0
+setMethod("localCheckpoint",
+          signature(x = "SparkDataFrame"),
+          function(x, eager = TRUE) {
+            df <- callJMethod(x@sdf, "localCheckpoint", as.logical(eager))
+            dataFrame(df)
+          })
+
 #' cube
 #'
 #' Create a multi-dimensional cube for the SparkDataFrame using the specified 
columns.

http://git-wip-us.apache.org/repos/asf/spark/blob/76e8a1d7/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 5ddaa66..d5d0bc9 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -611,6 +611,10 @@ setGeneric("isStreaming", function(x) { 
standardGeneric("isStreaming") })
 #' @export
 setGeneric("limit", function(x, num) {standardGeneric("limit") })
 
+#' @rdname localCheckpoint
+#' @export
+setGeneric("localCheckpoint", function(x, eager = TRUE) { 
standardGeneric("localCheckpoint") })
+
 #' @rdname merge
 #' @export
 setGeneric("merge")

http://git-wip-us.apache.org/repos/asf/spark/blob/76e8a1d7/R/pkg/tests/fulltests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R 
b/R/pkg/tests/fulltests/test_sparkSQL.R
index 6cc0188..650e7c0 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -957,6 +957,28 @@ test_that("setCheckpointDir(), checkpoint() on a 
DataFrame", {
   }
 })
 
+test_that("localCheckpoint() on a DataFrame", {
+  if (windows_with_hadoop()) {
+    # Checkpoint directory shouldn't matter in localCheckpoint.
+    checkpointDir <- file.path(tempdir(), "lcproot")
+    expect_true(length(list.files(path = checkpointDir, all.files = TRUE, 
recursive = TRUE)) == 0)
+    setCheckpointDir(checkpointDir)
+
+    textPath <- tempfile(pattern = "textPath", fileext = ".txt")
+    writeLines(mockLines, textPath)
+    # Read it lazily and then locally checkpoint eagerly.
+    df <- read.df(textPath, "text")
+    df <- localCheckpoint(df, eager = TRUE)
+    # Here, we remove the source path to check eagerness.
+    unlink(textPath)
+    expect_is(df, "SparkDataFrame")
+    expect_equal(colnames(df), c("value"))
+    expect_equal(count(df), 3)
+
+    expect_true(length(list.files(path = checkpointDir, all.files = TRUE, 
recursive = TRUE)) == 0)
+  }
+})
+
 test_that("schema(), dtypes(), columns(), names() return the correct 
values/format", {
   df <- read.json(jsonPath)
   testSchema <- schema(df)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to