Hello dear Beam community,
I would like to write to you for a question about OnWindowExpiration annotation 
on DoFn.
Does anyone of you have a working snippet with this ?

I try to write a DoFn with a Batch RPC on window closure. It is a BigQuery call 
for a historical metric value updated by an external process. I want to execute 
this query and sum the results with my events buffered in a state. The 
OnWindowExpiration looks very practical to accomplish this.

It looks like the function annotated with @OnWindowExpiration is never call. My 
pipeline runs on Dataflow, perhaps its not a supported feature on this runner…

Here is a snippet of what I try to accomplish. It seems like the annotated 
functions is never called, the log line is never appearing. Am I missing 
something ?
I tried to replicate the logic found in this blog post 
<https://beam.apache.org/blog/2017/08/28/timely-processing.html> and pieces of 
information found in this PR. <https://github.com/apache/beam/pull/4482>


// The window definition used in the pipeline sets in a higher transform
// Window<KV<String, Long>> w =
//     Window.<Row>into(FixedWindows.of(Duration.standardMinutes(1L)))
//         .withAllowedLateness(Duration.ZERO)
//         .discardingFiredPanes();

public final class Enrich extends DoFn<KV<String, Long>, KV<String, Long>> {

  @StateId("buffer")
  private final StateSpec<BagState<KV<String, Long>>> bufferedEvents = 
StateSpecs.bag();

  @ProcessElement
  public void process(
      final ProcessContext context,
      final @StateId("buffer") BagState<KV<String, Long>> bufferState) {
    bufferState.add(context.element());
    context.output(context.element());
  }

  @OnWindowExpiration
  public void onWindowExpiration(
      final @StateId("buffer") BagState<KV<String, Long>> bufferState,
      final OutputReceiver<KV<String, Long>> outputReceiver) {
    LOG.info("The window expired");
    for (KV<String, Long> enrichedEvent : 
enrichWithBigQuery(bufferState.read())) {
      outputReceiver.output(enrichedEvent);
    }
  }
}


Thanks for your help,


Augustin



-- 
Chauffeur Privé devient kapten_ Plus d'informations ici 
<https://www.kapten.com/fr/manifesto.html>

Reply via email to