Hi folks, hoping someone who works with Streaming can help me out.

I have the following snippet:

val stateDstream =
      data.map(x => (x, 1))
      .updateStateByKey[State](updateFunc)

stateDstream.saveAsTextFiles(checkpointDirectory, "partitions_test")

where data is a RDD of

case class StateKey(host:String,hour:String,customer:String)

when I dump out the stream, I see duplicate values in the same partition
(I've bolded the keys that are identical):

(StateKey(foo.com.br,2014-07-22-18,16),State(43,2014-08-06T14:05:29.831Z))
(*StateKey*(www.abcd.com
,2014-07-22-22,25),State(2564,2014-08-06T14:05:29.831Z))
(StateKey(bar.com,2014-07-04-20,29),State(77,2014-08-06T14:05:29.831Z))
(*StateKey*(www.abcd.com
,2014-07-22-22,25),State(1117,2014-08-06T14:05:29.831Z))


I was under the impression that on each batch, the stream will contain a
single RDD with Key-Value pairs, reflecting the latest state of each key.
Am I misunderstanding this? Or is the key equality somehow failing?

Any tips on this appreciated...

PS. For completeness State is
case class State(val count:Integer,val update_date:DateTime)

Reply via email to