Hi,

I am trying to process data with 60 minutes session interval using Apache
Beam Python SDK. But the actual session interval was inaccurate such as
3:00:00 or 1:01:00 or 1:50:00 when I run my application locally using
DirectRunner.

Would you help me find a solution to fix this issue and process data with
60 minutes session?

I built my pipeline as bellow.

-----
with Pipeline(options=pipeline_options) as pipeline:
    (
        pipeline
        | "Read" >> ReadFromText(known_args.input, skip_header_lines=1)
        | "Convert" >> ParDo(Convert())
        | "Add Timestamp" >> Map(lambda x: window.TimestampedValue(x,
get_timestamp_from_element(x).timestamp()))
        | "Use User ID As Key" >> Map(lambda x: (x["user_id"], x))
        | "Apply Session Window" >>
WindowInto(window.Sessions(known_args.session_interval))
        | "Group" >> GroupByKey()
        | "Write To CSV" >> ParDo(WriteToCSV(known_args.output))
    )
    result = pipeline.run()
    result.wait_until_finish()
-----

session_interval (60 minutes) is provided as bellow.

-----
    parser.add_argument(
        "--session_interval",
        help="Interval of each session",
        default=60*60) # 60 mins
-----

WriteToCSV function process data per session. I logged the session duration
but it was not accurate.

-----
class WriteToCSV(DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, element, window=DoFn.WindowParam):
        window_start = window.start.to_utc_datetime()
        window_end = window.end.to_utc_datetime()
        duration = window_end - window_start
        logging.info(">>> new %s record(s) in %s session (start %s end
%s)", len(click_records), duration, window_start, window_end)
        ....
-----

Then I got this log messages when I run this application locally with
DirectRunner.

-----
new 5 records in 3:00:00 session (start 2018-10-19 02:00:00 end 2018-10-19
05:00:00)
new 2 records in 1:01:00 session (start 2018-10-19 02:02:00 end 2018-10-19
03:03:00)
new 2 records in 1:50:00 session (start 2018-10-19 03:10:00 end 2018-10-19
05:00:00
-----

Thanks.

Reply via email to