Repository: spark Updated Branches: refs/heads/master ded6d27e4 -> 76e8a1d7e
[SPARK-22843][R] Adds localCheckpoint in R ## What changes were proposed in this pull request? This PR proposes to add `localCheckpoint(..)` in R API. ```r df <- localCheckpoint(createDataFrame(iris)) ``` ## How was this patch tested? Unit tests added in `R/pkg/tests/fulltests/test_sparkSQL.R` Author: hyukjinkwon <gurwls...@gmail.com> Closes #20073 from HyukjinKwon/SPARK-22843. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76e8a1d7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76e8a1d7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76e8a1d7 Branch: refs/heads/master Commit: 76e8a1d7e2619c1e6bd75c399314d2583a86b93b Parents: ded6d27 Author: hyukjinkwon <gurwls...@gmail.com> Authored: Thu Dec 28 20:17:26 2017 +0900 Committer: hyukjinkwon <gurwls...@gmail.com> Committed: Thu Dec 28 20:17:26 2017 +0900 ---------------------------------------------------------------------- R/pkg/NAMESPACE | 1 + R/pkg/R/DataFrame.R | 27 +++++++++++++++++++++++++++ R/pkg/R/generics.R | 4 ++++ R/pkg/tests/fulltests/test_sparkSQL.R | 22 ++++++++++++++++++++++ 4 files changed, 54 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/76e8a1d7/R/pkg/NAMESPACE ---------------------------------------------------------------------- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index dce64e1..4b699de 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -133,6 +133,7 @@ exportMethods("arrange", "isStreaming", "join", "limit", + "localCheckpoint", "merge", "mutate", "na.omit", http://git-wip-us.apache.org/repos/asf/spark/blob/76e8a1d7/R/pkg/R/DataFrame.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index b8d732a..ace49da 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -3782,6 +3782,33 @@ setMethod("checkpoint", dataFrame(df) }) +#' localCheckpoint +#' +#' Returns a locally checkpointed version of this SparkDataFrame. Checkpointing can be used to +#' truncate the logical plan, which is especially useful in iterative algorithms where the plan +#' may grow exponentially. Local checkpoints are stored in the executors using the caching +#' subsystem and therefore they are not reliable. +#' +#' @param x A SparkDataFrame +#' @param eager whether to locally checkpoint this SparkDataFrame immediately +#' @return a new locally checkpointed SparkDataFrame +#' @family SparkDataFrame functions +#' @aliases localCheckpoint,SparkDataFrame-method +#' @rdname localCheckpoint +#' @name localCheckpoint +#' @export +#' @examples +#'\dontrun{ +#' df <- localCheckpoint(df) +#' } +#' @note localCheckpoint since 2.3.0 +setMethod("localCheckpoint", + signature(x = "SparkDataFrame"), + function(x, eager = TRUE) { + df <- callJMethod(x@sdf, "localCheckpoint", as.logical(eager)) + dataFrame(df) + }) + #' cube #' #' Create a multi-dimensional cube for the SparkDataFrame using the specified columns. http://git-wip-us.apache.org/repos/asf/spark/blob/76e8a1d7/R/pkg/R/generics.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 5ddaa66..d5d0bc9 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -611,6 +611,10 @@ setGeneric("isStreaming", function(x) { standardGeneric("isStreaming") }) #' @export setGeneric("limit", function(x, num) {standardGeneric("limit") }) +#' @rdname localCheckpoint +#' @export +setGeneric("localCheckpoint", function(x, eager = TRUE) { standardGeneric("localCheckpoint") }) + #' @rdname merge #' @export setGeneric("merge") http://git-wip-us.apache.org/repos/asf/spark/blob/76e8a1d7/R/pkg/tests/fulltests/test_sparkSQL.R ---------------------------------------------------------------------- diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 6cc0188..650e7c0 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -957,6 +957,28 @@ test_that("setCheckpointDir(), checkpoint() on a DataFrame", { } }) +test_that("localCheckpoint() on a DataFrame", { + if (windows_with_hadoop()) { + # Checkpoint directory shouldn't matter in localCheckpoint. + checkpointDir <- file.path(tempdir(), "lcproot") + expect_true(length(list.files(path = checkpointDir, all.files = TRUE, recursive = TRUE)) == 0) + setCheckpointDir(checkpointDir) + + textPath <- tempfile(pattern = "textPath", fileext = ".txt") + writeLines(mockLines, textPath) + # Read it lazily and then locally checkpoint eagerly. + df <- read.df(textPath, "text") + df <- localCheckpoint(df, eager = TRUE) + # Here, we remove the source path to check eagerness. + unlink(textPath) + expect_is(df, "SparkDataFrame") + expect_equal(colnames(df), c("value")) + expect_equal(count(df), 3) + + expect_true(length(list.files(path = checkpointDir, all.files = TRUE, recursive = TRUE)) == 0) + } +}) + test_that("schema(), dtypes(), columns(), names() return the correct values/format", { df <- read.json(jsonPath) testSchema <- schema(df) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org