[jira] [Comment Edited] (SPARK-23650) Slow SparkR udf (dapply)

2018-03-23 Thread Deepansh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411246#comment-16411246
 ] 

Deepansh edited comment on SPARK-23650 at 3/23/18 1:43 PM:
---

R environment inside the thread for applying UDF is not getting reused(i think 
cached is not the right word for this context). It is created and destroyed 
with each query.

{code:R}
kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = 
"10.117.172.48:9092", topic = "source")
lines<- select(kafka, cast(kafka$value, "string"))
schema<-schema(lines)
library(caret)

df4<-dapply(lines,function(x){
  print(system.time(library(caret)))
  x
},schema)

q2 <- write.stream(df4,"kafka", checkpointLocation = loc, topic = "sink", 
kafka.bootstrap.servers = "10.117.172.48:9092")
awaitTermination(q2)
{code}

For the above code, for every new stream my output is,
18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: lattice
18/03/23 11:08:10 INFO BufferedStreamThread: 
18/03/23 11:08:10 INFO BufferedStreamThread: Attaching package: ‘lattice’
18/03/23 11:08:10 INFO BufferedStreamThread: 
18/03/23 11:08:10 INFO BufferedStreamThread: The following object is masked 
from ‘package:SparkR’:
18/03/23 11:08:10 INFO BufferedStreamThread: 
18/03/23 11:08:10 INFO BufferedStreamThread: histogram
18/03/23 11:08:10 INFO BufferedStreamThread: 
18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: ggplot2
18/03/23 11:08:12 INFO BufferedStreamThread:user  system elapsed 
18/03/23 11:08:12 INFO BufferedStreamThread:   1.937   0.062   1.999 
18/03/23 11:08:12 INFO RRunner: Times: boot = 0.009 s, init = 0.017 s, 
broadcast = 0.001 s, read-input = 0.001 s, compute = 2.064 s, write-output = 
0.001 s, total = 2.093 s

PFA: rest log file.

For every new coming stream, the packages are loaded again inside the thread, 
which means R environment inside the thread is not getting reused, it is 
created and destroyed every time.

The model(iris model), on which I am testing requires caret package. So, when I 
use the readRDS method, caret package is also loaded, which adds an overhead of 
(~2s) every time. 

The same problem is with the broadcast. Broadcasting the model doesn't take 
time, but when it deserializes the model it loads caret package which adds 2s 
overhead.

Ideally, the packages shouldn't load again. Is there a way around to this 
problem?


was (Author: litup):
R environment inside the thread for applying UDF is not getting cached. It is 
created and destroyed with each query.

{code:R}
kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = 
"10.117.172.48:9092", topic = "source")
lines<- select(kafka, cast(kafka$value, "string"))
schema<-schema(lines)
library(caret)

df4<-dapply(lines,function(x){
  print(system.time(library(caret)))
  x
},schema)

q2 <- write.stream(df4,"kafka", checkpointLocation = loc, topic = "sink", 
kafka.bootstrap.servers = "10.117.172.48:9092")
awaitTermination(q2)
{code}

For the above code, for every new stream my output is,
18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: lattice
18/03/23 11:08:10 INFO BufferedStreamThread: 
18/03/23 11:08:10 INFO BufferedStreamThread: Attaching package: ‘lattice’
18/03/23 11:08:10 INFO BufferedStreamThread: 
18/03/23 11:08:10 INFO BufferedStreamThread: The following object is masked 
from ‘package:SparkR’:
18/03/23 11:08:10 INFO BufferedStreamThread: 
18/03/23 11:08:10 INFO BufferedStreamThread: histogram
18/03/23 11:08:10 INFO BufferedStreamThread: 
18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package: ggplot2
18/03/23 11:08:12 INFO BufferedStreamThread:user  system elapsed 
18/03/23 11:08:12 INFO BufferedStreamThread:   1.937   0.062   1.999 
18/03/23 11:08:12 INFO RRunner: Times: boot = 0.009 s, init = 0.017 s, 
broadcast = 0.001 s, read-input = 0.001 s, compute = 2.064 s, write-output = 
0.001 s, total = 2.093 s

PFA: rest log file.

For every new coming stream, the packages are loaded again inside the thread, 
which means R environment inside the thread is not getting reused, it is 
created and destroyed every time.

The model(iris model), on which I am testing requires caret package. So, when I 
use the readRDS method, caret package is also loaded, which adds an overhead of 
(~2s) every time. 

The same problem is with the broadcast. Broadcasting the model doesn't take 
time, but when it deserializes the model it loads caret package which adds 2s 
overhead.

Ideally, the packages shouldn't load again. Is there a way around to this 
problem?

> Slow SparkR udf (dapply)
> 
>
> Key: SPARK-23650
> URL: https://issues.apache.org/jira/browse/SPARK-23650
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Shell, SparkR, Structured Streaming
>Affects Versions: 2.2.0
> 

[jira] [Comment Edited] (SPARK-23650) Slow SparkR udf (dapply)

2018-03-18 Thread Deepansh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16404213#comment-16404213
 ] 

Deepansh edited comment on SPARK-23650 at 3/18/18 10:16 PM:


