I have a log web log file that contains sessions id's and interactions, there are three interactions `GET, LOGIN, LOGOUT`. Something like:
``` 00:00:01;session1;GET 00:00:03;session2;LOGIN 00:01:01;session1;LOGOUT 00:03:01;session2;GET 00:08:15;session2;GET ``` and goes on. I want to be able to identify (right now, I'm dealing with bounded data) with sessions were expired. By expired I mean any session that do not have any interaction in a 5 minutes interval. Of course, if user "LOGOUT", expiration will not be applied. In the data above session 2 should be considered expired. I have the folloing dataflow ``` ( p | 'Read Files' >> ReadFromText(known_args.input, coder=LogCoder()) | ParDo(LogToSession()) | beam.Map(lambda entry: (entry.session, entry)) | beam.GroupByKey() ) ``` The `LogCoder()` is responsible to correctly read the input files. The `LogToSession` convert a log line to a Python class that correctly handle the data structure, begin able to acess properties correctly. For example I can fetch `entry.session` or `entry.timestamp` or `entry.operation`. Once processed by `LogToSession`, `entry.timestamp` is a python `datetime`, `entry.session` is a `str` and `entry.operation` is also an `str`. In normal python I would keep a dict with each session as key and last timestamp as value. For each new entry of a given key I would check the timedelta. If bigger than window. Expired. Otherwise, update last timestamp. But don't know how to handle in beam. How to handle the next steps?