Repository: spark Updated Branches: refs/heads/branch-1.4 b928db4fe -> c636b87dc
[SPARK-6806] [SPARKR] [DOCS] Fill in SparkR examples in programming guide sqlCtx -> sqlContext You can check the docs by: ``` $ cd docs $ SKIP_SCALADOC=1 jekyll serve ``` cc shivaram Author: Davies Liu <dav...@databricks.com> Closes #5442 from davies/r_docs and squashes the following commits: 7a12ec6 [Davies Liu] remove rdd in R docs 8496b26 [Davies Liu] remove the docs related to RDD e23b9d6 [Davies Liu] delete R docs for RDD API 222e4ff [Davies Liu] Merge branch 'master' into r_docs 89684ce [Davies Liu] Merge branch 'r_docs' of github.com:davies/spark into r_docs f0a10e1 [Davies Liu] address comments from @shivaram f61de71 [Davies Liu] Update pairRDD.R 3ef7cf3 [Davies Liu] use + instead of function(a,b) a+b 2f10a77 [Davies Liu] address comments from @cafreeman 9c2a062 [Davies Liu] mention R api together with Python API 23f751a [Davies Liu] Fill in SparkR examples in programming guide (cherry picked from commit 7af3818c6b2bf35bfa531ab7cc3a4a714385015e) Signed-off-by: Shivaram Venkataraman <shiva...@cs.berkeley.edu> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c636b87d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c636b87d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c636b87d Branch: refs/heads/branch-1.4 Commit: c636b87dc287ce99a887bc59cad31aaf48477a56 Parents: b928db4 Author: Davies Liu <dav...@databricks.com> Authored: Sat May 23 00:00:30 2015 -0700 Committer: Shivaram Venkataraman <shiva...@cs.berkeley.edu> Committed: Sat May 23 00:02:22 2015 -0700 ---------------------------------------------------------------------- R/README.md | 4 +- R/pkg/R/DataFrame.R | 176 ++++++++-------- R/pkg/R/RDD.R | 2 +- R/pkg/R/SQLContext.R | 165 ++++++++------- R/pkg/R/pairRDD.R | 4 +- R/pkg/R/sparkR.R | 10 +- R/pkg/inst/profile/shell.R | 6 +- R/pkg/inst/tests/test_sparkSQL.R | 156 +++++++------- docs/_plugins/copy_api_dirs.rb | 68 ++++--- docs/api.md | 3 +- docs/index.md | 23 ++- docs/programming-guide.md | 21 +- docs/quick-start.md | 18 +- docs/sql-programming-guide.md | 373 +++++++++++++++++++++++++++++++++- 14 files changed, 706 insertions(+), 323 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c636b87d/R/README.md ---------------------------------------------------------------------- diff --git a/R/README.md b/R/README.md index a6970e3..d7d65b4 100644 --- a/R/README.md +++ b/R/README.md @@ -52,7 +52,7 @@ The SparkR documentation (Rd files and HTML files) are not a part of the source SparkR comes with several sample programs in the `examples/src/main/r` directory. To run one of them, use `./bin/sparkR <filename> <args>`. For example: - ./bin/sparkR examples/src/main/r/pi.R local[2] + ./bin/sparkR examples/src/main/r/dataframe.R You can also run the unit-tests for SparkR by running (you need to install the [testthat](http://cran.r-project.org/web/packages/testthat/index.html) package first): @@ -63,5 +63,5 @@ You can also run the unit-tests for SparkR by running (you need to install the [ The `./bin/spark-submit` and `./bin/sparkR` can also be used to submit jobs to YARN clusters. You will need to set YARN conf dir before doing so. For example on CDH you can run ``` export YARN_CONF_DIR=/etc/hadoop/conf -./bin/spark-submit --master yarn examples/src/main/r/pi.R 4 +./bin/spark-submit --master yarn examples/src/main/r/dataframe.R ``` http://git-wip-us.apache.org/repos/asf/spark/blob/c636b87d/R/pkg/R/DataFrame.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index a7fa32e..ed8093c 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -65,9 +65,9 @@ dataFrame <- function(sdf, isCached = FALSE) { #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' printSchema(df) #'} setMethod("printSchema", @@ -88,9 +88,9 @@ setMethod("printSchema", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' dfSchema <- schema(df) #'} setMethod("schema", @@ -110,9 +110,9 @@ setMethod("schema", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' explain(df, TRUE) #'} setMethod("explain", @@ -139,9 +139,9 @@ setMethod("explain", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' isLocal(df) #'} setMethod("isLocal", @@ -162,9 +162,9 @@ setMethod("isLocal", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' showDF(df) #'} setMethod("showDF", @@ -185,9 +185,9 @@ setMethod("showDF", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' df #'} setMethod("show", "DataFrame", @@ -210,9 +210,9 @@ setMethod("show", "DataFrame", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' dtypes(df) #'} setMethod("dtypes", @@ -234,9 +234,9 @@ setMethod("dtypes", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' columns(df) #'} setMethod("columns", @@ -267,11 +267,11 @@ setMethod("names", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' registerTempTable(df, "json_df") -#' new_df <- sql(sqlCtx, "SELECT * FROM json_df") +#' new_df <- sql(sqlContext, "SELECT * FROM json_df") #'} setMethod("registerTempTable", signature(x = "DataFrame", tableName = "character"), @@ -293,9 +293,9 @@ setMethod("registerTempTable", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) -#' df <- read.df(sqlCtx, path, "parquet") -#' df2 <- read.df(sqlCtx, path2, "parquet") +#' sqlContext <- sparkRSQL.init(sc) +#' df <- read.df(sqlContext, path, "parquet") +#' df2 <- read.df(sqlContext, path2, "parquet") #' registerTempTable(df, "table1") #' insertInto(df2, "table1", overwrite = TRUE) #'} @@ -316,9 +316,9 @@ setMethod("insertInto", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' cache(df) #'} setMethod("cache", @@ -341,9 +341,9 @@ setMethod("cache", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' persist(df, "MEMORY_AND_DISK") #'} setMethod("persist", @@ -366,9 +366,9 @@ setMethod("persist", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' persist(df, "MEMORY_AND_DISK") #' unpersist(df) #'} @@ -391,9 +391,9 @@ setMethod("unpersist", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' newDF <- repartition(df, 2L) #'} setMethod("repartition", @@ -415,9 +415,9 @@ setMethod("repartition", # @examples #\dontrun{ # sc <- sparkR.init() -# sqlCtx <- sparkRSQL.init(sc) +# sqlContext <- sparkRSQL.init(sc) # path <- "path/to/file.json" -# df <- jsonFile(sqlCtx, path) +# df <- jsonFile(sqlContext, path) # newRDD <- toJSON(df) #} setMethod("toJSON", @@ -440,9 +440,9 @@ setMethod("toJSON", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' saveAsParquetFile(df, "/tmp/sparkr-tmp/") #'} setMethod("saveAsParquetFile", @@ -461,9 +461,9 @@ setMethod("saveAsParquetFile", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' distinctDF <- distinct(df) #'} setMethod("distinct", @@ -486,9 +486,9 @@ setMethod("distinct", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' collect(sample(df, FALSE, 0.5)) #' collect(sample(df, TRUE, 0.5)) #'} @@ -523,9 +523,9 @@ setMethod("sample_frac", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' count(df) #' } setMethod("count", @@ -545,9 +545,9 @@ setMethod("count", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' collected <- collect(df) #' firstName <- collected[[1]]$name #' } @@ -580,9 +580,9 @@ setMethod("collect", #' @examples #' \dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' limitedDF <- limit(df, 10) #' } setMethod("limit", @@ -599,9 +599,9 @@ setMethod("limit", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' take(df, 2) #' } setMethod("take", @@ -626,9 +626,9 @@ setMethod("take", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' head(df) #' } setMethod("head", @@ -647,9 +647,9 @@ setMethod("head", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' first(df) #' } setMethod("first", @@ -669,9 +669,9 @@ setMethod("first", # @examples #\dontrun{ # sc <- sparkR.init() -# sqlCtx <- sparkRSQL.init(sc) +# sqlContext <- sparkRSQL.init(sc) # path <- "path/to/file.json" -# df <- jsonFile(sqlCtx, path) +# df <- jsonFile(sqlContext, path) # rdd <- toRDD(df) # } setMethod("toRDD", @@ -938,9 +938,9 @@ setMethod("select", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' selectExpr(df, "col1", "(col2 * 5) as newCol") #' } setMethod("selectExpr", @@ -964,9 +964,9 @@ setMethod("selectExpr", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' newDF <- withColumn(df, "newCol", df$col1 * 5) #' } setMethod("withColumn", @@ -988,9 +988,9 @@ setMethod("withColumn", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' newDF <- mutate(df, newCol = df$col1 * 5, newCol2 = df$col1 * 2) #' names(newDF) # Will contain newCol, newCol2 #' } @@ -1024,9 +1024,9 @@ setMethod("mutate", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' newDF <- withColumnRenamed(df, "col1", "newCol1") #' } setMethod("withColumnRenamed", @@ -1055,9 +1055,9 @@ setMethod("withColumnRenamed", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' newDF <- rename(df, col1 = df$newCol1) #' } setMethod("rename", @@ -1095,9 +1095,9 @@ setClassUnion("characterOrColumn", c("character", "Column")) #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' arrange(df, df$col1) #' arrange(df, "col1") #' arrange(df, asc(df$col1), desc(abs(df$col2))) @@ -1137,9 +1137,9 @@ setMethod("orderBy", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' filter(df, "col1 > 0") #' filter(df, df$col2 != "abcdefg") #' } @@ -1177,9 +1177,9 @@ setMethod("where", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) -#' df1 <- jsonFile(sqlCtx, path) -#' df2 <- jsonFile(sqlCtx, path2) +#' sqlContext <- sparkRSQL.init(sc) +#' df1 <- jsonFile(sqlContext, path) +#' df2 <- jsonFile(sqlContext, path2) #' join(df1, df2) # Performs a Cartesian #' join(df1, df2, df1$col1 == df2$col2) # Performs an inner join based on expression #' join(df1, df2, df1$col1 == df2$col2, "right_outer") @@ -1219,9 +1219,9 @@ setMethod("join", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) -#' df1 <- jsonFile(sqlCtx, path) -#' df2 <- jsonFile(sqlCtx, path2) +#' sqlContext <- sparkRSQL.init(sc) +#' df1 <- jsonFile(sqlContext, path) +#' df2 <- jsonFile(sqlContext, path2) #' unioned <- unionAll(df, df2) #' } setMethod("unionAll", @@ -1244,9 +1244,9 @@ setMethod("unionAll", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) -#' df1 <- jsonFile(sqlCtx, path) -#' df2 <- jsonFile(sqlCtx, path2) +#' sqlContext <- sparkRSQL.init(sc) +#' df1 <- jsonFile(sqlContext, path) +#' df2 <- jsonFile(sqlContext, path2) #' intersectDF <- intersect(df, df2) #' } setMethod("intersect", @@ -1269,9 +1269,9 @@ setMethod("intersect", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) -#' df1 <- jsonFile(sqlCtx, path) -#' df2 <- jsonFile(sqlCtx, path2) +#' sqlContext <- sparkRSQL.init(sc) +#' df1 <- jsonFile(sqlContext, path) +#' df2 <- jsonFile(sqlContext, path2) #' exceptDF <- except(df, df2) #' } #' @rdname except @@ -1308,9 +1308,9 @@ setMethod("except", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' write.df(df, "myfile", "parquet", "overwrite") #' } setMethod("write.df", @@ -1318,8 +1318,8 @@ setMethod("write.df", mode = 'character'), function(df, path = NULL, source = NULL, mode = "append", ...){ if (is.null(source)) { - sqlCtx <- get(".sparkRSQLsc", envir = .sparkREnv) - source <- callJMethod(sqlCtx, "getConf", "spark.sql.sources.default", + sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv) + source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default", "org.apache.spark.sql.parquet") } allModes <- c("append", "overwrite", "error", "ignore") @@ -1371,9 +1371,9 @@ setMethod("saveDF", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' saveAsTable(df, "myfile") #' } setMethod("saveAsTable", @@ -1381,8 +1381,8 @@ setMethod("saveAsTable", mode = 'character'), function(df, tableName, source = NULL, mode="append", ...){ if (is.null(source)) { - sqlCtx <- get(".sparkRSQLsc", envir = .sparkREnv) - source <- callJMethod(sqlCtx, "getConf", "spark.sql.sources.default", + sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv) + source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default", "org.apache.spark.sql.parquet") } allModes <- c("append", "overwrite", "error", "ignore") @@ -1408,9 +1408,9 @@ setMethod("saveAsTable", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' describe(df) #' describe(df, "col1") #' describe(df, "col1", "col2") http://git-wip-us.apache.org/repos/asf/spark/blob/c636b87d/R/pkg/R/RDD.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index d3a68ff..0513299 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -239,7 +239,7 @@ setMethod("cache", # @aliases persist,RDD-method setMethod("persist", signature(x = "RDD", newLevel = "character"), - function(x, newLevel) { + function(x, newLevel = "MEMORY_ONLY") { callJMethod(getJRDD(x), "persist", getStorageLevel(newLevel)) x@env$isCached <- TRUE x http://git-wip-us.apache.org/repos/asf/spark/blob/c636b87d/R/pkg/R/SQLContext.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 531442e..36cc612 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -69,7 +69,7 @@ infer_type <- function(x) { #' #' Converts an RDD to a DataFrame by infer the types. #' -#' @param sqlCtx A SQLContext +#' @param sqlContext A SQLContext #' @param data An RDD or list or data.frame #' @param schema a list of column names or named list (StructType), optional #' @return an DataFrame @@ -77,13 +77,13 @@ infer_type <- function(x) { #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' rdd <- lapply(parallelize(sc, 1:10), function(x) list(a=x, b=as.character(x))) -#' df <- createDataFrame(sqlCtx, rdd) +#' df <- createDataFrame(sqlContext, rdd) #' } # TODO(davies): support sampling and infer type from NA -createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) { +createDataFrame <- function(sqlContext, data, schema = NULL, samplingRatio = 1.0) { if (is.data.frame(data)) { # get the names of columns, they will be put into RDD schema <- names(data) @@ -102,7 +102,7 @@ createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) { }) } if (is.list(data)) { - sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sqlCtx) + sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sqlContext) rdd <- parallelize(sc, data) } else if (inherits(data, "RDD")) { rdd <- data @@ -146,7 +146,7 @@ createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) { jrdd <- getJRDD(lapply(rdd, function(x) x), "row") srdd <- callJMethod(jrdd, "rdd") sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "createDF", - srdd, schema$jobj, sqlCtx) + srdd, schema$jobj, sqlContext) dataFrame(sdf) } @@ -161,7 +161,7 @@ createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) { # @examples #\dontrun{ # sc <- sparkR.init() -# sqlCtx <- sparkRSQL.init(sc) +# sqlContext <- sparkRSQL.init(sc) # rdd <- lapply(parallelize(sc, 1:10), function(x) list(a=x, b=as.character(x))) # df <- toDF(rdd) # } @@ -170,14 +170,14 @@ setGeneric("toDF", function(x, ...) { standardGeneric("toDF") }) setMethod("toDF", signature(x = "RDD"), function(x, ...) { - sqlCtx <- if (exists(".sparkRHivesc", envir = .sparkREnv)) { + sqlContext <- if (exists(".sparkRHivesc", envir = .sparkREnv)) { get(".sparkRHivesc", envir = .sparkREnv) } else if (exists(".sparkRSQLsc", envir = .sparkREnv)) { get(".sparkRSQLsc", envir = .sparkREnv) } else { stop("no SQL context available") } - createDataFrame(sqlCtx, x, ...) + createDataFrame(sqlContext, x, ...) }) #' Create a DataFrame from a JSON file. @@ -185,24 +185,24 @@ setMethod("toDF", signature(x = "RDD"), #' Loads a JSON file (one object per line), returning the result as a DataFrame #' It goes through the entire dataset once to determine the schema. #' -#' @param sqlCtx SQLContext to use +#' @param sqlContext SQLContext to use #' @param path Path of file to read. A vector of multiple paths is allowed. #' @return DataFrame #' @export #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' } -jsonFile <- function(sqlCtx, path) { +jsonFile <- function(sqlContext, path) { # Allow the user to have a more flexible definiton of the text file path path <- normalizePath(path) # Convert a string vector of paths to a string containing comma separated paths path <- paste(path, collapse = ",") - sdf <- callJMethod(sqlCtx, "jsonFile", path) + sdf <- callJMethod(sqlContext, "jsonFile", path) dataFrame(sdf) } @@ -211,7 +211,7 @@ jsonFile <- function(sqlCtx, path) { # # Loads an RDD storing one JSON object per string as a DataFrame. # -# @param sqlCtx SQLContext to use +# @param sqlContext SQLContext to use # @param rdd An RDD of JSON string # @param schema A StructType object to use as schema # @param samplingRatio The ratio of simpling used to infer the schema @@ -220,16 +220,16 @@ jsonFile <- function(sqlCtx, path) { # @examples #\dontrun{ # sc <- sparkR.init() -# sqlCtx <- sparkRSQL.init(sc) +# sqlContext <- sparkRSQL.init(sc) # rdd <- texFile(sc, "path/to/json") -# df <- jsonRDD(sqlCtx, rdd) +# df <- jsonRDD(sqlContext, rdd) # } # TODO: support schema -jsonRDD <- function(sqlCtx, rdd, schema = NULL, samplingRatio = 1.0) { +jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) { rdd <- serializeToString(rdd) if (is.null(schema)) { - sdf <- callJMethod(sqlCtx, "jsonRDD", callJMethod(getJRDD(rdd), "rdd"), samplingRatio) + sdf <- callJMethod(sqlContext, "jsonRDD", callJMethod(getJRDD(rdd), "rdd"), samplingRatio) dataFrame(sdf) } else { stop("not implemented") @@ -241,64 +241,63 @@ jsonRDD <- function(sqlCtx, rdd, schema = NULL, samplingRatio = 1.0) { #' #' Loads a Parquet file, returning the result as a DataFrame. #' -#' @param sqlCtx SQLContext to use +#' @param sqlContext SQLContext to use #' @param ... Path(s) of parquet file(s) to read. #' @return DataFrame #' @export # TODO: Implement saveasParquetFile and write examples for both -parquetFile <- function(sqlCtx, ...) { +parquetFile <- function(sqlContext, ...) { # Allow the user to have a more flexible definiton of the text file path paths <- lapply(list(...), normalizePath) - sdf <- callJMethod(sqlCtx, "parquetFile", paths) + sdf <- callJMethod(sqlContext, "parquetFile", paths) dataFrame(sdf) } #' SQL Query -#' +#' #' Executes a SQL query using Spark, returning the result as a DataFrame. #' -#' @param sqlCtx SQLContext to use +#' @param sqlContext SQLContext to use #' @param sqlQuery A character vector containing the SQL query #' @return DataFrame #' @export #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' registerTempTable(df, "table") -#' new_df <- sql(sqlCtx, "SELECT * FROM table") +#' new_df <- sql(sqlContext, "SELECT * FROM table") #' } -sql <- function(sqlCtx, sqlQuery) { - sdf <- callJMethod(sqlCtx, "sql", sqlQuery) - dataFrame(sdf) +sql <- function(sqlContext, sqlQuery) { + sdf <- callJMethod(sqlContext, "sql", sqlQuery) + dataFrame(sdf) } - #' Create a DataFrame from a SparkSQL Table #' #' Returns the specified Table as a DataFrame. The Table must have already been registered #' in the SQLContext. #' -#' @param sqlCtx SQLContext to use +#' @param sqlContext SQLContext to use #' @param tableName The SparkSQL Table to convert to a DataFrame. #' @return DataFrame #' @export #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' registerTempTable(df, "table") -#' new_df <- table(sqlCtx, "table") +#' new_df <- table(sqlContext, "table") #' } -table <- function(sqlCtx, tableName) { - sdf <- callJMethod(sqlCtx, "table", tableName) +table <- function(sqlContext, tableName) { + sdf <- callJMethod(sqlContext, "table", tableName) dataFrame(sdf) } @@ -307,22 +306,22 @@ table <- function(sqlCtx, tableName) { #' #' Returns a DataFrame containing names of tables in the given database. #' -#' @param sqlCtx SQLContext to use +#' @param sqlContext SQLContext to use #' @param databaseName name of the database #' @return a DataFrame #' @export #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) -#' tables(sqlCtx, "hive") +#' sqlContext <- sparkRSQL.init(sc) +#' tables(sqlContext, "hive") #' } -tables <- function(sqlCtx, databaseName = NULL) { +tables <- function(sqlContext, databaseName = NULL) { jdf <- if (is.null(databaseName)) { - callJMethod(sqlCtx, "tables") + callJMethod(sqlContext, "tables") } else { - callJMethod(sqlCtx, "tables", databaseName) + callJMethod(sqlContext, "tables", databaseName) } dataFrame(jdf) } @@ -332,22 +331,22 @@ tables <- function(sqlCtx, databaseName = NULL) { #' #' Returns the names of tables in the given database as an array. #' -#' @param sqlCtx SQLContext to use +#' @param sqlContext SQLContext to use #' @param databaseName name of the database #' @return a list of table names #' @export #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) -#' tableNames(sqlCtx, "hive") +#' sqlContext <- sparkRSQL.init(sc) +#' tableNames(sqlContext, "hive") #' } -tableNames <- function(sqlCtx, databaseName = NULL) { +tableNames <- function(sqlContext, databaseName = NULL) { if (is.null(databaseName)) { - callJMethod(sqlCtx, "tableNames") + callJMethod(sqlContext, "tableNames") } else { - callJMethod(sqlCtx, "tableNames", databaseName) + callJMethod(sqlContext, "tableNames", databaseName) } } @@ -356,58 +355,58 @@ tableNames <- function(sqlCtx, databaseName = NULL) { #' #' Caches the specified table in-memory. #' -#' @param sqlCtx SQLContext to use +#' @param sqlContext SQLContext to use #' @param tableName The name of the table being cached #' @return DataFrame #' @export #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' registerTempTable(df, "table") -#' cacheTable(sqlCtx, "table") +#' cacheTable(sqlContext, "table") #' } -cacheTable <- function(sqlCtx, tableName) { - callJMethod(sqlCtx, "cacheTable", tableName) +cacheTable <- function(sqlContext, tableName) { + callJMethod(sqlContext, "cacheTable", tableName) } #' Uncache Table #' #' Removes the specified table from the in-memory cache. #' -#' @param sqlCtx SQLContext to use +#' @param sqlContext SQLContext to use #' @param tableName The name of the table being uncached #' @return DataFrame #' @export #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" -#' df <- jsonFile(sqlCtx, path) +#' df <- jsonFile(sqlContext, path) #' registerTempTable(df, "table") -#' uncacheTable(sqlCtx, "table") +#' uncacheTable(sqlContext, "table") #' } -uncacheTable <- function(sqlCtx, tableName) { - callJMethod(sqlCtx, "uncacheTable", tableName) +uncacheTable <- function(sqlContext, tableName) { + callJMethod(sqlContext, "uncacheTable", tableName) } #' Clear Cache #' #' Removes all cached tables from the in-memory cache. #' -#' @param sqlCtx SQLContext to use +#' @param sqlContext SQLContext to use #' @examples #' \dontrun{ -#' clearCache(sqlCtx) +#' clearCache(sqlContext) #' } -clearCache <- function(sqlCtx) { - callJMethod(sqlCtx, "clearCache") +clearCache <- function(sqlContext) { + callJMethod(sqlContext, "clearCache") } #' Drop Temporary Table @@ -415,22 +414,22 @@ clearCache <- function(sqlCtx) { #' Drops the temporary table with the given table name in the catalog. #' If the table has been cached/persisted before, it's also unpersisted. #' -#' @param sqlCtx SQLContext to use +#' @param sqlContext SQLContext to use #' @param tableName The name of the SparkSQL table to be dropped. #' @examples #' \dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) -#' df <- read.df(sqlCtx, path, "parquet") +#' sqlContext <- sparkRSQL.init(sc) +#' df <- read.df(sqlContext, path, "parquet") #' registerTempTable(df, "table") -#' dropTempTable(sqlCtx, "table") +#' dropTempTable(sqlContext, "table") #' } -dropTempTable <- function(sqlCtx, tableName) { +dropTempTable <- function(sqlContext, tableName) { if (class(tableName) != "character") { stop("tableName must be a string.") } - callJMethod(sqlCtx, "dropTempTable", tableName) + callJMethod(sqlContext, "dropTempTable", tableName) } #' Load an DataFrame @@ -441,7 +440,7 @@ dropTempTable <- function(sqlCtx, tableName) { #' If `source` is not specified, the default data source configured by #' "spark.sql.sources.default" will be used. #' -#' @param sqlCtx SQLContext to use +#' @param sqlContext SQLContext to use #' @param path The path of files to load #' @param source the name of external data source #' @return DataFrame @@ -449,24 +448,24 @@ dropTempTable <- function(sqlCtx, tableName) { #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) -#' df <- read.df(sqlCtx, "path/to/file.json", source = "json") +#' sqlContext <- sparkRSQL.init(sc) +#' df <- read.df(sqlContext, "path/to/file.json", source = "json") #' } -read.df <- function(sqlCtx, path = NULL, source = NULL, ...) { +read.df <- function(sqlContext, path = NULL, source = NULL, ...) { options <- varargsToEnv(...) if (!is.null(path)) { options[['path']] <- path } - sdf <- callJMethod(sqlCtx, "load", source, options) + sdf <- callJMethod(sqlContext, "load", source, options) dataFrame(sdf) } #' @aliases loadDF #' @export -loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) { - read.df(sqlCtx, path, source, ...) +loadDF <- function(sqlContext, path = NULL, source = NULL, ...) { + read.df(sqlContext, path, source, ...) } #' Create an external table @@ -478,7 +477,7 @@ loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) { #' If `source` is not specified, the default data source configured by #' "spark.sql.sources.default" will be used. #' -#' @param sqlCtx SQLContext to use +#' @param sqlContext SQLContext to use #' @param tableName A name of the table #' @param path The path of files to load #' @param source the name of external data source @@ -487,15 +486,15 @@ loadDF <- function(sqlCtx, path = NULL, source = NULL, ...) { #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) -#' df <- sparkRSQL.createExternalTable(sqlCtx, "myjson", path="path/to/json", source="json") +#' sqlContext <- sparkRSQL.init(sc) +#' df <- sparkRSQL.createExternalTable(sqlContext, "myjson", path="path/to/json", source="json") #' } -createExternalTable <- function(sqlCtx, tableName, path = NULL, source = NULL, ...) { +createExternalTable <- function(sqlContext, tableName, path = NULL, source = NULL, ...) { options <- varargsToEnv(...) if (!is.null(path)) { options[['path']] <- path } - sdf <- callJMethod(sqlCtx, "createExternalTable", tableName, source, options) + sdf <- callJMethod(sqlContext, "createExternalTable", tableName, source, options) dataFrame(sdf) } http://git-wip-us.apache.org/repos/asf/spark/blob/c636b87d/R/pkg/R/pairRDD.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R index 7694652..1e24286 100644 --- a/R/pkg/R/pairRDD.R +++ b/R/pkg/R/pairRDD.R @@ -329,7 +329,7 @@ setMethod("reduceByKey", convertEnvsToList(keys, vals) } locallyReduced <- lapplyPartition(x, reduceVals) - shuffled <- partitionBy(locallyReduced, numPartitions) + shuffled <- partitionBy(locallyReduced, numToInt(numPartitions)) lapplyPartition(shuffled, reduceVals) }) @@ -436,7 +436,7 @@ setMethod("combineByKey", convertEnvsToList(keys, combiners) } locallyCombined <- lapplyPartition(x, combineLocally) - shuffled <- partitionBy(locallyCombined, numPartitions) + shuffled <- partitionBy(locallyCombined, numToInt(numPartitions)) mergeAfterShuffle <- function(part) { combiners <- new.env() keys <- new.env() http://git-wip-us.apache.org/repos/asf/spark/blob/c636b87d/R/pkg/R/sparkR.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index bc82df0..68387f0 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -222,7 +222,7 @@ sparkR.init <- function( #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRSQL.init(sc) +#' sqlContext <- sparkRSQL.init(sc) #'} sparkRSQL.init <- function(jsc) { @@ -230,11 +230,11 @@ sparkRSQL.init <- function(jsc) { return(get(".sparkRSQLsc", envir = .sparkREnv)) } - sqlCtx <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", + sqlContext <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "createSQLContext", jsc) - assign(".sparkRSQLsc", sqlCtx, envir = .sparkREnv) - sqlCtx + assign(".sparkRSQLsc", sqlContext, envir = .sparkREnv) + sqlContext } #' Initialize a new HiveContext. @@ -246,7 +246,7 @@ sparkRSQL.init <- function(jsc) { #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' sqlCtx <- sparkRHive.init(sc) +#' sqlContext <- sparkRHive.init(sc) #'} sparkRHive.init <- function(jsc) { http://git-wip-us.apache.org/repos/asf/spark/blob/c636b87d/R/pkg/inst/profile/shell.R ---------------------------------------------------------------------- diff --git a/R/pkg/inst/profile/shell.R b/R/pkg/inst/profile/shell.R index 33478d9..ca94f1d 100644 --- a/R/pkg/inst/profile/shell.R +++ b/R/pkg/inst/profile/shell.R @@ -26,8 +26,8 @@ sc <- SparkR::sparkR.init(Sys.getenv("MASTER", unset = "")) assign("sc", sc, envir=.GlobalEnv) - sqlCtx <- SparkR::sparkRSQL.init(sc) - assign("sqlCtx", sqlCtx, envir=.GlobalEnv) + sqlContext <- SparkR::sparkRSQL.init(sc) + assign("sqlContext", sqlContext, envir=.GlobalEnv) cat("\n Welcome to SparkR!") - cat("\n Spark context is available as sc, SQL context is available as sqlCtx\n") + cat("\n Spark context is available as sc, SQL context is available as sqlContext\n") } http://git-wip-us.apache.org/repos/asf/spark/blob/c636b87d/R/pkg/inst/tests/test_sparkSQL.R ---------------------------------------------------------------------- diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index 1768c57..1857e63 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -23,7 +23,7 @@ context("SparkSQL functions") sc <- sparkR.init() -sqlCtx <- sparkRSQL.init(sc) +sqlContext <- sparkRSQL.init(sc) mockLines <- c("{\"name\":\"Michael\"}", "{\"name\":\"Andy\", \"age\":30}", @@ -67,25 +67,25 @@ test_that("structType and structField", { test_that("create DataFrame from RDD", { rdd <- lapply(parallelize(sc, 1:10), function(x) { list(x, as.character(x)) }) - df <- createDataFrame(sqlCtx, rdd, list("a", "b")) + df <- createDataFrame(sqlContext, rdd, list("a", "b")) expect_true(inherits(df, "DataFrame")) expect_true(count(df) == 10) expect_equal(columns(df), c("a", "b")) expect_equal(dtypes(df), list(c("a", "int"), c("b", "string"))) - df <- createDataFrame(sqlCtx, rdd) + df <- createDataFrame(sqlContext, rdd) expect_true(inherits(df, "DataFrame")) expect_equal(columns(df), c("_1", "_2")) schema <- structType(structField(x = "a", type = "integer", nullable = TRUE), structField(x = "b", type = "string", nullable = TRUE)) - df <- createDataFrame(sqlCtx, rdd, schema) + df <- createDataFrame(sqlContext, rdd, schema) expect_true(inherits(df, "DataFrame")) expect_equal(columns(df), c("a", "b")) expect_equal(dtypes(df), list(c("a", "int"), c("b", "string"))) rdd <- lapply(parallelize(sc, 1:10), function(x) { list(a = x, b = as.character(x)) }) - df <- createDataFrame(sqlCtx, rdd) + df <- createDataFrame(sqlContext, rdd) expect_true(inherits(df, "DataFrame")) expect_true(count(df) == 10) expect_equal(columns(df), c("a", "b")) @@ -121,17 +121,17 @@ test_that("toDF", { test_that("create DataFrame from list or data.frame", { l <- list(list(1, 2), list(3, 4)) - df <- createDataFrame(sqlCtx, l, c("a", "b")) + df <- createDataFrame(sqlContext, l, c("a", "b")) expect_equal(columns(df), c("a", "b")) l <- list(list(a=1, b=2), list(a=3, b=4)) - df <- createDataFrame(sqlCtx, l) + df <- createDataFrame(sqlContext, l) expect_equal(columns(df), c("a", "b")) a <- 1:3 b <- c("a", "b", "c") ldf <- data.frame(a, b) - df <- createDataFrame(sqlCtx, ldf) + df <- createDataFrame(sqlContext, ldf) expect_equal(columns(df), c("a", "b")) expect_equal(dtypes(df), list(c("a", "int"), c("b", "string"))) expect_equal(count(df), 3) @@ -142,7 +142,7 @@ test_that("create DataFrame from list or data.frame", { test_that("create DataFrame with different data types", { l <- list(a = 1L, b = 2, c = TRUE, d = "ss", e = as.Date("2012-12-13"), f = as.POSIXct("2015-03-15 12:13:14.056")) - df <- createDataFrame(sqlCtx, list(l)) + df <- createDataFrame(sqlContext, list(l)) expect_equal(dtypes(df), list(c("a", "int"), c("b", "double"), c("c", "boolean"), c("d", "string"), c("e", "date"), c("f", "timestamp"))) expect_equal(count(df), 1) @@ -154,7 +154,7 @@ test_that("create DataFrame with different data types", { # e <- new.env() # assign("n", 3L, envir = e) # l <- list(1:10, list("a", "b"), e, list(a="aa", b=3L)) -# df <- createDataFrame(sqlCtx, list(l), c("a", "b", "c", "d")) +# df <- createDataFrame(sqlContext, list(l), c("a", "b", "c", "d")) # expect_equal(dtypes(df), list(c("a", "array<int>"), c("b", "array<string>"), # c("c", "map<string,int>"), c("d", "struct<a:string,b:int>"))) # expect_equal(count(df), 1) @@ -163,7 +163,7 @@ test_that("create DataFrame with different data types", { #}) test_that("jsonFile() on a local file returns a DataFrame", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) expect_true(inherits(df, "DataFrame")) expect_true(count(df) == 3) }) @@ -171,88 +171,88 @@ test_that("jsonFile() on a local file returns a DataFrame", { test_that("jsonRDD() on a RDD with json string", { rdd <- parallelize(sc, mockLines) expect_true(count(rdd) == 3) - df <- jsonRDD(sqlCtx, rdd) + df <- jsonRDD(sqlContext, rdd) expect_true(inherits(df, "DataFrame")) expect_true(count(df) == 3) rdd2 <- flatMap(rdd, function(x) c(x, x)) - df <- jsonRDD(sqlCtx, rdd2) + df <- jsonRDD(sqlContext, rdd2) expect_true(inherits(df, "DataFrame")) expect_true(count(df) == 6) }) test_that("test cache, uncache and clearCache", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) registerTempTable(df, "table1") - cacheTable(sqlCtx, "table1") - uncacheTable(sqlCtx, "table1") - clearCache(sqlCtx) - dropTempTable(sqlCtx, "table1") + cacheTable(sqlContext, "table1") + uncacheTable(sqlContext, "table1") + clearCache(sqlContext) + dropTempTable(sqlContext, "table1") }) test_that("test tableNames and tables", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) registerTempTable(df, "table1") - expect_true(length(tableNames(sqlCtx)) == 1) - df <- tables(sqlCtx) + expect_true(length(tableNames(sqlContext)) == 1) + df <- tables(sqlContext) expect_true(count(df) == 1) - dropTempTable(sqlCtx, "table1") + dropTempTable(sqlContext, "table1") }) test_that("registerTempTable() results in a queryable table and sql() results in a new DataFrame", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) registerTempTable(df, "table1") - newdf <- sql(sqlCtx, "SELECT * FROM table1 where name = 'Michael'") + newdf <- sql(sqlContext, "SELECT * FROM table1 where name = 'Michael'") expect_true(inherits(newdf, "DataFrame")) expect_true(count(newdf) == 1) - dropTempTable(sqlCtx, "table1") + dropTempTable(sqlContext, "table1") }) test_that("insertInto() on a registered table", { - df <- read.df(sqlCtx, jsonPath, "json") + df <- read.df(sqlContext, jsonPath, "json") write.df(df, parquetPath, "parquet", "overwrite") - dfParquet <- read.df(sqlCtx, parquetPath, "parquet") + dfParquet <- read.df(sqlContext, parquetPath, "parquet") lines <- c("{\"name\":\"Bob\", \"age\":24}", "{\"name\":\"James\", \"age\":35}") jsonPath2 <- tempfile(pattern="jsonPath2", fileext=".tmp") parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet") writeLines(lines, jsonPath2) - df2 <- read.df(sqlCtx, jsonPath2, "json") + df2 <- read.df(sqlContext, jsonPath2, "json") write.df(df2, parquetPath2, "parquet", "overwrite") - dfParquet2 <- read.df(sqlCtx, parquetPath2, "parquet") + dfParquet2 <- read.df(sqlContext, parquetPath2, "parquet") registerTempTable(dfParquet, "table1") insertInto(dfParquet2, "table1") - expect_true(count(sql(sqlCtx, "select * from table1")) == 5) - expect_true(first(sql(sqlCtx, "select * from table1 order by age"))$name == "Michael") - dropTempTable(sqlCtx, "table1") + expect_true(count(sql(sqlContext, "select * from table1")) == 5) + expect_true(first(sql(sqlContext, "select * from table1 order by age"))$name == "Michael") + dropTempTable(sqlContext, "table1") registerTempTable(dfParquet, "table1") insertInto(dfParquet2, "table1", overwrite = TRUE) - expect_true(count(sql(sqlCtx, "select * from table1")) == 2) - expect_true(first(sql(sqlCtx, "select * from table1 order by age"))$name == "Bob") - dropTempTable(sqlCtx, "table1") + expect_true(count(sql(sqlContext, "select * from table1")) == 2) + expect_true(first(sql(sqlContext, "select * from table1 order by age"))$name == "Bob") + dropTempTable(sqlContext, "table1") }) test_that("table() returns a new DataFrame", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) registerTempTable(df, "table1") - tabledf <- table(sqlCtx, "table1") + tabledf <- table(sqlContext, "table1") expect_true(inherits(tabledf, "DataFrame")) expect_true(count(tabledf) == 3) - dropTempTable(sqlCtx, "table1") + dropTempTable(sqlContext, "table1") }) test_that("toRDD() returns an RRDD", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) testRDD <- toRDD(df) expect_true(inherits(testRDD, "RDD")) expect_true(count(testRDD) == 3) }) test_that("union on two RDDs created from DataFrames returns an RRDD", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) RDD1 <- toRDD(df) RDD2 <- toRDD(df) unioned <- unionRDD(RDD1, RDD2) @@ -274,7 +274,7 @@ test_that("union on mixed serialization types correctly returns a byte RRDD", { writeLines(textLines, textPath) textRDD <- textFile(sc, textPath) - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) dfRDD <- toRDD(df) unionByte <- unionRDD(rdd, dfRDD) @@ -292,7 +292,7 @@ test_that("union on mixed serialization types correctly returns a byte RRDD", { test_that("objectFile() works with row serialization", { objectPath <- tempfile(pattern="spark-test", fileext=".tmp") - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) dfRDD <- toRDD(df) saveAsObjectFile(coalesce(dfRDD, 1L), objectPath) objectIn <- objectFile(sc, objectPath) @@ -303,7 +303,7 @@ test_that("objectFile() works with row serialization", { }) test_that("lapply() on a DataFrame returns an RDD with the correct columns", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) testRDD <- lapply(df, function(row) { row$newCol <- row$age + 5 row @@ -315,7 +315,7 @@ test_that("lapply() on a DataFrame returns an RDD with the correct columns", { }) test_that("collect() returns a data.frame", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) rdf <- collect(df) expect_true(is.data.frame(rdf)) expect_true(names(rdf)[1] == "age") @@ -324,20 +324,20 @@ test_that("collect() returns a data.frame", { }) test_that("limit() returns DataFrame with the correct number of rows", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) dfLimited <- limit(df, 2) expect_true(inherits(dfLimited, "DataFrame")) expect_true(count(dfLimited) == 2) }) test_that("collect() and take() on a DataFrame return the same number of rows and columns", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) expect_true(nrow(collect(df)) == nrow(take(df, 10))) expect_true(ncol(collect(df)) == ncol(take(df, 10))) }) test_that("multiple pipeline transformations starting with a DataFrame result in an RDD with the correct values", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) first <- lapply(df, function(row) { row$age <- row$age + 5 row @@ -354,7 +354,7 @@ test_that("multiple pipeline transformations starting with a DataFrame result in }) test_that("cache(), persist(), and unpersist() on a DataFrame", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) expect_false(df@env$isCached) cache(df) expect_true(df@env$isCached) @@ -373,7 +373,7 @@ test_that("cache(), persist(), and unpersist() on a DataFrame", { }) test_that("schema(), dtypes(), columns(), names() return the correct values/format", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) testSchema <- schema(df) expect_true(length(testSchema$fields()) == 2) expect_true(testSchema$fields()[[1]]$dataType.toString() == "LongType") @@ -394,7 +394,7 @@ test_that("schema(), dtypes(), columns(), names() return the correct values/form }) test_that("head() and first() return the correct data", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) testHead <- head(df) expect_true(nrow(testHead) == 3) expect_true(ncol(testHead) == 2) @@ -415,14 +415,14 @@ test_that("distinct() on DataFrames", { jsonPathWithDup <- tempfile(pattern="sparkr-test", fileext=".tmp") writeLines(lines, jsonPathWithDup) - df <- jsonFile(sqlCtx, jsonPathWithDup) + df <- jsonFile(sqlContext, jsonPathWithDup) uniques <- distinct(df) expect_true(inherits(uniques, "DataFrame")) expect_true(count(uniques) == 3) }) test_that("sample on a DataFrame", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) sampled <- sample(df, FALSE, 1.0) expect_equal(nrow(collect(sampled)), count(df)) expect_true(inherits(sampled, "DataFrame")) @@ -435,7 +435,7 @@ test_that("sample on a DataFrame", { }) test_that("select operators", { - df <- select(jsonFile(sqlCtx, jsonPath), "name", "age") + df <- select(jsonFile(sqlContext, jsonPath), "name", "age") expect_true(inherits(df$name, "Column")) expect_true(inherits(df[[2]], "Column")) expect_true(inherits(df[["age"]], "Column")) @@ -461,7 +461,7 @@ test_that("select operators", { }) test_that("select with column", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) df1 <- select(df, "name") expect_true(columns(df1) == c("name")) expect_true(count(df1) == 3) @@ -472,7 +472,7 @@ test_that("select with column", { }) test_that("selectExpr() on a DataFrame", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) selected <- selectExpr(df, "age * 2") expect_true(names(selected) == "(age * 2)") expect_equal(collect(selected), collect(select(df, df$age * 2L))) @@ -483,7 +483,7 @@ test_that("selectExpr() on a DataFrame", { }) test_that("column calculation", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) d <- collect(select(df, alias(df$age + 1, "age2"))) expect_true(names(d) == c("age2")) df2 <- select(df, lower(df$name), abs(df$age)) @@ -492,15 +492,15 @@ test_that("column calculation", { }) test_that("read.df() from json file", { - df <- read.df(sqlCtx, jsonPath, "json") + df <- read.df(sqlContext, jsonPath, "json") expect_true(inherits(df, "DataFrame")) expect_true(count(df) == 3) }) test_that("write.df() as parquet file", { - df <- read.df(sqlCtx, jsonPath, "json") + df <- read.df(sqlContext, jsonPath, "json") write.df(df, parquetPath, "parquet", mode="overwrite") - df2 <- read.df(sqlCtx, parquetPath, "parquet") + df2 <- read.df(sqlContext, parquetPath, "parquet") expect_true(inherits(df2, "DataFrame")) expect_true(count(df2) == 3) }) @@ -553,7 +553,7 @@ test_that("column binary mathfunctions", { "{\"a\":4, \"b\":8}") jsonPathWithDup <- tempfile(pattern="sparkr-test", fileext=".tmp") writeLines(lines, jsonPathWithDup) - df <- jsonFile(sqlCtx, jsonPathWithDup) + df <- jsonFile(sqlContext, jsonPathWithDup) expect_equal(collect(select(df, atan2(df$a, df$b)))[1, "ATAN2(a, b)"], atan2(1, 5)) expect_equal(collect(select(df, atan2(df$a, df$b)))[2, "ATAN2(a, b)"], atan2(2, 6)) expect_equal(collect(select(df, atan2(df$a, df$b)))[3, "ATAN2(a, b)"], atan2(3, 7)) @@ -565,7 +565,7 @@ test_that("column binary mathfunctions", { }) test_that("string operators", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) expect_equal(count(where(df, like(df$name, "A%"))), 1) expect_equal(count(where(df, startsWith(df$name, "A"))), 1) expect_equal(first(select(df, substr(df$name, 1, 2)))[[1]], "Mi") @@ -573,7 +573,7 @@ test_that("string operators", { }) test_that("group by", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) df1 <- agg(df, name = "max", age = "sum") expect_true(1 == count(df1)) df1 <- agg(df, age2 = max(df$age)) @@ -610,7 +610,7 @@ test_that("group by", { }) test_that("arrange() and orderBy() on a DataFrame", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) sorted <- arrange(df, df$age) expect_true(collect(sorted)[1,2] == "Michael") @@ -627,7 +627,7 @@ test_that("arrange() and orderBy() on a DataFrame", { }) test_that("filter() on a DataFrame", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) filtered <- filter(df, "age > 20") expect_true(count(filtered) == 1) expect_true(collect(filtered)$name == "Andy") @@ -637,7 +637,7 @@ test_that("filter() on a DataFrame", { }) test_that("join() on a DataFrame", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) mockLines2 <- c("{\"name\":\"Michael\", \"test\": \"yes\"}", "{\"name\":\"Andy\", \"test\": \"no\"}", @@ -645,7 +645,7 @@ test_that("join() on a DataFrame", { "{\"name\":\"Bob\", \"test\": \"yes\"}") jsonPath2 <- tempfile(pattern="sparkr-test", fileext=".tmp") writeLines(mockLines2, jsonPath2) - df2 <- jsonFile(sqlCtx, jsonPath2) + df2 <- jsonFile(sqlContext, jsonPath2) joined <- join(df, df2) expect_equal(names(joined), c("age", "name", "name", "test")) @@ -668,7 +668,7 @@ test_that("join() on a DataFrame", { }) test_that("toJSON() returns an RDD of the correct values", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) testRDD <- toJSON(df) expect_true(inherits(testRDD, "RDD")) expect_true(SparkR:::getSerializedMode(testRDD) == "string") @@ -676,25 +676,25 @@ test_that("toJSON() returns an RDD of the correct values", { }) test_that("showDF()", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) s <- capture.output(showDF(df)) expect_output(s , "+----+-------+\n| age| name|\n+----+-------+\n|null|Michael|\n| 30| Andy|\n| 19| Justin|\n+----+-------+\n") }) test_that("isLocal()", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) expect_false(isLocal(df)) }) test_that("unionAll(), except(), and intersect() on a DataFrame", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) lines <- c("{\"name\":\"Bob\", \"age\":24}", "{\"name\":\"Andy\", \"age\":30}", "{\"name\":\"James\", \"age\":35}") jsonPath2 <- tempfile(pattern="sparkr-test", fileext=".tmp") writeLines(lines, jsonPath2) - df2 <- read.df(sqlCtx, jsonPath2, "json") + df2 <- read.df(sqlContext, jsonPath2, "json") unioned <- arrange(unionAll(df, df2), df$age) expect_true(inherits(unioned, "DataFrame")) @@ -713,7 +713,7 @@ test_that("unionAll(), except(), and intersect() on a DataFrame", { }) test_that("withColumn() and withColumnRenamed()", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) newDF <- withColumn(df, "newAge", df$age + 2) expect_true(length(columns(newDF)) == 3) expect_true(columns(newDF)[3] == "newAge") @@ -725,7 +725,7 @@ test_that("withColumn() and withColumnRenamed()", { }) test_that("mutate() and rename()", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) newDF <- mutate(df, newAge = df$age + 2) expect_true(length(columns(newDF)) == 3) expect_true(columns(newDF)[3] == "newAge") @@ -737,25 +737,25 @@ test_that("mutate() and rename()", { }) test_that("write.df() on DataFrame and works with parquetFile", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) write.df(df, parquetPath, "parquet", mode="overwrite") - parquetDF <- parquetFile(sqlCtx, parquetPath) + parquetDF <- parquetFile(sqlContext, parquetPath) expect_true(inherits(parquetDF, "DataFrame")) expect_equal(count(df), count(parquetDF)) }) test_that("parquetFile works with multiple input paths", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) write.df(df, parquetPath, "parquet", mode="overwrite") parquetPath2 <- tempfile(pattern = "parquetPath2", fileext = ".parquet") write.df(df, parquetPath2, "parquet", mode="overwrite") - parquetDF <- parquetFile(sqlCtx, parquetPath, parquetPath2) + parquetDF <- parquetFile(sqlContext, parquetPath, parquetPath2) expect_true(inherits(parquetDF, "DataFrame")) expect_true(count(parquetDF) == count(df)*2) }) test_that("describe() on a DataFrame", { - df <- jsonFile(sqlCtx, jsonPath) + df <- jsonFile(sqlContext, jsonPath) stats <- describe(df, "age") expect_equal(collect(stats)[1, "summary"], "count") expect_equal(collect(stats)[2, "age"], "24.5") http://git-wip-us.apache.org/repos/asf/spark/blob/c636b87d/docs/_plugins/copy_api_dirs.rb ---------------------------------------------------------------------- diff --git a/docs/_plugins/copy_api_dirs.rb b/docs/_plugins/copy_api_dirs.rb index 0ea3f8e..6073b36 100644 --- a/docs/_plugins/copy_api_dirs.rb +++ b/docs/_plugins/copy_api_dirs.rb @@ -18,50 +18,52 @@ require 'fileutils' include FileUtils -if not (ENV['SKIP_API'] == '1' or ENV['SKIP_SCALADOC'] == '1') - # Build Scaladoc for Java/Scala +if not (ENV['SKIP_API'] == '1') + if not (ENV['SKIP_SCALADOC'] == '1') + # Build Scaladoc for Java/Scala - puts "Moving to project root and building API docs." - curr_dir = pwd - cd("..") + puts "Moving to project root and building API docs." + curr_dir = pwd + cd("..") - puts "Running 'build/sbt -Pkinesis-asl compile unidoc' from " + pwd + "; this may take a few minutes..." - puts `build/sbt -Pkinesis-asl compile unidoc` + puts "Running 'build/sbt -Pkinesis-asl compile unidoc' from " + pwd + "; this may take a few minutes..." + puts `build/sbt -Pkinesis-asl compile unidoc` - puts "Moving back into docs dir." - cd("docs") + puts "Moving back into docs dir." + cd("docs") - # Copy over the unified ScalaDoc for all projects to api/scala. - # This directory will be copied over to _site when `jekyll` command is run. - source = "../target/scala-2.10/unidoc" - dest = "api/scala" + # Copy over the unified ScalaDoc for all projects to api/scala. + # This directory will be copied over to _site when `jekyll` command is run. + source = "../target/scala-2.10/unidoc" + dest = "api/scala" - puts "Making directory " + dest - mkdir_p dest + puts "Making directory " + dest + mkdir_p dest - # From the rubydoc: cp_r('src', 'dest') makes src/dest, but this doesn't. - puts "cp -r " + source + "/. " + dest - cp_r(source + "/.", dest) + # From the rubydoc: cp_r('src', 'dest') makes src/dest, but this doesn't. + puts "cp -r " + source + "/. " + dest + cp_r(source + "/.", dest) - # Append custom JavaScript - js = File.readlines("./js/api-docs.js") - js_file = dest + "/lib/template.js" - File.open(js_file, 'a') { |f| f.write("\n" + js.join()) } + # Append custom JavaScript + js = File.readlines("./js/api-docs.js") + js_file = dest + "/lib/template.js" + File.open(js_file, 'a') { |f| f.write("\n" + js.join()) } - # Append custom CSS - css = File.readlines("./css/api-docs.css") - css_file = dest + "/lib/template.css" - File.open(css_file, 'a') { |f| f.write("\n" + css.join()) } + # Append custom CSS + css = File.readlines("./css/api-docs.css") + css_file = dest + "/lib/template.css" + File.open(css_file, 'a') { |f| f.write("\n" + css.join()) } - # Copy over the unified JavaDoc for all projects to api/java. - source = "../target/javaunidoc" - dest = "api/java" + # Copy over the unified JavaDoc for all projects to api/java. + source = "../target/javaunidoc" + dest = "api/java" - puts "Making directory " + dest - mkdir_p dest + puts "Making directory " + dest + mkdir_p dest - puts "cp -r " + source + "/. " + dest - cp_r(source + "/.", dest) + puts "cp -r " + source + "/. " + dest + cp_r(source + "/.", dest) + end # Build Sphinx docs for Python http://git-wip-us.apache.org/repos/asf/spark/blob/c636b87d/docs/api.md ---------------------------------------------------------------------- diff --git a/docs/api.md b/docs/api.md index 0346038..45df77a 100644 --- a/docs/api.md +++ b/docs/api.md @@ -7,4 +7,5 @@ Here you can API docs for Spark and its submodules. - [Spark Scala API (Scaladoc)](api/scala/index.html) - [Spark Java API (Javadoc)](api/java/index.html) -- [Spark Python API (Epydoc)](api/python/index.html) +- [Spark Python API (Sphinx)](api/python/index.html) +- [Spark R API (Roxygen2)](api/R/index.html) http://git-wip-us.apache.org/repos/asf/spark/blob/c636b87d/docs/index.md ---------------------------------------------------------------------- diff --git a/docs/index.md b/docs/index.md index b5b016e..5ef6d98 100644 --- a/docs/index.md +++ b/docs/index.md @@ -6,7 +6,7 @@ description: Apache Spark SPARK_VERSION_SHORT documentation homepage --- Apache Spark is a fast and general-purpose cluster computing system. -It provides high-level APIs in Java, Scala and Python, +It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including [Spark SQL](sql-programming-guide.html) for SQL and structured data processing, [MLlib](mllib-guide.html) for machine learning, [GraphX](graphx-programming-guide.html) for graph processing, and [Spark Streaming](streaming-programming-guide.html). @@ -20,13 +20,13 @@ Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). It's easy locally on one machine --- all you need is to have `java` installed on your system `PATH`, or the `JAVA_HOME` environment variable pointing to a Java installation. -Spark runs on Java 6+ and Python 2.6+. For the Scala API, Spark {{site.SPARK_VERSION}} uses +Spark runs on Java 6+, Python 2.6+ and R 3.1+. For the Scala API, Spark {{site.SPARK_VERSION}} uses Scala {{site.SCALA_BINARY_VERSION}}. You will need to use a compatible Scala version ({{site.SCALA_BINARY_VERSION}}.x). # Running the Examples and Shell -Spark comes with several sample programs. Scala, Java and Python examples are in the +Spark comes with several sample programs. Scala, Java, Python and R examples are in the `examples/src/main` directory. To run one of the Java or Scala sample programs, use `bin/run-example <class> [params]` in the top-level Spark directory. (Behind the scenes, this invokes the more general @@ -54,6 +54,15 @@ Example applications are also provided in Python. For example, ./bin/spark-submit examples/src/main/python/pi.py 10 +Spark also provides an experimental R API since 1.4 (only DataFrames APIs included). +To run Spark interactively in a R interpreter, use `bin/sparkR`: + + ./bin/sparkR --master local[2] + +Example applications are also provided in R. For example, + + ./bin/spark-submit examples/src/main/r/dataframe.R + # Launching on a Cluster The Spark [cluster mode overview](cluster-overview.html) explains the key concepts in running on a cluster. @@ -71,7 +80,7 @@ options for deployment: * [Quick Start](quick-start.html): a quick introduction to the Spark API; start here! * [Spark Programming Guide](programming-guide.html): detailed overview of Spark - in all supported languages (Scala, Java, Python) + in all supported languages (Scala, Java, Python, R) * Modules built on Spark: * [Spark Streaming](streaming-programming-guide.html): processing real-time data streams * [Spark SQL and DataFrames](sql-programming-guide.html): support for structured data and relational queries @@ -83,7 +92,8 @@ options for deployment: * [Spark Scala API (Scaladoc)](api/scala/index.html#org.apache.spark.package) * [Spark Java API (Javadoc)](api/java/index.html) -* [Spark Python API (Epydoc)](api/python/index.html) +* [Spark Python API (Sphinx)](api/python/index.html) +* [Spark R API (Roxygen2)](api/R/index.html) **Deployment Guides:** @@ -124,4 +134,5 @@ options for deployment: available online for free. * [Code Examples](http://spark.apache.org/examples.html): more are also available in the `examples` subfolder of Spark ([Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples), [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples), - [Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python)) + [Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python), + [R]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/r)) http://git-wip-us.apache.org/repos/asf/spark/blob/c636b87d/docs/programming-guide.md ---------------------------------------------------------------------- diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 07a4d29..5d9df28 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -98,9 +98,9 @@ to your version of HDFS. Some common HDFS version tags are listed on the [Prebuilt packages](http://spark.apache.org/downloads.html) are also available on the Spark homepage for common HDFS versions. -Finally, you need to import some Spark classes into your program. Add the following lines: +Finally, you need to import some Spark classes into your program. Add the following line: -{% highlight scala %} +{% highlight python %} from pyspark import SparkContext, SparkConf {% endhighlight %} @@ -478,7 +478,6 @@ the [Converter examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main for examples of using Cassandra / HBase ```InputFormat``` and ```OutputFormat``` with custom converters. </div> - </div> ## RDD Operations @@ -915,7 +914,8 @@ The following table lists some of the common transformations supported by Spark. RDD API doc ([Scala](api/scala/index.html#org.apache.spark.rdd.RDD), [Java](api/java/index.html?org/apache/spark/api/java/JavaRDD.html), - [Python](api/python/pyspark.html#pyspark.RDD)) + [Python](api/python/pyspark.html#pyspark.RDD), + [R](api/R/index.html)) and pair RDD functions doc ([Scala](api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions), [Java](api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html)) @@ -1028,7 +1028,9 @@ The following table lists some of the common actions supported by Spark. Refer t RDD API doc ([Scala](api/scala/index.html#org.apache.spark.rdd.RDD), [Java](api/java/index.html?org/apache/spark/api/java/JavaRDD.html), - [Python](api/python/pyspark.html#pyspark.RDD)) + [Python](api/python/pyspark.html#pyspark.RDD), + [R](api/R/index.html)) + and pair RDD functions doc ([Scala](api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions), [Java](api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html)) @@ -1565,7 +1567,8 @@ You can see some [example Spark programs](http://spark.apache.org/examples.html) In addition, Spark includes several samples in the `examples` directory ([Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples), [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples), - [Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python)). + [Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python), + [R]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/r)). You can run Java and Scala examples by passing the class name to Spark's `bin/run-example` script; for instance: ./bin/run-example SparkPi @@ -1574,6 +1577,10 @@ For Python examples, use `spark-submit` instead: ./bin/spark-submit examples/src/main/python/pi.py +For R examples, use `spark-submit` instead: + + ./bin/spark-submit examples/src/main/r/dataframe.R + For help on optimizing your programs, the [configuration](configuration.html) and [tuning](tuning.html) guides provide information on best practices. They are especially important for making sure that your data is stored in memory in an efficient format. @@ -1581,4 +1588,4 @@ For help on deploying, the [cluster mode overview](cluster-overview.html) descri in distributed operation and supported cluster managers. Finally, full API documentation is available in -[Scala](api/scala/#org.apache.spark.package), [Java](api/java/) and [Python](api/python/). +[Scala](api/scala/#org.apache.spark.package), [Java](api/java/), [Python](api/python/) and [R](api/R/). http://git-wip-us.apache.org/repos/asf/spark/blob/c636b87d/docs/quick-start.md ---------------------------------------------------------------------- diff --git a/docs/quick-start.md b/docs/quick-start.md index 81143da..bb39e41 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -184,10 +184,10 @@ scala> linesWithSpark.cache() res7: spark.RDD[String] = spark.FilteredRDD@17e51082 scala> linesWithSpark.count() -res8: Long = 15 +res8: Long = 19 scala> linesWithSpark.count() -res9: Long = 15 +res9: Long = 19 {% endhighlight %} It may seem silly to use Spark to explore and cache a 100-line text file. The interesting part is @@ -202,10 +202,10 @@ a cluster, as described in the [programming guide](programming-guide.html#initia >>> linesWithSpark.cache() >>> linesWithSpark.count() -15 +19 >>> linesWithSpark.count() -15 +19 {% endhighlight %} It may seem silly to use Spark to explore and cache a 100-line text file. The interesting part is @@ -423,14 +423,14 @@ dependencies to `spark-submit` through its `--py-files` argument by packaging th We can run this application using the `bin/spark-submit` script: -{% highlight python %} +{% highlight bash %} # Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --master local[4] \ SimpleApp.py ... Lines with a: 46, Lines with b: 23 -{% endhighlight python %} +{% endhighlight %} </div> </div> @@ -444,7 +444,8 @@ Congratulations on running your first Spark application! * Finally, Spark includes several samples in the `examples` directory ([Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples), [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples), - [Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python)). + [Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python), + [R]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/r)). You can run them as follows: {% highlight bash %} @@ -453,4 +454,7 @@ You can run them as follows: # For Python examples, use spark-submit directly: ./bin/spark-submit examples/src/main/python/pi.py + +# For R examples, use spark-submit directly: +./bin/spark-submit examples/src/main/r/dataframe.R {% endhighlight %} http://git-wip-us.apache.org/repos/asf/spark/blob/c636b87d/docs/sql-programming-guide.md ---------------------------------------------------------------------- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 78b8e8a..5b41c0e 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -16,9 +16,9 @@ Spark SQL is a Spark module for structured data processing. It provides a progra A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. -The DataFrame API is available in [Scala](api/scala/index.html#org.apache.spark.sql.DataFrame), [Java](api/java/index.html?org/apache/spark/sql/DataFrame.html), and [Python](api/python/pyspark.sql.html#pyspark.sql.DataFrame). +The DataFrame API is available in [Scala](api/scala/index.html#org.apache.spark.sql.DataFrame), [Java](api/java/index.html?org/apache/spark/sql/DataFrame.html), [Python](api/python/pyspark.sql.html#pyspark.sql.DataFrame), and [R](api/R/index.html). -All of the examples on this page use sample data included in the Spark distribution and can be run in the `spark-shell` or the `pyspark` shell. +All of the examples on this page use sample data included in the Spark distribution and can be run in the `spark-shell`, `pyspark` shell, or `sparkR` shell. ## Starting Point: `SQLContext` @@ -65,6 +65,17 @@ sqlContext = SQLContext(sc) {% endhighlight %} </div> + +<div data-lang="r" markdown="1"> + +The entry point into all relational functionality in Spark is the +`SQLContext` class, or one of its decedents. To create a basic `SQLContext`, all you need is a SparkContext. + +{% highlight r %} +sqlContext <- sparkRSQL.init(sc) +{% endhighlight %} + +</div> </div> In addition to the basic `SQLContext`, you can also create a `HiveContext`, which provides a @@ -130,6 +141,19 @@ df.show() {% endhighlight %} </div> + +<div data-lang="r" markdown="1"> +{% highlight r %} +sqlContext <- SQLContext(sc) + +df <- jsonFile(sqlContext, "examples/src/main/resources/people.json") + +# Displays the content of the DataFrame to stdout +showDF(df) +{% endhighlight %} + +</div> + </div> @@ -296,6 +320,57 @@ df.groupBy("age").count().show() {% endhighlight %} </div> + +<div data-lang="r" markdown="1"> +{% highlight r %} +sqlContext <- sparkRSQL.init(sc) + +# Create the DataFrame +df <- jsonFile(sqlContext, "examples/src/main/resources/people.json") + +# Show the content of the DataFrame +showDF(df) +## age name +## null Michael +## 30 Andy +## 19 Justin + +# Print the schema in a tree format +printSchema(df) +## root +## |-- age: long (nullable = true) +## |-- name: string (nullable = true) + +# Select only the "name" column +showDF(select(df, "name")) +## name +## Michael +## Andy +## Justin + +# Select everybody, but increment the age by 1 +showDF(select(df, df$name, df$age + 1)) +## name (age + 1) +## Michael null +## Andy 31 +## Justin 20 + +# Select people older than 21 +showDF(where(df, df$age > 21)) +## age name +## 30 Andy + +# Count people by age +showDF(count(groupBy(df, "age"))) +## age count +## null 1 +## 19 1 +## 30 1 + +{% endhighlight %} + +</div> + </div> @@ -325,6 +400,14 @@ sqlContext = SQLContext(sc) df = sqlContext.sql("SELECT * FROM table") {% endhighlight %} </div> + +<div data-lang="r" markdown="1"> +{% highlight r %} +sqlContext <- sparkRSQL.init(sc) +df <- sql(sqlContext, "SELECT * FROM table") +{% endhighlight %} +</div> + </div> @@ -720,6 +803,15 @@ df.select("name", "favorite_color").save("namesAndFavColors.parquet") {% endhighlight %} </div> + +<div data-lang="r" markdown="1"> + +{% highlight r %} +df <- loadDF(sqlContext, "people.parquet") +saveDF(select(df, "name", "age"), "namesAndAges.parquet") +{% endhighlight %} + +</div> </div> ### Manually Specifying Options @@ -761,6 +853,16 @@ df.select("name", "age").save("namesAndAges.parquet", "parquet") {% endhighlight %} </div> +<div data-lang="r" markdown="1"> + +{% highlight r %} + +df <- loadDF(sqlContext, "people.json", "json") +saveDF(select(df, "name", "age"), "namesAndAges.parquet", "parquet") + +{% endhighlight %} + +</div> </div> ### Save Modes @@ -908,6 +1010,31 @@ for teenName in teenNames.collect(): </div> +<div data-lang="r" markdown="1"> + +{% highlight r %} +# sqlContext from the previous example is used in this example. + +schemaPeople # The DataFrame from the previous example. + +# DataFrames can be saved as Parquet files, maintaining the schema information. +saveAsParquetFile(schemaPeople, "people.parquet") + +# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved. +# The result of loading a parquet file is also a DataFrame. +parquetFile <- parquetFile(sqlContext, "people.parquet") + +# Parquet files can also be registered as tables and then used in SQL statements. +registerTempTable(parquetFile, "parquetFile"); +teenagers <- sql(sqlContext, "SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") +teenNames <- map(teenagers, function(p) { paste("Name:", p$name)}) +for (teenName in collect(teenNames)) { + cat(teenName, "\n") +} +{% endhighlight %} + +</div> + <div data-lang="sql" markdown="1"> {% highlight sql %} @@ -1033,7 +1160,7 @@ df2 = sqlContext.createDataFrame(sc.parallelize(range(6, 11)) df2.save("data/test_table/key=2", "parquet") # Read the partitioned table -df3 = sqlContext.parquetFile("data/test_table") +df3 = sqlContext.load("data/test_table", "parquet") df3.printSchema() # The final schema consists of all 3 columns in the Parquet files together @@ -1047,6 +1174,33 @@ df3.printSchema() </div> +<div data-lang="r" markdown="1"> + +{% highlight r %} +# sqlContext from the previous example is used in this example. + +# Create a simple DataFrame, stored into a partition directory +saveDF(df1, "data/test_table/key=1", "parquet", "overwrite") + +# Create another DataFrame in a new partition directory, +# adding a new column and dropping an existing column +saveDF(df2, "data/test_table/key=2", "parquet", "overwrite") + +# Read the partitioned table +df3 <- loadDF(sqlContext, "data/test_table", "parquet") +printSchema(df3) + +# The final schema consists of all 3 columns in the Parquet files together +# with the partiioning column appeared in the partition directory paths. +# root +# |-- single: int (nullable = true) +# |-- double: int (nullable = true) +# |-- triple: int (nullable = true) +# |-- key : int (nullable = true) +{% endhighlight %} + +</div> + </div> ### Configuration @@ -1238,6 +1392,40 @@ anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD) {% endhighlight %} </div> +<div data-lang="r" markdown="1"> +Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. +This conversion can be done using one of two methods in a `SQLContext`: + +* `jsonFile` - loads data from a directory of JSON files where each line of the files is a JSON object. + +Note that the file that is offered as _jsonFile_ is not a typical JSON file. Each +line must contain a separate, self-contained valid JSON object. As a consequence, +a regular multi-line JSON file will most often fail. + +{% highlight r %} +# sc is an existing SparkContext. +sqlContext <- sparkRSQL.init(sc) + +# A JSON dataset is pointed to by path. +# The path can be either a single text file or a directory storing text files. +path <- "examples/src/main/resources/people.json" +# Create a DataFrame from the file(s) pointed to by path +people <- jsonFile(sqlContex,t path) + +# The inferred schema can be visualized using the printSchema() method. +printSchema(people) +# root +# |-- age: integer (nullable = true) +# |-- name: string (nullable = true) + +# Register this DataFrame as a table. +registerTempTable(people, "people") + +# SQL statements can be run by using the sql methods provided by `sqlContext`. +teenagers <- sql(sqlContext, "SELECT name FROM people WHERE age >= 13 AND age <= 19") +{% endhighlight %} +</div> + <div data-lang="sql" markdown="1"> {% highlight sql %} @@ -1314,10 +1502,7 @@ Row[] results = sqlContext.sql("FROM src SELECT key, value").collect(); <div data-lang="python" markdown="1"> When working with Hive one must construct a `HiveContext`, which inherits from `SQLContext`, and -adds support for finding tables in the MetaStore and writing queries using HiveQL. In addition to -the `sql` method a `HiveContext` also provides an `hql` methods, which allows queries to be -expressed in HiveQL. - +adds support for finding tables in the MetaStore and writing queries using HiveQL. {% highlight python %} # sc is an existing SparkContext. from pyspark.sql import HiveContext @@ -1332,6 +1517,24 @@ results = sqlContext.sql("FROM src SELECT key, value").collect() {% endhighlight %} </div> + +<div data-lang="r" markdown="1"> + +When working with Hive one must construct a `HiveContext`, which inherits from `SQLContext`, and +adds support for finding tables in the MetaStore and writing queries using HiveQL. +{% highlight r %} +# sc is an existing SparkContext. +sqlContext <- sparkRHive.init(sc) + +hql(sqlContext, "CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") +hql(sqlContext, "LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") + +# Queries can be expressed in HiveQL. +results = sqlContext.sql("FROM src SELECT key, value").collect() + +{% endhighlight %} + +</div> </div> ## JDBC To Other Databases @@ -1430,6 +1633,16 @@ df = sqlContext.load(source="jdbc", url="jdbc:postgresql:dbserver", dbtable="sch </div> +<div data-lang="r" markdown="1"> + +{% highlight r %} + +df <- loadDF(sqlContext, source="jdbc", url="jdbc:postgresql:dbserver", dbtable="schema.tablename") + +{% endhighlight %} + +</div> + <div data-lang="sql" markdown="1"> {% highlight sql %} @@ -2354,5 +2567,151 @@ from pyspark.sql.types import * </div> +<div data-lang="r" markdown="1"> + +<table class="table"> +<tr> + <th style="width:20%">Data type</th> + <th style="width:40%">Value type in R</th> + <th>API to access or create a data type</th></tr> +<tr> + <td> <b>ByteType</b> </td> + <td> + integer <br /> + <b>Note:</b> Numbers will be converted to 1-byte signed integer numbers at runtime. + Please make sure that numbers are within the range of -128 to 127. + </td> + <td> + "byte" + </td> +</tr> +<tr> + <td> <b>ShortType</b> </td> + <td> + integer <br /> + <b>Note:</b> Numbers will be converted to 2-byte signed integer numbers at runtime. + Please make sure that numbers are within the range of -32768 to 32767. + </td> + <td> + "short" + </td> +</tr> +<tr> + <td> <b>IntegerType</b> </td> + <td> integer </td> + <td> + "integer" + </td> +</tr> +<tr> + <td> <b>LongType</b> </td> + <td> + integer <br /> + <b>Note:</b> Numbers will be converted to 8-byte signed integer numbers at runtime. + Please make sure that numbers are within the range of + -9223372036854775808 to 9223372036854775807. + Otherwise, please convert data to decimal.Decimal and use DecimalType. + </td> + <td> + "long" + </td> +</tr> +<tr> + <td> <b>FloatType</b> </td> + <td> + numeric <br /> + <b>Note:</b> Numbers will be converted to 4-byte single-precision floating + point numbers at runtime. + </td> + <td> + "float" + </td> +</tr> +<tr> + <td> <b>DoubleType</b> </td> + <td> numeric </td> + <td> + "double" + </td> +</tr> +<tr> + <td> <b>DecimalType</b> </td> + <td> Not supported </td> + <td> + Not supported + </td> +</tr> +<tr> + <td> <b>StringType</b> </td> + <td> character </td> + <td> + "string" + </td> +</tr> +<tr> + <td> <b>BinaryType</b> </td> + <td> raw </td> + <td> + "binary" + </td> +</tr> +<tr> + <td> <b>BooleanType</b> </td> + <td> logical </td> + <td> + "bool" + </td> +</tr> +<tr> + <td> <b>TimestampType</b> </td> + <td> POSIXct </td> + <td> + "timestamp" + </td> +</tr> +<tr> + <td> <b>DateType</b> </td> + <td> Date </td> + <td> + "date" + </td> +</tr> +<tr> + <td> <b>ArrayType</b> </td> + <td> vector or list </td> + <td> + list(type="array", elementType=<i>elementType</i>, containsNull=[<i>containsNull</i>])<br /> + <b>Note:</b> The default value of <i>containsNull</i> is <i>True</i>. + </td> +</tr> +<tr> + <td> <b>MapType</b> </td> + <td> enviroment </td> + <td> + list(type="map", keyType=<i>keyType</i>, valueType=<i>valueType</i>, valueContainsNull=[<i>valueContainsNull</i>])<br /> + <b>Note:</b> The default value of <i>valueContainsNull</i> is <i>True</i>. + </td> +</tr> +<tr> + <td> <b>StructType</b> </td> + <td> named list</td> + <td> + list(type="struct", fields=<i>fields</i>)<br /> + <b>Note:</b> <i>fields</i> is a Seq of StructFields. Also, two fields with the same + name are not allowed. + </td> +</tr> +<tr> + <td> <b>StructField</b> </td> + <td> The value type in R of the data type of this field + (For example, integer for a StructField with the data type IntegerType) </td> + <td> + list(name=<i>name</i>, type=<i>dataType</i>, nullable=<i>nullable</i>) + </td> +</tr> +</table> + +</div> + </div> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org