I tried reading the model in UDF, but for every new stream, the model is being 
read which is adding an overhead (~2s). IMO The problem here is that R 
environment inside the thread for applying UDF is not getting cached. It is 
created and destroyed with each query.
Attached - logs 

To overcome the problem, I was using broadcast, as technically broadcast is 
done only once to the executors.


was (Author: litup):
I tried reading the model in UDF, but for every new stream, the model is being 
read which is adding an overhead (~2s). IMO The problem here is the R 
environment is not getting cached. It is created and destroyed with each query.
Attached - logs 

To overcome the problem, I was using broadcast, as technically broadcast is 
done only once to the executors.

> Slow SparkR udf (dapply)
> 
>
> Key: SPARK-23650
> URL: https://issues.apache.org/jira/browse/SPARK-23650
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Shell, SparkR, Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Deepansh
>Priority: Major
> Attachments: read_model_in_udf.txt, sparkR_log2.txt, sparkRlag.txt
>
>
> For eg, I am getting streams from Kafka and I want to implement a model made 
> in R for those streams. For this, I am using dapply.
> My code is:
> iris_model <- readRDS("./iris_model.rds")
> randomBr <- SparkR:::broadcast(sc, iris_model)
> kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = 
> "localhost:9092", topic = "source")
> lines<- select(kafka, cast(kafka$value, "string"))
> schema<-schema(lines)
> df1<-dapply(lines,function(x){
> i_model<-SparkR:::value(randomMatBr)
> for (row in 1:nrow(x))
> { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) 
> y<-toJSON(y) x[row,"value"] = y }
> x
> },schema)
> Every time when Kafka streams are fetched the dapply method creates new 
> runner thread and ships the variables again, which causes a huge lag(~2s for 
> shipping model) every time. I even tried without broadcast variables but it 
> takes same time to ship variables. Can some other techniques be applied to 
> improve its performance?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23650) Slow SparkR udf (dapply)

2018-03-16 Thread Felix Cheung (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16401525#comment-16401525
 ] 

Felix Cheung edited comment on SPARK-23650 at 3/16/18 7:16 AM:
---

do you mean this?

 

RRunner: Times: boot = 0.010 s, init = 0.005 s, broadcast = 1.894 s, read-input 
= 0.001 s, compute = 0.062 s, write-output = 0.002 s, total = 1.974 s

 

Under the cover it is working with the same R process.

I see

SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006

each time it is creating a new broadcast which would then needs to be 
transferred.

IMO there are a few things to look into:
 # it should detect if the broadcast is the same (not sure if it does that)
 # if it is attributed to the same broadcast in use.daemon mode then it perhaps 
doesn't have to transfer it again (but it would need to keep track of the stage 
executed before and broadcast that was sent before etc)
 # data transfer can be faster (SPARK-18924)

However, as of now RRunner simply picks up the broadcast that is pass to it and 
sends it.

 


was (Author: felixcheung):
do you mean this?

 

RRunner: Times: boot = 0.010 s, init = 0.005 s, broadcast = 1.894 s, read-input 
= 0.001 s, compute = 0.062 s, write-output = 0.002 s, total = 1.974 s

 

Under the cover it is working with the same R process.

I see

SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1006

each time it is creating a new broadcast which would then needs to be 
transferred.

IMO there are a few things to look into:
 # it should detect if the broadcast is the same (not sure if it does that)
 # if it is attributed to the same broadcast in use.daemon mode then it perhaps 
doesn't have to transfer it again
 # data transfer can be faster (SPARK-18924)

However, as of now RRunner simply picks up the broadcast that is pass to it and 
sends it.

 

> Slow SparkR udf (dapply)
> 
>
> Key: SPARK-23650
> URL: https://issues.apache.org/jira/browse/SPARK-23650
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Shell, SparkR, Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Deepansh
>Priority: Major
> Attachments: sparkR_log2.txt, sparkRlag.txt
>
>
> For eg, I am getting streams from Kafka and I want to implement a model made 
> in R for those streams. For this, I am using dapply.
> My code is:
> iris_model <- readRDS("./iris_model.rds")
> randomBr <- SparkR:::broadcast(sc, iris_model)
> kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = 
> "localhost:9092", topic = "source")
> lines<- select(kafka, cast(kafka$value, "string"))
> schema<-schema(lines)
> df1<-dapply(lines,function(x){
> i_model<-SparkR:::value(randomMatBr)
> for (row in 1:nrow(x))
> { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) 
> y<-toJSON(y) x[row,"value"] = y }
> x
> },schema)
> Every time when Kafka streams are fetched the dapply method creates new 
> runner thread and ships the variables again, which causes a huge lag(~2s for 
> shipping model) every time. I even tried without broadcast variables but it 
> takes same time to ship variables. Can some other techniques be applied to 
> improve its performance?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23650) Slow SparkR udf (dapply)

2018-03-14 Thread Deepansh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16398237#comment-16398237
 ] 

Deepansh edited comment on SPARK-23650 at 3/15/18 4:39 AM:
---

attached more logs.


was (Author: litup):
attaching more of logs.

