[ 
https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15762548#comment-15762548
 ] 

Shivaram Venkataraman commented on SPARK-18924:
-----------------------------------------------

This is a good thing to investigate - Just to provide some historical context, 
the functions in serialize.R and deserialize.R were primarily designed to 
enable functions between the JVM and R and the serialization performance is 
less critical there as its mostly just function names, arguments etc. 

For data path we were originally using R's own serializer, deserializer but 
that doesn't work if we want to parse the data in JVM. So the whole dfToCols 
was a retro-fit to make things work.

In terms of design options:
- I think removing the boxing / unboxing overheads and making `readIntArray` or 
`readStringArray` in R more efficient would be a good starting point
- In terms of using other packages - there are licensing questions and also 
usability questions. So far users mostly don't require any extra R package to 
use SparkR and hence we are compatible across a bunch of R versions etc. So I 
think we should first look at the points about how we can make our existing 
architecture better
- If the bottleneck is due to R function call overheads after the above changes 
we can explore writing a C module (similar to our old hashCode implementation). 
While this has lesser complications in terms of licensing, versions matches 
etc. -  there is still some complexity on how we build and distribute this in a 
binary package.

> Improve collect/createDataFrame performance in SparkR
> -----------------------------------------------------
>
>                 Key: SPARK-18924
>                 URL: https://issues.apache.org/jira/browse/SPARK-18924
>             Project: Spark
>          Issue Type: Improvement
>          Components: SparkR
>            Reporter: Xiangrui Meng
>            Priority: Critical
>
> SparkR has its own SerDe for data serialization between JVM and R.
> The SerDe on the JVM side is implemented in:
> * 
> [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala]
> * 
> [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala]
> The SerDe on the R side is implemented in:
> * 
> [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R]
> * 
> [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R]
> The serialization between JVM and R suffers from huge storage and computation 
> overhead. For example, a short round trip of 1 million doubles surprisingly 
> took 3 minutes on my laptop:
> {code}
> > system.time(collect(createDataFrame(data.frame(x=runif(1000000)))))
>    user  system elapsed
>  14.224   0.582 189.135
> {code}
> Collecting a medium-sized DataFrame to local and continuing with a local R 
> workflow is a use case we should pay attention to. SparkR will never be able 
> to cover all existing features from CRAN packages. It is also unnecessary for 
> Spark to do so because not all features need scalability. 
> Several factors contribute to the serialization overhead:
> 1. The SerDe in R side is implemented using high-level R methods.
> 2. DataFrame columns are not efficiently serialized, primitive type columns 
> in particular.
> 3. Some overhead in the serialization protocol/impl.
> 1) might be discussed before because R packages like rJava exist before 
> SparkR. I'm not sure whether we have a license issue in depending on those 
> libraries. Another option is to switch to low-level R'C interface or Rcpp, 
> which again might have license issue. I'm not an expert here. If we have to 
> implement our own, there still exist much space for improvement, discussed 
> below.
> 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, 
> which collects rows to local and then constructs columns. However,
> * it ignores column types and results boxing/unboxing overhead
> * it collects all objects to driver and results high GC pressure
> A relatively simple change is to implement specialized column builder based 
> on column types, primitive types in particular. We need to handle null/NA 
> values properly. A simple data structure we can use is
> {code}
> val size: Int
> val nullIndexes: Array[Int]
> val notNullValues: Array[T] // specialized for primitive types
> {code}
> On the R side, we can use `readBin` and `writeBin` to read the entire vector 
> in a single method call. The speed seems reasonable (at the order of GB/s):
> {code}
> > x <- runif(10000000) # 1e7, not 1e6
> > system.time(r <- writeBin(x, raw(0)))
>    user  system elapsed
>   0.036   0.021   0.059
> > > system.time(y <- readBin(r, double(), 10000000))
>    user  system elapsed
>   0.015   0.007   0.024
> {code}
> This is just a proposal that needs to be discussed and formalized. But in 
> general, it should be feasible to obtain 20x or more performance gain.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to