Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-10 Thread Anand Nalya
Thanks for the help Dean/TD, I was able to cut the lineage with checkpointing with following code: dstream.countByValue().foreachRDD((rdd, time) = { val joined = rdd.union(current).reduceByKey(_+_, 2).leftOuterJoin(base) val toUpdate = joined.filter(myfilter).map(mymap) val

Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Tathagata Das
If you are continuously unioning RDDs, then you are accumulating ever increasing data, and you are processing ever increasing amount of data in every batch. Obviously this is going to not last for very long. You fundamentally cannot keep processing ever increasing amount of data with finite

Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Anand Nalya
The data coming from dstream have the same keys that are in myRDD, so the reduceByKey after union keeps the overall tuple count in myRDD fixed. Or even with fixed tuple count, it will keep consuming more resources? On 9 July 2015 at 16:19, Tathagata Das t...@databricks.com wrote: If you are

RE: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Michel Hubert
Hi, I was just wondering how you generated to second image with the charts. What product? From: Anand Nalya [mailto:anand.na...@gmail.com] Sent: donderdag 9 juli 2015 11:48 To: spark users Subject: Breaking lineage and reducing stages in Spark Streaming Hi, I've an application in which an rdd

Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Anand Nalya
Thats from the Streaming tab for Spark 1.4 WebUI. On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote: Hi, I was just wondering how you generated to second image with the charts. What product? *From:* Anand Nalya [mailto:anand.na...@gmail.com] *Sent:* donderdag 9

Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Dean Wampler
Is myRDD outside a DStream? If so are you persisting on each batch iteration? It should be checkpointed frequently too. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler

Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Anand Nalya
Yes, myRDD is outside of DStream. Following is the actual code where newBase and current are the rdds being updated with each batch: val base = sc.textFile... var newBase = base.cache() val dstream: DStream[String] = ssc.textFileStream... var current: RDD[(String, Long)] =

Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Dean Wampler
I think you're complicating the cache behavior by aggressively re-using vars when temporary vals would be more straightforward. For example, newBase = newBase.unpersist()... effectively means that newBase's data is not actually cached when the subsequent .union(...) is performed, so it probably

Re: Breaking lineage and reducing stages in Spark Streaming

2015-07-09 Thread Tathagata Das
Summarizing the main problems discussed by Dean 1. If you have an infinitely growing lineage, bad things will eventually happen. You HAVE TO periodically (say every 10th batch), checkpoint the information. 2. Unpersist the previous `current` RDD ONLY AFTER running an action on the `newCurrent`.