Thanks for the help Dean/TD,
I was able to cut the lineage with checkpointing with following code:
dstream.countByValue().foreachRDD((rdd, time) = {
val joined = rdd.union(current).reduceByKey(_+_, 2).leftOuterJoin(base)
val toUpdate = joined.filter(myfilter).map(mymap)
val
If you are continuously unioning RDDs, then you are accumulating ever
increasing data, and you are processing ever increasing amount of data in
every batch. Obviously this is going to not last for very long. You
fundamentally cannot keep processing ever increasing amount of data with
finite
The data coming from dstream have the same keys that are in myRDD, so
the reduceByKey
after union keeps the overall tuple count in myRDD fixed. Or even with
fixed tuple count, it will keep consuming more resources?
On 9 July 2015 at 16:19, Tathagata Das t...@databricks.com wrote:
If you are
Hi,
I was just wondering how you generated to second image with the charts.
What product?
From: Anand Nalya [mailto:anand.na...@gmail.com]
Sent: donderdag 9 juli 2015 11:48
To: spark users
Subject: Breaking lineage and reducing stages in Spark Streaming
Hi,
I've an application in which an rdd
Thats from the Streaming tab for Spark 1.4 WebUI.
On 9 July 2015 at 15:35, Michel Hubert mich...@vsnsystemen.nl wrote:
Hi,
I was just wondering how you generated to second image with the charts.
What product?
*From:* Anand Nalya [mailto:anand.na...@gmail.com]
*Sent:* donderdag 9
Is myRDD outside a DStream? If so are you persisting on each batch
iteration? It should be checkpointed frequently too.
Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler
Yes, myRDD is outside of DStream. Following is the actual code where newBase
and current are the rdds being updated with each batch:
val base = sc.textFile...
var newBase = base.cache()
val dstream: DStream[String] = ssc.textFileStream...
var current: RDD[(String, Long)] =
I think you're complicating the cache behavior by aggressively re-using
vars when temporary vals would be more straightforward. For example,
newBase = newBase.unpersist()... effectively means that newBase's data is
not actually cached when the subsequent .union(...) is performed, so it
probably
Summarizing the main problems discussed by Dean
1. If you have an infinitely growing lineage, bad things will eventually
happen. You HAVE TO periodically (say every 10th batch), checkpoint the
information.
2. Unpersist the previous `current` RDD ONLY AFTER running an action on the
`newCurrent`.