[ https://issues.apache.org/jira/browse/SPARK-13349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15150893#comment-15150893 ]
krishna ramachandran commented on SPARK-13349: ---------------------------------------------- i have simple synthetic example below. created 2 "raw streams" and job 1 is materialized when stream1 is output (some action print/save) In job1 val stream1 = ssc.union(rawStreams).filter(_.contains("Stream:first")) save.stream1 ....... ...... job2 create another split using rawStreams and union with stream1 val stream2 = ssc.union(rawStreams).filter(_.contains("Batch:second")) val stream3 = stream1.union(stream2) ...... save.stream3 job2 is materialized and executed. This pattern is executed for every batch Looking at visual DAG I see, job1 executes first graph and job2 computes both "stream1" and "stream2" Caching DStream stream1 (result from job1) makes job2 go almost twice as fast In our real app, we have 7 such jobs per batch and typically we union output of job5 with job1. That is, union output of 1 with stream generated during job5. Caching and reusing output of job1 (stream1) is very efficient (per batch execution is 2.5 times faster) - but we start seeing out of memory errors I would like to be able to "unpersist" stream1 after the union (for that batch) > adding a split and union to a streaming application cause big performance hit > ----------------------------------------------------------------------------- > > Key: SPARK-13349 > URL: https://issues.apache.org/jira/browse/SPARK-13349 > Project: Spark > Issue Type: Improvement > Affects Versions: 1.4.1 > Reporter: krishna ramachandran > Priority: Critical > Fix For: 1.4.2 > > > We have a streaming application containing approximately 12 jobs every batch, > running in streaming mode (4 sec batches). Each job writes output to cassandra > each job can contain several stages. > job 1 > ---> receive Stream A --> map --> filter -> (union with another stream B) --> > map --> groupbykey --> transform --> reducebykey --> map > we go thro' few more jobs of transforms and save to database. > Around stage 5, we union the output of Dstream from job 1 (in red) with > another stream (generated by split during job 2) and save that state > It appears the whole execution thus far is repeated which is redundant (I can > see this in execution graph & also performance -> processing time). > Processing time per batch nearly doubles or triples. > This additional & redundant processing cause each batch to run as much as 2.5 > times slower compared to runs without the union - union for most batches does > not alter the original DStream (union with an empty set). If I cache the > DStream from job 1(red block output), performance improves substantially but > hit out of memory errors within few hours. > What is the recommended way to cache/unpersist in such a scenario? there is > no dstream level "unpersist" > setting "spark.streaming.unpersist" to true and > streamingContext.remember("duration") did not help. Still seeing out of > memory errors -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org