[jira] [Updated] (SPARK-18924) Improve collect/createDataFrame performance in SparkR

2017-02-23 Thread Joseph K. Bradley (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph K. Bradley updated SPARK-18924:
--
Target Version/s:   (was: 2.2.0)

> Improve collect/createDataFrame performance in SparkR
> -
>
> Key: SPARK-18924
> URL: https://issues.apache.org/jira/browse/SPARK-18924
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Critical
>
> SparkR has its own SerDe for data serialization between JVM and R.
> The SerDe on the JVM side is implemented in:
> * 
> [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala]
> * 
> [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala]
> The SerDe on the R side is implemented in:
> * 
> [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R]
> * 
> [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R]
> The serialization between JVM and R suffers from huge storage and computation 
> overhead. For example, a short round trip of 1 million doubles surprisingly 
> took 3 minutes on my laptop:
> {code}
> > system.time(collect(createDataFrame(data.frame(x=runif(100)
>user  system elapsed
>  14.224   0.582 189.135
> {code}
> Collecting a medium-sized DataFrame to local and continuing with a local R 
> workflow is a use case we should pay attention to. SparkR will never be able 
> to cover all existing features from CRAN packages. It is also unnecessary for 
> Spark to do so because not all features need scalability. 
> Several factors contribute to the serialization overhead:
> 1. The SerDe in R side is implemented using high-level R methods.
> 2. DataFrame columns are not efficiently serialized, primitive type columns 
> in particular.
> 3. Some overhead in the serialization protocol/impl.
> 1) might be discussed before because R packages like rJava exist before 
> SparkR. I'm not sure whether we have a license issue in depending on those 
> libraries. Another option is to switch to low-level R'C interface or Rcpp, 
> which again might have license issue. I'm not an expert here. If we have to 
> implement our own, there still exist much space for improvement, discussed 
> below.
> 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, 
> which collects rows to local and then constructs columns. However,
> * it ignores column types and results boxing/unboxing overhead
> * it collects all objects to driver and results high GC pressure
> A relatively simple change is to implement specialized column builder based 
> on column types, primitive types in particular. We need to handle null/NA 
> values properly. A simple data structure we can use is
> {code}
> val size: Int
> val nullIndexes: Array[Int]
> val notNullValues: Array[T] // specialized for primitive types
> {code}
> On the R side, we can use `readBin` and `writeBin` to read the entire vector 
> in a single method call. The speed seems reasonable (at the order of GB/s):
> {code}
> > x <- runif(1000) # 1e7, not 1e6
> > system.time(r <- writeBin(x, raw(0)))
>user  system elapsed
>   0.036   0.021   0.059
> > > system.time(y <- readBin(r, double(), 1000))
>user  system elapsed
>   0.015   0.007   0.024
> {code}
> This is just a proposal that needs to be discussed and formalized. But in 
> general, it should be feasible to obtain 20x or more performance gain.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18924) Improve collect/createDataFrame performance in SparkR

2017-02-08 Thread Xiangrui Meng (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiangrui Meng updated SPARK-18924:
--
Shepherd:   (was: Xiangrui Meng)

> Improve collect/createDataFrame performance in SparkR
> -
>
> Key: SPARK-18924
> URL: https://issues.apache.org/jira/browse/SPARK-18924
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Critical
>
> SparkR has its own SerDe for data serialization between JVM and R.
> The SerDe on the JVM side is implemented in:
> * 
> [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala]
> * 
> [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala]
> The SerDe on the R side is implemented in:
> * 
> [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R]
> * 
> [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R]
> The serialization between JVM and R suffers from huge storage and computation 
> overhead. For example, a short round trip of 1 million doubles surprisingly 
> took 3 minutes on my laptop:
> {code}
> > system.time(collect(createDataFrame(data.frame(x=runif(100)
>user  system elapsed
>  14.224   0.582 189.135
> {code}
> Collecting a medium-sized DataFrame to local and continuing with a local R 
> workflow is a use case we should pay attention to. SparkR will never be able 
> to cover all existing features from CRAN packages. It is also unnecessary for 
> Spark to do so because not all features need scalability. 
> Several factors contribute to the serialization overhead:
> 1. The SerDe in R side is implemented using high-level R methods.
> 2. DataFrame columns are not efficiently serialized, primitive type columns 
> in particular.
> 3. Some overhead in the serialization protocol/impl.
> 1) might be discussed before because R packages like rJava exist before 
> SparkR. I'm not sure whether we have a license issue in depending on those 
> libraries. Another option is to switch to low-level R'C interface or Rcpp, 
> which again might have license issue. I'm not an expert here. If we have to 
> implement our own, there still exist much space for improvement, discussed 
> below.
> 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, 
> which collects rows to local and then constructs columns. However,
> * it ignores column types and results boxing/unboxing overhead
> * it collects all objects to driver and results high GC pressure
> A relatively simple change is to implement specialized column builder based 
> on column types, primitive types in particular. We need to handle null/NA 
> values properly. A simple data structure we can use is
> {code}
> val size: Int
> val nullIndexes: Array[Int]
> val notNullValues: Array[T] // specialized for primitive types
> {code}
> On the R side, we can use `readBin` and `writeBin` to read the entire vector 
> in a single method call. The speed seems reasonable (at the order of GB/s):
> {code}
> > x <- runif(1000) # 1e7, not 1e6
> > system.time(r <- writeBin(x, raw(0)))
>user  system elapsed
>   0.036   0.021   0.059
> > > system.time(y <- readBin(r, double(), 1000))
>user  system elapsed
>   0.015   0.007   0.024
> {code}
> This is just a proposal that needs to be discussed and formalized. But in 
> general, it should be feasible to obtain 20x or more performance gain.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18924) Improve collect/createDataFrame performance in SparkR

2017-01-17 Thread Joseph K. Bradley (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph K. Bradley updated SPARK-18924:
--
Shepherd: Xiangrui Meng

> Improve collect/createDataFrame performance in SparkR
> -
>
> Key: SPARK-18924
> URL: https://issues.apache.org/jira/browse/SPARK-18924
> Project: Spark
>  Issue Type: Improvement
>  Components: SparkR
>Reporter: Xiangrui Meng
>Priority: Critical
>
> SparkR has its own SerDe for data serialization between JVM and R.
> The SerDe on the JVM side is implemented in:
> * 
> [SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala]
> * 
> [SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala]
> The SerDe on the R side is implemented in:
> * 
> [deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R]
> * 
> [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R]
> The serialization between JVM and R suffers from huge storage and computation 
> overhead. For example, a short round trip of 1 million doubles surprisingly 
> took 3 minutes on my laptop:
> {code}
> > system.time(collect(createDataFrame(data.frame(x=runif(100)
>user  system elapsed
>  14.224   0.582 189.135
> {code}
> Collecting a medium-sized DataFrame to local and continuing with a local R 
> workflow is a use case we should pay attention to. SparkR will never be able 
> to cover all existing features from CRAN packages. It is also unnecessary for 
> Spark to do so because not all features need scalability. 
> Several factors contribute to the serialization overhead:
> 1. The SerDe in R side is implemented using high-level R methods.
> 2. DataFrame columns are not efficiently serialized, primitive type columns 
> in particular.
> 3. Some overhead in the serialization protocol/impl.
> 1) might be discussed before because R packages like rJava exist before 
> SparkR. I'm not sure whether we have a license issue in depending on those 
> libraries. Another option is to switch to low-level R'C interface or Rcpp, 
> which again might have license issue. I'm not an expert here. If we have to 
> implement our own, there still exist much space for improvement, discussed 
> below.
> 2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, 
> which collects rows to local and then constructs columns. However,
> * it ignores column types and results boxing/unboxing overhead
> * it collects all objects to driver and results high GC pressure
> A relatively simple change is to implement specialized column builder based 
> on column types, primitive types in particular. We need to handle null/NA 
> values properly. A simple data structure we can use is
> {code}
> val size: Int
> val nullIndexes: Array[Int]
> val notNullValues: Array[T] // specialized for primitive types
> {code}
> On the R side, we can use `readBin` and `writeBin` to read the entire vector 
> in a single method call. The speed seems reasonable (at the order of GB/s):
> {code}
> > x <- runif(1000) # 1e7, not 1e6
> > system.time(r <- writeBin(x, raw(0)))
>user  system elapsed
>   0.036   0.021   0.059
> > > system.time(y <- readBin(r, double(), 1000))
>user  system elapsed
>   0.015   0.007   0.024
> {code}
> This is just a proposal that needs to be discussed and formalized. But in 
> general, it should be feasible to obtain 20x or more performance gain.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18924) Improve collect/createDataFrame performance in SparkR

2016-12-19 Thread Xiangrui Meng (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiangrui Meng updated SPARK-18924:
--
Description: 
SparkR has its own SerDe for data serialization between JVM and R.

The SerDe on the JVM side is implemented in:
* 
[SerDe.scala|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala]
* 
[SQLUtils.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala]

The SerDe on the R side is implemented in:
* 
[deserialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R]
* [serialize.R|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R]

The serialization between JVM and R suffers from huge storage and computation 
overhead. For example, a short round trip of 1 million doubles surprisingly 
took 3 minutes on my laptop:

{code}
> system.time(collect(createDataFrame(data.frame(x=runif(100)
   user  system elapsed
 14.224   0.582 189.135
{code}

Collecting a medium-sized DataFrame to local and continuing with a local R 
workflow is a use case we should pay attention to. SparkR will never be able to 
cover all existing features from CRAN packages. It is also unnecessary for 
Spark to do so because not all features need scalability. 


Several factors contribute to the serialization overhead:
1. The SerDe in R side is implemented using high-level R methods.
2. DataFrame columns are not efficiently serialized, primitive type columns in 
particular.
3. Some overhead in the serialization protocol/impl.

1) might be discussed before because R packages like rJava exist before SparkR. 
I'm not sure whether we have a license issue in depending on those libraries. 
Another option is to switch to low-level R'C interface or Rcpp, which again 
might have license issue. I'm not an expert here. If we have to implement our 
own, there still exist much space for improvement, discussed below.

2) is a huge gap. The current collect is implemented by `SQLUtils.dfToCols`, 
which collects rows to local and then constructs columns. However,
* it ignores column types and results boxing/unboxing overhead
* it collects all objects to driver and results high GC pressure

A relatively simple change is to implement specialized column builder based on 
column types, primitive types in particular. We need to handle null/NA values 
properly. A simple data structure we can use is

{code}
val size: Int
val nullIndexes: Array[Int]
val notNullValues: Array[T] // specialized for primitive types
{code}

On the R side, we can use `readBin` and `writeBin` to read the entire vector in 
a single method call. The speed seems reasonable (at the order of GB/s):

{code}
> x <- runif(1000) # 1e7, not 1e6
> system.time(r <- writeBin(x, raw(0)))
   user  system elapsed
  0.036   0.021   0.059
> > system.time(y <- readBin(r, double(), 1000))
   user  system elapsed
  0.015   0.007   0.024
{code}

This is just a proposal that needs to be discussed and formalized. But in 
general, it should be feasible to obtain 20x or more performance gain.

  was:
SparkR has its own SerDe for data serialization between JVM and R.

The SerDe on the JVM side is implemented in:
* 
[SeDe|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/r/SerDe.scala]
* 
[SQLUtils|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala]

The SerDe on the R side is implemented in:
* 
[deserialize|https://github.com/apache/spark/blob/master/R/pkg/R/deserialize.R]
* [serialize|https://github.com/apache/spark/blob/master/R/pkg/R/serialize.R]

The serialization between JVM and R suffers from huge storage and computation 
overhead. For example, a short round-trip of 1 million doubles surprisingly 
took 3 minutes on my laptop:

{code}
> system.time(collect(createDataFrame(data.frame(x=runif(100)
   user  system elapsed
 14.224   0.582 189.135
{code}

Collecting a medium-sized DataFrame to local and continuing with a local R 
workflow is a use case we should pay attention to. SparkR will never be able to 
cover all existing features from CRAN packages. It is also unnecessary for 
Spark to do so because not all features need scalability. 


Several factors contribute to the serialization overhead:
1. The SerDe in R side is implemented using high-level R methods.
2. DataFrame columns are not efficiently serialized, primitive type columns in 
particular.
3. Some overhead in the serialization protocol/impl.

1) might be discussed before because R packages like rJava exist before SparkR. 
I'm not sure whether we have a license issue in depending on those libraries. 
Another option is to switch to low-level R'C interface or Rcpp, which again 
might have license issue. I'm not an expert here. If we have to implement our 
own, there still exist much space for