Hi Ryan,

Thanks for your comments!

Using reduceByKey() before the mapWithState can get the expected result.

Do we ever consider that mapWithState only outputs the changed key one time
in every batch interval, just like the updateStateByKey. For some cases,
user may only care about the final state.

Regards,
-Terry

On Sat, Jan 16, 2016 at 6:20 AM, Shixiong(Ryan) Zhu <shixi...@databricks.com
> wrote:

> Hey Terry,
>
> That's expected. If you want to only output (1, 3), you can use
> "reduceByKey" before "mapWithState" like this:
>
> dstream.reduceByKey(_ + _).mapWithState(spec)
>
> On Fri, Jan 15, 2016 at 1:21 AM, Terry Hoo <hujie.ea...@gmail.com> wrote:
>
>> Hi,
>> I am doing a simple test with mapWithState, and get some events
>> unexpected, is this correct?
>>
>> The test is very simple: sum the value of each key
>>
>> val mappingFunction = (key: Int, value: Option[Int], state: State[Int]) => {
>>   state.update(state.getOption().getOrElse(0) + value.getOrElse(0))
>>   (key, state.get())
>> }
>> val spec = StateSpec.function(mappingFunction)
>> dstream.mapWithState(spec)
>>
>> I create two RDDs and insert into dstream:
>> RDD((1,1), (1,2), (2,1))
>> RDD((1,3))
>>
>> Get result like this:
>> RDD(*(1,1)*, *(1,3)*, (2,1))
>> RDD((1,6))
>>
>> You can see that the first batch will generate two items with the same
>> key "1": (1,1) and (1,3), is this expected behavior? I would expect (1,3)
>> only.
>>
>> Regards
>> - Terry
>>
>
>

Reply via email to