Yes, myRDD is outside of DStream. Following is the actual code where newBase
and current are the rdds being updated with each batch:

  val base = sc.textFile...
  var newBase = base.cache()

  val dstream: DStream[String] = ssc.textFileStream...
  var current: RDD[(String, Long)] = sc.emptyRDD.cache()

  dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd => {

    current = rdd.union(current.unpersist()).reduceByKey(_+_, 2)

    val joined = current.leftOuterJoin(newBase).cache()
    val toUpdate = joined.filter(myfilter).map(mymap).cache()
    val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache()

    toUpdate.collect().foreach(println) // this goes to some store

    newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_,
2).cache()

    current = toNotUpdate.cache()

    toUpdate.unpersist()
    joined.unpersist()
    rdd.unpersist()
  })


Regards,

Anand


On 9 July 2015 at 18:16, Dean Wampler <deanwamp...@gmail.com> wrote:

> Is myRDD outside a DStream? If so are you persisting on each batch
> iteration? It should be checkpointed frequently too.
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
> Typesafe <http://typesafe.com>
> @deanwampler <http://twitter.com/deanwampler>
> http://polyglotprogramming.com
>
> On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya <anand.na...@gmail.com> wrote:
>
>> The data coming from dstream have the same keys that are in myRDD, so the 
>> reduceByKey
>> after union keeps the overall tuple count in myRDD fixed. Or even with
>> fixed tuple count, it will keep consuming more resources?
>>
>> On 9 July 2015 at 16:19, Tathagata Das <t...@databricks.com> wrote:
>>
>>> If you are continuously unioning RDDs, then you are accumulating ever
>>> increasing data, and you are processing ever increasing amount of data in
>>> every batch. Obviously this is going to not last for very long. You
>>> fundamentally cannot keep processing ever increasing amount of data with
>>> finite resources, isnt it?
>>>
>>> On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya <anand.na...@gmail.com>
>>> wrote:
>>>
>>>> Thats from the Streaming tab for Spark 1.4 WebUI.
>>>>
>>>> On 9 July 2015 at 15:35, Michel Hubert <mich...@vsnsystemen.nl> wrote:
>>>>
>>>>>  Hi,
>>>>>
>>>>>
>>>>>
>>>>> I was just wondering how you generated to second image with the charts.
>>>>>
>>>>> What product?
>>>>>
>>>>>
>>>>>
>>>>> *From:* Anand Nalya [mailto:anand.na...@gmail.com]
>>>>> *Sent:* donderdag 9 juli 2015 11:48
>>>>> *To:* spark users
>>>>> *Subject:* Breaking lineage and reducing stages in Spark Streaming
>>>>>
>>>>>
>>>>>
>>>>> Hi,
>>>>>
>>>>>
>>>>>
>>>>> I've an application in which an rdd is being updated with tuples
>>>>> coming from RDDs in a DStream with following pattern.
>>>>>
>>>>>
>>>>>
>>>>> dstream.foreachRDD(rdd => {
>>>>>
>>>>>   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)
>>>>>
>>>>> })
>>>>>
>>>>>
>>>>>
>>>>> I'm using cache() and checkpointin to cache results. Over the time,
>>>>> the lineage of myRDD keeps increasing and stages in each batch of dstream
>>>>> keeps increasing, even though all the earlier stages are skipped. When the
>>>>> number of stages grow big enough, the overall delay due to scheduling 
>>>>> delay
>>>>> starts increasing. The processing time for each batch is still fixed.
>>>>>
>>>>>
>>>>>
>>>>> Following figures illustrate the problem:
>>>>>
>>>>>
>>>>>
>>>>> Job execution: https://i.imgur.com/GVHeXH3.png?1
>>>>>
>>>>> [image: Image removed by sender.]
>>>>>
>>>>> Delays: https://i.imgur.com/1DZHydw.png?1
>>>>>
>>>>> [image: Image removed by sender.]
>>>>>
>>>>> Is there some pattern that I can use to avoid this?
>>>>>
>>>>>
>>>>>
>>>>> Regards,
>>>>>
>>>>> Anand
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to