Hi Ankur,

If I understand your desire, then what you need for such a use case is an
event time timer, to flush when you are ready. You might choose the end of
the window, the window GC time or, in the global window, just some later
watermark.

    new DoFn<...>() {

      @StateId("buffer")
      private final StateSpec<Object, BagState<Foo>> bufferSpec =
StateSpec.bag(...)

      @TimerId("finallyCleanup")
      private final TimerSpec finallySpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);

      @ProcessElement
      public void process(@TimerId("finallyCleanup") Timer cleanupTimer) {
        cleanupTimer.set(...);
      }

      @OnTimer("finallyCleanup")
      public void onFinallyCleanup(@StateId("buffer") BagState<Foo>
buffered) {
        ...
      }
    }

This feature hasn't been blogged about or documented thoroughly except for
a couple of examples in the DoFn Javadoc. But it is available since 0.6.0.

Kenn

On Tue, Apr 11, 2017 at 3:03 AM, Ankur Chauhan <an...@malloc64.com> wrote:

> Hi,
>
> I am attempting to do a seemingly simple task using the new state api. I
> have created a DoFn<KV<Sting, Event>, KV<String, Event>> that accepts
> events keyed by a particular id (session id) and intends to emit the same
> events partitioned by as sessionID/eventType. In the simple case this would
> be a normal DoFn but there is always a case where some events are not as
> clean as we would like and we need to save some state for the session and
> then emit those events later when cleanup is complete. For example:
>
> Let’s say that the first few events are missing the eventType (or any
> other field), so we would like to buffer those events till we get the first
> event with the eventType field set and then use this information to emit
> the contents of the buffer with (last observed eventType + original
> contents of the buffered events),
>
> For this my initial approach involved creating a BagState<Event> which
> would contain any buffered events and as more events came in, i would
> either emit the input with modification, or add the input to the buffer or,
> emit the events in the buffer with the input.
>
> While running my test, I found that if I never get a “good” input, i.e.
> the session is only filled with error inputs, I would keep on buffering the
> input and never emit anything. My question is, how do i emit this buffer
> event when there is no more input?
>
> — Ankur Chauhan

Reply via email to