Thanks for confirming a hunch that I had. I was considering doing that but the javadoc saying "this feature is not implemented by any runner" sort of put me off.
Is there a more up to date list of similar in progress features? If not it may be helpful to keep one. Thanks! Ankur Chauhan. Sent from my iPhone > On Apr 11, 2017, at 07:18, Kenneth Knowles <k...@google.com> wrote: > > Hi Ankur, > > If I understand your desire, then what you need for such a use case is an > event time timer, to flush when you are ready. You might choose the end of > the window, the window GC time or, in the global window, just some later > watermark. > > new DoFn<...>() { > > @StateId("buffer") > private final StateSpec<Object, BagState<Foo>> bufferSpec = > StateSpec.bag(...) > > @TimerId("finallyCleanup") > private final TimerSpec finallySpec = > TimerSpecs.timer(TimeDomain.EVENT_TIME); > > @ProcessElement > public void process(@TimerId("finallyCleanup") Timer cleanupTimer) { > cleanupTimer.set(...); > } > > @OnTimer("finallyCleanup") > public void onFinallyCleanup(@StateId("buffer") BagState<Foo> buffered) > { > ... > } > } > > This feature hasn't been blogged about or documented thoroughly except for a > couple of examples in the DoFn Javadoc. But it is available since 0.6.0. > > Kenn > >> On Tue, Apr 11, 2017 at 3:03 AM, Ankur Chauhan <an...@malloc64.com> wrote: >> Hi, >> >> I am attempting to do a seemingly simple task using the new state api. I >> have created a DoFn<KV<Sting, Event>, KV<String, Event>> that accepts events >> keyed by a particular id (session id) and intends to emit the same events >> partitioned by as sessionID/eventType. In the simple case this would be a >> normal DoFn but there is always a case where some events are not as clean as >> we would like and we need to save some state for the session and then emit >> those events later when cleanup is complete. For example: >> >> Let’s say that the first few events are missing the eventType (or any other >> field), so we would like to buffer those events till we get the first event >> with the eventType field set and then use this information to emit the >> contents of the buffer with (last observed eventType + original contents of >> the buffered events), >> >> For this my initial approach involved creating a BagState<Event> which would >> contain any buffered events and as more events came in, i would either emit >> the input with modification, or add the input to the buffer or, emit the >> events in the buffer with the input. >> >> While running my test, I found that if I never get a “good” input, i.e. the >> session is only filled with error inputs, I would keep on buffering the >> input and never emit anything. My question is, how do i emit this buffer >> event when there is no more input? >> >> — Ankur Chauhan >