There's actually not a JIRA filed beyond BEAM-25 for what Eugene is referring to. Context: Prior to windowing and streaming it was safe to buffer elements in @ProcessElement and then actually perform output in @FinishBundle. This pattern is only suitable for global windowing, flushing to external systems, or requires perhaps complex manual window hackery. So now we'll need a new callback @OnWindowExpiration that occurs per-resident-window, before @FinishBundle, for producing output based on remaining state before it is discarded.
On Wed, Jan 25, 2017 at 11:00 AM, Eugene Kirpichov <kirpic...@google.com> wrote: > One more thing. > > I think ideally, bundles should not leak into the model at all - e.g. > ideally, startBundle/finishBundle methods in DoFn should not exist. They > interact poorly with windowing. > The proper way to address what is commonly done in these methods is either > Setup/Teardown methods, or a (to be designed) "window close" callback - > there's a JIRA about the latter but I couldn't find it, perhaps +Kenn > Knowles <k...@google.com> remembers what it is. > > On Wed, Jan 25, 2017 at 10:41 AM Amit Sela <amitsel...@gmail.com> wrote: > >> On Wed, Jan 25, 2017 at 8:23 PM Thomas Groh <tg...@google.com.invalid> >> wrote: >> >> > I have a couple of points in addition to what Robert said >> > >> > Runners are permitted to determine bundle sizes as appropriate to their >> > implementation, so long as bundles are atomically committed. The >> contents >> > of a PCollection are independent of the bundling of that PCollection. >> > >> > Runners can process all elements within their own bundles (e.g. >> > https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a6 >> > >> > 3841af9b99/runners/flink/runner/src/main/java/org/ >> apache/beam/runners/flink/ >> > translation/wrappers/streaming/DoFnOperator.java#L289 >> > <https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a6 >> 3841af9b99/runners/flink/runner/src/main/java/org/ >> apache/beam/runners/flink/translation/wrappers/ >> streaming/DoFnOperator.java#L289>), >> > the entire input >> > data, or anywhere in between. >> > >> Or, as Thomas mentioned, a runner could process an entire >> <https://github.com/apache/beam/blob/master/runners/ >> spark/src/main/java/org/apache/beam/runners/spark/translation/ >> SparkProcessContext.java#L57> >> partition of the data as a bundle. It basically depends on the runner's >> internal processing model. >> >> > >> > On Wed, Jan 25, 2017 at 10:05 AM, Robert Bradshaw < >> > rober...@google.com.invalid> wrote: >> > >> > > Bundles are simply the unit of commitment (retry) in the Beam SDK. >> > > They're not really a model concept, but do leak from the >> > > implementation into the API as it's not feasible to checkpoint every >> > > individual process call, and this allows some state/compute/... to be >> > > safely amortized across elements (either the results of all processed >> > > elements in a bundle are sent downstream, or none are and the entire >> > > bundle is retried). >> > > >> > > On Wed, Jan 25, 2017 at 9:36 AM, Matthew Jadczak <mn...@cam.ac.uk> >> > wrote: >> > > > Hi, >> > > > >> > > > I’m a finalist CompSci student at the University of Cambridge, and >> for >> > > my final project/dissertation I am writing an implementation of the >> Beam >> > > SDK in Elixir [1]. Given that the Beam project is obviously still very >> > much >> > > WIP, it’s still somewhat difficult to find good conceptual overviews >> of >> > > parts of the system, which is crucial when translating the OOP >> > architecture >> > > to something completely different. However I have found many of the >> > design >> > > docs scattered around the JIRA and here very helpful. (Incidentally, >> > > perhaps it would be helpful to maintain a list of them, to help any >> > > contributors acquaint themselves with the conceptual vision of the >> > > implementation?) >> > > > >> > > > One thing which I have not yet been able to work out is the >> > significance >> > > of “bundles” in the SDK. On the one hand, it seems that they are >> simply >> > an >> > > implementation detail, effectively a way to do micro-batch processing >> > > efficiently, and indeed they are not mentioned at all in the original >> > > Dataflow paper or anywhere in the Beam docs (except in passing). On >> the >> > > other hand, it seems most of the key transforms in the SDK core have a >> > > concept of bundles and operate in their terms in practice, while all >> > > conceptually being described as just operating on elements. >> > > > >> > > > Do bundles have semantic meaning in the Beam Model? Are there any >> > > guidelines as to how a given transform should split its output up into >> > > bundles? Should any runner/SDK implementing the Model have that >> concept, >> > > even when other primitives for streaming data processing including >> things >> > > like efficiently transmitting individual elements between stages with >> > > backpressure are available in the language/standard libraries? Are >> there >> > > any insights here that I am missing, i.e. were problems present in >> early >> > > versions of the runners solved by adding the concept of bundles? >> > > > >> > > > Thanks so much, >> > > > Matt >> > > > >> > > > [1] http://elixir-lang.org/ >> > > >> > >> >