You are correct that state is per-key.  In your case, if you just want all
elements in a window to be emitted when the window closes, you can just use
GroupByKey and assign everything the same key.  When the window closes
you'll get an iterable of all elements in that window.

Note: this won't scale well, as you'll be processing the group on a single
worker, however for smaller data sets it should be fine.

On Tue, Feb 26, 2019 at 10:26 AM Augustin Lafanechere <
augustin.lafanech...@kapten.com> wrote:

> I think I got the issue. I did not get that states are partitioned by key.
> So if I want to limit callback fires I need to change my partitioning logic
> for a more coarse grained one. Please tell me if I’m wrong or if a special
> feature exists to access a the global state, put due to shuffling issue I
> think it my not be possible / wanted.
>
> Thanks !
>
> Augustin
>
>
>
> Le 26 févr. 2019 à 14:06, Augustin Lafanechere <
> augustin.lafanech...@kapten.com> a écrit :
>
> Many thanks for your answers !
> The GroupIntoBatches transforms nearly implements the logic I am after,
> but I just want to execute the RPC call at the end of the window, not the
> flush on batch size limit reach.
>
> In order to do so I reimplemented the logic of the GroupIntoBatches that
> guarantees batch flush on window end.
>
> According to my logs its looks like the @OnTimer callback is fired for
> every element that reaches the processElement. Is it the expected behaviors
> ? I look after executing the callback only once (when the window is closed).
>
> Thanks for you help ! Please find below the snippet I am currently running.
>
> Augustin
>
> public final class Enrich extends DoFn<KV<String, Long>, KV<String, Long>>
> {
>
> private static final Logger LOG = LoggerFactory.getLogger(TestPipeline.
> class);
>
> @TimerId("endOfWindow")
> private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>
> @StateId("batch")
> private final StateSpec<BagState<KV<String, Long>>> batchSpec = StateSpecs
> .bag();
>
> @ProcessElement
> public void processElement(
> final @TimerId("endOfWindow") Timer windowTimer,
> final @StateId("batch") BagState<KV<String, Long>> batch,
> final @Element KV<String, Long> element,
> final BoundedWindow window,
> final OutputReceiver<KV<String, Long>> receiver) {
>
> Instant windowExpires = window.maxTimestamp();
>
> LOG.info(
> "*** SET TIMER *** to point in time {} for window {}",
> windowExpires.toString(),
> window.toString());
> windowTimer.set(windowExpires);
> batch.add(element);
> LOG.info("*** BATCH *** Add element for window {} ", window.toString());
> }
>
> @OnTimer("endOfWindow")
> public void onTimerCallback(
> final OutputReceiver<KV<String, Long>> receiver,
> final @Timestamp Instant timestamp,
> final @StateId("batch") BagState<KV<String, Long>> batch,
> final BoundedWindow window) {
> LOG.info(
> "*** END OF WINDOW *** for timer timestamp {} in windows {}", timestamp,
> window.toString());
> flushBatch(receiver, batch);
> }
>
> private void flushBatch(
> final OutputReceiver<KV<String, Long>> receiver, final BagState<KV<String,
> Long>> batch) {
> Iterable<KV<String, Long>> values = batch.read();
> // when the timer fires, batch state might be empty
> if (!Iterables.isEmpty(values)) {
> for (KV<String, Long> elem : values) {
> receiver.output(elem);
> }
> }
> batch.clear();
> LOG.info("*** BATCH *** clear");
> }
> }
>
>
>
> Le 26 févr. 2019 à 00:49, Kenneth Knowles <k...@apache.org> a écrit :
>
> Sorry you hit this issue.
>
> Implementation of this feature has been marked in progress [1] for a
> while. It looks to be stalled so I unassigned the ticket. There is not any
> explicit runner support, yet, though the existing implementation is clever
> enough that it may automatically work for many runners.
>
> Kenn
>
> [1] https://issues.apache.org/jira/browse/BEAM-1589
>
> On Mon, Feb 25, 2019 at 1:04 PM Steve Niemitz <sniem...@apache.org> wrote:
>
>> I've noticed this doesn't seem to work either.  The workaround is to just
>> schedule an event-time timer at the end of the window + allowed lateness.
>> The built-in GroupIntoBatches transform [1] does just this, I suspect to
>> work around the issue as well.
>>
>> [1]
>> https://github.com/apache/beam/blob/79b81b27d22d875d6b324d8ba9051b4f8f77c420/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L167
>>
>> On Mon, Feb 25, 2019 at 3:24 PM Augustin Lafanechere <
>> augustin.lafanech...@kapten.com> wrote:
>>
>>> Hello dear Beam community,
>>> Sorry, I sent this email on dev list first but it’s a user support
>>> question...
>>> I would like to write to you for a question about OnWindowExpiration
>>> annotation on DoFn.
>>> Does anyone of you have a working snippet with this ?
>>>
>>> I try to write a DoFn with a Batch RPC on window closure. It is a
>>> BigQuery call for a historical metric value updated by an external process.
>>> I want to execute this query and sum the results with my events buffered in
>>> a state. The OnWindowExpiration looks very practical to accomplish this.
>>>
>>> It looks like the function annotated with @OnWindowExpiration is never
>>> call. My pipeline runs on Dataflow, perhaps its not a supported feature on
>>> this runner…
>>>
>>> Here is a snippet of what I try to accomplish. It seems like the
>>> annotated functions is never called, the log line is never appearing. Am I
>>> missing something ?
>>> I tried to replicate the logic found in this blog post
>>> <https://beam.apache.org/blog/2017/08/28/timely-processing.html> and
>>> pieces of information found in this PR.
>>> <https://github.com/apache/beam/pull/4482>
>>>
>>>
>>> // The window definition used in the pipeline sets in a higher transform
>>> // Window<KV<String, Long>> w =
>>> // Window.<Row>into(FixedWindows.of(Duration.standardMinutes(1L)))
>>> // .withAllowedLateness(Duration.ZERO)
>>> // .discardingFiredPanes();
>>>
>>> public final class Enrich extends DoFn<KV<String, Long>, KV<String, Long>>
>>> {
>>>
>>> @StateId("buffer")
>>> private final StateSpec<BagState<KV<String, Long>>> bufferedEvents =
>>> StateSpecs.bag();
>>>
>>> @ProcessElement
>>> public void process(
>>> final ProcessContext context,
>>> final @StateId("buffer") BagState<KV<String, Long>> bufferState) {
>>> bufferState.add(context.element());
>>> context.output(context.element());
>>> }
>>>
>>> @OnWindowExpiration
>>> public void onWindowExpiration(
>>> final @StateId("buffer") BagState<KV<String, Long>> bufferState,
>>> final OutputReceiver<KV<String, Long>> outputReceiver) {
>>> LOG <http://log.info/>. <http://log.info/>info <http://log.info/>("The
>>> window expired");
>>> for (KV<String, Long> enrichedEvent : enrichWithBigQuery(bufferState.
>>> read())) {
>>> outputReceiver.output(enrichedEvent);
>>> }
>>> }
>>> }
>>>
>>>
>>> Thanks for your help,
>>>
>>>
>>> Augustin
>>>
>>> Chauffeur Privé devient kapten_ Plus d'informations ici
>>> <https://www.kapten.com/fr/manifesto.html>
>>>
>>
>
>
> Chauffeur Privé devient kapten_ Plus d'informations ici
> <https://www.kapten.com/fr/manifesto.html>
>

Reply via email to