Are you sure about that for Flink?  I thought the iterable finished when you 
processed a maximum number of elements or the input queue was empty so that it 
could returned control back to akka for better sharing of the thread pool.

https://github.com/apache/incubator-beam/blob/af8f5935ca1866012ceb102b9472c8b1ef102d73/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java#L99
Also in the javadocs for DoFn.Context it explicitly states that you can emit 
from the finishBundle method.

https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L104-L110
I thought I had seen some example of this being used for batching output to 
something downstream, like HDFS or Kafka, but I'm not sure on that.  If you can 
emit from finsihBundle and an new instance of the DoFn will be created around 
each bundle then I can see some people trying to do aggregations inside a DoFn 
and then emitting them at the end of the bundle knowing that if a batch fails 
or is rolled back the system will handle it.  If that is not allowed we should 
really update the javadocs around it to explain the pitfalls of doing this.
 - Bobby 

    On Wednesday, June 8, 2016 4:24 AM, Aljoscha Krettek <aljos...@apache.org> 
wrote:
 

 Hi,
a quick related question: In the Flink runner we basically see everything
as one big bundle, i.e. we call startBundle() once at the beginning and
then keep processing indefinitely, never calling finishBundle(). Is this
also correct behavior?

Best,
Aljoscha

On Tue, 7 Jun 2016 at 20:44 Thomas Groh <tg...@google.com.invalid> wrote:

> Hey everyone;
>
> I'm starting to work on BEAM-38 (
> https://issues.apache.org/jira/browse/BEAM-38), which enables an
> optimization for runners with many small bundles. BEAM-38 allows runners to
> reuse DoFn instances so long as that DoFn has not terminated abnormally.
> This replaces the previous requirement that a DoFn be used for only a
> single bundle if either of startBundle or finishBundle have been
> overwritten.
>
> DoFn deserialization-per-bundle can be a significance performance
> bottleneck when there are many small bundles, as is common in streaming
> executions. It has also surfaced as the cause of much of the current
> slowness in the new InProcessRunner.
>
> Existing Runners do not require any changes; they may choose to take
> advantage of of the new optimization opportunity. However, user DoFns may
> need to be revised to properly set up and tear down state in startBundle
> and finishBundle, respectively, if the depended on only being used for a
> single bundle.
>
> The first two updates are already in pull requests:
>
> PR #419 (https://github.com/apache/incubator-beam/pull/419) updates the
> Javadoc to the new spec
> PR #418 (https://github.com/apache/incubator-beam/pull/418) updates the
> DirectRunner to reuse DoFns according to the new policy.
>
> Yours,
>
> Thomas
>


  

Reply via email to