Hi Team, Hope you are doing well.
I have a use case around session windowing with some customizations. We need to have create user sessions based on *any *of the 3 conditions below 1. Session Window of 30 minutes (meaning, 30 minutes of inactivity i.e. no event for 30 minutes for a user) 2. Number of events in the session window reaches 20,000 3. 4 hours have elapsed since window start Below is what I have tried. beam.WindowInto(window.Sessions(session_timeout_seconds), trigger=trigger.Repeatedly( trigger.AfterAny( trigger.AfterCount(20000), trigger.DefaultTrigger(), TriggerWhenWindowStartPassesXHours(hours=0.2) ) ), timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW, accumulation_mode=trigger.AccumulationMode.DISCARDING ) # Custom Trigger Implementation from apache_beam.transforms.trigger import DefaultTrigger from apache_beam.utils.timestamp import Timestamp class TriggerWhenWindowStartPassesXHours(DefaultTrigger): def __init__(self, hours=4): super(TriggerWhenWindowStartPassesXHours, self).__init__() self.hours = hours def __repr__(self): return 'TriggerWhenWindowStartPassesXHours()' def should_fire(self, time_domain, watermark, window, context): should_fire = (Timestamp.now() - window.start).micros >= 3600000000 * self.hours return should_fire @staticmethod def from_runner_api(proto, context): return TriggerWhenWindowStartPassesXHours() The above works well, but there is an issue. Whenever Trigger No. 3 above fires -- it does not create a new session window, but the same window is continued. What happens due to this is, the No. 3 would keep on firing on every subsequent after 4 hours since window start, since should_fire condition is: should_fire = (Timestamp.now() - window.start).micros >= 3600000000 * self.hours and since window.start is never updated after the first time trigger is fired - it will fire for every subsequent event after the first trigger. I have also posted this on stackoverflow: https://stackoverflow.com/questions/68250618/apache-beam-session-window-with-limit-on-number-of-events I would be very grateful for any help as to how to achieve this. Thanks a lot in advance. Regards, Chandan