You're right, I forgot to check that the "events in this window" line actually showed the number of events inside each window was what I expected, despite being printed a bit out of order. Thank you for the help!
On 5 July 2016 at 17:37, Aljoscha Krettek <aljos...@apache.org> wrote: > The order in which elements are added to internal buffers and the point in > time when FoldFunction.fold() is called don't indicate to which window > elements are added. Flink will internally keep a buffer for each window and > emit the window once the watermark passes the end of the window. In your > case, there could be several windows in-flight at one given time. So the > elements with a timestamp in [19:10:40, 19:10:49] will be added to that > window and elements with a timestamp in [19:10:50, 19:10:59] will be added > to this other window. > > Looking at your log, the "100 events in this window" message indicates > that the watermark probably passed the end of the [19:10:40, 19:10:49] > window and the result for that window was emitted. The elements with > timestamp 19:10:50 that you see before that in the log are added to the > buffer for a later window that will be emitted at a future time. > > On Tue, 5 Jul 2016 at 04:35 Yukun Guo <gyk....@gmail.com> wrote: > >> The output is the timestamps of events in string. (For convenience, the >> payload of each event is exactly the timestamp of it.) As soon as the >> folding of a time window is finished, the code will print "# events in this >> window" indicating the end of the window. >> >> The 10s windows should be [19:10:40, 19:10:49], [19:10:50, 19:10:59], >> ..., but in the example above, the events at 19:10:50, which belong to >> [19:10:50, 19:10:59] window were mistakenly put in the [19:10:40, 19:10:49] >> one. >> >> On 4 July 2016 at 21:41, Aljoscha Krettek <aljos...@apache.org> wrote: >> >>> Could you please elaborate a bit on what exactly the output means and >>> how you derive that events are leaking into the previous window? >>> >>> On Mon, 4 Jul 2016 at 13:20 Yukun Guo <gyk....@gmail.com> wrote: >>> >>>> Thanks for the information. Strange enough, after I set the time >>>> characteristic to EventTime, the events are leaking into the previous >>>> window: >>>> >>>> ... >>>> Mon, 04 Jul 2016 19:10:49 CST >>>> Mon, 04 Jul 2016 19:10:50 CST # ? >>>> Mon, 04 Jul 2016 19:10:50 CST >>>> Mon, 04 Jul 2016 19:10:50 CST >>>> Mon, 04 Jul 2016 19:10:50 CST >>>> Mon, 04 Jul 2016 19:10:50 CST >>>> Mon, 04 Jul 2016 19:10:50 CST >>>> 100 events in this window >>>> Mon, 04 Jul 2016 19:10:50 CST >>>> Mon, 04 Jul 2016 19:10:50 CST >>>> Mon, 04 Jul 2016 19:10:50 CST >>>> Mon, 04 Jul 2016 19:10:50 CST >>>> Mon, 04 Jul 2016 19:10:51 CST >>>> Mon, 04 Jul 2016 19:10:51 CST >>>> >>>> >>>> On 4 July 2016 at 16:15, Aljoscha Krettek <aljos...@apache.org> wrote: >>>> >>>>> Hi, >>>>> I think it should be as simple as setting event time as the stream >>>>> time characteristic: >>>>> >>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>>>> >>>>> The problem is that .timeWindow(Time.seconds(10)) will use processing >>>>> time if you don't specify a time characteristic. You can enforce using an >>>>> event-time window using this: >>>>> >>>>> stream.window(EventTimeTumblingWindows.of(Time.seconds(10))) >>>>> >>>>> Cheers, >>>>> Aljoscha >>>>> >>>>> >>>>> On Mon, 4 Jul 2016 at 06:00 Yukun Guo <gyk....@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I wrote a program which constructs a WindowedStream to compute >>>>>> periodic data statistics every 10 seconds. However, I found that events >>>>>> have not been strictly grouped into windows of 10s duration, i.e., some >>>>>> events are leaking into the adjacent window. >>>>>> >>>>>> The output is like this: >>>>>> >>>>>> Mon, 04 Jul 2016 11:11:50 CST # 1 >>>>>> Mon, 04 Jul 2016 11:11:50 CST # 2 >>>>>> # removed for brevity >>>>>> Mon, 04 Jul 2016 11:11:59 CST # 99 >>>>>> 99 events in this window >>>>>> Mon, 04 Jul 2016 11:11:59 CST # This event has been put in the wrong >>>>>> window >>>>>> Mon, 04 Jul 2016 11:12:00 CST >>>>>> >>>>>> Here is the code: >>>>>> >>>>>> import org.apache.commons.lang3.time.FastDateFormat; >>>>>> import org.apache.flink.api.common.functions.FoldFunction; >>>>>> import org.apache.flink.api.common.functions.MapFunction; >>>>>> import org.apache.flink.api.java.functions.KeySelector; >>>>>> import org.apache.flink.streaming.api.datastream.DataStream; >>>>>> import >>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>>>> import >>>>>> org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; >>>>>> import >>>>>> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; >>>>>> import org.apache.flink.streaming.api.watermark.Watermark; >>>>>> import org.apache.flink.streaming.api.windowing.time.Time; >>>>>> >>>>>> public class TimeWindow { >>>>>> >>>>>> private static class TimestampAssigner implements >>>>>> AssignerWithPeriodicWatermarks<Long> { >>>>>> private final long DELAY = 500; >>>>>> private long currentWatermark; >>>>>> >>>>>> @Override >>>>>> public Watermark getCurrentWatermark() { >>>>>> return new Watermark(currentWatermark); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public long extractTimestamp(Long event, long l) { >>>>>> currentWatermark = Math.max(currentWatermark, event - DELAY); >>>>>> return event; >>>>>> } >>>>>> } >>>>>> >>>>>> public static void main(String[] args) throws Exception { >>>>>> final FastDateFormat formatter = >>>>>> FastDateFormat.getInstance("EEE, dd MMM yyyy HH:mm:ss z"); >>>>>> StreamExecutionEnvironment env = >>>>>> StreamExecutionEnvironment.createLocalEnvironment(); >>>>>> >>>>>> DataStream<Long> stream = env.addSource(new >>>>>> RichParallelSourceFunction<Long>() { >>>>>> private volatile boolean isRunning = true; >>>>>> >>>>>> @Override >>>>>> public void run(SourceContext<Long> sourceContext) throws >>>>>> Exception { >>>>>> while (isRunning) { >>>>>> sourceContext.collect(System.currentTimeMillis()); >>>>>> Thread.sleep(200); >>>>>> } >>>>>> >>>>>> sourceContext.close(); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public void cancel() { >>>>>> isRunning = false; >>>>>> } >>>>>> }); >>>>>> >>>>>> stream >>>>>> .assignTimestampsAndWatermarks(new TimestampAssigner()) >>>>>> .keyBy(new KeySelector<Long, Integer>() { >>>>>> @Override >>>>>> public Integer getKey(Long x) throws Exception { >>>>>> return 0; >>>>>> } >>>>>> }) >>>>>> .timeWindow(Time.seconds(10)) >>>>>> .fold(0, new FoldFunction<Long, Integer>() { >>>>>> @Override >>>>>> public Integer fold(Integer count, Long x) throws >>>>>> Exception { >>>>>> System.out.println(formatter.format(x)); >>>>>> return count + 1; >>>>>> } >>>>>> }) >>>>>> .map(new MapFunction<Integer, Void>() { >>>>>> @Override >>>>>> public Void map(Integer count) throws Exception { >>>>>> System.out.println(count + " events in this >>>>>> window"); >>>>>> return null; >>>>>> } >>>>>> }); >>>>>> >>>>>> env.execute(); >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> It doesn't always happen, but if you run the program long enough it >>>>>> can be observed for sure. >>>>>> Adjusting the DELAY value of watermark generation does not change the >>>>>> behavior. >>>>>> >>>>> >>>> >>