Hi TD,
Why does the example keep recalculating the count via fold?
Wouldn’t it make more sense to get the last count in values Seq and add 1 to it 
and save that as current count?

From what Sean explained I understand that all values in Seq have the same key. 
Then when a new value for that key is found it is added to this Seq collection 
and the update function is called.

Is my understanding correct?

-Adrian

From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: April-29-14 4:57 PM
To: user@spark.apache.org
Cc: u...@spark.incubator.apache.org
Subject: Re: What is Seq[V] in updateStateByKey?

You may have already seen it, but I will mention it anyways. This example may 
help.
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala

Here the state is essentially a running count of the words seen. So the value 
type (i.e, V) is Int (count of a word in each batch) and the state type (i.e. 
S) is also a Int (running count). The updateFunction essentially sums up the 
running count with the new count and to generate a new running count.

TD


On Tue, Apr 29, 2014 at 1:49 PM, Sean Owen 
<so...@cloudera.com<mailto:so...@cloudera.com>> wrote:
The original DStream is of (K,V). This function creates a DStream of
(K,S). Each time slice brings one or more new V for each K. The old
state S (can be different from V!) for each K -- possibly non-existent
-- is updated in some way by a bunch of new V, to produce a new state
S -- which also might not exist anymore after update. That's why the
function is from a Seq[V], and an Option[S], to an Option[S].

If you RDD has value type V = Double then your function needs to
update state based on a new Seq[Double] at each time slice, since
Doubles are the new thing arriving for each key at each time slice.


On Tue, Apr 29, 2014 at 7:50 PM, Adrian Mocanu
<amoc...@verticalscope.com<mailto:amoc...@verticalscope.com>> wrote:
> What is Seq[V] in updateStateByKey?
>
> Does this store the collected tuples of the RDD in a collection?
>
>
>
> Method signature:
>
> def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) =>
> Option[S] ): DStream[(K, S)]
>
>
>
> In my case I used Seq[Double] assuming a sequence of Doubles in the RDD; the
> moment I switched to a different type like Seq[(String, Double)] the code
> didn’t compile.
>
>
>
> -Adrian
>
>

Reply via email to