Hi Aljoscha, Thanks a lot for your Trigger implementation, definitely helped provide some direction. It appears to be working well for our use case. One thing I have noticed now that I have pulled the state API changes in is that key based state within a window function does not appear to be working. Perhaps I am not using it correctly now that the API has changed. Previously we had done something like this within the RichWindowFunction:
@Override public void open(final Configuration parameters) throws Exception { state = getRuntimeContext().getOperatorState("state", new StatePojo(), true); } Based on the API changes I switched it to: @Override public void open(final Configuration parameters) throws Exception { state = getRuntimeContext().getKeyValueState("state", StatePojo.class, new StatePojo()); } But the state doesn’t seem to be partitioned based on the key. I haven’t had much time to play around with it, so its certainly possible that I messed something up while refactoring to the API change. I will look at it further when I get a chance, but if you have any thoughts they are much appreciated. Thanks, Paul Hamilton On 10/17/15, 6:39 AM, "Aljoscha Krettek" <aljos...@apache.org> wrote: >Hi Paul, >it’s good to see people interested in this. I sketched a Trigger that >should fit your requirements: >https://gist.github.com/aljoscha/a7c6f22548e7d24bc4ac > >You can use it like this: > >DataStream<> input = … >DataStream<> result = input > .keyBy(“session-id”) > .window(GlobalWindows.create()) > .trigger(new SessionTrigger(timeout, maxElements)) > .apply(new MyWindowFunction()) > >The Trigger uses the new state API that I’m currently introducing in a >new Pull Request. It should be merged very soon, before the 0.10 release. > >This implementation has one caveat, though. It cannot deal with elements >that belong to different sessions that arrive intermingled with other >sessions. The reason is that Flink does not yet support merging the >windows that the WindowAssigner assigns as, for example, the Cloud >Dataflow API supports. This means that elements cannot be assigned to >session windows, instead the workaround with the GlobalWindow has to be >used. I want to tackle this for the release after 0.10, however. > >Please let us know if you need more information. I’m always happy to help >in these interesting cases at the bleeding edge of what is possible. :-) > >Cheers, >Aljoscha > >> On 16 Oct 2015, at 19:36, Hamilton, Paul <paul.hamilto...@sap.com> >>wrote: >> >> Hi, >> >> I am attempting to make use of the new window APIs in streaming to >> implement a session based window and am not sure if the currently >>provided >> functionality handles my use case. Specifically what I want to do is >> something conceptually similar to a ³Sessions.withGapDuration(Š)² window >> in Google DataFlow. >> >> Assuming the events are keyed by session id. I would like to use the >> event time and the watermarking functionality to trigger a window after >> the ³end of a session² (no events for a given session received within x >> amount of time). With watermarking this would mean trigger when a >> watermark is seen that is > (the time of the last event + session >> timeout). Also I want to perform an early triggering of the window >>after a >> given number of events have been received. >> >> Is it currently possible to do this with the current combination of >>window >> assigners and triggers? I am happy to write custom triggers etc, but >> wanted to make sure it wasn¹t already available before going down that >> road. >> >> Thanks, >> >> Paul Hamilton >> Hybris Software >> >> >