Apache Beam is attempting to reduce the amount of work to create an SDK by allowing one to use a Runner written within a different language. Within the Apache Beam technical vision [1] we discuss a world where an SDK is made portable by using a common pipeline representation (part of Runner Api [2]) and an execution model (Fn Api [3]). When these two pieces are combined, an SDK author would only need to write their language specific SDK component and a container which is able to understand and execute it. As a longer term goal, aligning your code with the Beam technical vision will help ease integrating into the community.
1: https://docs.google.com/presentation/d/1Tc9MdXTDicb6jVCrXjsCbbnLYQCxYiKlTYdVpRkYdBQ/edit#slide=id.g11bcfc06a9_1_1098 2: http://s.apache.org/beam-runner-api 3: http://s.apache.org/beam-fn-api On Wed, Jan 25, 2017 at 4:09 PM, Matthew Jadczak <[email protected]> wrote: > Thanks! So if I’m understanding right, with a greenfield implementation > that does not have to worry about actual interop with other Beam > SDKs/runners in the near future, implementing setup/teardown callbacks as > well as the state/timer API [1] for DoFns, and handling any committing and > retrying as a runner implementation detail would be the best approach? > > [1] https://s.apache.org/beam-state > There's actually not a JIRA filed beyond BEAM-25 for what Eugene is > referring to. Context: Prior to windowing and streaming it was safe to > buffer elements in @ProcessElement and then actually perform output in > @FinishBundle. This pattern is only suitable for global windowing, flushing > to external systems, or requires perhaps complex manual window hackery. So > now we'll need a new callback @OnWindowExpiration that occurs > per-resident-window, before @FinishBundle, for producing output based on > remaining state before it is discarded. > > > On Wed, Jan 25, 2017 at 11:00 AM, Eugene Kirpichov <[email protected]> > wrote: > > > One more thing. > > > > I think ideally, bundles should not leak into the model at all - e.g. > > ideally, startBundle/finishBundle methods in DoFn should not exist. They > > interact poorly with windowing. > > The proper way to address what is commonly done in these methods is > either > > Setup/Teardown methods, or a (to be designed) "window close" callback - > > there's a JIRA about the latter but I couldn't find it, perhaps +Kenn > > Knowles <[email protected]> remembers what it is. > > > > On Wed, Jan 25, 2017 at 10:41 AM Amit Sela <[email protected]> wrote: > > > >> On Wed, Jan 25, 2017 at 8:23 PM Thomas Groh <[email protected]> > >> wrote: > >> > >> > I have a couple of points in addition to what Robert said > >> > > >> > Runners are permitted to determine bundle sizes as appropriate to > their > >> > implementation, so long as bundles are atomically committed. The > >> contents > >> > of a PCollection are independent of the bundling of that PCollection. > >> > > >> > Runners can process all elements within their own bundles (e.g. > >> > https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a6 > >> > > >> > 3841af9b99/runners/flink/runner/src/main/java/org/ > >> apache/beam/runners/flink/ > >> > translation/wrappers/streaming/DoFnOperator.java#L289 > >> > <https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a6 > >> 3841af9b99/runners/flink/runner/src/main/java/org/ > >> apache/beam/runners/flink/translation/wrappers/ > >> streaming/DoFnOperator.java#L289>), > >> > the entire input > >> > data, or anywhere in between. > >> > > >> Or, as Thomas mentioned, a runner could process an entire > >> <https://github.com/apache/beam/blob/master/runners/ > >> spark/src/main/java/org/apache/beam/runners/spark/translation/ > >> SparkProcessContext.java#L57> > >> partition of the data as a bundle. It basically depends on the runner's > >> internal processing model. > >> > >> > > >> > On Wed, Jan 25, 2017 at 10:05 AM, Robert Bradshaw < > >> > [email protected]> wrote: > >> > > >> > > Bundles are simply the unit of commitment (retry) in the Beam SDK. > >> > > They're not really a model concept, but do leak from the > >> > > implementation into the API as it's not feasible to checkpoint every > >> > > individual process call, and this allows some state/compute/... to > be > >> > > safely amortized across elements (either the results of all > processed > >> > > elements in a bundle are sent downstream, or none are and the entire > >> > > bundle is retried). > >> > > > >> > > On Wed, Jan 25, 2017 at 9:36 AM, Matthew Jadczak <[email protected]> > >> > wrote: > >> > > > Hi, > >> > > > > >> > > > I’m a finalist CompSci student at the University of Cambridge, and > >> for > >> > > my final project/dissertation I am writing an implementation of the > >> Beam > >> > > SDK in Elixir [1]. Given that the Beam project is obviously still > very > >> > much > >> > > WIP, it’s still somewhat difficult to find good conceptual overviews > >> of > >> > > parts of the system, which is crucial when translating the OOP > >> > architecture > >> > > to something completely different. However I have found many of the > >> > design > >> > > docs scattered around the JIRA and here very helpful. (Incidentally, > >> > > perhaps it would be helpful to maintain a list of them, to help any > >> > > contributors acquaint themselves with the conceptual vision of the > >> > > implementation?) > >> > > > > >> > > > One thing which I have not yet been able to work out is the > >> > significance > >> > > of “bundles” in the SDK. On the one hand, it seems that they are > >> simply > >> > an > >> > > implementation detail, effectively a way to do micro-batch > processing > >> > > efficiently, and indeed they are not mentioned at all in the > original > >> > > Dataflow paper or anywhere in the Beam docs (except in passing). On > >> the > >> > > other hand, it seems most of the key transforms in the SDK core > have a > >> > > concept of bundles and operate in their terms in practice, while all > >> > > conceptually being described as just operating on elements. > >> > > > > >> > > > Do bundles have semantic meaning in the Beam Model? Are there any > >> > > guidelines as to how a given transform should split its output up > into > >> > > bundles? Should any runner/SDK implementing the Model have that > >> concept, > >> > > even when other primitives for streaming data processing including > >> things > >> > > like efficiently transmitting individual elements between stages > with > >> > > backpressure are available in the language/standard libraries? Are > >> there > >> > > any insights here that I am missing, i.e. were problems present in > >> early > >> > > versions of the runners solved by adding the concept of bundles? > >> > > > > >> > > > Thanks so much, > >> > > > Matt > >> > > > > >> > > > [1] http://elixir-lang.org/ > >> > > > >> > > >> > > >
