Hello dear Beam community, I would like to write to you for a question about OnWindowExpiration annotation on DoFn. Does anyone of you have a working snippet with this ?
I try to write a DoFn with a Batch RPC on window closure. It is a BigQuery call for a historical metric value updated by an external process. I want to execute this query and sum the results with my events buffered in a state. The OnWindowExpiration looks very practical to accomplish this. It looks like the function annotated with @OnWindowExpiration is never call. My pipeline runs on Dataflow, perhaps its not a supported feature on this runner… Here is a snippet of what I try to accomplish. It seems like the annotated functions is never called, the log line is never appearing. Am I missing something ? I tried to replicate the logic found in this blog post <https://beam.apache.org/blog/2017/08/28/timely-processing.html> and pieces of information found in this PR. <https://github.com/apache/beam/pull/4482> // The window definition used in the pipeline sets in a higher transform // Window<KV<String, Long>> w = // Window.<Row>into(FixedWindows.of(Duration.standardMinutes(1L))) // .withAllowedLateness(Duration.ZERO) // .discardingFiredPanes(); public final class Enrich extends DoFn<KV<String, Long>, KV<String, Long>> { @StateId("buffer") private final StateSpec<BagState<KV<String, Long>>> bufferedEvents = StateSpecs.bag(); @ProcessElement public void process( final ProcessContext context, final @StateId("buffer") BagState<KV<String, Long>> bufferState) { bufferState.add(context.element()); context.output(context.element()); } @OnWindowExpiration public void onWindowExpiration( final @StateId("buffer") BagState<KV<String, Long>> bufferState, final OutputReceiver<KV<String, Long>> outputReceiver) { LOG.info("The window expired"); for (KV<String, Long> enrichedEvent : enrichWithBigQuery(bufferState.read())) { outputReceiver.output(enrichedEvent); } } } Thanks for your help, Augustin -- Chauffeur Privé devient kapten_ Plus d'informations ici <https://www.kapten.com/fr/manifesto.html>
