Sure, if I wasn’t clear, what I meant was that I would indeed like to align my 
code with the Beam technical vision, as opposed to current implementation 
details. Since this is a university project I cannot release the source code 
until July most likely, so I am trying to align the high-level APIs with the 
technical vision and separate this out from artefacts of the organic growth of 
the Java/Python codebases which are likely to change and evolve in the next 
months.

My aim right now is to make this codebase possible to be extended in the future 
to actually interoperate with the existing SDKs and runners through the Fn API, 
as well as provide a Runner capable of running pipelines originating from 
another SDK, but that functionality is outside the scope of what I am doing—the 
primary areas of interest are the process and considerations of porting a large 
and complex OOP system into the functional/actor based Elixir, as well as in 
the runner exploring how well the capabilities of the Erlang VM/OTP/libraries 
can be leveraged to get good parallel and distributed execution of these 
pipelines.

As I understand it, your point is that in the current common pipeline 
representation, a DoFn would be expected to be able to handle startBundle and 
finishBundle calls, but my immediate goal is to align more closely with the 
idealised, theoretical model initially, with practical interop work shifted 
later as it’s out of the scope of my project at the time.

Please let me know if I have misunderstood you, and thanks for the input!


On 25 January 2017 at 22:06:15, Lukasz Cwik ([email protected]) wrote:

Apache Beam is attempting to reduce the amount of work to create an SDK by  
allowing one to use a Runner written within a different language. Within  
the Apache Beam technical vision [1] we discuss a world where an SDK is  
made portable by using a common pipeline representation (part of Runner Api  
[2]) and an execution model (Fn Api [3]). When these two pieces are  
combined, an SDK author would only need to write their language specific  
SDK component and a container which is able to understand and execute it.  
As a longer term goal, aligning your code with the Beam technical vision  
will help ease integrating into the community.  

1:  
https://docs.google.com/presentation/d/1Tc9MdXTDicb6jVCrXjsCbbnLYQCxYiKlTYdVpRkYdBQ/edit#slide=id.g11bcfc06a9_1_1098
  
2: http://s.apache.org/beam-runner-api  
3: http://s.apache.org/beam-fn-api  


On Wed, Jan 25, 2017 at 4:09 PM, Matthew Jadczak <[email protected]> wrote:  

