Github user felixcheung commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22954#discussion_r232500065
  
    --- Diff: R/pkg/R/SQLContext.R ---
    @@ -172,36 +257,72 @@ getDefaultSqlSource <- function() {
     createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
                                 numPartitions = NULL) {
       sparkSession <- getSparkSession()
    -
    +  arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == 
"true"
    +  shouldUseArrow <- FALSE
    +  firstRow <- NULL
       if (is.data.frame(data)) {
    -      # Convert data into a list of rows. Each row is a list.
    -
    -      # get the names of columns, they will be put into RDD
    -      if (is.null(schema)) {
    -        schema <- names(data)
    -      }
    +    # get the names of columns, they will be put into RDD
    +    if (is.null(schema)) {
    +      schema <- names(data)
    +    }
     
    -      # get rid of factor type
    -      cleanCols <- function(x) {
    -        if (is.factor(x)) {
    -          as.character(x)
    -        } else {
    -          x
    -        }
    +    # get rid of factor type
    +    cleanCols <- function(x) {
    +      if (is.factor(x)) {
    +        as.character(x)
    +      } else {
    +        x
           }
    +    }
    +    data[] <- lapply(data, cleanCols)
    +
    +    args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
    +    if (arrowEnabled) {
    +      shouldUseArrow <- tryCatch({
    +        stopifnot(length(data) > 0)
    +        dataHead <- head(data, 1)
    +        checkTypeRequirementForArrow(data, schema)
    +        fileName <- writeToTempFileInArrow(data, numPartitions)
    +        tryCatch(
    +          jrddInArrow <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
    +                                     "readArrowStreamFromFile",
    +                                     sparkSession,
    +                                     fileName),
    +        finally = {
    +          file.remove(fileName)
    +        })
    +
    +        firstRow <- do.call(mapply, append(args, dataHead))[[1]]
    +        TRUE
    +      },
    +      error = function(e) {
    +        warning(paste0("createDataFrame attempted Arrow optimization 
because ",
    +                       "'spark.sql.execution.arrow.enabled' is set to 
true; however, ",
    +                       "failed, attempting non-optimization. Reason: ",
    +                       e))
    +        return(FALSE)
    --- End diff --
    
    nit: just `FALSE` is good


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to