Repository: spark Updated Branches: refs/heads/branch-1.4 0ef2e9d35 -> 3e3151e75
[SPARK-8085] [SPARKR] Support user-specified schema in read.df cc davies sun-rui Author: Shivaram Venkataraman <shiva...@cs.berkeley.edu> Closes #6620 from shivaram/sparkr-read-schema and squashes the following commits: 16a6726 [Shivaram Venkataraman] Fix loadDF to pass schema Also add a unit test a229877 [Shivaram Venkataraman] Use wrapper function to DataFrameReader ee70ba8 [Shivaram Venkataraman] Support user-specified schema in read.df (cherry picked from commit 12f5eaeee1235850a076ce5716d069bd2f1205a5) Signed-off-by: Shivaram Venkataraman <shiva...@cs.berkeley.edu> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e3151e7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e3151e7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e3151e7 Branch: refs/heads/branch-1.4 Commit: 3e3151e755dd68aa9a75188d6ecb968c7c1dff24 Parents: 0ef2e9d Author: Shivaram Venkataraman <shiva...@cs.berkeley.edu> Authored: Fri Jun 5 10:19:03 2015 -0700 Committer: Shivaram Venkataraman <shiva...@cs.berkeley.edu> Committed: Fri Jun 5 10:19:15 2015 -0700 ---------------------------------------------------------------------- R/pkg/R/SQLContext.R | 14 ++++++++++---- R/pkg/inst/tests/test_sparkSQL.R | 13 +++++++++++++ .../scala/org/apache/spark/sql/api/r/SQLUtils.scala | 15 +++++++++++++++ 3 files changed, 38 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3e3151e7/R/pkg/R/SQLContext.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 88e1a50..22a4b5b 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -452,7 +452,7 @@ dropTempTable <- function(sqlContext, tableName) { #' df <- read.df(sqlContext, "path/to/file.json", source = "json") #' } -read.df <- function(sqlContext, path = NULL, source = NULL, ...) { +read.df <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) { options <- varargsToEnv(...) if (!is.null(path)) { options[['path']] <- path @@ -462,15 +462,21 @@ read.df <- function(sqlContext, path = NULL, source = NULL, ...) { source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default", "org.apache.spark.sql.parquet") } - sdf <- callJMethod(sqlContext, "load", source, options) + if (!is.null(schema)) { + stopifnot(class(schema) == "structType") + sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sqlContext, source, + schema$jobj, options) + } else { + sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sqlContext, source, options) + } dataFrame(sdf) } #' @aliases loadDF #' @export -loadDF <- function(sqlContext, path = NULL, source = NULL, ...) { - read.df(sqlContext, path, source, ...) +loadDF <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) { + read.df(sqlContext, path, source, schema, ...) } #' Create an external table http://git-wip-us.apache.org/repos/asf/spark/blob/3e3151e7/R/pkg/inst/tests/test_sparkSQL.R ---------------------------------------------------------------------- diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index d2d82e7..30edfc8 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -504,6 +504,19 @@ test_that("read.df() from json file", { df <- read.df(sqlContext, jsonPath, "json") expect_true(inherits(df, "DataFrame")) expect_true(count(df) == 3) + + # Check if we can apply a user defined schema + schema <- structType(structField("name", type = "string"), + structField("age", type = "double")) + + df1 <- read.df(sqlContext, jsonPath, "json", schema) + expect_true(inherits(df1, "DataFrame")) + expect_equal(dtypes(df1), list(c("name", "string"), c("age", "double"))) + + # Run the same with loadDF + df2 <- loadDF(sqlContext, jsonPath, "json", schema) + expect_true(inherits(df2, "DataFrame")) + expect_equal(dtypes(df2), list(c("name", "string"), c("age", "double"))) }) test_that("write.df() as parquet file", { http://git-wip-us.apache.org/repos/asf/spark/blob/3e3151e7/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index 604f312..43b62f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -139,4 +139,19 @@ private[r] object SQLUtils { case "ignore" => SaveMode.Ignore } } + + def loadDF( + sqlContext: SQLContext, + source: String, + options: java.util.Map[String, String]): DataFrame = { + sqlContext.read.format(source).options(options).load() + } + + def loadDF( + sqlContext: SQLContext, + source: String, + schema: StructType, + options: java.util.Map[String, String]): DataFrame = { + sqlContext.read.format(source).schema(schema).options(options).load() + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org