[jira] [Commented] (BEAM-5307) Allow injection of state in DoFn.FinishBundle

2018-10-30 Thread Kenneth Knowles (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16668683#comment-16668683
 ] 

Kenneth Knowles commented on BEAM-5307:
---

Every element always has a window, so if you aren't assigning windows then they 
are all in the global window. So in that case there is no window expiration 
since the global window never ends. Using timers is the right idea. You'll want 
to set the timer for a finite time in the future, not the end of the window. 
StackOverflow is a better place for digging in to this, so I'll answer there.

> Allow injection of state in DoFn.FinishBundle
> -
>
> Key: BEAM-5307
> URL: https://issues.apache.org/jira/browse/BEAM-5307
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sander Ploegsma
>Assignee: Kenneth Knowles
>Priority: Major
> Fix For: Not applicable
>
>
> Example use case: a stateful {{DoFn}} that requires persisting its state to 
> an external database. Instead of writing to the external database for each 
> element, it is much more efficient to flush the state every once in a while, 
> or when cleaning up.
> Currently, this is not possible because the {{@StateId}} injection is only 
> available in processing functions and timers. 
> [This|https://stackoverflow.com/questions/51789776/calculating-deltas-in-apache-beam-using-stateful-processing]
>  might be a workaround, but I'm not even sure if it works.
> Instead, by allowing the use of {{@StateId}} inside a {{@FinishBundle}} 
> function, we can make sure the internal state is persisted in all scenarios:
> {code:java}
> @FinishBundle
> public void flush(FinishBundleContext context, @StateId("myState") 
> ValueState state) {
> repository.save(state.read());
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5307) Allow injection of state in DoFn.FinishBundle

2018-10-30 Thread Sander Ploegsma (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16668317#comment-16668317
 ] 

Sander Ploegsma commented on BEAM-5307:
---

Hi [~kenn], thanks for getting back to me. Could you explain what the 'window 
expiration' would mean in case I'm not assigning windows?

> Allow injection of state in DoFn.FinishBundle
> -
>
> Key: BEAM-5307
> URL: https://issues.apache.org/jira/browse/BEAM-5307
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sander Ploegsma
>Assignee: Kenneth Knowles
>Priority: Major
> Fix For: Not applicable
>
>
> Example use case: a stateful {{DoFn}} that requires persisting its state to 
> an external database. Instead of writing to the external database for each 
> element, it is much more efficient to flush the state every once in a while, 
> or when cleaning up.
> Currently, this is not possible because the {{@StateId}} injection is only 
> available in processing functions and timers. 
> [This|https://stackoverflow.com/questions/51789776/calculating-deltas-in-apache-beam-using-stateful-processing]
>  might be a workaround, but I'm not even sure if it works.
> Instead, by allowing the use of {{@StateId}} inside a {{@FinishBundle}} 
> function, we can make sure the internal state is persisted in all scenarios:
> {code:java}
> @FinishBundle
> public void flush(FinishBundleContext context, @StateId("myState") 
> ValueState state) {
> repository.save(state.read());
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-5307) Allow injection of state in DoFn.FinishBundle

2018-10-23 Thread Kenneth Knowles (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-5307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16661260#comment-16661260
 ] 

Kenneth Knowles commented on BEAM-5307:
---

Hi [~sploegsma], thanks for the report. The trouble is that {{@FinishBundle}} 
is not partitioned by key or window, but is a way to flush outgoing buffers in 
a fairly naive way. What you want in the stateful case is to set a timer for 
the window expiration and flush there.

We have some work in progress to make this a convenient {{@OnWindowExpiration}} 
method - you can follow BEAM-1589 if you like.

> Allow injection of state in DoFn.FinishBundle
> -
>
> Key: BEAM-5307
> URL: https://issues.apache.org/jira/browse/BEAM-5307
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Sander Ploegsma
>Assignee: Kenneth Knowles
>Priority: Major
> Fix For: Not applicable
>
>
> Example use case: a stateful {{DoFn}} that requires persisting its state to 
> an external database. Instead of writing to the external database for each 
> element, it is much more efficient to flush the state every once in a while, 
> or when cleaning up.
> Currently, this is not possible because the {{@StateId}} injection is only 
> available in processing functions and timers. 
> [This|https://stackoverflow.com/questions/51789776/calculating-deltas-in-apache-beam-using-stateful-processing]
>  might be a workaround, but I'm not even sure if it works.
> Instead, by allowing the use of {{@StateId}} inside a {{@FinishBundle}} 
> function, we can make sure the internal state is persisted in all scenarios:
> {code:java}
> @FinishBundle
> public void flush(FinishBundleContext context, @StateId("myState") 
> ValueState state) {
> repository.save(state.read());
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)