The session gap duration of 60mins tells the runner to close the window when there is a gap between records of at least 60mins. So for a given key X, if we had data at timestamps: 1:01 1:30 2:45 2:50 then 1:01 and 1:30 would be in one session while 2:45 and 2:50 would be in a second session.
If I'm misunderstanding, maybe you could share some data with timestamps and what you expect the output to be. On Fri, Apr 24, 2020 at 2:17 AM Yohei Onishi <[email protected]> wrote: > Hi, > > I also deployed the application to Dataflow then got the same result. > The actual session interval was not the same as the given session interval > (60 minutes). > > --- > new 2 record(s) in 1:50:00 session (start 2018-10-19 11:10:00 end > 2018-10-19 13:00:00) > new 2 record(s) in 1:01:00 session (start 2018-10-19 10:02:00 end > 2018-10-19 11:03:00) > new 5 record(s) in 3:00:00 session (start 2018-10-19 10:00:00 end > 2018-10-19 13:00:00) > --- > > > https://stackoverflow.com/questions/61402895/apache-beam-python-sdk-inaccurate-session-window-interval > > On Fri, Apr 24, 2020 at 3:09 PM Yohei Onishi <[email protected]> > wrote: > >> Hi, >> >> I am trying to process data with 60 minutes session interval using Apache >> Beam Python SDK. But the actual session interval was inaccurate such as >> 3:00:00 or 1:01:00 or 1:50:00 when I run my application locally using >> DirectRunner. >> >> Would you help me find a solution to fix this issue and process data with >> 60 minutes session? >> >> I built my pipeline as bellow. >> >> ----- >> with Pipeline(options=pipeline_options) as pipeline: >> ( >> pipeline >> | "Read" >> ReadFromText(known_args.input, skip_header_lines=1) >> | "Convert" >> ParDo(Convert()) >> | "Add Timestamp" >> Map(lambda x: window.TimestampedValue(x, >> get_timestamp_from_element(x).timestamp())) >> | "Use User ID As Key" >> Map(lambda x: (x["user_id"], x)) >> | "Apply Session Window" >> >> WindowInto(window.Sessions(known_args.session_interval)) >> | "Group" >> GroupByKey() >> | "Write To CSV" >> ParDo(WriteToCSV(known_args.output)) >> ) >> result = pipeline.run() >> result.wait_until_finish() >> ----- >> >> session_interval (60 minutes) is provided as bellow. >> >> ----- >> parser.add_argument( >> "--session_interval", >> help="Interval of each session", >> default=60*60) # 60 mins >> ----- >> >> WriteToCSV function process data per session. I logged the session >> duration but it was not accurate. >> >> ----- >> class WriteToCSV(DoFn): >> def __init__(self, output_path): >> self.output_path = output_path >> >> def process(self, element, window=DoFn.WindowParam): >> window_start = window.start.to_utc_datetime() >> window_end = window.end.to_utc_datetime() >> duration = window_end - window_start >> logging.info(">>> new %s record(s) in %s session (start %s end >> %s)", len(click_records), duration, window_start, window_end) >> .... >> ----- >> >> Then I got this log messages when I run this application locally with >> DirectRunner. >> >> ----- >> new 5 records in 3:00:00 session (start 2018-10-19 02:00:00 end >> 2018-10-19 05:00:00) >> new 2 records in 1:01:00 session (start 2018-10-19 02:02:00 end >> 2018-10-19 03:03:00) >> new 2 records in 1:50:00 session (start 2018-10-19 03:10:00 end >> 2018-10-19 05:00:00 >> ----- >> >> Thanks. >> >> >>
