Hey Terry,

That's expected. If you want to only output (1, 3), you can use
"reduceByKey" before "mapWithState" like this:

dstream.reduceByKey(_ + _).mapWithState(spec)

On Fri, Jan 15, 2016 at 1:21 AM, Terry Hoo <hujie.ea...@gmail.com> wrote:

> Hi,
> I am doing a simple test with mapWithState, and get some events
> unexpected, is this correct?
>
> The test is very simple: sum the value of each key
>
> val mappingFunction = (key: Int, value: Option[Int], state: State[Int]) => {
>   state.update(state.getOption().getOrElse(0) + value.getOrElse(0))
>   (key, state.get())
> }
> val spec = StateSpec.function(mappingFunction)
> dstream.mapWithState(spec)
>
> I create two RDDs and insert into dstream:
> RDD((1,1), (1,2), (2,1))
> RDD((1,3))
>
> Get result like this:
> RDD(*(1,1)*, *(1,3)*, (2,1))
> RDD((1,6))
>
> You can see that the first batch will generate two items with the same key
> "1": (1,1) and (1,3), is this expected behavior? I would expect (1,3) only.
>
> Regards
> - Terry
>

Reply via email to