Hi folks, hoping someone who works with Streaming can help me out.
I have the following snippet:
val stateDstream =
data.map(x => (x, 1))
.updateStateByKey[State](updateFunc)
stateDstream.saveAsTextFiles(checkpointDirectory, "partitions_test")
where data is a RDD of
case class StateKey(host:String,hour:String,customer:String)
when I dump out the stream, I see duplicate values in the same partition
(I've bolded the keys that are identical):
(StateKey(foo.com.br,2014-07-22-18,16),State(43,2014-08-06T14:05:29.831Z))
(*StateKey*(www.abcd.com
,2014-07-22-22,25),State(2564,2014-08-06T14:05:29.831Z))
(StateKey(bar.com,2014-07-04-20,29),State(77,2014-08-06T14:05:29.831Z))
(*StateKey*(www.abcd.com
,2014-07-22-22,25),State(1117,2014-08-06T14:05:29.831Z))
I was under the impression that on each batch, the stream will contain a
single RDD with Key-Value pairs, reflecting the latest state of each key.
Am I misunderstanding this? Or is the key equality somehow failing?
Any tips on this appreciated...
PS. For completeness State is
case class State(val count:Integer,val update_date:DateTime)