Hi Chandan, I am moving this thread to u...@beam.apache.org. I think that is the best place to discuss.
Kenn On Wed, Jul 7, 2021 at 9:32 AM Chandan Bhattad <chandanbhatt...@gmail.com> wrote: > Hi Team, > > Hope you are doing well. > > I have a use case around session windowing with some customizations. > > We need to have create user sessions based on *any *of the 3 conditions > below > > 1. Session Window of 30 minutes (meaning, 30 minutes of inactivity i.e. no > event for 30 minutes for a user) > 2. Number of events in the session window reaches 20,000 > 3. 4 hours have elapsed since window start > > Below is what I have tried. > > beam.WindowInto(window.Sessions(session_timeout_seconds), > trigger=trigger.Repeatedly( > trigger.AfterAny( > trigger.AfterCount(20000), > trigger.DefaultTrigger(), > TriggerWhenWindowStartPassesXHours(hours=0.2) > ) > ), > timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW, > accumulation_mode=trigger.AccumulationMode.DISCARDING > ) > > > # Custom Trigger Implementation > from apache_beam.transforms.trigger import DefaultTrigger > from apache_beam.utils.timestamp import Timestamp > > > class TriggerWhenWindowStartPassesXHours(DefaultTrigger): > > def __init__(self, hours=4): > super(TriggerWhenWindowStartPassesXHours, self).__init__() > self.hours = hours > > def __repr__(self): > return 'TriggerWhenWindowStartPassesXHours()' > > def should_fire(self, time_domain, watermark, window, context): > should_fire = (Timestamp.now() - window.start).micros >= 3600000000 * > self.hours > return should_fire > > @staticmethod > def from_runner_api(proto, context): > return TriggerWhenWindowStartPassesXHours() > > The above works well, but there is an issue. Whenever Trigger No. 3 above > fires -- it does not create a new session window, but the same window is > continued. > What happens due to this is, the No. 3 would keep on firing on every > subsequent after 4 hours since window start, since should_fire condition is: > > should_fire = (Timestamp.now() - window.start).micros >= 3600000000 * > self.hours > > and since window.start is never updated after the first time trigger is > fired - it will fire for every subsequent event after the first trigger. > > I have also posted this on stackoverflow: > https://stackoverflow.com/questions/68250618/apache-beam-session-window-with-limit-on-number-of-events > > I would be very grateful for any help as to how to achieve this. > Thanks a lot in advance. > > Regards, > Chandan >