Hi, I am trying to process data with 60 minutes session interval using Apache Beam Python SDK. But the actual session interval was inaccurate such as 3:00:00 or 1:01:00 or 1:50:00 when I run my application locally using DirectRunner.
Would you help me find a solution to fix this issue and process data with 60 minutes session? I built my pipeline as bellow. ----- with Pipeline(options=pipeline_options) as pipeline: ( pipeline | "Read" >> ReadFromText(known_args.input, skip_header_lines=1) | "Convert" >> ParDo(Convert()) | "Add Timestamp" >> Map(lambda x: window.TimestampedValue(x, get_timestamp_from_element(x).timestamp())) | "Use User ID As Key" >> Map(lambda x: (x["user_id"], x)) | "Apply Session Window" >> WindowInto(window.Sessions(known_args.session_interval)) | "Group" >> GroupByKey() | "Write To CSV" >> ParDo(WriteToCSV(known_args.output)) ) result = pipeline.run() result.wait_until_finish() ----- session_interval (60 minutes) is provided as bellow. ----- parser.add_argument( "--session_interval", help="Interval of each session", default=60*60) # 60 mins ----- WriteToCSV function process data per session. I logged the session duration but it was not accurate. ----- class WriteToCSV(DoFn): def __init__(self, output_path): self.output_path = output_path def process(self, element, window=DoFn.WindowParam): window_start = window.start.to_utc_datetime() window_end = window.end.to_utc_datetime() duration = window_end - window_start logging.info(">>> new %s record(s) in %s session (start %s end %s)", len(click_records), duration, window_start, window_end) .... ----- Then I got this log messages when I run this application locally with DirectRunner. ----- new 5 records in 3:00:00 session (start 2018-10-19 02:00:00 end 2018-10-19 05:00:00) new 2 records in 1:01:00 session (start 2018-10-19 02:02:00 end 2018-10-19 03:03:00) new 2 records in 1:50:00 session (start 2018-10-19 03:10:00 end 2018-10-19 05:00:00 ----- Thanks.