Solved. Probably there was an error in the way I was testing. Also I
simplified the job and it works now.

2016-09-27 16:01 GMT+02:00 Simone Robutti <>:

> Hello,
> I'm dealing with an analytical job in streaming and I don't know how to
> write the last part.
> Actually I want to count all the elements in a window with a given status,
> so I keep a state with a Map[Status,Long]. This state is updated starting
> from tuples containing the oldStatus and the newStatus. So every event
> generates a +1 for the new status and a -1 for the old status. Then I want
> to reduce all these counts and move from a local and partial state to a
> global state that will be written in output.
> Right now my code look like:
> filteredLatestOrders.keyBy(x => x._1.getStatus).mapWithState(
> updateState).keyBy(x=>"0").reduce((orderA,orderB)=>orderA.sum(orderB))
> where "filteredLatestOrder" is a DataStream containing informations about
> the elements, the new state and the old state.
> This produces in output:
> 2> Map(DRAFT -> 0, RESERVED -> 1, PICKED -> 0, PACKED -> 0)
> 2> Map(DRAFT -> 0, RESERVED -> 2, PICKED -> 0, PACKED -> 0)
> 2> Map(DRAFT -> 0, RESERVED -> 3, PICKED -> 0, PACKED -> 1)
> 3> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 1, PACKED -> 0)
> 4> Map(DRAFT -> 0, RESERVED -> -1, PICKED -> 0, PACKED -> 1)
> I thought that keying with a fixed value would collect all the elements in
> a single node so that I could finally compute the final result, but they
> are left on different nodes and are never summed.
> Is this the correct approach? In that case, how can I do what I need? Is
> there a smarter way to count distinct evolving elements by their status in
> a streaming? Mind that the original source of events are updates to the
> status of an element and the requirement is that I want to count only the
> latest status available.
> Thank you in advance,
> Simone

Reply via email to