[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r233292436 --- Diff: R/pkg/R/SQLContext.R --- @@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, x } } + data[] <- lapply(data, cleanCols) - # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + if (arrowEnabled) { +shouldUseArrow <- tryCatch({ + stopifnot(length(data) > 0) + dataHead <- head(data, 1) + # Currenty Arrow optimization does not support POSIXct and raw for now. + # Also, it does not support explicit float type set by users. It leads to + # incorrect conversion. We will fall back to the path without Arrow optimization. + if (any(sapply(dataHead, function(x) is(x, "POSIXct" { +stop("Arrow optimization with R DataFrame does not support POSIXct type yet.") + } + if (any(sapply(dataHead, is.raw))) { +stop("Arrow optimization with R DataFrame does not support raw type yet.") + } + if (inherits(schema, "structType")) { +if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) { + stop("Arrow optimization with R DataFrame does not support FloatType type yet.") --- End diff -- I think it's a bug because it always produces a corrupt value when I try to use `number` as explicit float types. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r233209385 --- Diff: R/pkg/R/SQLContext.R --- @@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, x } } + data[] <- lapply(data, cleanCols) - # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + if (arrowEnabled) { +shouldUseArrow <- tryCatch({ + stopifnot(length(data) > 0) + dataHead <- head(data, 1) + # Currenty Arrow optimization does not support POSIXct and raw for now. + # Also, it does not support explicit float type set by users. It leads to + # incorrect conversion. We will fall back to the path without Arrow optimization. + if (any(sapply(dataHead, function(x) is(x, "POSIXct" { +stop("Arrow optimization with R DataFrame does not support POSIXct type yet.") + } + if (any(sapply(dataHead, is.raw))) { +stop("Arrow optimization with R DataFrame does not support raw type yet.") + } + if (inherits(schema, "structType")) { +if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) { + stop("Arrow optimization with R DataFrame does not support FloatType type yet.") --- End diff -- Oh if it's only when casting to a float, then maybe not that big of an issue. I just wanted to make sure a bug was filed for Arrow if the problem is there. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232895848 --- Diff: R/pkg/R/SQLContext.R --- @@ -172,36 +257,72 @@ getDefaultSqlSource <- function() { createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, numPartitions = NULL) { sparkSession <- getSparkSession() - + arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true" + shouldUseArrow <- FALSE + firstRow <- NULL if (is.data.frame(data)) { - # Convert data into a list of rows. Each row is a list. - - # get the names of columns, they will be put into RDD - if (is.null(schema)) { -schema <- names(data) - } +# get the names of columns, they will be put into RDD +if (is.null(schema)) { + schema <- names(data) +} - # get rid of factor type - cleanCols <- function(x) { -if (is.factor(x)) { - as.character(x) -} else { - x -} +# get rid of factor type +cleanCols <- function(x) { + if (is.factor(x)) { +as.character(x) + } else { +x } +} +data[] <- lapply(data, cleanCols) + +args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) +if (arrowEnabled) { + shouldUseArrow <- tryCatch({ --- End diff -- Yup, correct. Let me address other comments as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232619364 --- Diff: R/pkg/R/SQLContext.R --- @@ -172,36 +257,72 @@ getDefaultSqlSource <- function() { createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, numPartitions = NULL) { sparkSession <- getSparkSession() - + arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true" + shouldUseArrow <- FALSE + firstRow <- NULL if (is.data.frame(data)) { - # Convert data into a list of rows. Each row is a list. - - # get the names of columns, they will be put into RDD - if (is.null(schema)) { -schema <- names(data) - } +# get the names of columns, they will be put into RDD +if (is.null(schema)) { + schema <- names(data) +} - # get rid of factor type - cleanCols <- function(x) { -if (is.factor(x)) { - as.character(x) -} else { - x -} +# get rid of factor type +cleanCols <- function(x) { + if (is.factor(x)) { +as.character(x) + } else { +x } +} +data[] <- lapply(data, cleanCols) + +args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) +if (arrowEnabled) { + shouldUseArrow <- tryCatch({ --- End diff -- When `shouldUseArrow` is true, I think it means we already done using arrow? Maybe just `useArrow`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232618853 --- Diff: R/pkg/R/SQLContext.R --- @@ -172,36 +257,72 @@ getDefaultSqlSource <- function() { createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, numPartitions = NULL) { sparkSession <- getSparkSession() - + arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true" + shouldUseArrow <- FALSE + firstRow <- NULL if (is.data.frame(data)) { - # Convert data into a list of rows. Each row is a list. - - # get the names of columns, they will be put into RDD - if (is.null(schema)) { -schema <- names(data) - } +# get the names of columns, they will be put into RDD +if (is.null(schema)) { + schema <- names(data) +} - # get rid of factor type - cleanCols <- function(x) { -if (is.factor(x)) { - as.character(x) -} else { - x -} +# get rid of factor type +cleanCols <- function(x) { + if (is.factor(x)) { +as.character(x) + } else { +x } +} +data[] <- lapply(data, cleanCols) + +args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) +if (arrowEnabled) { + shouldUseArrow <- tryCatch({ +stopifnot(length(data) > 0) +dataHead <- head(data, 1) +checkTypeRequirementForArrow(data, schema) +fileName <- writeToTempFileInArrow(data, numPartitions) --- End diff -- Should we move `writeToTempFileInArrow` call into next `tryCatch`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232620582 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala --- @@ -225,4 +226,25 @@ private[sql] object SQLUtils extends Logging { } sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray } + + /** + * R callable function to read a file in Arrow stream format and create an `RDD` + * using each serialized ArrowRecordBatch as a partition. + */ + def readArrowStreamFromFile( + sparkSession: SparkSession, + filename: String): JavaRDD[Array[Byte]] = { +ArrowConverters.readArrowStreamFromFile(sparkSession.sqlContext, filename) + } + + /** + * R callable function to read a file in Arrow stream format and create a `DataFrame` --- End diff -- Is this going to read a file in Arrow stream format? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232525184 --- Diff: R/pkg/tests/fulltests/test_sparkSQL.R --- @@ -307,6 +307,64 @@ test_that("create DataFrame from RDD", { unsetHiveContext() }) +test_that("createDataFrame Arrow optimization", { + skip_if_not_installed("arrow") + skip_if_not_installed("withr") --- End diff -- Maybe we should hold it for now .. because I realised R API for Arrow requires R 3.5.x and Jenkins's one is 3.1.x if I remember this correctly. Ideally, we could probably do that via AppVeyor if everything goes fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232525068 --- Diff: R/pkg/tests/fulltests/test_sparkSQL.R --- @@ -307,6 +307,64 @@ test_that("create DataFrame from RDD", { unsetHiveContext() }) +test_that("createDataFrame Arrow optimization", { + skip_if_not_installed("arrow") + skip_if_not_installed("withr") + + conf <- callJMethod(sparkSession, "conf") + arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] + + callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false") + tryCatch({ --- End diff -- Just to inject the finally .. :-) .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232500065 --- Diff: R/pkg/R/SQLContext.R --- @@ -172,36 +257,72 @@ getDefaultSqlSource <- function() { createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, numPartitions = NULL) { sparkSession <- getSparkSession() - + arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true" + shouldUseArrow <- FALSE + firstRow <- NULL if (is.data.frame(data)) { - # Convert data into a list of rows. Each row is a list. - - # get the names of columns, they will be put into RDD - if (is.null(schema)) { -schema <- names(data) - } +# get the names of columns, they will be put into RDD +if (is.null(schema)) { + schema <- names(data) +} - # get rid of factor type - cleanCols <- function(x) { -if (is.factor(x)) { - as.character(x) -} else { - x -} +# get rid of factor type +cleanCols <- function(x) { + if (is.factor(x)) { +as.character(x) + } else { +x } +} +data[] <- lapply(data, cleanCols) + +args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) +if (arrowEnabled) { + shouldUseArrow <- tryCatch({ +stopifnot(length(data) > 0) +dataHead <- head(data, 1) +checkTypeRequirementForArrow(data, schema) +fileName <- writeToTempFileInArrow(data, numPartitions) +tryCatch( + jrddInArrow <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", + "readArrowStreamFromFile", + sparkSession, + fileName), +finally = { + file.remove(fileName) +}) + +firstRow <- do.call(mapply, append(args, dataHead))[[1]] +TRUE + }, + error = function(e) { +warning(paste0("createDataFrame attempted Arrow optimization because ", + "'spark.sql.execution.arrow.enabled' is set to true; however, ", + "failed, attempting non-optimization. Reason: ", + e)) +return(FALSE) --- End diff -- nit: just `FALSE` is good --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232499902 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,91 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + requireNamespace1 <- requireNamespace + + # For some reasons, Arrow R API requires to load 'defer_parent' which is from 'withr' package. + # This is a workaround to avoid this error. Otherwise, we should directly load 'withr' + # package, which CRAN complains about. + defer_parent <- function(x, ...) { + if (requireNamespace1("withr", quietly = TRUE)) { + defer_parent <- get("defer_parent", envir = asNamespace("withr"), inherits = FALSE) + defer_parent(x, ...) +} else { + stop("'withr' package should be installed.") +} + } + + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +numPartitions <- if (!is.null(numPartitions)) { + numToInt(numPartitions) +} else { + 1 --- End diff -- future: consolidate the default here and inside makeSplits --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232499848 --- Diff: R/pkg/tests/fulltests/test_sparkSQL.R --- @@ -307,6 +307,64 @@ test_that("create DataFrame from RDD", { unsetHiveContext() }) +test_that("createDataFrame Arrow optimization", { + skip_if_not_installed("arrow") + skip_if_not_installed("withr") --- End diff -- are we going to ask shane to install arrow/withr on the Jenkins machines? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232499794 --- Diff: R/pkg/tests/fulltests/test_sparkSQL.R --- @@ -307,6 +307,64 @@ test_that("create DataFrame from RDD", { unsetHiveContext() }) +test_that("createDataFrame Arrow optimization", { + skip_if_not_installed("arrow") + skip_if_not_installed("withr") + + conf <- callJMethod(sparkSession, "conf") + arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] + + callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false") + tryCatch({ --- End diff -- does this fail? or just a way to inject a finally? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232478997 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. +# Direct 'require' is not recommended by CRAN. Here's a workaround. +require1 <- require +if (require1("withr", quietly = TRUE)) { + numPartitions <- if (!is.null(numPartitions)) { +numToInt(numPartitions) + } else { +1 + } + fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp") + chunk <- as.integer(ceiling(nrow(rdf) / numPartitions)) + rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = chunk)[1:nrow(rdf)]) --- End diff -- Not sure. I think the intention is the same. Let me stick to R's one for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232477365 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. +# Direct 'require' is not recommended by CRAN. Here's a workaround. +require1 <- require +if (require1("withr", quietly = TRUE)) { --- End diff -- that will be good. circumventing CRAN check for method name is... problematic.. (there are other more hacky way too, but ..) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232477325 --- Diff: R/pkg/R/SQLContext.R --- @@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, x } } + data[] <- lapply(data, cleanCols) - # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + if (arrowEnabled) { +shouldUseArrow <- tryCatch({ + stopifnot(length(data) > 0) + dataHead <- head(data, 1) + # Currenty Arrow optimization does not support POSIXct and raw for now. + # Also, it does not support explicit float type set by users. It leads to + # incorrect conversion. We will fall back to the path without Arrow optimization. + if (any(sapply(dataHead, function(x) is(x, "POSIXct" { +stop("Arrow optimization with R DataFrame does not support POSIXct type yet.") + } + if (any(sapply(dataHead, is.raw))) { +stop("Arrow optimization with R DataFrame does not support raw type yet.") + } + if (inherits(schema, "structType")) { +if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) { + stop("Arrow optimization with R DataFrame does not support FloatType type yet.") --- End diff -- maybe out of bit range? 53 bit https://stat.ethz.ch/R-manual/R-patched/library/base/html/double.html --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232477271 --- Diff: R/pkg/R/SQLContext.R --- @@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, x } } + data[] <- lapply(data, cleanCols) - # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + if (arrowEnabled) { +shouldUseArrow <- tryCatch({ + stopifnot(length(data) > 0) + dataHead <- head(data, 1) + # Currenty Arrow optimization does not support POSIXct and raw for now. + # Also, it does not support explicit float type set by users. It leads to + # incorrect conversion. We will fall back to the path without Arrow optimization. + if (any(sapply(dataHead, function(x) is(x, "POSIXct" { +stop("Arrow optimization with R DataFrame does not support POSIXct type yet.") + } + if (any(sapply(dataHead, is.raw))) { +stop("Arrow optimization with R DataFrame does not support raw type yet.") + } + if (inherits(schema, "structType")) { +if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) { + stop("Arrow optimization with R DataFrame does not support FloatType type yet.") +} + } + firstRow <- do.call(mapply, append(args, dataHead))[[1]] + fileName <- writeToTempFileInArrow(data, numPartitions) + tryCatch( +jrddInArrow <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", + "readArrowStreamFromFile", + sparkSession, + fileName), + finally = { +file.remove(fileName) --- End diff -- yes, just more consistent. I also don't know for sure why all other instances are calling unlink --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232477257 --- Diff: R/pkg/R/SQLContext.R --- @@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, x } } + data[] <- lapply(data, cleanCols) - # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + if (arrowEnabled) { +shouldUseArrow <- tryCatch({ + stopifnot(length(data) > 0) + dataHead <- head(data, 1) + # Currenty Arrow optimization does not support POSIXct and raw for now. + # Also, it does not support explicit float type set by users. It leads to + # incorrect conversion. We will fall back to the path without Arrow optimization. + if (any(sapply(dataHead, function(x) is(x, "POSIXct" { --- End diff -- LG, I tested a few cases too --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232477171 --- Diff: R/pkg/R/SQLContext.R --- @@ -172,10 +221,10 @@ getDefaultSqlSource <- function() { createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, numPartitions = NULL) { sparkSession <- getSparkSession() - + arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true" --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232477155 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. +# Direct 'require' is not recommended by CRAN. Here's a workaround. +require1 <- require +if (require1("withr", quietly = TRUE)) { + numPartitions <- if (!is.null(numPartitions)) { +numToInt(numPartitions) + } else { +1 + } + fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp") + chunk <- as.integer(ceiling(nrow(rdf) / numPartitions)) + rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = chunk)[1:nrow(rdf)]) --- End diff -- ah, any idea that was done that way in python? this seems to be different from sc.paralleize which is what https://github.com/apache/spark/blob/c3b4a94a91d66c172cf332321d3a78dba29ef8f0/R/pkg/R/context.R#L152 is done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232477131 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala --- @@ -225,4 +226,25 @@ private[sql] object SQLUtils extends Logging { } sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray } + + /** + * R callable function to read a file in Arrow stream format and create a `RDD` + * using each serialized ArrowRecordBatch as a partition. + */ + def readArrowStreamFromFile( + sparkSession: SparkSession, + filename: String): JavaRDD[Array[Byte]] = { +ArrowConverters.readArrowStreamFromFile(sparkSession.sqlContext, filename) --- End diff -- hmm, I see your point... but there could be hundreds of these wrapper we need add if we set as a practice, I'm guessing. a few problems with these wrappers I see: 1. they are extra work to add or maintain 2. many are very simple, not much value add 3. many get abandoned over the years - they are not called and not removed but I kinda see your way, let's keep this one and review any new one in the future. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232476906 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. +# Direct 'require' is not recommended by CRAN. Here's a workaround. +require1 <- require +if (require1("withr", quietly = TRUE)) { + numPartitions <- if (!is.null(numPartitions)) { +numToInt(numPartitions) + } else { +1 + } + fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp") + chunk <- as.integer(ceiling(nrow(rdf) / numPartitions)) + rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = chunk)[1:nrow(rdf)]) --- End diff -- Let me try to reuse the R side slicing logic. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232475881 --- Diff: R/pkg/R/SQLContext.R --- @@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, x } } + data[] <- lapply(data, cleanCols) - # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + if (arrowEnabled) { +shouldUseArrow <- tryCatch({ + stopifnot(length(data) > 0) + dataHead <- head(data, 1) + # Currenty Arrow optimization does not support POSIXct and raw for now. + # Also, it does not support explicit float type set by users. It leads to + # incorrect conversion. We will fall back to the path without Arrow optimization. + if (any(sapply(dataHead, function(x) is(x, "POSIXct" { --- End diff -- Looks okay in this case specifically: ```r > any(sapply(head(data.frame(list(list(a=NA))), 1), is.raw)) [1] FALSE > any(sapply(head(data.frame(list(list(a=NA))), 1), function(x) is(x, "POSIXct"))) [1] FALSE > any(sapply(head(data.frame(list(list(a=1))), 1), is.raw)) [1] FALSE > any(sapply(head(data.frame(list(list(a="a"))), 1), function(x) is(x, "POSIXct"))) [1] FALSE > any(sapply(head(data.frame(list(list(a=raw(1, 1), is.raw)) [1] TRUE > any(sapply(head(data.frame(list(list(a=as.POSIXct("2000-01-01", 1), function(x) is(x, "POSIXct"))) [1] TRUE ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232475777 --- Diff: R/pkg/R/SQLContext.R --- @@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, x } } + data[] <- lapply(data, cleanCols) - # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + if (arrowEnabled) { +shouldUseArrow <- tryCatch({ + stopifnot(length(data) > 0) + dataHead <- head(data, 1) + # Currenty Arrow optimization does not support POSIXct and raw for now. + # Also, it does not support explicit float type set by users. It leads to + # incorrect conversion. We will fall back to the path without Arrow optimization. + if (any(sapply(dataHead, function(x) is(x, "POSIXct" { +stop("Arrow optimization with R DataFrame does not support POSIXct type yet.") + } + if (any(sapply(dataHead, is.raw))) { +stop("Arrow optimization with R DataFrame does not support raw type yet.") + } + if (inherits(schema, "structType")) { +if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) { + stop("Arrow optimization with R DataFrame does not support FloatType type yet.") +} + } + firstRow <- do.call(mapply, append(args, dataHead))[[1]] + fileName <- writeToTempFileInArrow(data, numPartitions) + tryCatch( +jrddInArrow <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", + "readArrowStreamFromFile", + sparkSession, + fileName), + finally = { +file.remove(fileName) --- End diff -- I believe either way is fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232475752 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. +# Direct 'require' is not recommended by CRAN. Here's a workaround. +require1 <- require +if (require1("withr", quietly = TRUE)) { --- End diff -- Yea, I at least managed to get rid of this hack itself. Will push soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232473761 --- Diff: R/pkg/R/SQLContext.R --- @@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, x } } + data[] <- lapply(data, cleanCols) - # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + if (arrowEnabled) { +shouldUseArrow <- tryCatch({ + stopifnot(length(data) > 0) + dataHead <- head(data, 1) + # Currenty Arrow optimization does not support POSIXct and raw for now. + # Also, it does not support explicit float type set by users. It leads to + # incorrect conversion. We will fall back to the path without Arrow optimization. + if (any(sapply(dataHead, function(x) is(x, "POSIXct" { +stop("Arrow optimization with R DataFrame does not support POSIXct type yet.") + } + if (any(sapply(dataHead, is.raw))) { +stop("Arrow optimization with R DataFrame does not support raw type yet.") + } + if (inherits(schema, "structType")) { +if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) { + stop("Arrow optimization with R DataFrame does not support FloatType type yet.") --- End diff -- I suspect that it happens when `numeric` (which is like `1.0`) is casted into float type. I think it's related with casting behaviour. Let me take a look and file a JIRA there in Arrow side but if you don't mind I will focus on matching exact type cases for now ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232473723 --- Diff: R/pkg/R/SQLContext.R --- @@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, x } } + data[] <- lapply(data, cleanCols) - # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + if (arrowEnabled) { +shouldUseArrow <- tryCatch({ --- End diff -- Yup, let me try. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232473716 --- Diff: R/pkg/R/SQLContext.R --- @@ -172,10 +221,10 @@ getDefaultSqlSource <- function() { createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, numPartitions = NULL) { sparkSession <- getSparkSession() - + arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true" --- End diff -- Yea,I checked that it always has the default value. I initially left the default value but took it out after double checking. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232473705 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. +# Direct 'require' is not recommended by CRAN. Here's a workaround. +require1 <- require +if (require1("withr", quietly = TRUE)) { + numPartitions <- if (!is.null(numPartitions)) { +numToInt(numPartitions) + } else { +1 + } + fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp") + chunk <- as.integer(ceiling(nrow(rdf) / numPartitions)) + rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = chunk)[1:nrow(rdf)]) + stream_writer <- NULL + for (rdf_slice in rdf_slices) { +batch <- record_batch(rdf_slice) +if (is.null(stream_writer)) { + # We should avoid private calls like 'close_on_exit' (CRAN disallows) but looks + # there's no exposed API for it. Here's a workaround but ideally this should + # be removed. + close_on_exit <- get("close_on_exit", envir = asNamespace("arrow"), inherits = FALSE) --- End diff -- Hm, possibly yea. Let me try it. In this way, we could get rid of `require`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232473697 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. +# Direct 'require' is not recommended by CRAN. Here's a workaround. +require1 <- require +if (require1("withr", quietly = TRUE)) { + numPartitions <- if (!is.null(numPartitions)) { +numToInt(numPartitions) + } else { +1 + } + fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp") + chunk <- as.integer(ceiling(nrow(rdf) / numPartitions)) + rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = chunk)[1:nrow(rdf)]) --- End diff -- This resembles PySpark side logic: https://github.com/apache/spark/blob/d367bdcf521f564d2d7066257200be26b27ea926/python/pyspark/sql/session.py#L554-L556 Let me check the difference between them --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232473669 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. +# Direct 'require' is not recommended by CRAN. Here's a workaround. +require1 <- require +if (require1("withr", quietly = TRUE)) { + numPartitions <- if (!is.null(numPartitions)) { +numToInt(numPartitions) + } else { +1 --- End diff -- We should; however, it follows the original code path's behaviour. I matched it as the same so that we can compare the performances in the same conditions. If you don't mind, I will fix both in a separate PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232473643 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala --- @@ -225,4 +226,25 @@ private[sql] object SQLUtils extends Logging { } sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray } + + /** + * R callable function to read a file in Arrow stream format and create a `RDD` + * using each serialized ArrowRecordBatch as a partition. + */ + def readArrowStreamFromFile( + sparkSession: SparkSession, + filename: String): JavaRDD[Array[Byte]] = { +ArrowConverters.readArrowStreamFromFile(sparkSession.sqlContext, filename) --- End diff -- Hmhmhm .. yea. What I was trying to do is to add SQL related codes called in R from JVM, into here when they are not official APIs in order to avoid, we change the internal APIs within Scala, and it causes R test failure. I was trying to do the similar things within PySpark side. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232473611 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. +# Direct 'require' is not recommended by CRAN. Here's a workaround. +require1 <- require +if (require1("withr", quietly = TRUE)) { --- End diff -- Yup .. it is .. looks we shouldn't have this error from a cursory look in R API of Arrow. Maybe this can be gone when I use official R Arrow release version. Let me check it later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232453690 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. --- End diff -- nit: `arrow` -> `Arrow` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232425279 --- Diff: R/pkg/R/SQLContext.R --- @@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, x } } + data[] <- lapply(data, cleanCols) - # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + if (arrowEnabled) { +shouldUseArrow <- tryCatch({ + stopifnot(length(data) > 0) + dataHead <- head(data, 1) + # Currenty Arrow optimization does not support POSIXct and raw for now. + # Also, it does not support explicit float type set by users. It leads to + # incorrect conversion. We will fall back to the path without Arrow optimization. + if (any(sapply(dataHead, function(x) is(x, "POSIXct" { +stop("Arrow optimization with R DataFrame does not support POSIXct type yet.") + } + if (any(sapply(dataHead, is.raw))) { +stop("Arrow optimization with R DataFrame does not support raw type yet.") + } + if (inherits(schema, "structType")) { +if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) { + stop("Arrow optimization with R DataFrame does not support FloatType type yet.") --- End diff -- Any idea what's going on with the `FloatType`? Is it a problem on the arrow side? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232425031 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala --- @@ -225,4 +226,25 @@ private[sql] object SQLUtils extends Logging { } sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray } + + /** + * R callable function to read a file in Arrow stream format and create a `RDD` --- End diff -- nit: a `RDD` -> an `RDD` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232202773 --- Diff: R/pkg/R/SQLContext.R --- @@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, x } } + data[] <- lapply(data, cleanCols) - # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + if (arrowEnabled) { +shouldUseArrow <- tryCatch({ + stopifnot(length(data) > 0) + dataHead <- head(data, 1) + # Currenty Arrow optimization does not support POSIXct and raw for now. + # Also, it does not support explicit float type set by users. It leads to + # incorrect conversion. We will fall back to the path without Arrow optimization. + if (any(sapply(dataHead, function(x) is(x, "POSIXct" { +stop("Arrow optimization with R DataFrame does not support POSIXct type yet.") + } + if (any(sapply(dataHead, is.raw))) { +stop("Arrow optimization with R DataFrame does not support raw type yet.") + } + if (inherits(schema, "structType")) { +if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) { + stop("Arrow optimization with R DataFrame does not support FloatType type yet.") +} + } + firstRow <- do.call(mapply, append(args, dataHead))[[1]] + fileName <- writeToTempFileInArrow(data, numPartitions) + tryCatch( +jrddInArrow <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", + "readArrowStreamFromFile", + sparkSession, + fileName), + finally = { +file.remove(fileName) + }) + TRUE +}, +error = function(e) { + message(paste0("WARN: createDataFrame attempted Arrow optimization because ", --- End diff -- ooops --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232170936 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. +# Direct 'require' is not recommended by CRAN. Here's a workaround. +require1 <- require +if (require1("withr", quietly = TRUE)) { + numPartitions <- if (!is.null(numPartitions)) { +numToInt(numPartitions) + } else { +1 + } + fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp") + chunk <- as.integer(ceiling(nrow(rdf) / numPartitions)) + rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = chunk)[1:nrow(rdf)]) --- End diff -- `1 : ceiling`? `1 : nrow`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232176721 --- Diff: R/pkg/R/SQLContext.R --- @@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, x } } + data[] <- lapply(data, cleanCols) - # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + if (arrowEnabled) { +shouldUseArrow <- tryCatch({ + stopifnot(length(data) > 0) + dataHead <- head(data, 1) + # Currenty Arrow optimization does not support POSIXct and raw for now. + # Also, it does not support explicit float type set by users. It leads to + # incorrect conversion. We will fall back to the path without Arrow optimization. + if (any(sapply(dataHead, function(x) is(x, "POSIXct" { +stop("Arrow optimization with R DataFrame does not support POSIXct type yet.") + } + if (any(sapply(dataHead, is.raw))) { +stop("Arrow optimization with R DataFrame does not support raw type yet.") + } + if (inherits(schema, "structType")) { +if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) { + stop("Arrow optimization with R DataFrame does not support FloatType type yet.") +} + } + firstRow <- do.call(mapply, append(args, dataHead))[[1]] + fileName <- writeToTempFileInArrow(data, numPartitions) + tryCatch( +jrddInArrow <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", + "readArrowStreamFromFile", + sparkSession, + fileName), + finally = { +file.remove(fileName) + }) + TRUE +}, +error = function(e) { + message(paste0("WARN: createDataFrame attempted Arrow optimization because ", --- End diff -- ? https://stat.ethz.ch/R-manual/R-devel/library/base/html/warning.html --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232172687 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. +# Direct 'require' is not recommended by CRAN. Here's a workaround. +require1 <- require +if (require1("withr", quietly = TRUE)) { + numPartitions <- if (!is.null(numPartitions)) { +numToInt(numPartitions) + } else { +1 + } + fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp") + chunk <- as.integer(ceiling(nrow(rdf) / numPartitions)) + rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = chunk)[1:nrow(rdf)]) + stream_writer <- NULL + for (rdf_slice in rdf_slices) { +batch <- record_batch(rdf_slice) +if (is.null(stream_writer)) { + # We should avoid private calls like 'close_on_exit' (CRAN disallows) but looks + # there's no exposed API for it. Here's a workaround but ideally this should + # be removed. + close_on_exit <- get("close_on_exit", envir = asNamespace("arrow"), inherits = FALSE) --- End diff -- actually, I think if you use withr here it will call close_on_exit for you? (but when?) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232169938 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. +# Direct 'require' is not recommended by CRAN. Here's a workaround. +require1 <- require +if (require1("withr", quietly = TRUE)) { + numPartitions <- if (!is.null(numPartitions)) { +numToInt(numPartitions) + } else { +1 --- End diff -- should this go by default with default parallelism ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232173367 --- Diff: R/pkg/R/SQLContext.R --- @@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, x } } + data[] <- lapply(data, cleanCols) - # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + if (arrowEnabled) { +shouldUseArrow <- tryCatch({ + stopifnot(length(data) > 0) + dataHead <- head(data, 1) + # Currenty Arrow optimization does not support POSIXct and raw for now. + # Also, it does not support explicit float type set by users. It leads to + # incorrect conversion. We will fall back to the path without Arrow optimization. + if (any(sapply(dataHead, function(x) is(x, "POSIXct" { --- End diff -- can you check - I think `is` `is.x` doesn't something do the right thing when head(df, 1) and one of the field is `NA` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232173043 --- Diff: R/pkg/R/SQLContext.R --- @@ -172,10 +221,10 @@ getDefaultSqlSource <- function() { createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, numPartitions = NULL) { sparkSession <- getSparkSession() - + arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true" --- End diff -- is it always in the conf - I think you can also pass in a default value to sparkR.conf --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232167634 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. +# Direct 'require' is not recommended by CRAN. Here's a workaround. +require1 <- require +if (require1("withr", quietly = TRUE)) { --- End diff -- require1 sounds a bit like a hack though... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232172546 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. +# Direct 'require' is not recommended by CRAN. Here's a workaround. +require1 <- require +if (require1("withr", quietly = TRUE)) { + numPartitions <- if (!is.null(numPartitions)) { +numToInt(numPartitions) + } else { +1 + } + fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp") + chunk <- as.integer(ceiling(nrow(rdf) / numPartitions)) + rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = chunk)[1:nrow(rdf)]) + stream_writer <- NULL + for (rdf_slice in rdf_slices) { +batch <- record_batch(rdf_slice) +if (is.null(stream_writer)) { + # We should avoid private calls like 'close_on_exit' (CRAN disallows) but looks + # there's no exposed API for it. Here's a workaround but ideally this should + # be removed. + close_on_exit <- get("close_on_exit", envir = asNamespace("arrow"), inherits = FALSE) --- End diff -- so is this an API missing in Arrow? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232177132 --- Diff: R/pkg/R/SQLContext.R --- @@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, x } } + data[] <- lapply(data, cleanCols) - # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + if (arrowEnabled) { +shouldUseArrow <- tryCatch({ --- End diff -- refactor this into a method? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232167926 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala --- @@ -225,4 +226,25 @@ private[sql] object SQLUtils extends Logging { } sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray } + + /** + * R callable function to read a file in Arrow stream format and create a `RDD` + * using each serialized ArrowRecordBatch as a partition. + */ + def readArrowStreamFromFile( + sparkSession: SparkSession, + filename: String): JavaRDD[Array[Byte]] = { +ArrowConverters.readArrowStreamFromFile(sparkSession.sqlContext, filename) --- End diff -- what's the advantage of adding this wrapper here - I've thinking to eliminate most of these if possible - and just use callJMethod on `ArrowConverters` say? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232176774 --- Diff: R/pkg/R/SQLContext.R --- @@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, x } } + data[] <- lapply(data, cleanCols) - # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + if (arrowEnabled) { +shouldUseArrow <- tryCatch({ + stopifnot(length(data) > 0) + dataHead <- head(data, 1) + # Currenty Arrow optimization does not support POSIXct and raw for now. + # Also, it does not support explicit float type set by users. It leads to + # incorrect conversion. We will fall back to the path without Arrow optimization. + if (any(sapply(dataHead, function(x) is(x, "POSIXct" { +stop("Arrow optimization with R DataFrame does not support POSIXct type yet.") + } + if (any(sapply(dataHead, is.raw))) { +stop("Arrow optimization with R DataFrame does not support raw type yet.") + } + if (inherits(schema, "structType")) { +if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) { + stop("Arrow optimization with R DataFrame does not support FloatType type yet.") +} + } + firstRow <- do.call(mapply, append(args, dataHead))[[1]] + fileName <- writeToTempFileInArrow(data, numPartitions) + tryCatch( +jrddInArrow <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", + "readArrowStreamFromFile", + sparkSession, + fileName), + finally = { +file.remove(fileName) --- End diff -- unlink? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232171176 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. +# Direct 'require' is not recommended by CRAN. Here's a workaround. +require1 <- require +if (require1("withr", quietly = TRUE)) { + numPartitions <- if (!is.null(numPartitions)) { +numToInt(numPartitions) + } else { +1 + } + fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp") + chunk <- as.integer(ceiling(nrow(rdf) / numPartitions)) + rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = chunk)[1:nrow(rdf)]) --- End diff -- how is this slices computed? is it similar to https://github.com/apache/spark/blob/c3b4a94a91d66c172cf332321d3a78dba29ef8f0/R/pkg/R/context.R#L152 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232167480 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232167110 --- Diff: R/pkg/R/SQLContext.R --- @@ -215,14 +278,16 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, } if (is.null(schema) || (!inherits(schema, "structType") && is.null(names(schema { -row <- firstRDD(rdd) +if (is.null(firstRow)) { + firstRow <- firstRDD(rdd) --- End diff -- I <3 4 digits! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232115966 --- Diff: R/pkg/R/SQLContext.R --- @@ -147,6 +147,55 @@ getDefaultSqlSource <- function() { l[["spark.sql.sources.default"]] } +writeToTempFileInArrow <- function(rdf, numPartitions) { + # R API in Arrow is not yet released. CRAN requires to add the package in requireNamespace + # at DESCRIPTION. Later, CRAN checks if the package is available or not. Therefore, it works + # around by avoiding direct requireNamespace. + requireNamespace1 <- requireNamespace + if (requireNamespace1("arrow", quietly = TRUE)) { +record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE) +record_batch_stream_writer <- get( + "record_batch_stream_writer", envir = asNamespace("arrow"), inherits = FALSE) +file_output_stream <- get( + "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE) +write_record_batch <- get( + "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE) + +# Currently arrow requires withr; otherwise, write APIs don't work. +# Direct 'require' is not recommended by CRAN. Here's a workaround. +require1 <- require +if (require1("withr", quietly = TRUE)) { --- End diff -- It's actually a bit odd that I need to manually require this package. Otherwise, it complains, for instance, here https://github.com/apache/arrow/blob/d3ec69069649013229366ebe01e22f389597dc19/r/R/on_exit.R#L20 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org