[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r233292436
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, 
samplingRatio = 1.0,
   x
 }
   }
+  data[] <- lapply(data, cleanCols)
 
-  # drop factors and wrap lists
-  data <- setNames(lapply(data, cleanCols), NULL)
+  args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
+  if (arrowEnabled) {
+shouldUseArrow <- tryCatch({
+  stopifnot(length(data) > 0)
+  dataHead <- head(data, 1)
+  # Currenty Arrow optimization does not support POSIXct and raw 
for now.
+  # Also, it does not support explicit float type set by users. It 
leads to
+  # incorrect conversion. We will fall back to the path without 
Arrow optimization.
+  if (any(sapply(dataHead, function(x) is(x, "POSIXct" {
+stop("Arrow optimization with R DataFrame does not support 
POSIXct type yet.")
+  }
+  if (any(sapply(dataHead, is.raw))) {
+stop("Arrow optimization with R DataFrame does not support raw 
type yet.")
+  }
+  if (inherits(schema, "structType")) {
+if (any(sapply(schema$fields(), function(x) 
x$dataType.toString() == "FloatType"))) {
+  stop("Arrow optimization with R DataFrame does not support 
FloatType type yet.")
--- End diff --

I think it's a bug because it always produces a corrupt value when I try to 
use `number` as explicit float types.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-13 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r233209385
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, 
samplingRatio = 1.0,
   x
 }
   }
+  data[] <- lapply(data, cleanCols)
 
-  # drop factors and wrap lists
-  data <- setNames(lapply(data, cleanCols), NULL)
+  args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
+  if (arrowEnabled) {
+shouldUseArrow <- tryCatch({
+  stopifnot(length(data) > 0)
+  dataHead <- head(data, 1)
+  # Currenty Arrow optimization does not support POSIXct and raw 
for now.
+  # Also, it does not support explicit float type set by users. It 
leads to
+  # incorrect conversion. We will fall back to the path without 
Arrow optimization.
+  if (any(sapply(dataHead, function(x) is(x, "POSIXct" {
+stop("Arrow optimization with R DataFrame does not support 
POSIXct type yet.")
+  }
+  if (any(sapply(dataHead, is.raw))) {
+stop("Arrow optimization with R DataFrame does not support raw 
type yet.")
+  }
+  if (inherits(schema, "structType")) {
+if (any(sapply(schema$fields(), function(x) 
x$dataType.toString() == "FloatType"))) {
+  stop("Arrow optimization with R DataFrame does not support 
FloatType type yet.")
--- End diff --

Oh if it's only when casting to a float, then maybe not that big of an 
issue. I just wanted to make sure a bug was filed for Arrow if the problem is 
there.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-12 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232895848
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -172,36 +257,72 @@ getDefaultSqlSource <- function() {
 createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
 numPartitions = NULL) {
   sparkSession <- getSparkSession()
-
+  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == 
"true"
+  shouldUseArrow <- FALSE
+  firstRow <- NULL
   if (is.data.frame(data)) {
-  # Convert data into a list of rows. Each row is a list.
-
-  # get the names of columns, they will be put into RDD
-  if (is.null(schema)) {
-schema <- names(data)
-  }
+# get the names of columns, they will be put into RDD
+if (is.null(schema)) {
+  schema <- names(data)
+}
 
-  # get rid of factor type
-  cleanCols <- function(x) {
-if (is.factor(x)) {
-  as.character(x)
-} else {
-  x
-}
+# get rid of factor type
+cleanCols <- function(x) {
+  if (is.factor(x)) {
+as.character(x)
+  } else {
+x
   }
+}
+data[] <- lapply(data, cleanCols)
+
+args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
+if (arrowEnabled) {
+  shouldUseArrow <- tryCatch({
--- End diff --

Yup, correct. Let me address other comments as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232619364
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -172,36 +257,72 @@ getDefaultSqlSource <- function() {
 createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
 numPartitions = NULL) {
   sparkSession <- getSparkSession()
-
+  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == 
"true"
+  shouldUseArrow <- FALSE
+  firstRow <- NULL
   if (is.data.frame(data)) {
-  # Convert data into a list of rows. Each row is a list.
-
-  # get the names of columns, they will be put into RDD
-  if (is.null(schema)) {
-schema <- names(data)
-  }
+# get the names of columns, they will be put into RDD
+if (is.null(schema)) {
+  schema <- names(data)
+}
 
-  # get rid of factor type
-  cleanCols <- function(x) {
-if (is.factor(x)) {
-  as.character(x)
-} else {
-  x
-}
+# get rid of factor type
+cleanCols <- function(x) {
+  if (is.factor(x)) {
+as.character(x)
+  } else {
+x
   }
+}
+data[] <- lapply(data, cleanCols)
+
+args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
+if (arrowEnabled) {
+  shouldUseArrow <- tryCatch({
--- End diff --

When `shouldUseArrow` is true, I think it means we already done using 
arrow? Maybe just `useArrow`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232618853
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -172,36 +257,72 @@ getDefaultSqlSource <- function() {
 createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
 numPartitions = NULL) {
   sparkSession <- getSparkSession()
-
+  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == 
"true"
+  shouldUseArrow <- FALSE
+  firstRow <- NULL
   if (is.data.frame(data)) {
-  # Convert data into a list of rows. Each row is a list.
-
-  # get the names of columns, they will be put into RDD
-  if (is.null(schema)) {
-schema <- names(data)
-  }
+# get the names of columns, they will be put into RDD
+if (is.null(schema)) {
+  schema <- names(data)
+}
 
-  # get rid of factor type
-  cleanCols <- function(x) {
-if (is.factor(x)) {
-  as.character(x)
-} else {
-  x
-}
+# get rid of factor type
+cleanCols <- function(x) {
+  if (is.factor(x)) {
+as.character(x)
+  } else {
+x
   }
+}
+data[] <- lapply(data, cleanCols)
+
+args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
+if (arrowEnabled) {
+  shouldUseArrow <- tryCatch({
+stopifnot(length(data) > 0)
+dataHead <- head(data, 1)
+checkTypeRequirementForArrow(data, schema)
+fileName <- writeToTempFileInArrow(data, numPartitions)
--- End diff --

Should we move `writeToTempFileInArrow` call into next `tryCatch`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232620582
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala 
---
@@ -225,4 +226,25 @@ private[sql] object SQLUtils extends Logging {
 }
 sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray
   }
+
+  /**
+   * R callable function to read a file in Arrow stream format and create 
an `RDD`
+   * using each serialized ArrowRecordBatch as a partition.
+   */
+  def readArrowStreamFromFile(
+  sparkSession: SparkSession,
+  filename: String): JavaRDD[Array[Byte]] = {
+ArrowConverters.readArrowStreamFromFile(sparkSession.sqlContext, 
filename)
+  }
+
+  /**
+   * R callable function to read a file in Arrow stream format and create 
a `DataFrame`
--- End diff --

Is this going to read a file in Arrow stream format?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232525184
  
--- Diff: R/pkg/tests/fulltests/test_sparkSQL.R ---
@@ -307,6 +307,64 @@ test_that("create DataFrame from RDD", {
   unsetHiveContext()
 })
 
+test_that("createDataFrame Arrow optimization", {
+  skip_if_not_installed("arrow")
+  skip_if_not_installed("withr")
--- End diff --

Maybe we should hold it for now .. because I realised R API for Arrow 
requires R 3.5.x and Jenkins's one is 3.1.x if I remember this correctly. 
Ideally, we could probably do that via AppVeyor if everything goes fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232525068
  
--- Diff: R/pkg/tests/fulltests/test_sparkSQL.R ---
@@ -307,6 +307,64 @@ test_that("create DataFrame from RDD", {
   unsetHiveContext()
 })
 
+test_that("createDataFrame Arrow optimization", {
+  skip_if_not_installed("arrow")
+  skip_if_not_installed("withr")
+
+  conf <- callJMethod(sparkSession, "conf")
+  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+
+  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
+  tryCatch({
--- End diff --

Just to inject the finally .. :-) ..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-11 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232500065
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -172,36 +257,72 @@ getDefaultSqlSource <- function() {
 createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
 numPartitions = NULL) {
   sparkSession <- getSparkSession()
-
+  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == 
"true"
+  shouldUseArrow <- FALSE
+  firstRow <- NULL
   if (is.data.frame(data)) {
-  # Convert data into a list of rows. Each row is a list.
-
-  # get the names of columns, they will be put into RDD
-  if (is.null(schema)) {
-schema <- names(data)
-  }
+# get the names of columns, they will be put into RDD
+if (is.null(schema)) {
+  schema <- names(data)
+}
 
-  # get rid of factor type
-  cleanCols <- function(x) {
-if (is.factor(x)) {
-  as.character(x)
-} else {
-  x
-}
+# get rid of factor type
+cleanCols <- function(x) {
+  if (is.factor(x)) {
+as.character(x)
+  } else {
+x
   }
+}
+data[] <- lapply(data, cleanCols)
+
+args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
+if (arrowEnabled) {
+  shouldUseArrow <- tryCatch({
+stopifnot(length(data) > 0)
+dataHead <- head(data, 1)
+checkTypeRequirementForArrow(data, schema)
+fileName <- writeToTempFileInArrow(data, numPartitions)
+tryCatch(
+  jrddInArrow <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
+ "readArrowStreamFromFile",
+ sparkSession,
+ fileName),
+finally = {
+  file.remove(fileName)
+})
+
+firstRow <- do.call(mapply, append(args, dataHead))[[1]]
+TRUE
+  },
+  error = function(e) {
+warning(paste0("createDataFrame attempted Arrow optimization 
because ",
+   "'spark.sql.execution.arrow.enabled' is set to 
true; however, ",
+   "failed, attempting non-optimization. Reason: ",
+   e))
+return(FALSE)
--- End diff --

nit: just `FALSE` is good


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-11 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232499902
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -147,6 +147,91 @@ getDefaultSqlSource <- function() {
   l[["spark.sql.sources.default"]]
 }
 
+writeToTempFileInArrow <- function(rdf, numPartitions) {
+  requireNamespace1 <- requireNamespace
+
+  # For some reasons, Arrow R API requires to load 'defer_parent' which is 
from 'withr' package.
+  # This is a workaround to avoid this error. Otherwise, we should 
directly load 'withr'
+  # package, which CRAN complains about.
+  defer_parent <- function(x, ...) {
+  if (requireNamespace1("withr", quietly = TRUE)) {
+  defer_parent <- get("defer_parent", envir = asNamespace("withr"), 
inherits = FALSE)
+  defer_parent(x, ...)
+} else {
+  stop("'withr' package should be installed.")
+}
+  }
+
+  # R API in Arrow is not yet released. CRAN requires to add the package 
in requireNamespace
+  # at DESCRIPTION. Later, CRAN checks if the package is available or not. 
Therefore, it works
+  # around by avoiding direct requireNamespace.
+  if (requireNamespace1("arrow", quietly = TRUE)) {
+record_batch <- get("record_batch", envir = asNamespace("arrow"), 
inherits = FALSE)
+record_batch_stream_writer <- get(
+  "record_batch_stream_writer", envir = asNamespace("arrow"), inherits 
= FALSE)
+file_output_stream <- get(
+  "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE)
+write_record_batch <- get(
+  "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE)
+
+numPartitions <- if (!is.null(numPartitions)) {
+  numToInt(numPartitions)
+} else {
+  1
--- End diff --

future: consolidate the default here and inside makeSplits


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-11 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232499848
  
--- Diff: R/pkg/tests/fulltests/test_sparkSQL.R ---
@@ -307,6 +307,64 @@ test_that("create DataFrame from RDD", {
   unsetHiveContext()
 })
 
+test_that("createDataFrame Arrow optimization", {
+  skip_if_not_installed("arrow")
+  skip_if_not_installed("withr")
--- End diff --

are we going to ask shane to install arrow/withr on the Jenkins machines?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-11 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232499794
  
--- Diff: R/pkg/tests/fulltests/test_sparkSQL.R ---
@@ -307,6 +307,64 @@ test_that("create DataFrame from RDD", {
   unsetHiveContext()
 })
 
+test_that("createDataFrame Arrow optimization", {
+  skip_if_not_installed("arrow")
+  skip_if_not_installed("withr")
+
+  conf <- callJMethod(sparkSession, "conf")
+  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]]
+
+  callJMethod(conf, "set", "spark.sql.execution.arrow.enabled", "false")
+  tryCatch({
--- End diff --

does this fail? or just a way to inject a finally?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232478997
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -147,6 +147,55 @@ getDefaultSqlSource <- function() {
   l[["spark.sql.sources.default"]]
 }
 
+writeToTempFileInArrow <- function(rdf, numPartitions) {
+  # R API in Arrow is not yet released. CRAN requires to add the package 
in requireNamespace
+  # at DESCRIPTION. Later, CRAN checks if the package is available or not. 
Therefore, it works
+  # around by avoiding direct requireNamespace.
+  requireNamespace1 <- requireNamespace
+  if (requireNamespace1("arrow", quietly = TRUE)) {
+record_batch <- get("record_batch", envir = asNamespace("arrow"), 
inherits = FALSE)
+record_batch_stream_writer <- get(
+  "record_batch_stream_writer", envir = asNamespace("arrow"), inherits 
= FALSE)
+file_output_stream <- get(
+  "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE)
+write_record_batch <- get(
+  "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE)
+
+# Currently arrow requires withr; otherwise, write APIs don't work.
+# Direct 'require' is not recommended by CRAN. Here's a workaround.
+require1 <- require
+if (require1("withr", quietly = TRUE)) {
+  numPartitions <- if (!is.null(numPartitions)) {
+numToInt(numPartitions)
+  } else {
+1
+  }
+  fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp")
+  chunk <- as.integer(ceiling(nrow(rdf) / numPartitions))
+  rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = 
chunk)[1:nrow(rdf)])
--- End diff --

Not sure. I think the intention is the same. Let me stick to R's one for 
now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-10 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232477365
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -147,6 +147,55 @@ getDefaultSqlSource <- function() {
   l[["spark.sql.sources.default"]]
 }
 
+writeToTempFileInArrow <- function(rdf, numPartitions) {
+  # R API in Arrow is not yet released. CRAN requires to add the package 
in requireNamespace
+  # at DESCRIPTION. Later, CRAN checks if the package is available or not. 
Therefore, it works
+  # around by avoiding direct requireNamespace.
+  requireNamespace1 <- requireNamespace
+  if (requireNamespace1("arrow", quietly = TRUE)) {
+record_batch <- get("record_batch", envir = asNamespace("arrow"), 
inherits = FALSE)
+record_batch_stream_writer <- get(
+  "record_batch_stream_writer", envir = asNamespace("arrow"), inherits 
= FALSE)
+file_output_stream <- get(
+  "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE)
+write_record_batch <- get(
+  "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE)
+
+# Currently arrow requires withr; otherwise, write APIs don't work.
+# Direct 'require' is not recommended by CRAN. Here's a workaround.
+require1 <- require
+if (require1("withr", quietly = TRUE)) {
--- End diff --

that will be good. circumventing CRAN check for method name is... 
problematic..
(there are other more hacky way too, but ..)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-10 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232477325
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, 
samplingRatio = 1.0,
   x
 }
   }
+  data[] <- lapply(data, cleanCols)
 
-  # drop factors and wrap lists
-  data <- setNames(lapply(data, cleanCols), NULL)
+  args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
+  if (arrowEnabled) {
+shouldUseArrow <- tryCatch({
+  stopifnot(length(data) > 0)
+  dataHead <- head(data, 1)
+  # Currenty Arrow optimization does not support POSIXct and raw 
for now.
+  # Also, it does not support explicit float type set by users. It 
leads to
+  # incorrect conversion. We will fall back to the path without 
Arrow optimization.
+  if (any(sapply(dataHead, function(x) is(x, "POSIXct" {
+stop("Arrow optimization with R DataFrame does not support 
POSIXct type yet.")
+  }
+  if (any(sapply(dataHead, is.raw))) {
+stop("Arrow optimization with R DataFrame does not support raw 
type yet.")
+  }
+  if (inherits(schema, "structType")) {
+if (any(sapply(schema$fields(), function(x) 
x$dataType.toString() == "FloatType"))) {
+  stop("Arrow optimization with R DataFrame does not support 
FloatType type yet.")
--- End diff --

maybe out of bit range? 53 bit 
https://stat.ethz.ch/R-manual/R-patched/library/base/html/double.html



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-10 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232477271
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, 
samplingRatio = 1.0,
   x
 }
   }
+  data[] <- lapply(data, cleanCols)
 
-  # drop factors and wrap lists
-  data <- setNames(lapply(data, cleanCols), NULL)
+  args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
+  if (arrowEnabled) {
+shouldUseArrow <- tryCatch({
+  stopifnot(length(data) > 0)
+  dataHead <- head(data, 1)
+  # Currenty Arrow optimization does not support POSIXct and raw 
for now.
+  # Also, it does not support explicit float type set by users. It 
leads to
+  # incorrect conversion. We will fall back to the path without 
Arrow optimization.
+  if (any(sapply(dataHead, function(x) is(x, "POSIXct" {
+stop("Arrow optimization with R DataFrame does not support 
POSIXct type yet.")
+  }
+  if (any(sapply(dataHead, is.raw))) {
+stop("Arrow optimization with R DataFrame does not support raw 
type yet.")
+  }
+  if (inherits(schema, "structType")) {
+if (any(sapply(schema$fields(), function(x) 
x$dataType.toString() == "FloatType"))) {
+  stop("Arrow optimization with R DataFrame does not support 
FloatType type yet.")
+}
+  }
+  firstRow <- do.call(mapply, append(args, dataHead))[[1]]
+  fileName <- writeToTempFileInArrow(data, numPartitions)
+  tryCatch(
+jrddInArrow <- 
callJStatic("org.apache.spark.sql.api.r.SQLUtils",
+   "readArrowStreamFromFile",
+   sparkSession,
+   fileName),
+  finally = {
+file.remove(fileName)
--- End diff --

yes, just more consistent. I also don't know for sure why all other 
instances are calling unlink


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-10 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232477257
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, 
samplingRatio = 1.0,
   x
 }
   }
+  data[] <- lapply(data, cleanCols)
 
-  # drop factors and wrap lists
-  data <- setNames(lapply(data, cleanCols), NULL)
+  args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
+  if (arrowEnabled) {
+shouldUseArrow <- tryCatch({
+  stopifnot(length(data) > 0)
+  dataHead <- head(data, 1)
+  # Currenty Arrow optimization does not support POSIXct and raw 
for now.
+  # Also, it does not support explicit float type set by users. It 
leads to
+  # incorrect conversion. We will fall back to the path without 
Arrow optimization.
+  if (any(sapply(dataHead, function(x) is(x, "POSIXct" {
--- End diff --

LG, I tested a few cases too


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-10 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232477171
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -172,10 +221,10 @@ getDefaultSqlSource <- function() {
 createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
 numPartitions = NULL) {
   sparkSession <- getSparkSession()
-
+  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == 
"true"
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-10 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232477155
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -147,6 +147,55 @@ getDefaultSqlSource <- function() {
   l[["spark.sql.sources.default"]]
 }
 
+writeToTempFileInArrow <- function(rdf, numPartitions) {
+  # R API in Arrow is not yet released. CRAN requires to add the package 
in requireNamespace
+  # at DESCRIPTION. Later, CRAN checks if the package is available or not. 
Therefore, it works
+  # around by avoiding direct requireNamespace.
+  requireNamespace1 <- requireNamespace
+  if (requireNamespace1("arrow", quietly = TRUE)) {
+record_batch <- get("record_batch", envir = asNamespace("arrow"), 
inherits = FALSE)
+record_batch_stream_writer <- get(
+  "record_batch_stream_writer", envir = asNamespace("arrow"), inherits 
= FALSE)
+file_output_stream <- get(
+  "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE)
+write_record_batch <- get(
+  "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE)
+
+# Currently arrow requires withr; otherwise, write APIs don't work.
+# Direct 'require' is not recommended by CRAN. Here's a workaround.
+require1 <- require
+if (require1("withr", quietly = TRUE)) {
+  numPartitions <- if (!is.null(numPartitions)) {
+numToInt(numPartitions)
+  } else {
+1
+  }
+  fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp")
+  chunk <- as.integer(ceiling(nrow(rdf) / numPartitions))
+  rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = 
chunk)[1:nrow(rdf)])
--- End diff --

ah, any idea that was done that way in python? this seems to be different 
from sc.paralleize which is what 
https://github.com/apache/spark/blob/c3b4a94a91d66c172cf332321d3a78dba29ef8f0/R/pkg/R/context.R#L152
 is done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-10 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232477131
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala 
---
@@ -225,4 +226,25 @@ private[sql] object SQLUtils extends Logging {
 }
 sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray
   }
+
+  /**
+   * R callable function to read a file in Arrow stream format and create 
a `RDD`
+   * using each serialized ArrowRecordBatch as a partition.
+   */
+  def readArrowStreamFromFile(
+  sparkSession: SparkSession,
+  filename: String): JavaRDD[Array[Byte]] = {
+ArrowConverters.readArrowStreamFromFile(sparkSession.sqlContext, 
filename)
--- End diff --

hmm, I see your point... but there could be hundreds of these wrapper we 
need add if we set as a practice, I'm guessing. 

a few problems with these wrappers I see:
1. they are extra work to add or maintain
2. many are very simple, not much value add
3. many get abandoned over the years - they are not called and not removed

but I kinda see your way, let's keep this one and review any new one in the 
future.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232476906
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -147,6 +147,55 @@ getDefaultSqlSource <- function() {
   l[["spark.sql.sources.default"]]
 }
 
+writeToTempFileInArrow <- function(rdf, numPartitions) {
+  # R API in Arrow is not yet released. CRAN requires to add the package 
in requireNamespace
+  # at DESCRIPTION. Later, CRAN checks if the package is available or not. 
Therefore, it works
+  # around by avoiding direct requireNamespace.
+  requireNamespace1 <- requireNamespace
+  if (requireNamespace1("arrow", quietly = TRUE)) {
+record_batch <- get("record_batch", envir = asNamespace("arrow"), 
inherits = FALSE)
+record_batch_stream_writer <- get(
+  "record_batch_stream_writer", envir = asNamespace("arrow"), inherits 
= FALSE)
+file_output_stream <- get(
+  "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE)
+write_record_batch <- get(
+  "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE)
+
+# Currently arrow requires withr; otherwise, write APIs don't work.
+# Direct 'require' is not recommended by CRAN. Here's a workaround.
+require1 <- require
+if (require1("withr", quietly = TRUE)) {
+  numPartitions <- if (!is.null(numPartitions)) {
+numToInt(numPartitions)
+  } else {
+1
+  }
+  fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp")
+  chunk <- as.integer(ceiling(nrow(rdf) / numPartitions))
+  rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = 
chunk)[1:nrow(rdf)])
--- End diff --

Let me try to reuse the R side slicing logic.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232475881
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, 
samplingRatio = 1.0,
   x
 }
   }
+  data[] <- lapply(data, cleanCols)
 
-  # drop factors and wrap lists
-  data <- setNames(lapply(data, cleanCols), NULL)
+  args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
+  if (arrowEnabled) {
+shouldUseArrow <- tryCatch({
+  stopifnot(length(data) > 0)
+  dataHead <- head(data, 1)
+  # Currenty Arrow optimization does not support POSIXct and raw 
for now.
+  # Also, it does not support explicit float type set by users. It 
leads to
+  # incorrect conversion. We will fall back to the path without 
Arrow optimization.
+  if (any(sapply(dataHead, function(x) is(x, "POSIXct" {
--- End diff --

Looks okay in this case specifically:

```r
> any(sapply(head(data.frame(list(list(a=NA))), 1), is.raw))
[1] FALSE
> any(sapply(head(data.frame(list(list(a=NA))), 1), function(x) is(x, 
"POSIXct")))
[1] FALSE
> any(sapply(head(data.frame(list(list(a=1))), 1), is.raw))
[1] FALSE
> any(sapply(head(data.frame(list(list(a="a"))), 1), function(x) is(x, 
"POSIXct")))
[1] FALSE
> any(sapply(head(data.frame(list(list(a=raw(1, 1), is.raw))
[1] TRUE
> any(sapply(head(data.frame(list(list(a=as.POSIXct("2000-01-01", 1), 
function(x) is(x, "POSIXct")))
[1] TRUE
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232475777
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, 
samplingRatio = 1.0,
   x
 }
   }
+  data[] <- lapply(data, cleanCols)
 
-  # drop factors and wrap lists
-  data <- setNames(lapply(data, cleanCols), NULL)
+  args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
+  if (arrowEnabled) {
+shouldUseArrow <- tryCatch({
+  stopifnot(length(data) > 0)
+  dataHead <- head(data, 1)
+  # Currenty Arrow optimization does not support POSIXct and raw 
for now.
+  # Also, it does not support explicit float type set by users. It 
leads to
+  # incorrect conversion. We will fall back to the path without 
Arrow optimization.
+  if (any(sapply(dataHead, function(x) is(x, "POSIXct" {
+stop("Arrow optimization with R DataFrame does not support 
POSIXct type yet.")
+  }
+  if (any(sapply(dataHead, is.raw))) {
+stop("Arrow optimization with R DataFrame does not support raw 
type yet.")
+  }
+  if (inherits(schema, "structType")) {
+if (any(sapply(schema$fields(), function(x) 
x$dataType.toString() == "FloatType"))) {
+  stop("Arrow optimization with R DataFrame does not support 
FloatType type yet.")
+}
+  }
+  firstRow <- do.call(mapply, append(args, dataHead))[[1]]
+  fileName <- writeToTempFileInArrow(data, numPartitions)
+  tryCatch(
+jrddInArrow <- 
callJStatic("org.apache.spark.sql.api.r.SQLUtils",
+   "readArrowStreamFromFile",
+   sparkSession,
+   fileName),
+  finally = {
+file.remove(fileName)
--- End diff --

I believe either way is fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232475752
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -147,6 +147,55 @@ getDefaultSqlSource <- function() {
   l[["spark.sql.sources.default"]]
 }
 
+writeToTempFileInArrow <- function(rdf, numPartitions) {
+  # R API in Arrow is not yet released. CRAN requires to add the package 
in requireNamespace
+  # at DESCRIPTION. Later, CRAN checks if the package is available or not. 
Therefore, it works
+  # around by avoiding direct requireNamespace.
+  requireNamespace1 <- requireNamespace
+  if (requireNamespace1("arrow", quietly = TRUE)) {
+record_batch <- get("record_batch", envir = asNamespace("arrow"), 
inherits = FALSE)
+record_batch_stream_writer <- get(
+  "record_batch_stream_writer", envir = asNamespace("arrow"), inherits 
= FALSE)
+file_output_stream <- get(
+  "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE)
+write_record_batch <- get(
+  "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE)
+
+# Currently arrow requires withr; otherwise, write APIs don't work.
+# Direct 'require' is not recommended by CRAN. Here's a workaround.
+require1 <- require
+if (require1("withr", quietly = TRUE)) {
--- End diff --

Yea, I at least managed to get rid of this hack itself. Will push soon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232473761
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, 
samplingRatio = 1.0,
   x
 }
   }
+  data[] <- lapply(data, cleanCols)
 
-  # drop factors and wrap lists
-  data <- setNames(lapply(data, cleanCols), NULL)
+  args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
+  if (arrowEnabled) {
+shouldUseArrow <- tryCatch({
+  stopifnot(length(data) > 0)
+  dataHead <- head(data, 1)
+  # Currenty Arrow optimization does not support POSIXct and raw 
for now.
+  # Also, it does not support explicit float type set by users. It 
leads to
+  # incorrect conversion. We will fall back to the path without 
Arrow optimization.
+  if (any(sapply(dataHead, function(x) is(x, "POSIXct" {
+stop("Arrow optimization with R DataFrame does not support 
POSIXct type yet.")
+  }
+  if (any(sapply(dataHead, is.raw))) {
+stop("Arrow optimization with R DataFrame does not support raw 
type yet.")
+  }
+  if (inherits(schema, "structType")) {
+if (any(sapply(schema$fields(), function(x) 
x$dataType.toString() == "FloatType"))) {
+  stop("Arrow optimization with R DataFrame does not support 
FloatType type yet.")
--- End diff --

I suspect that it happens when `numeric` (which is like `1.0`) is casted 
into float type. I think it's related with casting behaviour. Let me take a 
look and file a JIRA there in Arrow side but if you don't mind I will focus on 
matching exact type cases for now ... 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232473723
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, 
samplingRatio = 1.0,
   x
 }
   }
+  data[] <- lapply(data, cleanCols)
 
-  # drop factors and wrap lists
-  data <- setNames(lapply(data, cleanCols), NULL)
+  args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
+  if (arrowEnabled) {
+shouldUseArrow <- tryCatch({
--- End diff --

Yup, let me try.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232473716
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -172,10 +221,10 @@ getDefaultSqlSource <- function() {
 createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
 numPartitions = NULL) {
   sparkSession <- getSparkSession()
-
+  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == 
"true"
--- End diff --

Yea,I checked that it always has the default value. I initially left the 
default value but took it out after double checking.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232473705
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -147,6 +147,55 @@ getDefaultSqlSource <- function() {
   l[["spark.sql.sources.default"]]
 }
 
+writeToTempFileInArrow <- function(rdf, numPartitions) {
+  # R API in Arrow is not yet released. CRAN requires to add the package 
in requireNamespace
+  # at DESCRIPTION. Later, CRAN checks if the package is available or not. 
Therefore, it works
+  # around by avoiding direct requireNamespace.
+  requireNamespace1 <- requireNamespace
+  if (requireNamespace1("arrow", quietly = TRUE)) {
+record_batch <- get("record_batch", envir = asNamespace("arrow"), 
inherits = FALSE)
+record_batch_stream_writer <- get(
+  "record_batch_stream_writer", envir = asNamespace("arrow"), inherits 
= FALSE)
+file_output_stream <- get(
+  "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE)
+write_record_batch <- get(
+  "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE)
+
+# Currently arrow requires withr; otherwise, write APIs don't work.
+# Direct 'require' is not recommended by CRAN. Here's a workaround.
+require1 <- require
+if (require1("withr", quietly = TRUE)) {
+  numPartitions <- if (!is.null(numPartitions)) {
+numToInt(numPartitions)
+  } else {
+1
+  }
+  fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp")
+  chunk <- as.integer(ceiling(nrow(rdf) / numPartitions))
+  rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = 
chunk)[1:nrow(rdf)])
+  stream_writer <- NULL
+  for (rdf_slice in rdf_slices) {
+batch <- record_batch(rdf_slice)
+if (is.null(stream_writer)) {
+  # We should avoid private calls like 'close_on_exit' (CRAN 
disallows) but looks
+  # there's no exposed API for it. Here's a workaround but ideally 
this should
+  # be removed.
+  close_on_exit <- get("close_on_exit", envir = 
asNamespace("arrow"), inherits = FALSE)
--- End diff --

Hm, possibly yea. Let me try it. In this way, we could get rid of `require`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232473697
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -147,6 +147,55 @@ getDefaultSqlSource <- function() {
   l[["spark.sql.sources.default"]]
 }
 
+writeToTempFileInArrow <- function(rdf, numPartitions) {
+  # R API in Arrow is not yet released. CRAN requires to add the package 
in requireNamespace
+  # at DESCRIPTION. Later, CRAN checks if the package is available or not. 
Therefore, it works
+  # around by avoiding direct requireNamespace.
+  requireNamespace1 <- requireNamespace
+  if (requireNamespace1("arrow", quietly = TRUE)) {
+record_batch <- get("record_batch", envir = asNamespace("arrow"), 
inherits = FALSE)
+record_batch_stream_writer <- get(
+  "record_batch_stream_writer", envir = asNamespace("arrow"), inherits 
= FALSE)
+file_output_stream <- get(
+  "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE)
+write_record_batch <- get(
+  "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE)
+
+# Currently arrow requires withr; otherwise, write APIs don't work.
+# Direct 'require' is not recommended by CRAN. Here's a workaround.
+require1 <- require
+if (require1("withr", quietly = TRUE)) {
+  numPartitions <- if (!is.null(numPartitions)) {
+numToInt(numPartitions)
+  } else {
+1
+  }
+  fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp")
+  chunk <- as.integer(ceiling(nrow(rdf) / numPartitions))
+  rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = 
chunk)[1:nrow(rdf)])
--- End diff --

This resembles PySpark side logic:


https://github.com/apache/spark/blob/d367bdcf521f564d2d7066257200be26b27ea926/python/pyspark/sql/session.py#L554-L556

Let me check the difference between them


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232473669
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -147,6 +147,55 @@ getDefaultSqlSource <- function() {
   l[["spark.sql.sources.default"]]
 }
 
+writeToTempFileInArrow <- function(rdf, numPartitions) {
+  # R API in Arrow is not yet released. CRAN requires to add the package 
in requireNamespace
+  # at DESCRIPTION. Later, CRAN checks if the package is available or not. 
Therefore, it works
+  # around by avoiding direct requireNamespace.
+  requireNamespace1 <- requireNamespace
+  if (requireNamespace1("arrow", quietly = TRUE)) {
+record_batch <- get("record_batch", envir = asNamespace("arrow"), 
inherits = FALSE)
+record_batch_stream_writer <- get(
+  "record_batch_stream_writer", envir = asNamespace("arrow"), inherits 
= FALSE)
+file_output_stream <- get(
+  "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE)
+write_record_batch <- get(
+  "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE)
+
+# Currently arrow requires withr; otherwise, write APIs don't work.
+# Direct 'require' is not recommended by CRAN. Here's a workaround.
+require1 <- require
+if (require1("withr", quietly = TRUE)) {
+  numPartitions <- if (!is.null(numPartitions)) {
+numToInt(numPartitions)
+  } else {
+1
--- End diff --

We should; however, it follows the original code path's behaviour. I 
matched it as the same so that we can compare the performances in the same 
conditions. If you don't mind, I will fix both in a separate PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232473643
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala 
---
@@ -225,4 +226,25 @@ private[sql] object SQLUtils extends Logging {
 }
 sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray
   }
+
+  /**
+   * R callable function to read a file in Arrow stream format and create 
a `RDD`
+   * using each serialized ArrowRecordBatch as a partition.
+   */
+  def readArrowStreamFromFile(
+  sparkSession: SparkSession,
+  filename: String): JavaRDD[Array[Byte]] = {
+ArrowConverters.readArrowStreamFromFile(sparkSession.sqlContext, 
filename)
--- End diff --

Hmhmhm .. yea. What I was trying to do is to add SQL related codes called 
in R from JVM, into here when they are not official APIs in order to avoid, we 
change the internal APIs within Scala, and it causes R test failure. I was 
trying to do the similar things within PySpark side.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-10 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232473611
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -147,6 +147,55 @@ getDefaultSqlSource <- function() {
   l[["spark.sql.sources.default"]]
 }
 
+writeToTempFileInArrow <- function(rdf, numPartitions) {
+  # R API in Arrow is not yet released. CRAN requires to add the package 
in requireNamespace
+  # at DESCRIPTION. Later, CRAN checks if the package is available or not. 
Therefore, it works
+  # around by avoiding direct requireNamespace.
+  requireNamespace1 <- requireNamespace
+  if (requireNamespace1("arrow", quietly = TRUE)) {
+record_batch <- get("record_batch", envir = asNamespace("arrow"), 
inherits = FALSE)
+record_batch_stream_writer <- get(
+  "record_batch_stream_writer", envir = asNamespace("arrow"), inherits 
= FALSE)
+file_output_stream <- get(
+  "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE)
+write_record_batch <- get(
+  "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE)
+
+# Currently arrow requires withr; otherwise, write APIs don't work.
+# Direct 'require' is not recommended by CRAN. Here's a workaround.
+require1 <- require
+if (require1("withr", quietly = TRUE)) {
--- End diff --

Yup .. it is .. looks we shouldn't have this error from a cursory look in R 
API of Arrow. Maybe this can be gone when I use official R Arrow release 
version. Let me check it later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-10 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232453690
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -147,6 +147,55 @@ getDefaultSqlSource <- function() {
   l[["spark.sql.sources.default"]]
 }
 
+writeToTempFileInArrow <- function(rdf, numPartitions) {
+  # R API in Arrow is not yet released. CRAN requires to add the package 
in requireNamespace
+  # at DESCRIPTION. Later, CRAN checks if the package is available or not. 
Therefore, it works
+  # around by avoiding direct requireNamespace.
+  requireNamespace1 <- requireNamespace
+  if (requireNamespace1("arrow", quietly = TRUE)) {
+record_batch <- get("record_batch", envir = asNamespace("arrow"), 
inherits = FALSE)
+record_batch_stream_writer <- get(
+  "record_batch_stream_writer", envir = asNamespace("arrow"), inherits 
= FALSE)
+file_output_stream <- get(
+  "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE)
+write_record_batch <- get(
+  "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE)
+
+# Currently arrow requires withr; otherwise, write APIs don't work.
--- End diff --

nit: `arrow` -> `Arrow`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-09 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232425279
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, 
samplingRatio = 1.0,
   x
 }
   }
+  data[] <- lapply(data, cleanCols)
 
-  # drop factors and wrap lists
-  data <- setNames(lapply(data, cleanCols), NULL)
+  args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
+  if (arrowEnabled) {
+shouldUseArrow <- tryCatch({
+  stopifnot(length(data) > 0)
+  dataHead <- head(data, 1)
+  # Currenty Arrow optimization does not support POSIXct and raw 
for now.
+  # Also, it does not support explicit float type set by users. It 
leads to
+  # incorrect conversion. We will fall back to the path without 
Arrow optimization.
+  if (any(sapply(dataHead, function(x) is(x, "POSIXct" {
+stop("Arrow optimization with R DataFrame does not support 
POSIXct type yet.")
+  }
+  if (any(sapply(dataHead, is.raw))) {
+stop("Arrow optimization with R DataFrame does not support raw 
type yet.")
+  }
+  if (inherits(schema, "structType")) {
+if (any(sapply(schema$fields(), function(x) 
x$dataType.toString() == "FloatType"))) {
+  stop("Arrow optimization with R DataFrame does not support 
FloatType type yet.")
--- End diff --

Any idea what's going on with the `FloatType`? Is it a problem on the arrow 
side?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-09 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232425031
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala 
---
@@ -225,4 +226,25 @@ private[sql] object SQLUtils extends Logging {
 }
 sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray
   }
+
+  /**
+   * R callable function to read a file in Arrow stream format and create 
a `RDD`
--- End diff --

nit: a `RDD` -> an `RDD`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232202773
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, 
samplingRatio = 1.0,
   x
 }
   }
+  data[] <- lapply(data, cleanCols)
 
-  # drop factors and wrap lists
-  data <- setNames(lapply(data, cleanCols), NULL)
+  args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
+  if (arrowEnabled) {
+shouldUseArrow <- tryCatch({
+  stopifnot(length(data) > 0)
+  dataHead <- head(data, 1)
+  # Currenty Arrow optimization does not support POSIXct and raw 
for now.
+  # Also, it does not support explicit float type set by users. It 
leads to
+  # incorrect conversion. We will fall back to the path without 
Arrow optimization.
+  if (any(sapply(dataHead, function(x) is(x, "POSIXct" {
+stop("Arrow optimization with R DataFrame does not support 
POSIXct type yet.")
+  }
+  if (any(sapply(dataHead, is.raw))) {
+stop("Arrow optimization with R DataFrame does not support raw 
type yet.")
+  }
+  if (inherits(schema, "structType")) {
+if (any(sapply(schema$fields(), function(x) 
x$dataType.toString() == "FloatType"))) {
+  stop("Arrow optimization with R DataFrame does not support 
FloatType type yet.")
+}
+  }
+  firstRow <- do.call(mapply, append(args, dataHead))[[1]]
+  fileName <- writeToTempFileInArrow(data, numPartitions)
+  tryCatch(
+jrddInArrow <- 
callJStatic("org.apache.spark.sql.api.r.SQLUtils",
+   "readArrowStreamFromFile",
+   sparkSession,
+   fileName),
+  finally = {
+file.remove(fileName)
+  })
+  TRUE
+},
+error = function(e) {
+  message(paste0("WARN: createDataFrame attempted Arrow 
optimization because ",
--- End diff --

ooops


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-09 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232170936
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -147,6 +147,55 @@ getDefaultSqlSource <- function() {
   l[["spark.sql.sources.default"]]
 }
 
+writeToTempFileInArrow <- function(rdf, numPartitions) {
+  # R API in Arrow is not yet released. CRAN requires to add the package 
in requireNamespace
+  # at DESCRIPTION. Later, CRAN checks if the package is available or not. 
Therefore, it works
+  # around by avoiding direct requireNamespace.
+  requireNamespace1 <- requireNamespace
+  if (requireNamespace1("arrow", quietly = TRUE)) {
+record_batch <- get("record_batch", envir = asNamespace("arrow"), 
inherits = FALSE)
+record_batch_stream_writer <- get(
+  "record_batch_stream_writer", envir = asNamespace("arrow"), inherits 
= FALSE)
+file_output_stream <- get(
+  "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE)
+write_record_batch <- get(
+  "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE)
+
+# Currently arrow requires withr; otherwise, write APIs don't work.
+# Direct 'require' is not recommended by CRAN. Here's a workaround.
+require1 <- require
+if (require1("withr", quietly = TRUE)) {
+  numPartitions <- if (!is.null(numPartitions)) {
+numToInt(numPartitions)
+  } else {
+1
+  }
+  fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp")
+  chunk <- as.integer(ceiling(nrow(rdf) / numPartitions))
+  rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = 
chunk)[1:nrow(rdf)])
--- End diff --

`1 : ceiling`? `1 : nrow`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-09 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232176721
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, 
samplingRatio = 1.0,
   x
 }
   }
+  data[] <- lapply(data, cleanCols)
 
-  # drop factors and wrap lists
-  data <- setNames(lapply(data, cleanCols), NULL)
+  args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
+  if (arrowEnabled) {
+shouldUseArrow <- tryCatch({
+  stopifnot(length(data) > 0)
+  dataHead <- head(data, 1)
+  # Currenty Arrow optimization does not support POSIXct and raw 
for now.
+  # Also, it does not support explicit float type set by users. It 
leads to
+  # incorrect conversion. We will fall back to the path without 
Arrow optimization.
+  if (any(sapply(dataHead, function(x) is(x, "POSIXct" {
+stop("Arrow optimization with R DataFrame does not support 
POSIXct type yet.")
+  }
+  if (any(sapply(dataHead, is.raw))) {
+stop("Arrow optimization with R DataFrame does not support raw 
type yet.")
+  }
+  if (inherits(schema, "structType")) {
+if (any(sapply(schema$fields(), function(x) 
x$dataType.toString() == "FloatType"))) {
+  stop("Arrow optimization with R DataFrame does not support 
FloatType type yet.")
+}
+  }
+  firstRow <- do.call(mapply, append(args, dataHead))[[1]]
+  fileName <- writeToTempFileInArrow(data, numPartitions)
+  tryCatch(
+jrddInArrow <- 
callJStatic("org.apache.spark.sql.api.r.SQLUtils",
+   "readArrowStreamFromFile",
+   sparkSession,
+   fileName),
+  finally = {
+file.remove(fileName)
+  })
+  TRUE
+},
+error = function(e) {
+  message(paste0("WARN: createDataFrame attempted Arrow 
optimization because ",
--- End diff --

? https://stat.ethz.ch/R-manual/R-devel/library/base/html/warning.html


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-09 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232172687
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -147,6 +147,55 @@ getDefaultSqlSource <- function() {
   l[["spark.sql.sources.default"]]
 }
 
+writeToTempFileInArrow <- function(rdf, numPartitions) {
+  # R API in Arrow is not yet released. CRAN requires to add the package 
in requireNamespace
+  # at DESCRIPTION. Later, CRAN checks if the package is available or not. 
Therefore, it works
+  # around by avoiding direct requireNamespace.
+  requireNamespace1 <- requireNamespace
+  if (requireNamespace1("arrow", quietly = TRUE)) {
+record_batch <- get("record_batch", envir = asNamespace("arrow"), 
inherits = FALSE)
+record_batch_stream_writer <- get(
+  "record_batch_stream_writer", envir = asNamespace("arrow"), inherits 
= FALSE)
+file_output_stream <- get(
+  "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE)
+write_record_batch <- get(
+  "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE)
+
+# Currently arrow requires withr; otherwise, write APIs don't work.
+# Direct 'require' is not recommended by CRAN. Here's a workaround.
+require1 <- require
+if (require1("withr", quietly = TRUE)) {
+  numPartitions <- if (!is.null(numPartitions)) {
+numToInt(numPartitions)
+  } else {
+1
+  }
+  fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp")
+  chunk <- as.integer(ceiling(nrow(rdf) / numPartitions))
+  rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = 
chunk)[1:nrow(rdf)])
+  stream_writer <- NULL
+  for (rdf_slice in rdf_slices) {
+batch <- record_batch(rdf_slice)
+if (is.null(stream_writer)) {
+  # We should avoid private calls like 'close_on_exit' (CRAN 
disallows) but looks
+  # there's no exposed API for it. Here's a workaround but ideally 
this should
+  # be removed.
+  close_on_exit <- get("close_on_exit", envir = 
asNamespace("arrow"), inherits = FALSE)
--- End diff --

actually, I think if you use withr here it will call close_on_exit for you? 
(but when?)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-09 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232169938
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -147,6 +147,55 @@ getDefaultSqlSource <- function() {
   l[["spark.sql.sources.default"]]
 }
 
+writeToTempFileInArrow <- function(rdf, numPartitions) {
+  # R API in Arrow is not yet released. CRAN requires to add the package 
in requireNamespace
+  # at DESCRIPTION. Later, CRAN checks if the package is available or not. 
Therefore, it works
+  # around by avoiding direct requireNamespace.
+  requireNamespace1 <- requireNamespace
+  if (requireNamespace1("arrow", quietly = TRUE)) {
+record_batch <- get("record_batch", envir = asNamespace("arrow"), 
inherits = FALSE)
+record_batch_stream_writer <- get(
+  "record_batch_stream_writer", envir = asNamespace("arrow"), inherits 
= FALSE)
+file_output_stream <- get(
+  "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE)
+write_record_batch <- get(
+  "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE)
+
+# Currently arrow requires withr; otherwise, write APIs don't work.
+# Direct 'require' is not recommended by CRAN. Here's a workaround.
+require1 <- require
+if (require1("withr", quietly = TRUE)) {
+  numPartitions <- if (!is.null(numPartitions)) {
+numToInt(numPartitions)
+  } else {
+1
--- End diff --

should this go by default with default parallelism ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-09 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232173367
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, 
samplingRatio = 1.0,
   x
 }
   }
+  data[] <- lapply(data, cleanCols)
 
-  # drop factors and wrap lists
-  data <- setNames(lapply(data, cleanCols), NULL)
+  args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
+  if (arrowEnabled) {
+shouldUseArrow <- tryCatch({
+  stopifnot(length(data) > 0)
+  dataHead <- head(data, 1)
+  # Currenty Arrow optimization does not support POSIXct and raw 
for now.
+  # Also, it does not support explicit float type set by users. It 
leads to
+  # incorrect conversion. We will fall back to the path without 
Arrow optimization.
+  if (any(sapply(dataHead, function(x) is(x, "POSIXct" {
--- End diff --

can you check -  I think `is` `is.x` doesn't something do the right thing 
when

head(df, 1) and one of the field is `NA`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-09 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232173043
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -172,10 +221,10 @@ getDefaultSqlSource <- function() {
 createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
 numPartitions = NULL) {
   sparkSession <- getSparkSession()
-
+  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == 
"true"
--- End diff --

is it always in the conf - I think you can also pass in a default value to 
sparkR.conf


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-09 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232167634
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -147,6 +147,55 @@ getDefaultSqlSource <- function() {
   l[["spark.sql.sources.default"]]
 }
 
+writeToTempFileInArrow <- function(rdf, numPartitions) {
+  # R API in Arrow is not yet released. CRAN requires to add the package 
in requireNamespace
+  # at DESCRIPTION. Later, CRAN checks if the package is available or not. 
Therefore, it works
+  # around by avoiding direct requireNamespace.
+  requireNamespace1 <- requireNamespace
+  if (requireNamespace1("arrow", quietly = TRUE)) {
+record_batch <- get("record_batch", envir = asNamespace("arrow"), 
inherits = FALSE)
+record_batch_stream_writer <- get(
+  "record_batch_stream_writer", envir = asNamespace("arrow"), inherits 
= FALSE)
+file_output_stream <- get(
+  "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE)
+write_record_batch <- get(
+  "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE)
+
+# Currently arrow requires withr; otherwise, write APIs don't work.
+# Direct 'require' is not recommended by CRAN. Here's a workaround.
+require1 <- require
+if (require1("withr", quietly = TRUE)) {
--- End diff --

require1 sounds a bit like a hack though...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-09 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232172546
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -147,6 +147,55 @@ getDefaultSqlSource <- function() {
   l[["spark.sql.sources.default"]]
 }
 
+writeToTempFileInArrow <- function(rdf, numPartitions) {
+  # R API in Arrow is not yet released. CRAN requires to add the package 
in requireNamespace
+  # at DESCRIPTION. Later, CRAN checks if the package is available or not. 
Therefore, it works
+  # around by avoiding direct requireNamespace.
+  requireNamespace1 <- requireNamespace
+  if (requireNamespace1("arrow", quietly = TRUE)) {
+record_batch <- get("record_batch", envir = asNamespace("arrow"), 
inherits = FALSE)
+record_batch_stream_writer <- get(
+  "record_batch_stream_writer", envir = asNamespace("arrow"), inherits 
= FALSE)
+file_output_stream <- get(
+  "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE)
+write_record_batch <- get(
+  "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE)
+
+# Currently arrow requires withr; otherwise, write APIs don't work.
+# Direct 'require' is not recommended by CRAN. Here's a workaround.
+require1 <- require
+if (require1("withr", quietly = TRUE)) {
+  numPartitions <- if (!is.null(numPartitions)) {
+numToInt(numPartitions)
+  } else {
+1
+  }
+  fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp")
+  chunk <- as.integer(ceiling(nrow(rdf) / numPartitions))
+  rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = 
chunk)[1:nrow(rdf)])
+  stream_writer <- NULL
+  for (rdf_slice in rdf_slices) {
+batch <- record_batch(rdf_slice)
+if (is.null(stream_writer)) {
+  # We should avoid private calls like 'close_on_exit' (CRAN 
disallows) but looks
+  # there's no exposed API for it. Here's a workaround but ideally 
this should
+  # be removed.
+  close_on_exit <- get("close_on_exit", envir = 
asNamespace("arrow"), inherits = FALSE)
--- End diff --

so is this an API missing in Arrow?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-09 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232177132
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, 
samplingRatio = 1.0,
   x
 }
   }
+  data[] <- lapply(data, cleanCols)
 
-  # drop factors and wrap lists
-  data <- setNames(lapply(data, cleanCols), NULL)
+  args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
+  if (arrowEnabled) {
+shouldUseArrow <- tryCatch({
--- End diff --

refactor this into a method?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-09 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232167926
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala 
---
@@ -225,4 +226,25 @@ private[sql] object SQLUtils extends Logging {
 }
 sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray
   }
+
+  /**
+   * R callable function to read a file in Arrow stream format and create 
a `RDD`
+   * using each serialized ArrowRecordBatch as a partition.
+   */
+  def readArrowStreamFromFile(
+  sparkSession: SparkSession,
+  filename: String): JavaRDD[Array[Byte]] = {
+ArrowConverters.readArrowStreamFromFile(sparkSession.sqlContext, 
filename)
--- End diff --

what's the advantage of adding this wrapper here - I've thinking to 
eliminate most of these if possible - and just use callJMethod on 
`ArrowConverters` say?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-09 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232176774
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, 
samplingRatio = 1.0,
   x
 }
   }
+  data[] <- lapply(data, cleanCols)
 
-  # drop factors and wrap lists
-  data <- setNames(lapply(data, cleanCols), NULL)
+  args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
+  if (arrowEnabled) {
+shouldUseArrow <- tryCatch({
+  stopifnot(length(data) > 0)
+  dataHead <- head(data, 1)
+  # Currenty Arrow optimization does not support POSIXct and raw 
for now.
+  # Also, it does not support explicit float type set by users. It 
leads to
+  # incorrect conversion. We will fall back to the path without 
Arrow optimization.
+  if (any(sapply(dataHead, function(x) is(x, "POSIXct" {
+stop("Arrow optimization with R DataFrame does not support 
POSIXct type yet.")
+  }
+  if (any(sapply(dataHead, is.raw))) {
+stop("Arrow optimization with R DataFrame does not support raw 
type yet.")
+  }
+  if (inherits(schema, "structType")) {
+if (any(sapply(schema$fields(), function(x) 
x$dataType.toString() == "FloatType"))) {
+  stop("Arrow optimization with R DataFrame does not support 
FloatType type yet.")
+}
+  }
+  firstRow <- do.call(mapply, append(args, dataHead))[[1]]
+  fileName <- writeToTempFileInArrow(data, numPartitions)
+  tryCatch(
+jrddInArrow <- 
callJStatic("org.apache.spark.sql.api.r.SQLUtils",
+   "readArrowStreamFromFile",
+   sparkSession,
+   fileName),
+  finally = {
+file.remove(fileName)
--- End diff --

unlink?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-09 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232171176
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -147,6 +147,55 @@ getDefaultSqlSource <- function() {
   l[["spark.sql.sources.default"]]
 }
 
+writeToTempFileInArrow <- function(rdf, numPartitions) {
+  # R API in Arrow is not yet released. CRAN requires to add the package 
in requireNamespace
+  # at DESCRIPTION. Later, CRAN checks if the package is available or not. 
Therefore, it works
+  # around by avoiding direct requireNamespace.
+  requireNamespace1 <- requireNamespace
+  if (requireNamespace1("arrow", quietly = TRUE)) {
+record_batch <- get("record_batch", envir = asNamespace("arrow"), 
inherits = FALSE)
+record_batch_stream_writer <- get(
+  "record_batch_stream_writer", envir = asNamespace("arrow"), inherits 
= FALSE)
+file_output_stream <- get(
+  "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE)
+write_record_batch <- get(
+  "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE)
+
+# Currently arrow requires withr; otherwise, write APIs don't work.
+# Direct 'require' is not recommended by CRAN. Here's a workaround.
+require1 <- require
+if (require1("withr", quietly = TRUE)) {
+  numPartitions <- if (!is.null(numPartitions)) {
+numToInt(numPartitions)
+  } else {
+1
+  }
+  fileName <- tempfile(pattern = "spark-arrow", fileext = ".tmp")
+  chunk <- as.integer(ceiling(nrow(rdf) / numPartitions))
+  rdf_slices <- split(rdf, rep(1:ceiling(nrow(rdf) / chunk), each = 
chunk)[1:nrow(rdf)])
--- End diff --

how is this slices computed? is it similar to 
https://github.com/apache/spark/blob/c3b4a94a91d66c172cf332321d3a78dba29ef8f0/R/pkg/R/context.R#L152


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-09 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232167480
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -147,6 +147,55 @@ getDefaultSqlSource <- function() {
   l[["spark.sql.sources.default"]]
 }
 
+writeToTempFileInArrow <- function(rdf, numPartitions) {
+  # R API in Arrow is not yet released. CRAN requires to add the package 
in requireNamespace
+  # at DESCRIPTION. Later, CRAN checks if the package is available or not. 
Therefore, it works
+  # around by avoiding direct requireNamespace.
+  requireNamespace1 <- requireNamespace
+  if (requireNamespace1("arrow", quietly = TRUE)) {
+record_batch <- get("record_batch", envir = asNamespace("arrow"), 
inherits = FALSE)
+record_batch_stream_writer <- get(
+  "record_batch_stream_writer", envir = asNamespace("arrow"), inherits 
= FALSE)
+file_output_stream <- get(
+  "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE)
+write_record_batch <- get(
+  "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE)
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-09 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232167110
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -215,14 +278,16 @@ createDataFrame <- function(data, schema = NULL, 
samplingRatio = 1.0,
   }
 
   if (is.null(schema) || (!inherits(schema, "structType") && 
is.null(names(schema {
-row <- firstRDD(rdd)
+if (is.null(firstRow)) {
+  firstRow <- firstRDD(rdd)
--- End diff --

I <3 4 digits!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...

2018-11-08 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22954#discussion_r232115966
  
--- Diff: R/pkg/R/SQLContext.R ---
@@ -147,6 +147,55 @@ getDefaultSqlSource <- function() {
   l[["spark.sql.sources.default"]]
 }
 
+writeToTempFileInArrow <- function(rdf, numPartitions) {
+  # R API in Arrow is not yet released. CRAN requires to add the package 
in requireNamespace
+  # at DESCRIPTION. Later, CRAN checks if the package is available or not. 
Therefore, it works
+  # around by avoiding direct requireNamespace.
+  requireNamespace1 <- requireNamespace
+  if (requireNamespace1("arrow", quietly = TRUE)) {
+record_batch <- get("record_batch", envir = asNamespace("arrow"), 
inherits = FALSE)
+record_batch_stream_writer <- get(
+  "record_batch_stream_writer", envir = asNamespace("arrow"), inherits 
= FALSE)
+file_output_stream <- get(
+  "file_output_stream", envir = asNamespace("arrow"), inherits = FALSE)
+write_record_batch <- get(
+  "write_record_batch", envir = asNamespace("arrow"), inherits = FALSE)
+
+# Currently arrow requires withr; otherwise, write APIs don't work.
+# Direct 'require' is not recommended by CRAN. Here's a workaround.
+require1 <- require
+if (require1("withr", quietly = TRUE)) {
--- End diff --

It's actually a bit odd that I need to manually require this package. 
Otherwise, it complains, for instance, here 
https://github.com/apache/arrow/blob/d3ec69069649013229366ebe01e22f389597dc19/r/R/on_exit.R#L20


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org