I am using Flink 1.2-Snapshot. My data looks like the following: - id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919 - id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, value=920 - id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, value=944 - id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, value=944 - id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955 - id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955 - id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955 - id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960 - id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000
I am running the following code to create windows based user IDs: stream.flatMap(new LogsParser()) .assignTimestampsAndWatermarks(new MessageTimestampExtractor()) .keyBy("sourceId") .window(GlobalWindows.create()) .trigger(PurgingTrigger.of(new MySessionTrigger())) .apply(new SessionWindowFunction()) .print(); MySession trigger looks into the received event and check the user ID to trigger the window on user ID changes. The SessionWindowFunction just create a session out of the window. Here are the sessions created: 1. Session: - id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919 - id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, value=920 - id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, value=944 - id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, value=944 2. Session: - id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955 - id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955 - id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955 - id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960 3. Session: - id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000 The problem as you can see is that in every session the last event belongs actually to the next window. The decision to trigger the window is somehow late as the last event is already in the window. How can I trigger the window without considering the last event in that window? Thanks for your help.