This is great, thank you very much.  I've read more into session windowing and 
suppression and they seem to fit my needs perfectly.  I'm struggling to find a 
method of triggering the window to close early when I receive the end event.

Maybe I could assign a monotonically increasing identifier each time I see a 
start event, then re-key including that as part of a compound key and session 
window by that?  I feel like I may be engineering an anti-pattern where there's 
something much better already built in.

On 2020/01/04 17:46:40, Sachin Mittal <[email protected]> wrote: 
> Try something like this:
> 
> stream
>   .groupBy(
>     (key, value) -> value.userId
>   )
>   .aggregate(
>     () -> new Session(),
>     (aggKey, newValue, aggValue) -> {
>       aggValue.userId = newValue.userId
>       if (newValue.start) {
>         aggValue.start = newValue.start
>         aggValue.duration = 0
>         aggValue.open = true
>       }
>       else if (newValue.end) {
>         aggValue.duration = newValue.end - aggValue.start
>         aggValue.close = true
>       } else {
>         aggValue.count++
>         aggValue.duration = now() - aggValue.start
>       }
>     }
>   )
> 
> Note you need to have a well defined Session class and Event class with
> their appropriate serde.
> So the aggregated stream would have Session with its attributes like start
> time, duration, count, session open or close.
> 
> One thing you need to take care is after a session is closed a new session
> for the same user can be created again.
> So you may need to break sessions based on some windowing or as session is
> closed you store it in some store (be it internal to kafka or some external
> database) and reset the session object.
> 
> Hope this helps.
> 
> 
> 
> 
> On Sat, Jan 4, 2020 at 10:02 PM Chris Madge <[email protected]> wrote:
> 
> > Hi there,
> >
> > It’s my first voyage into stream processing - I’ve tried a few things but
> > I think I’m struggling to think in the streams way. I wondered if I could
> > be cheeky and ask if someone could give me some clues as to the correct
> > design for my first task to get me started?
> >
> > I have application events coming in like:
> >
> > <timestamp>,type:start,<user_id>
> > <timestamp>,type:action,<user_id>
> > <timestamp>,type:action,<user_id>
> > <timestamp>,type:action,<user_id>
> > <timestamp>,type:end,<user_id>
> >
> > each one represents a single user session.
> >
> > I need to output:
> > <timestamp of start event>,<duration between start and end
> > event>,<user_id>,<count_of_action_events>
> >
> > I’m working with event time (specified by the application) and I can’t
> > trust the application to close sessions/notify gracefully (I’m happy for
> > those to be thrown out, but cool ideas for alternatives are very welcome!).
> >
> > Any advice would be much appreciated.
> >
> > Chris Madge
> >
> 

Reply via email to