Repository: spark
Updated Branches:
  refs/heads/branch-1.4 0ef2e9d35 -> 3e3151e75


[SPARK-8085] [SPARKR] Support user-specified schema in read.df

cc davies sun-rui

Author: Shivaram Venkataraman <shiva...@cs.berkeley.edu>

Closes #6620 from shivaram/sparkr-read-schema and squashes the following 
commits:

16a6726 [Shivaram Venkataraman] Fix loadDF to pass schema Also add a unit test
a229877 [Shivaram Venkataraman] Use wrapper function to DataFrameReader
ee70ba8 [Shivaram Venkataraman] Support user-specified schema in read.df

(cherry picked from commit 12f5eaeee1235850a076ce5716d069bd2f1205a5)
Signed-off-by: Shivaram Venkataraman <shiva...@cs.berkeley.edu>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e3151e7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e3151e7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e3151e7

Branch: refs/heads/branch-1.4
Commit: 3e3151e755dd68aa9a75188d6ecb968c7c1dff24
Parents: 0ef2e9d
Author: Shivaram Venkataraman <shiva...@cs.berkeley.edu>
Authored: Fri Jun 5 10:19:03 2015 -0700
Committer: Shivaram Venkataraman <shiva...@cs.berkeley.edu>
Committed: Fri Jun 5 10:19:15 2015 -0700

----------------------------------------------------------------------
 R/pkg/R/SQLContext.R                                 | 14 ++++++++++----
 R/pkg/inst/tests/test_sparkSQL.R                     | 13 +++++++++++++
 .../scala/org/apache/spark/sql/api/r/SQLUtils.scala  | 15 +++++++++++++++
 3 files changed, 38 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3e3151e7/R/pkg/R/SQLContext.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 88e1a50..22a4b5b 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -452,7 +452,7 @@ dropTempTable <- function(sqlContext, tableName) {
 #' df <- read.df(sqlContext, "path/to/file.json", source = "json")
 #' }
 
-read.df <- function(sqlContext, path = NULL, source = NULL, ...) {
+read.df <- function(sqlContext, path = NULL, source = NULL, schema = NULL, 
...) {
   options <- varargsToEnv(...)
   if (!is.null(path)) {
     options[['path']] <- path
@@ -462,15 +462,21 @@ read.df <- function(sqlContext, path = NULL, source = 
NULL, ...) {
     source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
                           "org.apache.spark.sql.parquet")
   }
-  sdf <- callJMethod(sqlContext, "load", source, options)
+  if (!is.null(schema)) {
+    stopifnot(class(schema) == "structType")
+    sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", 
sqlContext, source,
+                       schema$jobj, options)
+  } else {
+    sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", 
sqlContext, source, options)
+  }
   dataFrame(sdf)
 }
 
 #' @aliases loadDF
 #' @export
 
-loadDF <- function(sqlContext, path = NULL, source = NULL, ...) {
-  read.df(sqlContext, path, source, ...)
+loadDF <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) 
{
+  read.df(sqlContext, path, source, schema, ...)
 }
 
 #' Create an external table

http://git-wip-us.apache.org/repos/asf/spark/blob/3e3151e7/R/pkg/inst/tests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index d2d82e7..30edfc8 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -504,6 +504,19 @@ test_that("read.df() from json file", {
   df <- read.df(sqlContext, jsonPath, "json")
   expect_true(inherits(df, "DataFrame"))
   expect_true(count(df) == 3)
+
+  # Check if we can apply a user defined schema
+  schema <- structType(structField("name", type = "string"),
+                       structField("age", type = "double"))
+
+  df1 <- read.df(sqlContext, jsonPath, "json", schema)
+  expect_true(inherits(df1, "DataFrame"))
+  expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double")))
+
+  # Run the same with loadDF
+  df2 <- loadDF(sqlContext, jsonPath, "json", schema)
+  expect_true(inherits(df2, "DataFrame"))
+  expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double")))
 })
 
 test_that("write.df() as parquet file", {

http://git-wip-us.apache.org/repos/asf/spark/blob/3e3151e7/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
index 604f312..43b62f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -139,4 +139,19 @@ private[r] object SQLUtils {
       case "ignore" => SaveMode.Ignore
     }
   }
+
+  def loadDF(
+      sqlContext: SQLContext,
+      source: String,
+      options: java.util.Map[String, String]): DataFrame = {
+    sqlContext.read.format(source).options(options).load()
+  }
+
+  def loadDF(
+      sqlContext: SQLContext,
+      source: String,
+      schema: StructType,
+      options: java.util.Map[String, String]): DataFrame = {
+    sqlContext.read.format(source).schema(schema).options(options).load()
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to