You're right, I forgot to check that the "events in this window" line
actually showed the number of events inside each window was what I
expected, despite being printed a bit out of order. Thank you for the help!

On 5 July 2016 at 17:37, Aljoscha Krettek <> wrote:

> The order in which elements are added to internal buffers and the point in
> time when FoldFunction.fold() is called don't indicate to which window
> elements are added. Flink will internally keep a buffer for each window and
> emit the window once the watermark passes the end of the window. In your
> case, there could be several windows in-flight at one given time. So the
> elements with a timestamp in [19:10:40, 19:10:49] will be added to that
> window and elements with a timestamp in [19:10:50, 19:10:59] will be added
> to this other window.
> Looking at your log, the "100 events in this window" message indicates
> that the watermark probably passed the end of the [19:10:40, 19:10:49]
> window and the result for that window was emitted. The elements with
> timestamp 19:10:50 that you see before that in the log are added to the
> buffer for a later window that will be emitted at a future time.
> On Tue, 5 Jul 2016 at 04:35 Yukun Guo <> wrote:
>> The output is the timestamps of events in string. (For convenience, the
>> payload of each event is exactly the timestamp of it.) As soon as the
>> folding of a time window is finished, the code will print "# events in this
>> window" indicating the end of the window.
>> The 10s windows should be [19:10:40, 19:10:49], [19:10:50, 19:10:59],
>> ..., but in the example above, the events at 19:10:50, which belong to
>> [19:10:50, 19:10:59] window were mistakenly put in the [19:10:40, 19:10:49]
>> one.
>> On 4 July 2016 at 21:41, Aljoscha Krettek <> wrote:
>>> Could you please elaborate a bit on what exactly the output means and
>>> how you derive that events are leaking into the previous window?
>>> On Mon, 4 Jul 2016 at 13:20 Yukun Guo <> wrote:
>>>> Thanks for the information. Strange enough, after I set the time
>>>> characteristic to EventTime, the events are leaking into the previous
>>>> window:
>>>> ...
>>>> Mon, 04 Jul 2016 19:10:49 CST
>>>> Mon, 04 Jul 2016 19:10:50 CST # ?
>>>> Mon, 04 Jul 2016 19:10:50 CST
>>>> Mon, 04 Jul 2016 19:10:50 CST
>>>> Mon, 04 Jul 2016 19:10:50 CST
>>>> Mon, 04 Jul 2016 19:10:50 CST
>>>> Mon, 04 Jul 2016 19:10:50 CST
>>>> 100 events in this window
>>>> Mon, 04 Jul 2016 19:10:50 CST
>>>> Mon, 04 Jul 2016 19:10:50 CST
>>>> Mon, 04 Jul 2016 19:10:50 CST
>>>> Mon, 04 Jul 2016 19:10:50 CST
>>>> Mon, 04 Jul 2016 19:10:51 CST
>>>> Mon, 04 Jul 2016 19:10:51 CST
>>>> On 4 July 2016 at 16:15, Aljoscha Krettek <> wrote:
>>>>> Hi,
>>>>> I think it should be as simple as setting event time as the stream
>>>>> time characteristic:
>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>> The problem is that .timeWindow(Time.seconds(10)) will use processing
>>>>> time if you don't specify a time characteristic. You can enforce using an
>>>>> event-time window using this:
>>>>> stream.window(EventTimeTumblingWindows.of(Time.seconds(10)))
>>>>> Cheers,
>>>>> Aljoscha
>>>>> On Mon, 4 Jul 2016 at 06:00 Yukun Guo <> wrote:
>>>>>> Hi,
>>>>>> I wrote a program which constructs a WindowedStream to compute
>>>>>> periodic data statistics every 10 seconds. However, I found that events
>>>>>> have not been strictly grouped into windows of 10s duration, i.e., some
>>>>>> events are leaking into the adjacent window.
>>>>>> The output is like this:
>>>>>> Mon, 04 Jul 2016 11:11:50 CST  # 1
>>>>>> Mon, 04 Jul 2016 11:11:50 CST  # 2
>>>>>> # removed for brevity
>>>>>> Mon, 04 Jul 2016 11:11:59 CST  # 99
>>>>>> 99 events in this window
>>>>>> Mon, 04 Jul 2016 11:11:59 CST  # This event has been put in the wrong
>>>>>> window
>>>>>> Mon, 04 Jul 2016 11:12:00 CST
>>>>>> Here is the code:
>>>>>> import org.apache.commons.lang3.time.FastDateFormat;
>>>>>> import org.apache.flink.api.common.functions.FoldFunction;
>>>>>> import org.apache.flink.api.common.functions.MapFunction;
>>>>>> import;
>>>>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>>>>> import 
>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>>> import 
>>>>>> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
>>>>>> import 
>>>>>> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
>>>>>> import org.apache.flink.streaming.api.watermark.Watermark;
>>>>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>>>>> public class TimeWindow {
>>>>>>     private static class TimestampAssigner implements 
>>>>>> AssignerWithPeriodicWatermarks<Long> {
>>>>>>         private final long DELAY = 500;
>>>>>>         private long currentWatermark;
>>>>>>         @Override
>>>>>>         public Watermark getCurrentWatermark() {
>>>>>>             return new Watermark(currentWatermark);
>>>>>>         }
>>>>>>         @Override
>>>>>>         public long extractTimestamp(Long event, long l) {
>>>>>>             currentWatermark = Math.max(currentWatermark, event - DELAY);
>>>>>>             return event;
>>>>>>         }
>>>>>>     }
>>>>>>     public static void main(String[] args) throws Exception {
>>>>>>         final FastDateFormat formatter = 
>>>>>> FastDateFormat.getInstance("EEE, dd MMM yyyy HH:mm:ss z");
>>>>>>         StreamExecutionEnvironment env = 
>>>>>> StreamExecutionEnvironment.createLocalEnvironment();
>>>>>>         DataStream<Long> stream = env.addSource(new 
>>>>>> RichParallelSourceFunction<Long>() {
>>>>>>             private volatile boolean isRunning = true;
>>>>>>             @Override
>>>>>>             public void run(SourceContext<Long> sourceContext) throws 
>>>>>> Exception {
>>>>>>                 while (isRunning) {
>>>>>>                     sourceContext.collect(System.currentTimeMillis());
>>>>>>                     Thread.sleep(200);
>>>>>>                 }
>>>>>>                 sourceContext.close();
>>>>>>             }
>>>>>>             @Override
>>>>>>             public void cancel() {
>>>>>>                 isRunning = false;
>>>>>>             }
>>>>>>         });
>>>>>>         stream
>>>>>>                 .assignTimestampsAndWatermarks(new TimestampAssigner())
>>>>>>                 .keyBy(new KeySelector<Long, Integer>() {
>>>>>>                     @Override
>>>>>>                     public Integer getKey(Long x) throws Exception {
>>>>>>                         return 0;
>>>>>>                     }
>>>>>>                 })
>>>>>>                 .timeWindow(Time.seconds(10))
>>>>>>                 .fold(0, new FoldFunction<Long, Integer>() {
>>>>>>                     @Override
>>>>>>                     public Integer fold(Integer count, Long x) throws 
>>>>>> Exception {
>>>>>>                         System.out.println(formatter.format(x));
>>>>>>                         return count + 1;
>>>>>>                     }
>>>>>>                 })
>>>>>>                 .map(new MapFunction<Integer, Void>() {
>>>>>>                     @Override
>>>>>>                     public Void map(Integer count) throws Exception {
>>>>>>                         System.out.println(count + " events in this 
>>>>>> window");
>>>>>>                         return null;
>>>>>>                     }
>>>>>>                 });
>>>>>>         env.execute();
>>>>>>     }
>>>>>> }
>>>>>> It doesn't always happen, but if you run the program long enough it
>>>>>> can be observed for sure.
>>>>>> Adjusting the DELAY value of watermark generation does not change the
>>>>>> behavior.

