Re: ReduceByKeyAndWindow in Flink

2015-11-23 Thread Konstantin Knauf
Thanks! @Fabian: Yepp, but this still results in multiple outputs per window, because the maximum is emitted for every key. @Gyula: Yepp, that's the second bullet point from my question ;) The way I implemented it, it basically doubles the latency, because the timeWindowAll has to wait for the

Re: ReduceByKeyAndWindow in Flink

2015-11-23 Thread Gyula Fóra
Yes, you are right I think we should have some nice abstractions for doing this. Before the rewrite of the windowing runtime to support out-of-order events, we had abstractions for supporting this but that code was not feasible from performance perspective. (The result of a keyed window reduce

Re: ReduceByKeyAndWindow in Flink

2015-11-23 Thread Matthias J. Sax
Hi, Can't you use a second keyed window (with the same size) and apply .max(...)? -Matthias On 11/23/2015 11:00 AM, Konstantin Knauf wrote: > Hi Fabian, > > thanks for your answer. Yes, that's what I want. > > The solution you suggest is what I am doing right now (see last of the > bullet

Re: ReduceByKeyAndWindow in Flink

2015-11-23 Thread Stephan Ewen
One addition: You can set the system to use "ingestion time", which gives you event time with auto-generated timestamps and watermarks, based on the time that the events are seen in the sources. That way you have the same simplicity as processing time, and you get the window alignment that

Re: ReduceByKeyAndWindow in Flink

2015-11-23 Thread Aljoscha Krettek
Hi, @Konstantin: are you using event-time or processing-time windows. If you are using processing time, then you can only do it the way Fabian suggested. The problem here is, however, that the .keyBy().reduce() combination would emit a new maximum for every element that arrives there and you

Re: ReduceByKeyAndWindow in Flink

2015-11-23 Thread Fabian Hueske
If you set the key to the time attribute, the "old" key is no longer valid. The streams are organized by time and only one aggregate for each window-time should be computed. This should do what you are looking for: DataStream .keyBy(_._1) // key by orginal key .timeWindow(..) .apply(...)