The local java runner does arbitrary batching of 10 elements. I'm not sure if flink exposes this or not, but couldn't you use the checkpoint triggers to also start/finish a bundle? - Bobby
On Wednesday, June 8, 2016 10:17 AM, Aljoscha Krettek <aljos...@apache.org> wrote: Ahh, what we could do is artificially induce bundles using either count or processing time or both. Just so that finishBundle() is called once in a while. On Wed, 8 Jun 2016 at 17:12 Aljoscha Krettek <aljos...@apache.org> wrote: > Pretty sure, yes. The Iterable in a MapPartitionFunction should give you > all the values in a given partition. > > I checked again for streaming execution. We're doing the opposite, right > now: every element is a bundle in itself, startBundle()/finishBundle() are > called for every element which seems a bit wasteful. The only other option > is to see all elements as one bundle, because Flink does not bundle/micro > batch elements in streaming execution. > > On Wed, 8 Jun 2016 at 16:38 Bobby Evans <ev...@yahoo-inc.com.invalid> > wrote: > >> Are you sure about that for Flink? I thought the iterable finished when >> you processed a maximum number of elements or the input queue was empty so >> that it could returned control back to akka for better sharing of the >> thread pool. >> >> >> https://github.com/apache/incubator-beam/blob/af8f5935ca1866012ceb102b9472c8b1ef102d73/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java#L99 >> Also in the javadocs for DoFn.Context it explicitly states that you can >> emit from the finishBundle method. >> >> >> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L104-L110 >> I thought I had seen some example of this being used for batching output >> to something downstream, like HDFS or Kafka, but I'm not sure on that. If >> you can emit from finsihBundle and an new instance of the DoFn will be >> created around each bundle then I can see some people trying to do >> aggregations inside a DoFn and then emitting them at the end of the bundle >> knowing that if a batch fails or is rolled back the system will handle it. >> If that is not allowed we should really update the javadocs around it to >> explain the pitfalls of doing this. >> - Bobby >> >> On Wednesday, June 8, 2016 4:24 AM, Aljoscha Krettek < >> aljos...@apache.org> wrote: >> >> >> Hi, >> a quick related question: In the Flink runner we basically see everything >> as one big bundle, i.e. we call startBundle() once at the beginning and >> then keep processing indefinitely, never calling finishBundle(). Is this >> also correct behavior? >> >> Best, >> Aljoscha >> >> On Tue, 7 Jun 2016 at 20:44 Thomas Groh <tg...@google.com.invalid> wrote: >> >> > Hey everyone; >> > >> > I'm starting to work on BEAM-38 ( >> > https://issues.apache.org/jira/browse/BEAM-38), which enables an >> > optimization for runners with many small bundles. BEAM-38 allows >> runners to >> > reuse DoFn instances so long as that DoFn has not terminated abnormally. >> > This replaces the previous requirement that a DoFn be used for only a >> > single bundle if either of startBundle or finishBundle have been >> > overwritten. >> > >> > DoFn deserialization-per-bundle can be a significance performance >> > bottleneck when there are many small bundles, as is common in streaming >> > executions. It has also surfaced as the cause of much of the current >> > slowness in the new InProcessRunner. >> > >> > Existing Runners do not require any changes; they may choose to take >> > advantage of of the new optimization opportunity. However, user DoFns >> may >> > need to be revised to properly set up and tear down state in startBundle >> > and finishBundle, respectively, if the depended on only being used for a >> > single bundle. >> > >> > The first two updates are already in pull requests: >> > >> > PR #419 (https://github.com/apache/incubator-beam/pull/419) updates the >> > Javadoc to the new spec >> > PR #418 (https://github.com/apache/incubator-beam/pull/418) updates the >> > DirectRunner to reuse DoFns according to the new policy. >> > >> > Yours, >> > >> > Thomas >> > >> >> >> > >