Hi,
I wrote a program which constructs a WindowedStream to compute periodic
data statistics every 10 seconds. However, I found that events have not
been strictly grouped into windows of 10s duration, i.e., some events are
leaking into the adjacent window.
The output is like this:
Mon, 04 Jul 2016 11:11:50 CST # 1
Mon, 04 Jul 2016 11:11:50 CST # 2
# removed for brevity
Mon, 04 Jul 2016 11:11:59 CST # 99
99 events in this window
Mon, 04 Jul 2016 11:11:59 CST # This event has been put in the wrong window
Mon, 04 Jul 2016 11:12:00 CST
Here is the code:
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
public class TimeWindow {
private static class TimestampAssigner implements
AssignerWithPeriodicWatermarks<Long> {
private final long DELAY = 500;
private long currentWatermark;
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentWatermark);
}
@Override
public long extractTimestamp(Long event, long l) {
currentWatermark = Math.max(currentWatermark, event - DELAY);
return event;
}
}
public static void main(String[] args) throws Exception {
final FastDateFormat formatter =
FastDateFormat.getInstance("EEE, dd MMM yyyy HH:mm:ss z");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
DataStream<Long> stream = env.addSource(new
RichParallelSourceFunction<Long>() {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Long> sourceContext) throws
Exception {
while (isRunning) {
sourceContext.collect(System.currentTimeMillis());
Thread.sleep(200);
}
sourceContext.close();
}
@Override
public void cancel() {
isRunning = false;
}
});
stream
.assignTimestampsAndWatermarks(new TimestampAssigner())
.keyBy(new KeySelector<Long, Integer>() {
@Override
public Integer getKey(Long x) throws Exception {
return 0;
}
})
.timeWindow(Time.seconds(10))
.fold(0, new FoldFunction<Long, Integer>() {
@Override
public Integer fold(Integer count, Long x) throws
Exception {
System.out.println(formatter.format(x));
return count + 1;
}
})
.map(new MapFunction<Integer, Void>() {
@Override
public Void map(Integer count) throws Exception {
System.out.println(count + " events in this window");
return null;
}
});
env.execute();
}
}
It doesn't always happen, but if you run the program long enough it can be
observed for sure.
Adjusting the DELAY value of watermark generation does not change the
behavior.