Thanks for confirming a hunch that I had. I was considering doing that but the 
javadoc saying "this feature is not implemented by any runner" sort of put me 
off. 

Is there a more up to date list of similar in progress features? If not it may 
be helpful to keep one. 

Thanks!
Ankur Chauhan. 

Sent from my iPhone

> On Apr 11, 2017, at 07:18, Kenneth Knowles <k...@google.com> wrote:
> 
> Hi Ankur,
> 
> If I understand your desire, then what you need for such a use case is an 
> event time timer, to flush when you are ready. You might choose the end of 
> the window, the window GC time or, in the global window, just some later 
> watermark.
> 
>     new DoFn<...>() {
>     
>       @StateId("buffer")
>       private final StateSpec<Object, BagState<Foo>> bufferSpec = 
> StateSpec.bag(...)
>     
>       @TimerId("finallyCleanup")
>       private final TimerSpec finallySpec = 
> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>     
>       @ProcessElement
>       public void process(@TimerId("finallyCleanup") Timer cleanupTimer) {
>         cleanupTimer.set(...);
>       }
>     
>       @OnTimer("finallyCleanup")
>       public void onFinallyCleanup(@StateId("buffer") BagState<Foo> buffered) 
> {
>         ...
>       }
>     }
> 
> This feature hasn't been blogged about or documented thoroughly except for a 
> couple of examples in the DoFn Javadoc. But it is available since 0.6.0.
> 
> Kenn
> 
>> On Tue, Apr 11, 2017 at 3:03 AM, Ankur Chauhan <an...@malloc64.com> wrote:
>> Hi,
>> 
>> I am attempting to do a seemingly simple task using the new state api. I 
>> have created a DoFn<KV<Sting, Event>, KV<String, Event>> that accepts events 
>> keyed by a particular id (session id) and intends to emit the same events 
>> partitioned by as sessionID/eventType. In the simple case this would be a 
>> normal DoFn but there is always a case where some events are not as clean as 
>> we would like and we need to save some state for the session and then emit 
>> those events later when cleanup is complete. For example:
>> 
>> Let’s say that the first few events are missing the eventType (or any other 
>> field), so we would like to buffer those events till we get the first event 
>> with the eventType field set and then use this information to emit the 
>> contents of the buffer with (last observed eventType + original contents of 
>> the buffered events),
>> 
>> For this my initial approach involved creating a BagState<Event> which would 
>> contain any buffered events and as more events came in, i would either emit 
>> the input with modification, or add the input to the buffer or, emit the 
>> events in the buffer with the input.
>> 
>> While running my test, I found that if I never get a “good” input, i.e. the 
>> session is only filled with error inputs, I would keep on buffering the 
>> input and never emit anything. My question is, how do i emit this buffer 
>> event when there is no more input?
>> 
>> — Ankur Chauhan
> 

Reply via email to