Le 25/01/2017 à 20:34, Kenneth Knowles a écrit :
There's actually not a JIRA filed beyond BEAM-25 for what Eugene is
referring to. Context: Prior to windowing and streaming it was safe to
buffer elements in @ProcessElement and then actually perform output in
@FinishBundle. This pattern is only suitable for global windowing, flushing
to external systems, or requires perhaps complex manual window hackery. So
now we'll need a new callback @OnWindowExpiration that occurs
per-resident-window, before @FinishBundle, for producing output based on
remaining state before it is discarded.
+1 This is exactly what I need for BEAM-135. Lets imagine that we have a collection of elements artificially timestamped every 10 seconds and a fixed windowing of 1 minute. Then each window contains 6 elements. If we were to buffer the elements by batches of 5 elements, then for each window we expect to get 2 batches (one of 5 elements, one of 1 element). For that to append, we need the @OnWindowExpiration on the DoFn to materialize the batch of 1 element.


On Wed, Jan 25, 2017 at 11:00 AM, Eugene Kirpichov <kirpic...@google.com>
wrote:

One more thing.

I think ideally, bundles should not leak into the model at all - e.g.
ideally, startBundle/finishBundle methods in DoFn should not exist. They
interact poorly with windowing.
The proper way to address what is commonly done in these methods is either
Setup/Teardown methods, or a (to be designed) "window close" callback -
there's a JIRA about the latter but I couldn't find it, perhaps +Kenn
Knowles <k...@google.com> remembers what it is.

On Wed, Jan 25, 2017 at 10:41 AM Amit Sela <amitsel...@gmail.com> wrote:

On Wed, Jan 25, 2017 at 8:23 PM Thomas Groh <tg...@google.com.invalid>
wrote:

I have a couple of points in addition to what Robert said

Runners are permitted to determine bundle sizes as appropriate to their
implementation, so long as bundles are atomically committed. The
contents
of a PCollection are independent of the bundling of that PCollection.

Runners can process all elements within their own bundles (e.g.
https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a6

3841af9b99/runners/flink/runner/src/main/java/org/
apache/beam/runners/flink/
translation/wrappers/streaming/DoFnOperator.java#L289
<https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a6
3841af9b99/runners/flink/runner/src/main/java/org/
apache/beam/runners/flink/translation/wrappers/
streaming/DoFnOperator.java#L289>),
the entire input
data, or anywhere in between.

Or, as Thomas mentioned, a runner could process an entire
<https://github.com/apache/beam/blob/master/runners/
spark/src/main/java/org/apache/beam/runners/spark/translation/
SparkProcessContext.java#L57>
partition of the data as a bundle. It basically depends on the runner's
internal processing model.

On Wed, Jan 25, 2017 at 10:05 AM, Robert Bradshaw <
rober...@google.com.invalid> wrote:

Bundles are simply the unit of commitment (retry) in the Beam SDK.
They're not really a model concept, but do leak from the
implementation into the API as it's not feasible to checkpoint every
individual process call, and this allows some state/compute/... to be
safely amortized across elements (either the results of all processed
elements in a bundle are sent downstream, or none are and the entire
bundle is retried).

On Wed, Jan 25, 2017 at 9:36 AM, Matthew Jadczak <mn...@cam.ac.uk>
wrote:
Hi,

I’m a finalist CompSci student at the University of Cambridge, and
for
my final project/dissertation I am writing an implementation of the
Beam
SDK in Elixir [1]. Given that the Beam project is obviously still very
much
WIP, it’s still somewhat difficult to find good conceptual overviews
of
parts of the system, which is crucial when translating the OOP
architecture
to something completely different. However I have found many of the
design
docs scattered around the JIRA and here very helpful. (Incidentally,
perhaps it would be helpful to maintain a list of them, to help any
contributors acquaint themselves with the conceptual vision of the
implementation?)
One thing which I have not yet been able to work out is the
significance
of “bundles” in the SDK. On the one hand, it seems that they are
simply
an
implementation detail, effectively a way to do micro-batch processing
efficiently, and indeed they are not mentioned at all in the original
Dataflow paper or anywhere in the Beam docs (except in passing). On
the
other hand, it seems most of the key transforms in the SDK core have a
concept of bundles and operate in their terms in practice, while all
conceptually being described as just operating on elements.
Do bundles have semantic meaning in the Beam Model? Are there any
guidelines as to how a given transform should split its output up into
bundles? Should any runner/SDK implementing the Model have that
concept,
even when other primitives for streaming data processing including
things
like efficiently transmitting individual elements between stages with
backpressure are available in the language/standard libraries? Are
there
any insights here that I am missing, i.e. were problems present in
early
versions of the runners solved by adding the concept of bundles?
Thanks so much,
Matt

[1] http://elixir-lang.org/

Reply via email to