The session gap duration of 60mins tells the runner to close the window
when there is a gap between records of at least 60mins. So for a given key
X, if we had data at timestamps:
1:01
1:30
2:45
2:50
then 1:01 and 1:30 would be in one session while 2:45 and 2:50 would be in
a second session.

If I'm misunderstanding, maybe you could share some data with timestamps
and what you expect the output to be.

On Fri, Apr 24, 2020 at 2:17 AM Yohei Onishi <[email protected]>
wrote:

> Hi,
>
> I also deployed the application to Dataflow then got the same result.
> The actual session interval was not the same as the given session interval
> (60 minutes).
>
> ---
> new 2 record(s) in 1:50:00 session (start 2018-10-19 11:10:00 end
> 2018-10-19 13:00:00)
> new 2 record(s) in 1:01:00 session (start 2018-10-19 10:02:00 end
> 2018-10-19 11:03:00)
> new 5 record(s) in 3:00:00 session (start 2018-10-19 10:00:00 end
> 2018-10-19 13:00:00)
> ---
>
>
> https://stackoverflow.com/questions/61402895/apache-beam-python-sdk-inaccurate-session-window-interval
>
> On Fri, Apr 24, 2020 at 3:09 PM Yohei Onishi <[email protected]>
> wrote:
>
>> Hi,
>>
>> I am trying to process data with 60 minutes session interval using Apache
>> Beam Python SDK. But the actual session interval was inaccurate such as
>> 3:00:00 or 1:01:00 or 1:50:00 when I run my application locally using
>> DirectRunner.
>>
>> Would you help me find a solution to fix this issue and process data with
>> 60 minutes session?
>>
>> I built my pipeline as bellow.
>>
>> -----
>> with Pipeline(options=pipeline_options) as pipeline:
>>     (
>>         pipeline
>>         | "Read" >> ReadFromText(known_args.input, skip_header_lines=1)
>>         | "Convert" >> ParDo(Convert())
>>         | "Add Timestamp" >> Map(lambda x: window.TimestampedValue(x,
>> get_timestamp_from_element(x).timestamp()))
>>         | "Use User ID As Key" >> Map(lambda x: (x["user_id"], x))
>>         | "Apply Session Window" >>
>> WindowInto(window.Sessions(known_args.session_interval))
>>         | "Group" >> GroupByKey()
>>         | "Write To CSV" >> ParDo(WriteToCSV(known_args.output))
>>     )
>>     result = pipeline.run()
>>     result.wait_until_finish()
>> -----
>>
>> session_interval (60 minutes) is provided as bellow.
>>
>> -----
>>     parser.add_argument(
>>         "--session_interval",
>>         help="Interval of each session",
>>         default=60*60) # 60 mins
>> -----
>>
>> WriteToCSV function process data per session. I logged the session
>> duration but it was not accurate.
>>
>> -----
>> class WriteToCSV(DoFn):
>>     def __init__(self, output_path):
>>         self.output_path = output_path
>>
>>     def process(self, element, window=DoFn.WindowParam):
>>         window_start = window.start.to_utc_datetime()
>>         window_end = window.end.to_utc_datetime()
>>         duration = window_end - window_start
>>         logging.info(">>> new %s record(s) in %s session (start %s end
>> %s)", len(click_records), duration, window_start, window_end)
>>         ....
>> -----
>>
>> Then I got this log messages when I run this application locally with
>> DirectRunner.
>>
>> -----
>> new 5 records in 3:00:00 session (start 2018-10-19 02:00:00 end
>> 2018-10-19 05:00:00)
>> new 2 records in 1:01:00 session (start 2018-10-19 02:02:00 end
>> 2018-10-19 03:03:00)
>> new 2 records in 1:50:00 session (start 2018-10-19 03:10:00 end
>> 2018-10-19 05:00:00
>> -----
>>
>> Thanks.
>>
>>
>>

Reply via email to