Repository: spark
Updated Branches:
  refs/heads/branch-1.6 2f0f8bb66 -> d981bfccc


[SPARK-11086][SPARKR] Use dropFactors column-wise instead of nested loop when 
createDataFrame

Use `dropFactors` column-wise instead of nested loop when `createDataFrame` 
from a `data.frame`

At this moment SparkR createDataFrame is using nested loop to convert factors 
to character when called on a local data.frame.  It works but is incredibly 
slow especially with data.table (~ 2 orders of magnitude compared to PySpark / 
Pandas version on a DateFrame of size 1M rows x 2 columns).

A simple improvement is to apply `dropFactor `column-wise and then reshape 
output list.

It should at least partially address 
[SPARK-8277](https://issues.apache.org/jira/browse/SPARK-8277).

Author: zero323 <matthew.szymkiew...@gmail.com>

Closes #9099 from zero323/SPARK-11086.

(cherry picked from commit d7d9fa0b8750166f8b74f9bc321df26908683a8b)
Signed-off-by: Shivaram Venkataraman <shiva...@cs.berkeley.edu>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d981bfcc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d981bfcc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d981bfcc

Branch: refs/heads/branch-1.6
Commit: d981bfcccea5ce52bb0eb18ffa668e5db9046e05
Parents: 2f0f8bb
Author: zero323 <matthew.szymkiew...@gmail.com>
Authored: Sun Nov 15 19:15:27 2015 -0800
Committer: Shivaram Venkataraman <shiva...@cs.berkeley.edu>
Committed: Sun Nov 15 19:15:37 2015 -0800

----------------------------------------------------------------------
 R/pkg/R/SQLContext.R             | 54 +++++++++++++++++++++--------------
 R/pkg/inst/tests/test_sparkSQL.R | 16 +++++++++++
 2 files changed, 49 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d981bfcc/R/pkg/R/SQLContext.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index fd013fd..a62b25f 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -17,27 +17,33 @@
 
 # SQLcontext.R: SQLContext-driven functions
 
+
+# Map top level R type to SQL type
+getInternalType <- function(x) {
+  # class of POSIXlt is c("POSIXlt" "POSIXt")
+  switch(class(x)[[1]],
+         integer = "integer",
+         character = "string",
+         logical = "boolean",
+         double = "double",
+         numeric = "double",
+         raw = "binary",
+         list = "array",
+         struct = "struct",
+         environment = "map",
+         Date = "date",
+         POSIXlt = "timestamp",
+         POSIXct = "timestamp",
+         stop(paste("Unsupported type for DataFrame:", class(x))))
+}
+
 #' infer the SQL type
 infer_type <- function(x) {
   if (is.null(x)) {
     stop("can not infer type from NULL")
   }
 
-  # class of POSIXlt is c("POSIXlt" "POSIXt")
-  type <- switch(class(x)[[1]],
-                 integer = "integer",
-                 character = "string",
-                 logical = "boolean",
-                 double = "double",
-                 numeric = "double",
-                 raw = "binary",
-                 list = "array",
-                 struct = "struct",
-                 environment = "map",
-                 Date = "date",
-                 POSIXlt = "timestamp",
-                 POSIXct = "timestamp",
-                 stop(paste("Unsupported type for DataFrame:", class(x))))
+  type <- getInternalType(x)
 
   if (type == "map") {
     stopifnot(length(x) > 0)
@@ -90,19 +96,25 @@ createDataFrame <- function(sqlContext, data, schema = 
NULL, samplingRatio = 1.0
       if (is.null(schema)) {
         schema <- names(data)
       }
-      n <- nrow(data)
-      m <- ncol(data)
+
       # get rid of factor type
-      dropFactor <- function(x) {
+      cleanCols <- function(x) {
         if (is.factor(x)) {
           as.character(x)
         } else {
           x
         }
       }
-      data <- lapply(1:n, function(i) {
-        lapply(1:m, function(j) { dropFactor(data[i,j]) })
-      })
+
+      # drop factors and wrap lists
+      data <- setNames(lapply(data, cleanCols), NULL)
+
+      # check if all columns have supported type
+      lapply(data, getInternalType)
+
+      # convert to rows
+      args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
+      data <- do.call(mapply, append(args, data))
   }
   if (is.list(data)) {
     sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", 
"getJavaSparkContext", sqlContext)

http://git-wip-us.apache.org/repos/asf/spark/blob/d981bfcc/R/pkg/inst/tests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index af024e6..8ff0627 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -242,6 +242,14 @@ test_that("create DataFrame from list or data.frame", {
   expect_equal(count(df), 3)
   ldf2 <- collect(df)
   expect_equal(ldf$a, ldf2$a)
+
+  irisdf <- createDataFrame(sqlContext, iris)
+  iris_collected <- collect(irisdf)
+  expect_equivalent(iris_collected[,-5], iris[,-5])
+  expect_equal(iris_collected$Species, as.character(iris$Species))
+
+  mtcarsdf <- createDataFrame(sqlContext, mtcars)
+  expect_equivalent(collect(mtcarsdf), mtcars)
 })
 
 test_that("create DataFrame with different data types", {
@@ -283,6 +291,14 @@ test_that("create DataFrame with complex types", {
   expect_equal(s$b, 3L)
 })
 
+test_that("create DataFrame from a data.frame with complex types", {
+  ldf <- data.frame(row.names=1:2)
+  ldf$a_list <- list(list(1, 2), list(3, 4))
+  sdf <- createDataFrame(sqlContext, ldf)
+
+  expect_equivalent(ldf, collect(sdf))
+})
+
 # For test map type and struct type in DataFrame
 mockLinesMapType <- 
c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}",
                       
"{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to