spark git commit: [SPARK-18788][SPARKR] Add API for getNumPartitions
Repository: spark Updated Branches: refs/heads/master c0ba28430 -> 90817a6cd [SPARK-18788][SPARKR] Add API for getNumPartitions ## What changes were proposed in this pull request? With doc to say this would convert DF into RDD ## How was this patch tested? unit tests, manual tests Author: Felix CheungCloses #16668 from felixcheung/rgetnumpartitions. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90817a6c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90817a6c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90817a6c Branch: refs/heads/master Commit: 90817a6cd06068fa9f9ff77384a1fcba73b43006 Parents: c0ba284 Author: Felix Cheung Authored: Thu Jan 26 21:06:39 2017 -0800 Committer: Felix Cheung Committed: Thu Jan 26 21:06:39 2017 -0800 -- R/pkg/NAMESPACE | 1 + R/pkg/R/DataFrame.R | 23 R/pkg/R/RDD.R | 30 +- R/pkg/R/generics.R| 8 +-- R/pkg/R/pairRDD.R | 4 ++-- R/pkg/inst/tests/testthat/test_rdd.R | 10 - R/pkg/inst/tests/testthat/test_sparkSQL.R | 14 ++-- 7 files changed, 59 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/90817a6c/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index caa1c3b..7ff6e9a 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -95,6 +95,7 @@ exportMethods("arrange", "freqItems", "gapply", "gapplyCollect", + "getNumPartitions", "group_by", "groupBy", "head", http://git-wip-us.apache.org/repos/asf/spark/blob/90817a6c/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 0a10122..523343e 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -3428,3 +3428,26 @@ setMethod("randomSplit", } sapply(sdfs, dataFrame) }) + +#' getNumPartitions +#' +#' Return the number of partitions +#' +#' @param x A SparkDataFrame +#' @family SparkDataFrame functions +#' @aliases getNumPartitions,SparkDataFrame-method +#' @rdname getNumPartitions +#' @name getNumPartitions +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' df <- createDataFrame(cars, numPartitions = 2) +#' getNumPartitions(df) +#' } +#' @note getNumPartitions since 2.1.1 +setMethod("getNumPartitions", + signature(x = "SparkDataFrame"), + function(x) { +callJMethod(callJMethod(x@sdf, "rdd"), "getNumPartitions") + }) http://git-wip-us.apache.org/repos/asf/spark/blob/90817a6c/R/pkg/R/RDD.R -- diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index 0f1162f..91bab33 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -313,7 +313,7 @@ setMethod("checkpoint", #' @rdname getNumPartitions #' @aliases getNumPartitions,RDD-method #' @noRd -setMethod("getNumPartitions", +setMethod("getNumPartitionsRDD", signature(x = "RDD"), function(x) { callJMethod(getJRDD(x), "getNumPartitions") @@ -329,7 +329,7 @@ setMethod("numPartitions", signature(x = "RDD"), function(x) { .Deprecated("getNumPartitions") -getNumPartitions(x) +getNumPartitionsRDD(x) }) #' Collect elements of an RDD @@ -460,7 +460,7 @@ setMethod("countByValue", signature(x = "RDD"), function(x) { ones <- lapply(x, function(item) { list(item, 1L) }) -collectRDD(reduceByKey(ones, `+`, getNumPartitions(x))) +collectRDD(reduceByKey(ones, `+`, getNumPartitionsRDD(x))) }) #' Apply a function to all elements @@ -780,7 +780,7 @@ setMethod("takeRDD", resList <- list() index <- -1 jrdd <- getJRDD(x) -numPartitions <- getNumPartitions(x) +numPartitions <- getNumPartitionsRDD(x) serializedModeRDD <- getSerializedMode(x) # TODO(shivaram): Collect more than one partition based on size @@ -846,7 +846,7 @@ setMethod("firstRDD", #' @noRd setMethod("distinctRDD", signature(x = "RDD"), - function(x, numPartitions = SparkR:::getNumPartitions(x)) { + function(x, numPartitions = SparkR:::getNumPartitionsRDD(x)) { identical.mapped <- lapply(x, function(x) { list(x, NULL) }) reduced <-
spark git commit: [SPARK-18788][SPARKR] Add API for getNumPartitions
Repository: spark Updated Branches: refs/heads/branch-2.1 59502bbcf -> ba2a5ada4 [SPARK-18788][SPARKR] Add API for getNumPartitions ## What changes were proposed in this pull request? With doc to say this would convert DF into RDD ## How was this patch tested? unit tests, manual tests Author: Felix CheungCloses #16668 from felixcheung/rgetnumpartitions. (cherry picked from commit 90817a6cd06068fa9f9ff77384a1fcba73b43006) Signed-off-by: Felix Cheung Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba2a5ada Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba2a5ada Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba2a5ada Branch: refs/heads/branch-2.1 Commit: ba2a5ada4825a9ca3e4e954a51574a2eede096a3 Parents: 59502bb Author: Felix Cheung Authored: Thu Jan 26 21:06:39 2017 -0800 Committer: Felix Cheung Committed: Thu Jan 26 21:06:54 2017 -0800 -- R/pkg/NAMESPACE | 1 + R/pkg/R/DataFrame.R | 23 R/pkg/R/RDD.R | 30 +- R/pkg/R/generics.R| 8 +-- R/pkg/R/pairRDD.R | 4 ++-- R/pkg/inst/tests/testthat/test_rdd.R | 10 - R/pkg/inst/tests/testthat/test_sparkSQL.R | 14 ++-- 7 files changed, 59 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ba2a5ada/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index c3ec3f4..8a19fd0 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -94,6 +94,7 @@ exportMethods("arrange", "freqItems", "gapply", "gapplyCollect", + "getNumPartitions", "group_by", "groupBy", "head", http://git-wip-us.apache.org/repos/asf/spark/blob/ba2a5ada/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 48ac307..39e8376 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -3422,3 +3422,26 @@ setMethod("randomSplit", } sapply(sdfs, dataFrame) }) + +#' getNumPartitions +#' +#' Return the number of partitions +#' +#' @param x A SparkDataFrame +#' @family SparkDataFrame functions +#' @aliases getNumPartitions,SparkDataFrame-method +#' @rdname getNumPartitions +#' @name getNumPartitions +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' df <- createDataFrame(cars, numPartitions = 2) +#' getNumPartitions(df) +#' } +#' @note getNumPartitions since 2.1.1 +setMethod("getNumPartitions", + signature(x = "SparkDataFrame"), + function(x) { +callJMethod(callJMethod(x@sdf, "rdd"), "getNumPartitions") + }) http://git-wip-us.apache.org/repos/asf/spark/blob/ba2a5ada/R/pkg/R/RDD.R -- diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index 0f1162f..91bab33 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -313,7 +313,7 @@ setMethod("checkpoint", #' @rdname getNumPartitions #' @aliases getNumPartitions,RDD-method #' @noRd -setMethod("getNumPartitions", +setMethod("getNumPartitionsRDD", signature(x = "RDD"), function(x) { callJMethod(getJRDD(x), "getNumPartitions") @@ -329,7 +329,7 @@ setMethod("numPartitions", signature(x = "RDD"), function(x) { .Deprecated("getNumPartitions") -getNumPartitions(x) +getNumPartitionsRDD(x) }) #' Collect elements of an RDD @@ -460,7 +460,7 @@ setMethod("countByValue", signature(x = "RDD"), function(x) { ones <- lapply(x, function(item) { list(item, 1L) }) -collectRDD(reduceByKey(ones, `+`, getNumPartitions(x))) +collectRDD(reduceByKey(ones, `+`, getNumPartitionsRDD(x))) }) #' Apply a function to all elements @@ -780,7 +780,7 @@ setMethod("takeRDD", resList <- list() index <- -1 jrdd <- getJRDD(x) -numPartitions <- getNumPartitions(x) +numPartitions <- getNumPartitionsRDD(x) serializedModeRDD <- getSerializedMode(x) # TODO(shivaram): Collect more than one partition based on size @@ -846,7 +846,7 @@ setMethod("firstRDD", #' @noRd setMethod("distinctRDD", signature(x = "RDD"), - function(x, numPartitions = SparkR:::getNumPartitions(x)) { + function(x, numPartitions =