I see. I guess I got a little confused since these are mentioned in the Authoring a Runner <https://beam.apache.org/contribute/runner-guide/#the-runner-api-protos> docs page which implied to me that they'd be safe to use. I'll check out the bundle_processor. Thanks!
On Mon, Jul 10, 2023 at 1:07 PM Robert Bradshaw <[email protected]> wrote: > On Sun, Jul 9, 2023 at 9:22 AM Joey Tran <[email protected]> > wrote: > >> Working on this on and off now and getting some pretty good traction. >> >> One thing I'm a little worried about is all the classes that are marked >> "internal use only". A lot of these seem either very useful or possibly >> critical to writing a runner. How strictly should I interpret these private >> implementation labels? >> >> A few bits that I'm interested in using ordered by how surprised I was to >> find that they're internal only. >> >> - apache_bean.pipeline.AppliedPTransform >> - apache_beam.pipeline.PipelineVisitor >> - apache_beam.runners.common.DoFnRunner >> > > The public API is the protos. You should not have to interact > with AppliedPTransform and PipelineVisitor directly (and while you can > reach in and do so, there are no promises here and these are subject to > change). As for DoFnRunner, if you're trying to reach in at this level > you're probably going to have to be replicating a bunch of surrounding > infrastructure as well. I would recommend using a BundleProcessor [1] to > coordinate the work (which will internally wire up the chain of DoFns > correctly and take them through their proper lifecycle). As mentioned > above, you can directly borrow the translations in fn_api_runner to go from > a full Pipeline graph (proto) to a set of fused DoFns to execute in > topological order (as ProcessBundleDescriptor protos, which is > what BundleProcessor accepts). > > [1] > https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L851 > > >> Thanks again for the help, >> Joey >> >> On Fri, Jun 23, 2023 at 8:34 PM Chamikara Jayalath <[email protected]> >> wrote: >> >>> Another advantage of a portable runner would be that it will be using >>> well defined and backwards compatible Beam portable APIs to communicate >>> with SDKs. I think this is specially important for runners that do not live >>> in the Beam repo since otherwise future SDK releases could break your >>> runner in subtle ways. Also portability gives you more flexibility when it >>> comes to choosing an SDK to define the pipeline and will allow you to >>> execute transforms in any SDK via cross-language. >>> >>> Thanks, >>> Cham >>> >>> On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user < >>> [email protected]> wrote: >>> >>>> >>>> >>>> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran <[email protected]> >>>> wrote: >>>> >>>>> Totally doable by one person, especially given the limited feature set >>>>>> you mention above. >>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE >>>>>> is >>>>>> a good starting point as to what the relationship between a Runner and >>>>>> the >>>>>> SDK is at a level of detail sufficient for implementation (told from the >>>>>> perspective of an SDK, but the story is largely about the interface which >>>>>> is directly applicable). >>>>> >>>>> >>>>> Great slides, I really appreciate the illustrations. >>>>> >>>>> I hadn't realized there was a concept of an "SDK Worker", I had >>>>> imagined that once the Runner started execution of a workflow, it was >>>>> Runner all the way down. Is the Fn API the only way to implement a runner? >>>>> Our execution environment is a bit constrained in such a way that we can't >>>>> expose the APIs required to implement the Fn API. To be forthright, we >>>>> basically only have the ability to start a worker either with a known >>>>> Pub/Sub topic to expect data from and a Pub/Sub topic to write to; or with >>>>> a bundle of data to process and return the outputs for. We're constrained >>>>> from really any additional communication with a worker beyond that. >>>>> >>>> >>>> The "worker" abstraction gives the ability to wrap any user code in a >>>> way that it can be called from any runner. If you're willing to constrain >>>> the code you're wrapping (e.g. "Python DoFns only") then this "worker" can >>>> be a logical, rather than physical, concept. >>>> >>>> Another way to look at it is that in practice, the "runner" often has >>>> its own notion of "workers" which wrap (often in a 1:1 way) the logical >>>> "SDK Worker" (which in turn invokes the actual DoFns). This latter may be >>>> inlined (e.g. if it's 100% Python on both sides). See, for example, >>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py#L350 >>>> >>>> >>>>> On Fri, Jun 23, 2023 at 4:02 PM Robert Bradshaw <[email protected]> >>>>> wrote: >>>>> >>>>>> On Fri, Jun 23, 2023 at 11:15 AM Joey Tran <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Thanks all for the responses! >>>>>>> >>>>>>> If Beam Runner Authoring Guide is rather high-level for you, then, >>>>>>>> at fist, I’d suggest to answer two questions for yourself: >>>>>>>> - Am I going to implement a portable runner or native one? >>>>>>>> >>>>>>> >>>>>>> Portable sounds great, but the answer depends on how much additional >>>>>>> cost it'd require to implement portable over non-portable, even >>>>>>> considering >>>>>>> future deprecation (unless deprecation is happening tomorrow). I'm not >>>>>>> familiar enough to know what the additional cost is so I don't have a >>>>>>> firm >>>>>>> answer. >>>>>>> >>>>>> >>>>>> I would way it would not be that expensive to write it in a "portable >>>>>> compatible" way (i.e it uses the publicly-documented protocol as the >>>>>> interface rather than reaching into internal details) even if it doesn't >>>>>> use GRCP and fire up separate processes/docker images for the workers >>>>>> (preferring to do tall of that inline like the Python portable direct >>>>>> runner does). >>>>>> >>>>>> >>>>>>> - Which SDK I should use for this runner? >>>>>>>> >>>>>>> I'd be developing this runner against the python SDK and if the >>>>>>> runner only worked with the python SDK that'd be okay in the short term >>>>>>> >>>>>> >>>>>> Yes. And if you do it the above way, it should be easy to extend (or >>>>>> not) if/when the need arises. >>>>>> >>>>>> >>>>>>> Also, we don’t know if this new runner will be contributed back to >>>>>>>> Beam, what is a runtime and what actually is a final goal of it. >>>>>>> >>>>>>> Likely won't be contributed back to Beam (not sure if it'd actually >>>>>>> be useful to a wide audience anyways). >>>>>>> >>>>>>> The context is we've been developing an in-house large-scale >>>>>>> pipeline framework that encapsulates both the programming model and the >>>>>>> runner/execution of data workflows. As it's grown, I keep finding myself >>>>>>> just reimplementing features and abstractions Beam has already >>>>>>> implemented, >>>>>>> so I wanted to explore adopting Beam. Our execution environment is very >>>>>>> particular though and our workflows require it (due to the way we >>>>>>> license >>>>>>> our software), so my plan was to try to create a very basic runner that >>>>>>> uses our execution environment. The runner could have very few features >>>>>>> e.g. no streaming, no metrics, no side inputs, etc. After that I'd >>>>>>> probably >>>>>>> introduce a shim for some of our internally implemented transforms and >>>>>>> assess from there. >>>>>>> >>>>>>> Not sure if this is a lofty goal or not, so happy to hear your >>>>>>> thoughts as to whether this seems reasonable and achievable without a >>>>>>> large >>>>>>> concerted effort or even if the general idea makes any sense. (I >>>>>>> recognize >>>>>>> that it might not be *easy*, but I don't have the resources to >>>>>>> dedicate more than myself to work on a PoC) >>>>>>> >>>>>> >>>>>> Totally doable by one person, especially given the limited feature >>>>>> set you mention above. >>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE >>>>>> is a good starting point as to what the relationship between a Runner and >>>>>> the SDK is at a level of detail sufficient for implementation (told from >>>>>> the perspective of an SDK, but the story is largely about the interface >>>>>> which is directly applicable). >>>>>> >>>>>> Given the limited feature set you proposed, this is similar to the >>>>>> original Python portable runner which took a week or two to put together >>>>>> (granted a lot has been added since then), or the typescript direct >>>>>> runner >>>>>> ( >>>>>> https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts >>>>>> ) which was done (in its basic form, no support for side inputs and such) >>>>>> in less than a week. Granted, as these are local runners, this >>>>>> illustrates >>>>>> only the Beam-side complexity of things (not the work of actually >>>>>> implementing a distributed shuffle, starting and assigning work to >>>>>> multiple >>>>>> workers, etc. but presumably that's the kind of thing your execution >>>>>> environment already takes care of. >>>>>> >>>>>> As for some more concrete pointers, you could probably leverage a lot >>>>>> of what's there by invoking create_stages >>>>>> >>>>>> >>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362 >>>>>> >>>>>> which will do optimization, fusion, etc. and then implementing your >>>>>> own version of run_stages >>>>>> >>>>>> >>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392 >>>>>> >>>>>> to execute these in topological order on your compute infrastructure. >>>>>> (If you're not doing streaming, this is much more straightforward than >>>>>> all >>>>>> the bundler scheduler stuff that currently exists in that code). >>>>>> >>>>>> >>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> If Beam Runner Authoring Guide is rather high-level for you, then, >>>>>>>>> at fist, I’d suggest to answer two questions for yourself: >>>>>>>>> - Am I going to implement a portable runner or native one? >>>>>>>>> >>>>>>>> >>>>>>>> The answer to this should be portable, as non-portable ones will be >>>>>>>> deprecated. >>>>>>>> >>>>>>>> >>>>>>>> Well, actually this is a question that I don’t remember we >>>>>>>> discussed here in details before and had a common agreement. >>>>>>>> >>>>>>>> Actually, I’m not sure that I understand clearly what is meant by >>>>>>>> “deprecation" in this case. For example, Portable Spark Runner is >>>>>>>> heavily >>>>>>>> actually based on native Spark RDD runner and its translations. So, >>>>>>>> which >>>>>>>> part should be deprecated and what is a reason for that? >>>>>>>> >>>>>>>> Well, anyway I guess it’s off topic here. >>>>>>>> >>>>>>>> Also, we don’t know if this new runner will be contributed back to >>>>>>>> Beam, what is a runtime and what actually is a final goal of it. >>>>>>>> So I agree that more details on this would be useful. >>>>>>>> >>>>>>>> — >>>>>>>> Alexey >>>>>>>> >>>>>>>> >>>>>>>> - Which SDK I should use for this runner? >>>>>>>>> >>>>>>>> >>>>>>>> The answer to the above question makes this one moot :). >>>>>>>> >>>>>>>> On a more serious note, could you tell us a bit more about the >>>>>>>> runner you're looking at implementing? >>>>>>>> >>>>>>>> >>>>>>>>> Then, depending on answers, I’d suggest to take as an example one >>>>>>>>> of the most similar Beam runners and use it as a more detailed source >>>>>>>>> of >>>>>>>>> information along with Beam runner doc mentioned before. >>>>>>>>> >>>>>>>>> — >>>>>>>>> Alexey >>>>>>>>> >>>>>>>>> On 22 Jun 2023, at 14:39, Joey Tran <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Hi Beam community! >>>>>>>>> >>>>>>>>> I'm interested in trying to implement a runner with my company's >>>>>>>>> execution environment but I'm struggling to get started. I've read >>>>>>>>> the docs >>>>>>>>> page >>>>>>>>> <https://beam.apache.org/contribute/runner-guide/#testing-your-runner> >>>>>>>>> on implementing a runner but it's quite high level. Anyone have any >>>>>>>>> concrete suggestions on getting started? >>>>>>>>> >>>>>>>>> I've started by cloning and running the hello world example >>>>>>>>> <https://github.com/apache/beam-starter-python>. I've then >>>>>>>>> subclassed `PipelineRunner >>>>>>>>> <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>` >>>>>>>>> to create my own custom runner but at this point I'm a bit stuck. My >>>>>>>>> custom >>>>>>>>> runner just looks like >>>>>>>>> >>>>>>>>> class CustomRunner(runner.PipelineRunner): >>>>>>>>> def run_pipeline(self, pipeline, >>>>>>>>> options): >>>>>>>>> self.visit_transforms(pipeline, options) >>>>>>>>> >>>>>>>>> And when using it I get an error about not having implemented >>>>>>>>> "Impulse" >>>>>>>>> >>>>>>>>> NotImplementedError: Execution of [<Impulse(PTransform) >>>>>>>>> label=[Impulse]>] not implemented in runner <my_app.app.CustomRunner >>>>>>>>> object >>>>>>>>> at 0x135d9ff40>. >>>>>>>>> >>>>>>>>> Am I going about this the right way? Are there tests I can run my >>>>>>>>> custom runner against to validate it beyond just running the hello >>>>>>>>> world >>>>>>>>> example? I'm finding myself just digging through the beam source to >>>>>>>>> try to >>>>>>>>> piece together how a runner works and I'm struggling to get a >>>>>>>>> foothold. Any >>>>>>>>> guidance would be greatly appreciated, especially if anyone has any >>>>>>>>> experience implementing their own python runner. >>>>>>>>> >>>>>>>>> Thanks in advance! Also, could I get a Slack invite? >>>>>>>>> Cheers, >>>>>>>>> Joey >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>
