The local java runner does arbitrary batching of 10 elements.

I'm not sure if flink exposes this or not, but couldn't you use the checkpoint 
triggers to also start/finish a bundle?
 - Bobby 

    On Wednesday, June 8, 2016 10:17 AM, Aljoscha Krettek <aljos...@apache.org> 
wrote:
 

 Ahh, what we could do is artificially induce bundles using either count or
processing time or both. Just so that finishBundle() is called once in a
while.

On Wed, 8 Jun 2016 at 17:12 Aljoscha Krettek <aljos...@apache.org> wrote:

> Pretty sure, yes. The Iterable in a MapPartitionFunction should give you
> all the values in a given partition.
>
> I checked again for streaming execution. We're doing the opposite, right
> now: every element is a bundle in itself, startBundle()/finishBundle() are
> called for every element which seems a bit wasteful. The only other option
> is to see all elements as one bundle, because Flink does not bundle/micro
> batch elements in streaming execution.
>
> On Wed, 8 Jun 2016 at 16:38 Bobby Evans <ev...@yahoo-inc.com.invalid>
> wrote:
>
>> Are you sure about that for Flink?  I thought the iterable finished when
>> you processed a maximum number of elements or the input queue was empty so
>> that it could returned control back to akka for better sharing of the
>> thread pool.
>>
>>
>> https://github.com/apache/incubator-beam/blob/af8f5935ca1866012ceb102b9472c8b1ef102d73/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java#L99
>> Also in the javadocs for DoFn.Context it explicitly states that you can
>> emit from the finishBundle method.
>>
>>
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L104-L110
>> I thought I had seen some example of this being used for batching output
>> to something downstream, like HDFS or Kafka, but I'm not sure on that.  If
>> you can emit from finsihBundle and an new instance of the DoFn will be
>> created around each bundle then I can see some people trying to do
>> aggregations inside a DoFn and then emitting them at the end of the bundle
>> knowing that if a batch fails or is rolled back the system will handle it.
>> If that is not allowed we should really update the javadocs around it to
>> explain the pitfalls of doing this.
>>  - Bobby
>>
>>    On Wednesday, June 8, 2016 4:24 AM, Aljoscha Krettek <
>> aljos...@apache.org> wrote:
>>
>>
>>  Hi,
>> a quick related question: In the Flink runner we basically see everything
>> as one big bundle, i.e. we call startBundle() once at the beginning and
>> then keep processing indefinitely, never calling finishBundle(). Is this
>> also correct behavior?
>>
>> Best,
>> Aljoscha
>>
>> On Tue, 7 Jun 2016 at 20:44 Thomas Groh <tg...@google.com.invalid> wrote:
>>
>> > Hey everyone;
>> >
>> > I'm starting to work on BEAM-38 (
>> > https://issues.apache.org/jira/browse/BEAM-38), which enables an
>> > optimization for runners with many small bundles. BEAM-38 allows
>> runners to
>> > reuse DoFn instances so long as that DoFn has not terminated abnormally.
>> > This replaces the previous requirement that a DoFn be used for only a
>> > single bundle if either of startBundle or finishBundle have been
>> > overwritten.
>> >
>> > DoFn deserialization-per-bundle can be a significance performance
>> > bottleneck when there are many small bundles, as is common in streaming
>> > executions. It has also surfaced as the cause of much of the current
>> > slowness in the new InProcessRunner.
>> >
>> > Existing Runners do not require any changes; they may choose to take
>> > advantage of of the new optimization opportunity. However, user DoFns
>> may
>> > need to be revised to properly set up and tear down state in startBundle
>> > and finishBundle, respectively, if the depended on only being used for a
>> > single bundle.
>> >
>> > The first two updates are already in pull requests:
>> >
>> > PR #419 (https://github.com/apache/incubator-beam/pull/419) updates the
>> > Javadoc to the new spec
>> > PR #418 (https://github.com/apache/incubator-beam/pull/418) updates the
>> > DirectRunner to reuse DoFns according to the new policy.
>> >
>> > Yours,
>> >
>> > Thomas
>> >
>>
>>
>>
>
>


  

Reply via email to