Yes, myRDD is outside of DStream. Following is the actual code where newBase and current are the rdds being updated with each batch:
val base = sc.textFile... var newBase = base.cache() val dstream: DStream[String] = ssc.textFileStream... var current: RDD[(String, Long)] = sc.emptyRDD.cache() dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd => { current = rdd.union(current.unpersist()).reduceByKey(_+_, 2) val joined = current.leftOuterJoin(newBase).cache() val toUpdate = joined.filter(myfilter).map(mymap).cache() val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache() toUpdate.collect().foreach(println) // this goes to some store newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_, 2).cache() current = toNotUpdate.cache() toUpdate.unpersist() joined.unpersist() rdd.unpersist() }) Regards, Anand On 9 July 2015 at 18:16, Dean Wampler <deanwamp...@gmail.com> wrote: > Is myRDD outside a DStream? If so are you persisting on each batch > iteration? It should be checkpointed frequently too. > > Dean Wampler, Ph.D. > Author: Programming Scala, 2nd Edition > <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly) > Typesafe <http://typesafe.com> > @deanwampler <http://twitter.com/deanwampler> > http://polyglotprogramming.com > > On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya <anand.na...@gmail.com> wrote: > >> The data coming from dstream have the same keys that are in myRDD, so the >> reduceByKey >> after union keeps the overall tuple count in myRDD fixed. Or even with >> fixed tuple count, it will keep consuming more resources? >> >> On 9 July 2015 at 16:19, Tathagata Das <t...@databricks.com> wrote: >> >>> If you are continuously unioning RDDs, then you are accumulating ever >>> increasing data, and you are processing ever increasing amount of data in >>> every batch. Obviously this is going to not last for very long. You >>> fundamentally cannot keep processing ever increasing amount of data with >>> finite resources, isnt it? >>> >>> On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya <anand.na...@gmail.com> >>> wrote: >>> >>>> Thats from the Streaming tab for Spark 1.4 WebUI. >>>> >>>> On 9 July 2015 at 15:35, Michel Hubert <mich...@vsnsystemen.nl> wrote: >>>> >>>>> Hi, >>>>> >>>>> >>>>> >>>>> I was just wondering how you generated to second image with the charts. >>>>> >>>>> What product? >>>>> >>>>> >>>>> >>>>> *From:* Anand Nalya [mailto:anand.na...@gmail.com] >>>>> *Sent:* donderdag 9 juli 2015 11:48 >>>>> *To:* spark users >>>>> *Subject:* Breaking lineage and reducing stages in Spark Streaming >>>>> >>>>> >>>>> >>>>> Hi, >>>>> >>>>> >>>>> >>>>> I've an application in which an rdd is being updated with tuples >>>>> coming from RDDs in a DStream with following pattern. >>>>> >>>>> >>>>> >>>>> dstream.foreachRDD(rdd => { >>>>> >>>>> myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_) >>>>> >>>>> }) >>>>> >>>>> >>>>> >>>>> I'm using cache() and checkpointin to cache results. Over the time, >>>>> the lineage of myRDD keeps increasing and stages in each batch of dstream >>>>> keeps increasing, even though all the earlier stages are skipped. When the >>>>> number of stages grow big enough, the overall delay due to scheduling >>>>> delay >>>>> starts increasing. The processing time for each batch is still fixed. >>>>> >>>>> >>>>> >>>>> Following figures illustrate the problem: >>>>> >>>>> >>>>> >>>>> Job execution: https://i.imgur.com/GVHeXH3.png?1 >>>>> >>>>> [image: Image removed by sender.] >>>>> >>>>> Delays: https://i.imgur.com/1DZHydw.png?1 >>>>> >>>>> [image: Image removed by sender.] >>>>> >>>>> Is there some pattern that I can use to avoid this? >>>>> >>>>> >>>>> >>>>> Regards, >>>>> >>>>> Anand >>>>> >>>> >>>> >>> >> >