Hi Fabian, thanks for your answer. Yes, that's what I want.
The solution you suggest is what I am doing right now (see last of the bullet point in my question). But given your example. I would expect the following output: (key: 1, w-time: 10, agg: 17) (key: 2, w-time: 10, agg: 20) (key: 1, w-time: 20, agg: 30) (key: 1, w-time: 20, agg: 30) (key: 1, w-time: 20, agg: 30) Because the reduce function is evaluated for every incoming event (i.e. each key), right? Cheers, Konstantin On 23.11.2015 10:47, Fabian Hueske wrote: > Hi Konstantin, > > let me first summarize to make sure I understood what you are looking for. > You computed an aggregate over a keyed event-time window and you are > looking for the maximum aggregate for each group of windows over the > same period of time. > So if you have > (key: 1, w-time: 10, agg: 17) > (key: 2, w-time: 10, agg: 20) > (key: 1, w-time: 20, agg: 30) > (key: 2, w-time: 20, agg: 28) > (key: 3, w-time: 20, agg: 5) > > you would like to get: > (key: 2, w-time: 10, agg: 20) > (key: 1, w-time: 20, agg: 30) > > If this is correct, you can do this as follows. > You can extract the window start and end time from the TimeWindow > parameter of the WindowFunction and key the stream either by start or > end time and apply a ReduceFunction on the keyed stream. > > Best, Fabian > > 2015-11-23 8:41 GMT+01:00 Konstantin Knauf <konstantin.kn...@tngtech.com > <mailto:konstantin.kn...@tngtech.com>>: > > Hi everyone, > > me again :) Let's say you have a stream, and for every window and key > you compute some aggregate value, like this: > > DataStream.keyBy(..) > .timeWindow(..) > .apply(...) > > > Now I want to get the maximum aggregate value for every window over the > keys. This feels like a pretty natural use case. How can I achieve this > with Flink in the most compact way? > > The options I thought of so far are: > > * Use an allTimeWindow, obviously. Drawback is, that the WindowFunction > would not be distributed by keys anymore. > > * use a windowAll after the WindowFunction to create windows of the > aggregates, which originated from the same timeWindow. This could be > done either with a TimeWindow or with a GlobalWindow with DeltaTrigger. > Drawback: Seems unnecessarily complicated and doubles the latency (at > least in my naive implementation ;)). > > * Of course, you could also just keyBy the start time of the window > after the WindowFunction, but then you get more than one event for each > window. > > Is there some easy way I am missing? If not, is there a technical > reasons, why such an "reduceByKeyAndWindow"-operator is not available in > Flink? > > Cheers, > > Konstantin > > -- Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082