Hi TD, Why does the example keep recalculating the count via fold? Wouldn’t it make more sense to get the last count in values Seq and add 1 to it and save that as current count?
From what Sean explained I understand that all values in Seq have the same key. Then when a new value for that key is found it is added to this Seq collection and the update function is called. Is my understanding correct? -Adrian From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: April-29-14 4:57 PM To: user@spark.apache.org Cc: u...@spark.incubator.apache.org Subject: Re: What is Seq[V] in updateStateByKey? You may have already seen it, but I will mention it anyways. This example may help. https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala Here the state is essentially a running count of the words seen. So the value type (i.e, V) is Int (count of a word in each batch) and the state type (i.e. S) is also a Int (running count). The updateFunction essentially sums up the running count with the new count and to generate a new running count. TD On Tue, Apr 29, 2014 at 1:49 PM, Sean Owen <so...@cloudera.com<mailto:so...@cloudera.com>> wrote: The original DStream is of (K,V). This function creates a DStream of (K,S). Each time slice brings one or more new V for each K. The old state S (can be different from V!) for each K -- possibly non-existent -- is updated in some way by a bunch of new V, to produce a new state S -- which also might not exist anymore after update. That's why the function is from a Seq[V], and an Option[S], to an Option[S]. If you RDD has value type V = Double then your function needs to update state based on a new Seq[Double] at each time slice, since Doubles are the new thing arriving for each key at each time slice. On Tue, Apr 29, 2014 at 7:50 PM, Adrian Mocanu <amoc...@verticalscope.com<mailto:amoc...@verticalscope.com>> wrote: > What is Seq[V] in updateStateByKey? > > Does this store the collected tuples of the RDD in a collection? > > > > Method signature: > > def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) => > Option[S] ): DStream[(K, S)] > > > > In my case I used Seq[Double] assuming a sequence of Doubles in the RDD; the > moment I switched to a different type like Seq[(String, Double)] the code > didn’t compile. > > > > -Adrian > >