I am using Flink 1.2-Snapshot. My data looks like the following:

   - id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919
   - id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, value=920
   - id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, value=944
   - id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149, value=944
   - id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
   - id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955
   - id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955
   - id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960
   - id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000

I am running the following code to create windows based user IDs:

    stream.flatMap(new LogsParser())
            .assignTimestampsAndWatermarks(new MessageTimestampExtractor())
            .keyBy("sourceId")
            .window(GlobalWindows.create())
            .trigger(PurgingTrigger.of(new MySessionTrigger()))
            .apply(new SessionWindowFunction())
            .print();

MySession trigger looks into the received event and check the user ID to
trigger the window on user ID changes. The SessionWindowFunction just
create a session out of the window.

Here are the sessions created:

   1.

   Session:
   - id=25398102, sourceId=1, ts=2016-10-15 00:00:56, user=14, value=919
      - id=25398185, sourceId=1, ts=2016-10-15 00:01:06, user=14, value=920
      - id=25398210, sourceId=1, ts=2016-10-15 00:01:16, user=14, value=944
      - id=25398235, sourceId=1, ts=2016-10-15 00:01:24, user=3149,
      value=944
   2.

   Session:
   - id=25398236, sourceId=1, ts=2016-10-15 00:01:25, user=71, value=955
      - id=25398239, sourceId=1, ts=2016-10-15 00:01:26, user=71, value=955
      - id=25398265, sourceId=1, ts=2016-10-15 00:01:36, user=71, value=955
      - id=25398310, sourceId=1, ts=2016-10-15 00:02:16, user=14, value=960
   3.

   Session:
   - id=25398320, sourceId=1, ts=2016-10-15 00:02:26, user=14, value=1000

The problem as you can see is that in every session the last event belongs
actually to the next window. The decision to trigger the window is somehow
late as the last event is already in the window.

How can I trigger the window without considering the last event in that
window?

Thanks for your help.

Reply via email to