Re: Closing over a var with changing value in Streaming application

2015-01-21 Thread Tobias Pfeiffer
Hi,

On Wed, Jan 21, 2015 at 9:13 PM, Bob Tiernay btier...@hotmail.com wrote:

 Maybe I'm misunderstanding something here, but couldn't this be done with
 broadcast variables? I there is the following caveat from the docs:

 In addition, the object v should not be modified after it is broadcast
 in order to ensure that all nodes get the same value of the broadcast
 variable (e.g. if the variable is shipped to a new node later)


Well, I think I need a modifiable state (modifiable = changes once per
interval) that stores the number of total items seen so far in the
lifetime of my application, and I need this number on each executor. Since
this number changes after every interval processed, I think broadcast
variables are probably not appropriate in this case.

Thanks
Tobias


RE: Closing over a var with changing value in Streaming application

2015-01-21 Thread Bob Tiernay
Maybe I'm misunderstanding something here, but couldn't this be done with 
broadcast variables? I there is the following caveat from the docs: 
In addition, the object v should not be modified after it is broadcast in 
order to ensure that all nodes get the same value of the broadcast variable 
(e.g. if the variable is shipped to a new node later)
But isn't this exactly the semantics you want (i.e. not the same value)?


Date: Wed, 21 Jan 2015 21:02:31 +0900
Subject: Re: Closing over a var with changing value in Streaming application
From: t...@preferred.jp
To: ak...@sigmoidanalytics.com
CC: user@spark.apache.org

Hi again,

On Wed, Jan 21, 2015 at 4:53 PM, Tobias Pfeiffer t...@preferred.jp wrote:On 
Wed, Jan 21, 2015 at 4:46 PM, Akhil Das ak...@sigmoidanalytics.com wrote:
How about using accumulators?
As far as I understand, they solve the part of the problem that I am not 
worried about, namely increasing the counter. I was more worried about getting 
that counter/accumulator value back to the executors.
Uh, I may have been a bit quick here...
So I had this one working:
  var totalNumberOfItems = 0L
  // update the keys of the stream data  val globallyIndexedItems = 
inputStream.map(keyVal =  (keyVal._1 + totalNumberOfItems, keyVal._2))  // 
increase the number of total seen items  inputStream.foreachRDD(rdd = {
totalNumberOfItems += rdd.count  })
and used the dstream.foreachRDD(rdd = someVar += rdd.count) pattern at a 
number of places.
Then, however, I added a  dstream.transformWith(otherDStream, func)call, which 
somehow changed the order in which the DStreams are computed. In particular, 
suddenly some of my DStream values were computed before the foreachRDD calls 
that set the proper variables were executed, which lead to completely 
unpredictable behavior. So especially when looking at the existence of 
spark.streaming.concurrentJobs, I suddenly feel like none of DStream 
computations done on executors should depend on the ordering of output 
operations done on the driver. (And I am afraid this includes accumulator 
updates.)
Thinking about this, I feel I don't even know how I can realize a globally 
(over the lifetime of my stream) increasing ID in my DStream. Do I need 
something like  val counts: DStream[(Int, Long)] = stream.count().map((1, 
_)).updateStateByKey(...)with a pseudo-key just to keep a tiny bit of state 
from one interval to the next?
Really thankful for any insights,Tobias
  

Re: Closing over a var with changing value in Streaming application

2015-01-20 Thread Akhil Das
How about using accumulators
http://spark.apache.org/docs/1.2.0/programming-guide.html#accumulators?

Thanks
Best Regards

On Wed, Jan 21, 2015 at 12:53 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 I am developing a Spark Streaming application where I want every item in
 my stream to be assigned a unique, strictly increasing Long. My input data
 already has RDD-local integers (from 0 to N-1) assigned, so I am doing the
 following:

   var totalNumberOfItems = 0L
   // update the keys of the stream data
   val globallyIndexedItems = inputStream.map(keyVal =
   (keyVal._1 + totalNumberOfItems, keyVal._2))
   // increase the number of total seen items
   inputStream.foreachRDD(rdd = {
 totalNumberOfItems += rdd.count
   })

 Now this works on my local[*] Spark instance, but I was wondering if this
 is actually an ok thing to do. I don't want this to break when going to a
 YARN cluster...

 The function increasing totalNumberOfItems is closing over a var and
 running in the driver, so I think this is ok. Here is my concern: What
 about the function in the inputStream.map(...) block? This one is closing
 over a var that has a different value in every interval. Will the closure
 be serialized with that new value in every interval? Or only once with the
 initial value and this will always be 0 during the runtime of the program?

 As I said, it works locally, but I was wondering if I can really assume
 that the closure is serialized with a new value in every interval.

 Thanks,
 Tobias




Re: Closing over a var with changing value in Streaming application

2015-01-20 Thread Tobias Pfeiffer
Hi,

On Wed, Jan 21, 2015 at 4:46 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 How about using accumulators
 http://spark.apache.org/docs/1.2.0/programming-guide.html#accumulators?


As far as I understand, they solve the part of the problem that I am not
worried about, namely increasing the counter. I was more worried about
getting that counter/accumulator value back to the executors.

Thanks
Tobias