There's actually not a JIRA filed beyond BEAM-25 for what Eugene is
referring to. Context: Prior to windowing and streaming it was safe to
buffer elements in @ProcessElement and then actually perform output in
@FinishBundle. This pattern is only suitable for global windowing, flushing
to external systems, or requires perhaps complex manual window hackery. So
now we'll need a new callback @OnWindowExpiration that occurs
per-resident-window, before @FinishBundle, for producing output based on
remaining state before it is discarded.


On Wed, Jan 25, 2017 at 11:00 AM, Eugene Kirpichov <kirpic...@google.com>
wrote:

> One more thing.
>
> I think ideally, bundles should not leak into the model at all - e.g.
> ideally, startBundle/finishBundle methods in DoFn should not exist. They
> interact poorly with windowing.
> The proper way to address what is commonly done in these methods is either
> Setup/Teardown methods, or a (to be designed) "window close" callback -
> there's a JIRA about the latter but I couldn't find it, perhaps +Kenn
> Knowles <k...@google.com> remembers what it is.
>
> On Wed, Jan 25, 2017 at 10:41 AM Amit Sela <amitsel...@gmail.com> wrote:
>
>> On Wed, Jan 25, 2017 at 8:23 PM Thomas Groh <tg...@google.com.invalid>
>> wrote:
>>
>> > I have a couple of points in addition to what Robert said
>> >
>> > Runners are permitted to determine bundle sizes as appropriate to their
>> > implementation, so long as bundles are atomically committed. The
>> contents
>> > of a PCollection are independent of the bundling of that PCollection.
>> >
>> > Runners can process all elements within their own bundles (e.g.
>> > https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a6
>> >
>> > 3841af9b99/runners/flink/runner/src/main/java/org/
>> apache/beam/runners/flink/
>> > translation/wrappers/streaming/DoFnOperator.java#L289
>> > <https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a6
>> 3841af9b99/runners/flink/runner/src/main/java/org/
>> apache/beam/runners/flink/translation/wrappers/
>> streaming/DoFnOperator.java#L289>),
>> > the entire input
>> > data, or anywhere in between.
>> >
>> Or, as Thomas mentioned, a runner could process an entire
>> <https://github.com/apache/beam/blob/master/runners/
>> spark/src/main/java/org/apache/beam/runners/spark/translation/
>> SparkProcessContext.java#L57>
>> partition of the data as a bundle. It basically depends on the runner's
>> internal processing model.
>>
>> >
>> > On Wed, Jan 25, 2017 at 10:05 AM, Robert Bradshaw <
>> > rober...@google.com.invalid> wrote:
>> >
>> > > Bundles are simply the unit of commitment (retry) in the Beam SDK.
>> > > They're not really a model concept, but do leak from the
>> > > implementation into the API as it's not feasible to checkpoint every
>> > > individual process call, and this allows some state/compute/... to be
>> > > safely amortized across elements (either the results of all processed
>> > > elements in a bundle are sent downstream, or none are and the entire
>> > > bundle is retried).
>> > >
>> > > On Wed, Jan 25, 2017 at 9:36 AM, Matthew Jadczak <mn...@cam.ac.uk>
>> > wrote:
>> > > > Hi,
>> > > >
>> > > > I’m a finalist CompSci student at the University of Cambridge, and
>> for
>> > > my final project/dissertation I am writing an implementation of the
>> Beam
>> > > SDK in Elixir [1]. Given that the Beam project is obviously still very
>> > much
>> > > WIP, it’s still somewhat difficult to find good conceptual overviews
>> of
>> > > parts of the system, which is crucial when translating the OOP
>> > architecture
>> > > to something completely different. However I have found many of the
>> > design
>> > > docs scattered around the JIRA and here very helpful. (Incidentally,
>> > > perhaps it would be helpful to maintain a list of them, to help any
>> > > contributors acquaint themselves with the conceptual vision of the
>> > > implementation?)
>> > > >
>> > > > One thing which I have not yet been able to work out is the
>> > significance
>> > > of “bundles” in the SDK. On the one hand, it seems that they are
>> simply
>> > an
>> > > implementation detail, effectively a way to do micro-batch processing
>> > > efficiently, and indeed they are not mentioned at all in the original
>> > > Dataflow paper or anywhere in the Beam docs (except in passing). On
>> the
>> > > other hand, it seems most of the key transforms in the SDK core have a
>> > > concept of bundles and operate in their terms in practice, while all
>> > > conceptually being described as just operating on elements.
>> > > >
>> > > > Do bundles have semantic meaning in the Beam Model? Are there any
>> > > guidelines as to how a given transform should split its output up into
>> > > bundles? Should any runner/SDK implementing the Model have that
>> concept,
>> > > even when other primitives for streaming data processing including
>> things
>> > > like efficiently transmitting individual elements between stages with
>> > > backpressure are available in the language/standard libraries? Are
>> there
>> > > any insights here that I am missing, i.e. were problems present in
>> early
>> > > versions of the runners solved by adding the concept of bundles?
>> > > >
>> > > > Thanks so much,
>> > > > Matt
>> > > >
>> > > > [1] http://elixir-lang.org/
>> > >
>> >
>>
>

Reply via email to