[
https://issues.apache.org/jira/browse/BEAM-5307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16661260#comment-16661260
]
Kenneth Knowles commented on BEAM-5307:
---------------------------------------
Hi [~sploegsma], thanks for the report. The trouble is that {{@FinishBundle}}
is not partitioned by key or window, but is a way to flush outgoing buffers in
a fairly naive way. What you want in the stateful case is to set a timer for
the window expiration and flush there.
We have some work in progress to make this a convenient {{@OnWindowExpiration}}
method - you can follow BEAM-1589 if you like.
> Allow injection of state in DoFn.FinishBundle
> ---------------------------------------------
>
> Key: BEAM-5307
> URL: https://issues.apache.org/jira/browse/BEAM-5307
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core
> Reporter: Sander Ploegsma
> Assignee: Kenneth Knowles
> Priority: Major
> Fix For: Not applicable
>
>
> Example use case: a stateful {{DoFn}} that requires persisting its state to
> an external database. Instead of writing to the external database for each
> element, it is much more efficient to flush the state every once in a while,
> or when cleaning up.
> Currently, this is not possible because the {{@StateId}} injection is only
> available in processing functions and timers.
> [This|https://stackoverflow.com/questions/51789776/calculating-deltas-in-apache-beam-using-stateful-processing]
> might be a workaround, but I'm not even sure if it works.
> Instead, by allowing the use of {{@StateId}} inside a {{@FinishBundle}}
> function, we can make sure the internal state is persisted in all scenarios:
> {code:java}
> @FinishBundle
> public void flush(FinishBundleContext context, @StateId("myState")
> ValueState<StateObj> state) {
> repository.save(state.read());
> }
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)