Hey Chris, Yeah, I think what you’re really looking for is data-driven windowing, which we haven’t implemented yet. In lieu of that, you’ll want to build on top of session windows.
What you can do is define an aggregate object similar to what Sachin proposed. After the aggregation, you can just filter to only allow results where “open == false”. Since you have explicit session end events, I don’t think you need suppression. Hope this helps, John On Sun, Jan 5, 2020, at 06:36, Chris Madge wrote: > This is great, thank you very much. I've read more into session > windowing and suppression and they seem to fit my needs perfectly. I'm > struggling to find a method of triggering the window to close early > when I receive the end event. > > Maybe I could assign a monotonically increasing identifier each time I > see a start event, then re-key including that as part of a compound key > and session window by that? I feel like I may be engineering an > anti-pattern where there's something much better already built in. > > On 2020/01/04 17:46:40, Sachin Mittal <[email protected]> wrote: > > Try something like this: > > > > stream > > .groupBy( > > (key, value) -> value.userId > > ) > > .aggregate( > > () -> new Session(), > > (aggKey, newValue, aggValue) -> { > > aggValue.userId = newValue.userId > > if (newValue.start) { > > aggValue.start = newValue.start > > aggValue.duration = 0 > > aggValue.open = true > > } > > else if (newValue.end) { > > aggValue.duration = newValue.end - aggValue.start > > aggValue.close = true > > } else { > > aggValue.count++ > > aggValue.duration = now() - aggValue.start > > } > > } > > ) > > > > Note you need to have a well defined Session class and Event class with > > their appropriate serde. > > So the aggregated stream would have Session with its attributes like start > > time, duration, count, session open or close. > > > > One thing you need to take care is after a session is closed a new session > > for the same user can be created again. > > So you may need to break sessions based on some windowing or as session is > > closed you store it in some store (be it internal to kafka or some external > > database) and reset the session object. > > > > Hope this helps. > > > > > > > > > > On Sat, Jan 4, 2020 at 10:02 PM Chris Madge <[email protected]> wrote: > > > > > Hi there, > > > > > > It’s my first voyage into stream processing - I’ve tried a few things but > > > I think I’m struggling to think in the streams way. I wondered if I could > > > be cheeky and ask if someone could give me some clues as to the correct > > > design for my first task to get me started? > > > > > > I have application events coming in like: > > > > > > <timestamp>,type:start,<user_id> > > > <timestamp>,type:action,<user_id> > > > <timestamp>,type:action,<user_id> > > > <timestamp>,type:action,<user_id> > > > <timestamp>,type:end,<user_id> > > > > > > each one represents a single user session. > > > > > > I need to output: > > > <timestamp of start event>,<duration between start and end > > > event>,<user_id>,<count_of_action_events> > > > > > > I’m working with event time (specified by the application) and I can’t > > > trust the application to close sessions/notify gracefully (I’m happy for > > > those to be thrown out, but cool ideas for alternatives are very > > > welcome!). > > > > > > Any advice would be much appreciated. > > > > > > Chris Madge > > > > > >
