I have a log web log file that contains sessions id's and interactions,
there are three interactions `GET, LOGIN, LOGOUT`. Something like:

```
00:00:01;session1;GET
00:00:03;session2;LOGIN
00:01:01;session1;LOGOUT
00:03:01;session2;GET
00:08:15;session2;GET
```

and goes on.

I want to be able to identify (right now, I'm dealing with bounded data)
with sessions were expired. By expired I mean any session that do not have
any interaction in a 5 minutes interval.

Of course, if user "LOGOUT", expiration will not be applied. In the data
above session 2 should be considered expired.

I have the folloing dataflow
```
( p
      | 'Read Files' >> ReadFromText(known_args.input, coder=LogCoder())
      | ParDo(LogToSession())
      | beam.Map(lambda entry: (entry.session, entry))
      | beam.GroupByKey()
)
```

The `LogCoder()` is responsible to correctly read the input files. The
`LogToSession` convert a log line to a Python class that correctly handle
the data structure, begin able to acess properties correctly.

For example I can fetch `entry.session` or `entry.timestamp` or
`entry.operation`.

Once processed by `LogToSession`, `entry.timestamp` is a python `datetime`,
`entry.session` is a `str` and `entry.operation` is also an `str`.

In normal python I would keep a dict with each session as key and last
timestamp as value. For each new entry of a given key I would check the
timedelta. If bigger than window. Expired. Otherwise, update last
timestamp. But don't know how to handle in beam.

How to handle the next steps?

Reply via email to