This is great, thank you very much. I've read more into session windowing and suppression and they seem to fit my needs perfectly. I'm struggling to find a method of triggering the window to close early when I receive the end event.
Maybe I could assign a monotonically increasing identifier each time I see a start event, then re-key including that as part of a compound key and session window by that? I feel like I may be engineering an anti-pattern where there's something much better already built in. On 2020/01/04 17:46:40, Sachin Mittal <[email protected]> wrote: > Try something like this: > > stream > .groupBy( > (key, value) -> value.userId > ) > .aggregate( > () -> new Session(), > (aggKey, newValue, aggValue) -> { > aggValue.userId = newValue.userId > if (newValue.start) { > aggValue.start = newValue.start > aggValue.duration = 0 > aggValue.open = true > } > else if (newValue.end) { > aggValue.duration = newValue.end - aggValue.start > aggValue.close = true > } else { > aggValue.count++ > aggValue.duration = now() - aggValue.start > } > } > ) > > Note you need to have a well defined Session class and Event class with > their appropriate serde. > So the aggregated stream would have Session with its attributes like start > time, duration, count, session open or close. > > One thing you need to take care is after a session is closed a new session > for the same user can be created again. > So you may need to break sessions based on some windowing or as session is > closed you store it in some store (be it internal to kafka or some external > database) and reset the session object. > > Hope this helps. > > > > > On Sat, Jan 4, 2020 at 10:02 PM Chris Madge <[email protected]> wrote: > > > Hi there, > > > > It’s my first voyage into stream processing - I’ve tried a few things but > > I think I’m struggling to think in the streams way. I wondered if I could > > be cheeky and ask if someone could give me some clues as to the correct > > design for my first task to get me started? > > > > I have application events coming in like: > > > > <timestamp>,type:start,<user_id> > > <timestamp>,type:action,<user_id> > > <timestamp>,type:action,<user_id> > > <timestamp>,type:action,<user_id> > > <timestamp>,type:end,<user_id> > > > > each one represents a single user session. > > > > I need to output: > > <timestamp of start event>,<duration between start and end > > event>,<user_id>,<count_of_action_events> > > > > I’m working with event time (specified by the application) and I can’t > > trust the application to close sessions/notify gracefully (I’m happy for > > those to be thrown out, but cool ideas for alternatives are very welcome!). > > > > Any advice would be much appreciated. > > > > Chris Madge > > >
