Hi Team,
Hope you are doing well.
I have a use case around session windowing with some customizations.
We need to have create user sessions based on *any *of the 3 conditions
below
1. Session Window of 30 minutes (meaning, 30 minutes of inactivity i.e. no
event for 30 minutes for a user)
2. Number of events in the session window reaches 20,000
3. 4 hours have elapsed since window start
Below is what I have tried.
beam.WindowInto(window.Sessions(session_timeout_seconds),
trigger=trigger.Repeatedly(
trigger.AfterAny(
trigger.AfterCount(20000),
trigger.DefaultTrigger(),
TriggerWhenWindowStartPassesXHours(hours=0.2)
)
),
timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW,
accumulation_mode=trigger.AccumulationMode.DISCARDING
)
# Custom Trigger Implementation
from apache_beam.transforms.trigger import DefaultTrigger
from apache_beam.utils.timestamp import Timestamp
class TriggerWhenWindowStartPassesXHours(DefaultTrigger):
def __init__(self, hours=4):
super(TriggerWhenWindowStartPassesXHours, self).__init__()
self.hours = hours
def __repr__(self):
return 'TriggerWhenWindowStartPassesXHours()'
def should_fire(self, time_domain, watermark, window, context):
should_fire = (Timestamp.now() - window.start).micros >=
3600000000 * self.hours
return should_fire
@staticmethod
def from_runner_api(proto, context):
return TriggerWhenWindowStartPassesXHours()
The above works well, but there is an issue. Whenever Trigger No. 3 above
fires -- it does not create a new session window, but the same window is
continued.
What happens due to this is, the No. 3 would keep on firing on every
subsequent after 4 hours since window start, since should_fire condition is:
should_fire = (Timestamp.now() - window.start).micros >= 3600000000 * self.hours
and since window.start is never updated after the first time trigger is
fired - it will fire for every subsequent event after the first trigger.
I have also posted this on stackoverflow:
https://stackoverflow.com/questions/68250618/apache-beam-session-window-with-limit-on-number-of-events
I would be very grateful for any help as to how to achieve this.
Thanks a lot in advance.
Regards,
Chandan