Hey Terry, That's expected. If you want to only output (1, 3), you can use "reduceByKey" before "mapWithState" like this:
dstream.reduceByKey(_ + _).mapWithState(spec) On Fri, Jan 15, 2016 at 1:21 AM, Terry Hoo <hujie.ea...@gmail.com> wrote: > Hi, > I am doing a simple test with mapWithState, and get some events > unexpected, is this correct? > > The test is very simple: sum the value of each key > > val mappingFunction = (key: Int, value: Option[Int], state: State[Int]) => { > state.update(state.getOption().getOrElse(0) + value.getOrElse(0)) > (key, state.get()) > } > val spec = StateSpec.function(mappingFunction) > dstream.mapWithState(spec) > > I create two RDDs and insert into dstream: > RDD((1,1), (1,2), (2,1)) > RDD((1,3)) > > Get result like this: > RDD(*(1,1)*, *(1,3)*, (2,1)) > RDD((1,6)) > > You can see that the first batch will generate two items with the same key > "1": (1,1) and (1,3), is this expected behavior? I would expect (1,3) only. > > Regards > - Terry >