> Thanks! So if I’m understanding right, with a greenfield implementation  
> that does not have to worry about actual interop with other Beam  
> SDKs/runners in the near future, implementing setup/teardown callbacks as  
> well as the state/timer API [1] for DoFns, and handling any committing and  
> retrying as a runner implementation detail would be the best approach?  
>  
> [1] https://s.apache.org/beam-state  
> There's actually not a JIRA filed beyond BEAM-25 for what Eugene is  
> referring to. Context: Prior to windowing and streaming it was safe to  
> buffer elements in @ProcessElement and then actually perform output in  
> @FinishBundle. This pattern is only suitable for global windowing, flushing  
> to external systems, or requires perhaps complex manual window hackery. So  
> now we'll need a new callback @OnWindowExpiration that occurs  
> per-resident-window, before @FinishBundle, for producing output based on  
> remaining state before it is discarded.  
>  
>  
> On Wed, Jan 25, 2017 at 11:00 AM, Eugene Kirpichov <[email protected]>  
> wrote:  
>  
> > One more thing.  
> >  
> > I think ideally, bundles should not leak into the model at all - e.g.  
> > ideally, startBundle/finishBundle methods in DoFn should not exist. They  
> > interact poorly with windowing.  
> > The proper way to address what is commonly done in these methods is  
> either  
> > Setup/Teardown methods, or a (to be designed) "window close" callback -  
> > there's a JIRA about the latter but I couldn't find it, perhaps +Kenn  
> > Knowles <[email protected]> remembers what it is.  
> >  
> > On Wed, Jan 25, 2017 at 10:41 AM Amit Sela <[email protected]> wrote:  
> >  
> >> On Wed, Jan 25, 2017 at 8:23 PM Thomas Groh <[email protected]>  
> >> wrote:  
> >>  
> >> > I have a couple of points in addition to what Robert said  
> >> >  
> >> > Runners are permitted to determine bundle sizes as appropriate to  
> their  
> >> > implementation, so long as bundles are atomically committed. The  
> >> contents  
> >> > of a PCollection are independent of the bundling of that PCollection.  
> >> >  
> >> > Runners can process all elements within their own bundles (e.g.  
> >> > https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a6  
> >> >  
> >> > 3841af9b99/runners/flink/runner/src/main/java/org/  
> >> apache/beam/runners/flink/  
> >> > translation/wrappers/streaming/DoFnOperator.java#L289  
> >> > <https://github.com/apache/beam/blob/a6810372b003adf24bdbe34ed764a6  
> >> 3841af9b99/runners/flink/runner/src/main/java/org/  
> >> apache/beam/runners/flink/translation/wrappers/  
> >> streaming/DoFnOperator.java#L289>),  
> >> > the entire input  
> >> > data, or anywhere in between.  
> >> >  
> >> Or, as Thomas mentioned, a runner could process an entire  
> >> <https://github.com/apache/beam/blob/master/runners/  
> >> spark/src/main/java/org/apache/beam/runners/spark/translation/  
> >> SparkProcessContext.java#L57>  
> >> partition of the data as a bundle. It basically depends on the runner's  
> >> internal processing model.  
> >>  
> >> >  
> >> > On Wed, Jan 25, 2017 at 10:05 AM, Robert Bradshaw <  
> >> > [email protected]> wrote:  
> >> >  
> >> > > Bundles are simply the unit of commitment (retry) in the Beam SDK.  
> >> > > They're not really a model concept, but do leak from the  
> >> > > implementation into the API as it's not feasible to checkpoint every  
> >> > > individual process call, and this allows some state/compute/... to  
> be  
> >> > > safely amortized across elements (either the results of all  
> processed  
> >> > > elements in a bundle are sent downstream, or none are and the entire  
> >> > > bundle is retried).  
> >> > >  
> >> > > On Wed, Jan 25, 2017 at 9:36 AM, Matthew Jadczak <[email protected]>  
> >> > wrote:  
> >> > > > Hi,  
> >> > > >  
> >> > > > I’m a finalist CompSci student at the University of Cambridge, and  
> >> for  
> >> > > my final project/dissertation I am writing an implementation of the  
> >> Beam  
> >> > > SDK in Elixir [1]. Given that the Beam project is obviously still  
> very  
> >> > much  
> >> > > WIP, it’s still somewhat difficult to find good conceptual overviews  
> >> of  
> >> > > parts of the system, which is crucial when translating the OOP  
> >> > architecture  
> >> > > to something completely different. However I have found many of the  
> >> > design  
> >> > > docs scattered around the JIRA and here very helpful. (Incidentally,  
> >> > > perhaps it would be helpful to maintain a list of them, to help any  
> >> > > contributors acquaint themselves with the conceptual vision of the  
> >> > > implementation?)  
> >> > > >  
> >> > > > One thing which I have not yet been able to work out is the  
> >> > significance  
> >> > > of “bundles” in the SDK. On the one hand, it seems that they are  
> >> simply  
> >> > an  
> >> > > implementation detail, effectively a way to do micro-batch  
> processing  
> >> > > efficiently, and indeed they are not mentioned at all in the  
> original  
> >> > > Dataflow paper or anywhere in the Beam docs (except in passing). On  
> >> the  
> >> > > other hand, it seems most of the key transforms in the SDK core  
> have a  
> >> > > concept of bundles and operate in their terms in practice, while all  
> >> > > conceptually being described as just operating on elements.  
> >> > > >  
> >> > > > Do bundles have semantic meaning in the Beam Model? Are there any  
> >> > > guidelines as to how a given transform should split its output up  
> into  
> >> > > bundles? Should any runner/SDK implementing the Model have that  
> >> concept,  
> >> > > even when other primitives for streaming data processing including  
> >> things  
> >> > > like efficiently transmitting individual elements between stages  
> with  
> >> > > backpressure are available in the language/standard libraries? Are  
> >> there  
> >> > > any insights here that I am missing, i.e. were problems present in  
> >> early  
> >> > > versions of the runners solved by adding the concept of bundles?  
> >> > > >  
> >> > > > Thanks so much,  
> >> > > > Matt  
> >> > > >  
> >> > > > [1] http://elixir-lang.org/  
> >> > >  
> >> >  
> >>  
> >  
>  

Reply via email to