[ https://issues.apache.org/jira/browse/SPARK-23650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16404213#comment-16404213 ]
Deepansh edited comment on SPARK-23650 at 3/18/18 10:16 PM: ------------------------------------------------------------ I tried reading the model in UDF, but for every new stream, the model is being read which is adding an overhead (~2s). IMO The problem here is that R environment inside the thread for applying UDF is not getting cached. It is created and destroyed with each query. Attached - logs To overcome the problem, I was using broadcast, as technically broadcast is done only once to the executors. was (Author: litup): I tried reading the model in UDF, but for every new stream, the model is being read which is adding an overhead (~2s). IMO The problem here is the R environment is not getting cached. It is created and destroyed with each query. Attached - logs To overcome the problem, I was using broadcast, as technically broadcast is done only once to the executors. > Slow SparkR udf (dapply) > ------------------------ > > Key: SPARK-23650 > URL: https://issues.apache.org/jira/browse/SPARK-23650 > Project: Spark > Issue Type: Improvement > Components: Spark Shell, SparkR, Structured Streaming > Affects Versions: 2.2.0 > Reporter: Deepansh > Priority: Major > Attachments: read_model_in_udf.txt, sparkR_log2.txt, sparkRlag.txt > > > For eg, I am getting streams from Kafka and I want to implement a model made > in R for those streams. For this, I am using dapply. > My code is: > iris_model <- readRDS("./iris_model.rds") > randomBr <- SparkR:::broadcast(sc, iris_model) > kafka <- read.stream("kafka",subscribe = "source", kafka.bootstrap.servers = > "localhost:9092", topic = "source") > lines<- select(kafka, cast(kafka$value, "string")) > schema<-schema(lines) > df1<-dapply(lines,function(x){ > i_model<-SparkR:::value(randomMatBr) > for (row in 1:nrow(x)) > { y<-fromJSON(as.character(x[row,"value"])) y$predict=predict(i_model,y) > y<-toJSON(y) x[row,"value"] = y } > x > },schema) > Every time when Kafka streams are fetched the dapply method creates new > runner thread and ships the variables again, which causes a huge lag(~2s for > shipping model) every time. I even tried without broadcast variables but it > takes same time to ship variables. Can some other techniques be applied to > improve its performance? -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org