Solved. Probably there was an error in the way I was testing. Also I
simplified the job and it works now.

2016-09-27 16:01 GMT+02:00 Simone Robutti <simone.robu...@radicalbit.io>:

> Hello,
>
> I'm dealing with an analytical job in streaming and I don't know how to
> write the last part.
>
> Actually I want to count all the elements in a window with a given status,
> so I keep a state with a Map[Status,Long]. This state is updated starting
> from tuples containing the oldStatus and the newStatus. So every event
> generates a +1 for the new status and a -1 for the old status. Then I want
> to reduce all these counts and move from a local and partial state to a
> global state that will be written in output.
>
> Right now my code look like:
>
> filteredLatestOrders.keyBy(x => x._1.getStatus).mapWithState(
> updateState).keyBy(x=>"0").reduce((orderA,orderB)=>orderA.sum(orderB))
>
> where "filteredLatestOrder" is a DataStream containing informations about
> the elements, the new state and the old state.
>
> This produces in output:
>
> 2> Map(DRAFT -> 0, RESERVED -> 1, PICKED -> 0, PACKED -> 0)
> 2> Map(DRAFT -> 0, RESERVED -> 2, PICKED -> 0, PACKED -> 0)
> 2> Map(DRAFT -> 0, RESERVED -> 3, PICKED -> 0, PACKED -> 1)
> 3> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 1, PACKED -> 0)
> 4> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 0, PACKED -> 1)
>
> I thought that keying with a fixed value would collect all the elements in
> a single node so that I could finally compute the final result, but they
> are left on different nodes and are never summed.
>
> Is this the correct approach? In that case, how can I do what I need? Is
> there a smarter way to count distinct evolving elements by their status in
> a streaming? Mind that the original source of events are updates to the
> status of an element and the requirement is that I want to count only the
> latest status available.
>
> Thank you in advance,
>
> Simone
>

Reply via email to