> Slow SparkR udf (dapply)
> 
>
> Key: SPARK-23650
> URL: https://issues.apache.org/jira/browse/SPARK-23650
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Shell, SparkR, Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Deepansh
>Priority: Major
> Attachments: sparkR_log2.txt, sparkRlag.txt
>
>
> For eg, I am getting streams from Kafka and I want to implement a model made 
> in R for those streams. For this, I am using dapply.
> My code is:
> iris_model <- readRDS("./iris_model.rds")
> randomBr <- SparkR:::broadcast(sc, iris_model)
> kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = 
> "localhost:9092", topic = "source")
> lines<- select(kafka, cast(kafka$value, "string"))
> schema<-schema(lines)
> df1<-dapply(lines,function(x){
> i_model<-SparkR:::value(randomMatBr)
> for (row in 1:nrow(x))
> { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) 
> y<-toJSON(y) x[row,"value"] = y }
> x
> },schema)
> Every time when Kafka streams are fetched the dapply method creates new 
> runner thread and ships the variables again, which causes a huge lag(~2s for 
> shipping model) every time. I even tried without broadcast variables but it 
> takes same time to ship variables. Can some other techniques be applied to 
> improve its performance?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23650) Slow SparkR udf (dapply)

2018-03-13 Thread Deepansh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396834#comment-16396834
 ] 

Deepansh edited comment on SPARK-23650 at 3/13/18 11:37 AM:


I tried on local as well as yarn cluster, the result is more or less the same.

Due to this, I went through spark code and as my understanding goes every time 
a new Kafka stream comes, spark creates a new RRunner class object and 
broadcast variables, packages are shipped off to create a new R worker. But it 
should happen only once and not every time stream comes?
PFA: log file


was (Author: litup):
I tried on local as well as yarn cluster, the result is more or less the same.

Due to this, I went through spark code and as my understanding goes every time 
a new Kafka stream comes, spark creates a new RRunner class object and 
broadcast variables, packages are shipped off to create a new R worker. But it 
should happen only once and not every time stream comes?

> Slow SparkR udf (dapply)
> 
>
> Key: SPARK-23650
> URL: https://issues.apache.org/jira/browse/SPARK-23650
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Shell, SparkR, Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Deepansh
>Priority: Major
> Attachments: sparkRlag.txt
>
>
> For eg, I am getting streams from Kafka and I want to implement a model made 
> in R for those streams. For this, I am using dapply.
> My code is:
> iris_model <- readRDS("./iris_model.rds")
> randomBr <- SparkR:::broadcast(sc, iris_model)
> kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = 
> "localhost:9092", topic = "source")
> lines<- select(kafka, cast(kafka$value, "string"))
> schema<-schema(lines)
> df1<-dapply(lines,function(x){
> i_model<-SparkR:::value(randomMatBr)
> for (row in 1:nrow(x))
> { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) 
> y<-toJSON(y) x[row,"value"] = y }
> x
> },schema)
> Every time when Kafka streams are fetched the dapply method creates new 
> runner thread and ships the variables again, which causes a huge lag(~2s for 
> shipping model) every time. I even tried without broadcast variables but it 
> takes same time to ship variables. Can some other techniques be applied to 
> improve its performance?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23650) Slow SparkR udf (dapply)

2018-03-13 Thread Deepansh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16396834#comment-16396834
 ] 

Deepansh edited comment on SPARK-23650 at 3/13/18 11:25 AM:


I tried on local as well as yarn cluster, the result is more or less the same.

Due to this, I went through spark code and as my understanding goes every time 
a new Kafka stream comes, spark creates a new RRunner class object and 
broadcast variables, packages are shipped off to create a new R worker. But it 
should happen only once and not every time stream comes?


was (Author: litup):
I tried on local as well as yarn cluster, the result is more or less the same.

Due to this, I went through spark code and as my understanding goes every time 
a new Kafka stream comes, spark creates a new RRunner class object and 
broadcast variables are shipped off to it. But it should happen only once and 
not every time stream comes?

> Slow SparkR udf (dapply)
> 
>
> Key: SPARK-23650
> URL: https://issues.apache.org/jira/browse/SPARK-23650
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Shell, SparkR, Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Deepansh
>Priority: Major
>
> For eg, I am getting streams from Kafka and I want to implement a model made 
> in R for those streams. For this, I am using dapply.
> My code is:
> iris_model <- readRDS("./iris_model.rds")
> randomBr <- SparkR:::broadcast(sc, iris_model)
> kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = 
> "localhost:9092", topic = "source")
> lines<- select(kafka, cast(kafka$value, "string"))
> schema<-schema(lines)
> df1<-dapply(lines,function(x){
> i_model<-SparkR:::value(randomMatBr)
> for (row in 1:nrow(x))
> { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) 
> y<-toJSON(y) x[row,"value"] = y }
> x
> },schema)
> Every time when Kafka streams are fetched the dapply method creates new 
> runner thread and ships the variables again, which causes a huge lag(~2s for 
> shipping model) every time. I even tried without broadcast variables but it 
> takes same time to ship variables. Can some other techniques be applied to 
> improve its performance?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org