[ 
https://issues.apache.org/jira/browse/SPARK-13349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

krishna ramachandran updated SPARK-13349:
-----------------------------------------
    Description: 
We have a streaming application containing approximately 12 jobs every batch, 
running in streaming mode (4 sec batches). Each job writes output to cassandra

each job can contain several stages.

job 1

---> receive Stream A --> map --> filter -> (union with another stream B) --> 
map --> groupbykey --> transform --> reducebykey --> map

we go thro' few more jobs of transforms and save to database. 

Around stage 5, we union the output of Dstream from job 1 (in red) with another 
stream (generated by split during job 2) and save that state

It appears the whole execution thus far is repeated which is redundant (I can 
see this in execution graph & also performance -> processing time). Processing 
time per batch nearly doubles or triples.

This additional & redundant processing cause each batch to run as much as 2.5 
times slower compared to runs without the union - union for most batches does 
not alter the original DStream (union with an empty set). If I cache the 
DStream from job 1(red block output), performance improves substantially but 
hit out of memory errors within few hours.

What is the recommended way to cache/unpersist in such a scenario? there is no 
dstream level "unpersist"

setting "spark.streaming.unpersist" to true and 
streamingContext.remember("duration") did not help. Still seeing out of memory 
errors

  was:
We have a streaming application containing approximately 12 stages every batch, 
running in streaming mode (4 sec batches). Each stage persists output to 
cassandra

the pipeline stages 
stage 1

---> receive Stream A --> map --> filter -> (union with another stream B) --> 
map --> groupbykey --> transform --> reducebykey --> map

we go thro' few more stages of transforms and save to database. 

Around stage 5, we union the output of Dstream from stage 1 (in red) with 
another stream (generated by split during stage 2) and save that state

It appears the whole execution thus far is repeated which is redundant (I can 
see this in execution graph & also performance -> processing time). Processing 
time per batch nearly doubles or triples.

This additional & redundant processing cause each batch to run as much as 2.5 
times slower compared to runs without the union - union for most batches does 
not alter the original DStream (union with an empty set). If I cache the 
DStream (red block output), performance improves substantially but hit out of 
memory errors within few hours.

What is the recommended way to cache/unpersist in such a scenario? there is no 
dstream level "unpersist"

setting "spark.streaming.unpersist" to true and 
streamingContext.remember("duration") did not help. Still seeing out of memory 
errors


> adding a split and union to a streaming application cause big performance hit
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-13349
>                 URL: https://issues.apache.org/jira/browse/SPARK-13349
>             Project: Spark
>          Issue Type: Improvement
>    Affects Versions: 1.4.1
>            Reporter: krishna ramachandran
>            Priority: Critical
>             Fix For: 1.4.2
>
>
> We have a streaming application containing approximately 12 jobs every batch, 
> running in streaming mode (4 sec batches). Each job writes output to cassandra
> each job can contain several stages.
> job 1
> ---> receive Stream A --> map --> filter -> (union with another stream B) --> 
> map --> groupbykey --> transform --> reducebykey --> map
> we go thro' few more jobs of transforms and save to database. 
> Around stage 5, we union the output of Dstream from job 1 (in red) with 
> another stream (generated by split during job 2) and save that state
> It appears the whole execution thus far is repeated which is redundant (I can 
> see this in execution graph & also performance -> processing time). 
> Processing time per batch nearly doubles or triples.
> This additional & redundant processing cause each batch to run as much as 2.5 
> times slower compared to runs without the union - union for most batches does 
> not alter the original DStream (union with an empty set). If I cache the 
> DStream from job 1(red block output), performance improves substantially but 
> hit out of memory errors within few hours.
> What is the recommended way to cache/unpersist in such a scenario? there is 
> no dstream level "unpersist"
> setting "spark.streaming.unpersist" to true and 
> streamingContext.remember("duration") did not help. Still seeing out of 
> memory errors



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to