Repository: spark
Updated Branches:
  refs/heads/master 285a7798e -> b0e8eb6d3


[SPARK-18335][SPARKR] createDataFrame to support numPartitions parameter

## What changes were proposed in this pull request?

To allow specifying number of partitions when the DataFrame is created

## How was this patch tested?

manual, unit tests

Author: Felix Cheung <felixcheun...@hotmail.com>

Closes #16512 from felixcheung/rnumpart.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b0e8eb6d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b0e8eb6d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b0e8eb6d

Branch: refs/heads/master
Commit: b0e8eb6d3e9e80fa62625a5b9382d93af77250db
Parents: 285a779
Author: Felix Cheung <felixcheun...@hotmail.com>
Authored: Fri Jan 13 10:08:14 2017 -0800
Committer: Shivaram Venkataraman <shiva...@cs.berkeley.edu>
Committed: Fri Jan 13 10:08:14 2017 -0800

----------------------------------------------------------------------
 R/pkg/R/SQLContext.R                      | 20 +++++++++----
 R/pkg/R/context.R                         | 39 ++++++++++++++++++++++----
 R/pkg/inst/tests/testthat/test_rdd.R      |  4 +--
 R/pkg/inst/tests/testthat/test_sparkSQL.R | 23 ++++++++++++++-
 4 files changed, 72 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b0e8eb6d/R/pkg/R/SQLContext.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 6f48cd6..e771a05 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -184,8 +184,11 @@ getDefaultSqlSource <- function() {
 #'
 #' Converts R data.frame or list into SparkDataFrame.
 #'
-#' @param data an RDD or list or data.frame.
+#' @param data a list or data.frame.
 #' @param schema a list of column names or named list (StructType), optional.
+#' @param samplingRatio Currently not used.
+#' @param numPartitions the number of partitions of the SparkDataFrame. 
Defaults to 1, this is
+#'        limited by length of the list or number of rows of the data.frame
 #' @return A SparkDataFrame.
 #' @rdname createDataFrame
 #' @export
@@ -195,12 +198,14 @@ getDefaultSqlSource <- function() {
 #' df1 <- as.DataFrame(iris)
 #' df2 <- as.DataFrame(list(3,4,5,6))
 #' df3 <- createDataFrame(iris)
+#' df4 <- createDataFrame(cars, numPartitions = 2)
 #' }
 #' @name createDataFrame
 #' @method createDataFrame default
 #' @note createDataFrame since 1.4.0
 # TODO(davies): support sampling and infer type from NA
-createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) {
+createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0,
+                                    numPartitions = NULL) {
   sparkSession <- getSparkSession()
 
   if (is.data.frame(data)) {
@@ -233,7 +238,11 @@ createDataFrame.default <- function(data, schema = NULL, 
samplingRatio = 1.0) {
 
   if (is.list(data)) {
     sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", 
"getJavaSparkContext", sparkSession)
-    rdd <- parallelize(sc, data)
+    if (!is.null(numPartitions)) {
+      rdd <- parallelize(sc, data, numSlices = numToInt(numPartitions))
+    } else {
+      rdd <- parallelize(sc, data, numSlices = 1)
+    }
   } else if (inherits(data, "RDD")) {
     rdd <- data
   } else {
@@ -283,14 +292,13 @@ createDataFrame <- function(x, ...) {
   dispatchFunc("createDataFrame(data, schema = NULL)", x, ...)
 }
 
-#' @param samplingRatio Currently not used.
 #' @rdname createDataFrame
 #' @aliases createDataFrame
 #' @export
 #' @method as.DataFrame default
 #' @note as.DataFrame since 1.6.0
-as.DataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0) {
-  createDataFrame(data, schema)
+as.DataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0, 
numPartitions = NULL) {
+  createDataFrame(data, schema, samplingRatio, numPartitions)
 }
 
 #' @param ... additional argument(s).

http://git-wip-us.apache.org/repos/asf/spark/blob/b0e8eb6d/R/pkg/R/context.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index 1138caf..1a0dd65 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -91,6 +91,16 @@ objectFile <- function(sc, path, minPartitions = NULL) {
 #' will write it to disk and send the file name to JVM. Also to make sure each 
slice is not
 #' larger than that limit, number of slices may be increased.
 #'
+#' In 2.2.0 we are changing how the numSlices are used/computed to handle
+#' 1 < (length(coll) / numSlices) << length(coll) better, and to get the exact 
number of slices.
+#' This change affects both createDataFrame and spark.lapply.
+#' In the specific one case that it is used to convert R native object into 
SparkDataFrame, it has
+#' always been kept at the default of 1. In the case the object is large, we 
are explicitly setting
+#' the parallism to numSlices (which is still 1).
+#'
+#' Specifically, we are changing to split positions to match the calculation 
in positions() of
+#' ParallelCollectionRDD in Spark.
+#'
 #' @param sc SparkContext to use
 #' @param coll collection to parallelize
 #' @param numSlices number of partitions to create in the RDD
@@ -107,6 +117,8 @@ parallelize <- function(sc, coll, numSlices = 1) {
   # TODO: bound/safeguard numSlices
   # TODO: unit tests for if the split works for all primitives
   # TODO: support matrix, data frame, etc
+
+  # Note, for data.frame, createDataFrame turns it into a list before it calls 
here.
   # nolint start
   # suppress lintr warning: Place a space before left parenthesis, except in a 
function call.
   if ((!is.list(coll) && !is.vector(coll)) || is.data.frame(coll)) {
@@ -128,12 +140,29 @@ parallelize <- function(sc, coll, numSlices = 1) {
   objectSize <- object.size(coll)
 
   # For large objects we make sure the size of each slice is also smaller than 
sizeLimit
-  numSlices <- max(numSlices, ceiling(objectSize / sizeLimit))
-  if (numSlices > length(coll))
-    numSlices <- length(coll)
+  numSerializedSlices <- max(numSlices, ceiling(objectSize / sizeLimit))
+  if (numSerializedSlices > length(coll))
+    numSerializedSlices <- length(coll)
+
+  # Generate the slice ids to put each row
+  # For instance, for numSerializedSlices of 22, length of 50
+  #  [1]  0  0  2  2  4  4  6  6  6  9  9 11 11 13 13 15 15 15 18 18 20 20 22 
22 22
+  # [26] 25 25 27 27 29 29 31 31 31 34 34 36 36 38 38 40 40 40 43 43 45 45 47 
47 47
+  # Notice the slice group with 3 slices (ie. 6, 15, 22) are roughly evenly 
spaced.
+  # We are trying to reimplement the calculation in the positions method in 
ParallelCollectionRDD
+  splits <- if (numSerializedSlices > 0) {
+    unlist(lapply(0: (numSerializedSlices - 1), function(x) {
+      # nolint start
+      start <- trunc((x * length(coll)) / numSerializedSlices)
+      end <- trunc(((x + 1) * length(coll)) / numSerializedSlices)
+      # nolint end
+      rep(start, end - start)
+    }))
+  } else {
+    1
+  }
 
-  sliceLen <- ceiling(length(coll) / numSlices)
-  slices <- split(coll, rep(1: (numSlices + 1), each = 
sliceLen)[1:length(coll)])
+  slices <- split(coll, splits)
 
   # Serialize each slice: obtain a list of raws, or a list of lists (slices) of
   # 2-tuples of raws

http://git-wip-us.apache.org/repos/asf/spark/blob/b0e8eb6d/R/pkg/inst/tests/testthat/test_rdd.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_rdd.R 
b/R/pkg/inst/tests/testthat/test_rdd.R
index a3d66c2..2c41a6b 100644
--- a/R/pkg/inst/tests/testthat/test_rdd.R
+++ b/R/pkg/inst/tests/testthat/test_rdd.R
@@ -381,8 +381,8 @@ test_that("aggregateRDD() on RDDs", {
 test_that("zipWithUniqueId() on RDDs", {
   rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
   actual <- collectRDD(zipWithUniqueId(rdd))
-  expected <- list(list("a", 0), list("b", 3), list("c", 1),
-                   list("d", 4), list("e", 2))
+  expected <- list(list("a", 0), list("b", 1), list("c", 4),
+                   list("d", 2), list("e", 5))
   expect_equal(actual, expected)
 
   rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L)

http://git-wip-us.apache.org/repos/asf/spark/blob/b0e8eb6d/R/pkg/inst/tests/testthat/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R 
b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 3e8b96a..2601742 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -196,6 +196,26 @@ test_that("create DataFrame from RDD", {
   expect_equal(dtypes(df), list(c("name", "string"), c("age", "int"), 
c("height", "float")))
   expect_equal(as.list(collect(where(df, df$name == "John"))),
                list(name = "John", age = 19L, height = 176.5))
+  expect_equal(getNumPartitions(toRDD(df)), 1)
+
+  df <- as.DataFrame(cars, numPartitions = 2)
+  expect_equal(getNumPartitions(toRDD(df)), 2)
+  df <- createDataFrame(cars, numPartitions = 3)
+  expect_equal(getNumPartitions(toRDD(df)), 3)
+  # validate limit by num of rows
+  df <- createDataFrame(cars, numPartitions = 60)
+  expect_equal(getNumPartitions(toRDD(df)), 50)
+  # validate when 1 < (length(coll) / numSlices) << length(coll)
+  df <- createDataFrame(cars, numPartitions = 20)
+  expect_equal(getNumPartitions(toRDD(df)), 20)
+
+  df <- as.DataFrame(data.frame(0))
+  expect_is(df, "SparkDataFrame")
+  df <- createDataFrame(list(list(1)))
+  expect_is(df, "SparkDataFrame")
+  df <- as.DataFrame(data.frame(0), numPartitions = 2)
+  # no data to partition, goes to 1
+  expect_equal(getNumPartitions(toRDD(df)), 1)
 
   setHiveContext(sc)
   sql("CREATE TABLE people (name string, age double, height float)")
@@ -213,7 +233,8 @@ test_that("createDataFrame uses files for large objects", {
   # To simulate a large file scenario, we set spark.r.maxAllocationLimit to a 
smaller value
   conf <- callJMethod(sparkSession, "conf")
   callJMethod(conf, "set", "spark.r.maxAllocationLimit", "100")
-  df <- suppressWarnings(createDataFrame(iris))
+  df <- suppressWarnings(createDataFrame(iris, numPartitions = 3))
+  expect_equal(getNumPartitions(toRDD(df)), 3)
 
   # Resetting the conf back to default value
   callJMethod(conf, "set", "spark.r.maxAllocationLimit", 
toString(.Machine$integer.max / 10))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to