Hi Fabian,

thanks for your answer. Yes, that's what I want.

The solution you suggest is what I am doing right now (see last of the
bullet point in my question).

But given your example. I would expect the following output:

(key: 1, w-time: 10, agg: 17)
(key: 2, w-time: 10, agg: 20)
(key: 1, w-time: 20, agg: 30)
(key: 1, w-time: 20, agg: 30)
(key: 1, w-time: 20, agg: 30)

Because the reduce function is evaluated for every incoming event (i.e.
each key), right?

Cheers,

Konstantin

On 23.11.2015 10:47, Fabian Hueske wrote:
> Hi Konstantin,
> 
> let me first summarize to make sure I understood what you are looking for.
> You computed an aggregate over a keyed event-time window and you are
> looking for the maximum aggregate for each group of windows over the
> same period of time.
> So if you have
> (key: 1, w-time: 10, agg: 17)
> (key: 2, w-time: 10, agg: 20)
> (key: 1, w-time: 20, agg: 30)
> (key: 2, w-time: 20, agg: 28)
> (key: 3, w-time: 20, agg: 5)
> 
> you would like to get:
> (key: 2, w-time: 10, agg: 20)
> (key: 1, w-time: 20, agg: 30)
> 
> If this is correct, you can do this as follows.
> You can extract the window start and end time from the TimeWindow
> parameter of the WindowFunction and key the stream either by start or
> end time and apply a ReduceFunction on the keyed stream.
> 
> Best, Fabian
> 
> 2015-11-23 8:41 GMT+01:00 Konstantin Knauf <konstantin.kn...@tngtech.com
> <mailto:konstantin.kn...@tngtech.com>>:
> 
>     Hi everyone,
> 
>     me again :) Let's say you have a stream, and for every window and key
>     you compute some aggregate value, like this:
> 
>     DataStream.keyBy(..)
>               .timeWindow(..)
>               .apply(...)
> 
> 
>     Now I want to get the maximum aggregate value for every window over the
>     keys. This feels like a pretty natural use case. How can I achieve this
>     with Flink in the most compact way?
> 
>     The options I thought of so far are:
> 
>     * Use an allTimeWindow, obviously. Drawback is, that the WindowFunction
>     would not be distributed by keys anymore.
> 
>     * use a windowAll after the WindowFunction to create windows of the
>     aggregates, which originated from the same timeWindow. This could be
>     done either with a TimeWindow or with a GlobalWindow with DeltaTrigger.
>     Drawback: Seems unnecessarily complicated and doubles the latency (at
>     least in my naive implementation ;)).
> 
>     * Of course, you could also just keyBy the start time of the window
>     after the WindowFunction, but then you get more than one event for each
>     window.
> 
>     Is there some easy way I am missing? If not, is there a technical
>     reasons, why such an "reduceByKeyAndWindow"-operator is not available in
>     Flink?
> 
>     Cheers,
> 
>     Konstantin
> 
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply via email to