I took a first pass at https://github.com/apache/beam/blob/be19140f3e9194721f36e57f4a946adc6c43971a/website/www/site/content/en/contribute/runner-guide.md
https://github.com/apache/beam/blob/1cfc0fdc6ff27ad70365683fdc8264f42642f6e9/sdks/python/apache_beam/runners/trivial_runner.py may also be of interest. On Fri, Jul 21, 2023 at 7:25 AM Joey Tran <joey.t...@schrodinger.com> wrote: > > Could you let me know when you update it? I would be interested in rereading > after the rewrite. > > Thanks! > Joey > > On Fri, Jul 14, 2023 at 4:38 PM Robert Bradshaw <rober...@google.com> wrote: >> >> I'm taking an action item to update that page, as it is *way* out of date. >> >> On Thu, Jul 13, 2023 at 6:54 PM Joey Tran <joey.t...@schrodinger.com> wrote: >>> >>> I see. I guess I got a little confused since these are mentioned in the >>> Authoring a Runner docs page which implied to me that they'd be safe to >>> use. I'll check out the bundle_processor. Thanks! >>> >>> On Mon, Jul 10, 2023 at 1:07 PM Robert Bradshaw <rober...@google.com> wrote: >>>> >>>> On Sun, Jul 9, 2023 at 9:22 AM Joey Tran <joey.t...@schrodinger.com> wrote: >>>>> >>>>> Working on this on and off now and getting some pretty good traction. >>>>> >>>>> One thing I'm a little worried about is all the classes that are marked >>>>> "internal use only". A lot of these seem either very useful or possibly >>>>> critical to writing a runner. How strictly should I interpret these >>>>> private implementation labels? >>>>> >>>>> A few bits that I'm interested in using ordered by how surprised I was to >>>>> find that they're internal only. >>>>> >>>>> - apache_bean.pipeline.AppliedPTransform >>>>> - apache_beam.pipeline.PipelineVisitor >>>>> - apache_beam.runners.common.DoFnRunner >>>> >>>> >>>> The public API is the protos. You should not have to interact with >>>> AppliedPTransform and PipelineVisitor directly (and while you can reach in >>>> and do so, there are no promises here and these are subject to change). As >>>> for DoFnRunner, if you're trying to reach in at this level you're probably >>>> going to have to be replicating a bunch of surrounding infrastructure as >>>> well. I would recommend using a BundleProcessor [1] to coordinate the work >>>> (which will internally wire up the chain of DoFns correctly and take them >>>> through their proper lifecycle). As mentioned above, you can directly >>>> borrow the translations in fn_api_runner to go from a full Pipeline graph >>>> (proto) to a set of fused DoFns to execute in topological order (as >>>> ProcessBundleDescriptor protos, which is what BundleProcessor accepts). >>>> >>>> [1] >>>> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L851 >>>> >>>>> >>>>> Thanks again for the help, >>>>> Joey >>>>> >>>>> On Fri, Jun 23, 2023 at 8:34 PM Chamikara Jayalath <chamik...@google.com> >>>>> wrote: >>>>>> >>>>>> Another advantage of a portable runner would be that it will be using >>>>>> well defined and backwards compatible Beam portable APIs to communicate >>>>>> with SDKs. I think this is specially important for runners that do not >>>>>> live in the Beam repo since otherwise future SDK releases could break >>>>>> your runner in subtle ways. Also portability gives you more flexibility >>>>>> when it comes to choosing an SDK to define the pipeline and will allow >>>>>> you to execute transforms in any SDK via cross-language. >>>>>> >>>>>> Thanks, >>>>>> Cham >>>>>> >>>>>> On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user >>>>>> <user@beam.apache.org> wrote: >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran <joey.t...@schrodinger.com> >>>>>>> wrote: >>>>>>>>> >>>>>>>>> Totally doable by one person, especially given the limited feature >>>>>>>>> set you mention above. >>>>>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE >>>>>>>>> is a good starting point as to what the relationship between a >>>>>>>>> Runner and the SDK is at a level of detail sufficient for >>>>>>>>> implementation (told from the perspective of an SDK, but the story is >>>>>>>>> largely about the interface which is directly applicable). >>>>>>>> >>>>>>>> >>>>>>>> Great slides, I really appreciate the illustrations. >>>>>>>> >>>>>>>> I hadn't realized there was a concept of an "SDK Worker", I had >>>>>>>> imagined that once the Runner started execution of a workflow, it was >>>>>>>> Runner all the way down. Is the Fn API the only way to implement a >>>>>>>> runner? Our execution environment is a bit constrained in such a way >>>>>>>> that we can't expose the APIs required to implement the Fn API. To be >>>>>>>> forthright, we basically only have the ability to start a worker >>>>>>>> either with a known Pub/Sub topic to expect data from and a Pub/Sub >>>>>>>> topic to write to; or with a bundle of data to process and return the >>>>>>>> outputs for. We're constrained from really any additional >>>>>>>> communication with a worker beyond that. >>>>>>> >>>>>>> >>>>>>> The "worker" abstraction gives the ability to wrap any user code in a >>>>>>> way that it can be called from any runner. If you're willing to >>>>>>> constrain the code you're wrapping (e.g. "Python DoFns only") then this >>>>>>> "worker" can be a logical, rather than physical, concept. >>>>>>> >>>>>>> Another way to look at it is that in practice, the "runner" often has >>>>>>> its own notion of "workers" which wrap (often in a 1:1 way) the logical >>>>>>> "SDK Worker" (which in turn invokes the actual DoFns). This latter may >>>>>>> be inlined (e.g. if it's 100% Python on both sides). See, for example, >>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py#L350 >>>>>>> >>>>>>>> >>>>>>>> On Fri, Jun 23, 2023 at 4:02 PM Robert Bradshaw <rober...@google.com> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>> On Fri, Jun 23, 2023 at 11:15 AM Joey Tran >>>>>>>>> <joey.t...@schrodinger.com> wrote: >>>>>>>>>> >>>>>>>>>> Thanks all for the responses! >>>>>>>>>> >>>>>>>>>>> If Beam Runner Authoring Guide is rather high-level for you, then, >>>>>>>>>>> at fist, I’d suggest to answer two questions for yourself: >>>>>>>>>>> - Am I going to implement a portable runner or native one? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Portable sounds great, but the answer depends on how much additional >>>>>>>>>> cost it'd require to implement portable over non-portable, even >>>>>>>>>> considering future deprecation (unless deprecation is happening >>>>>>>>>> tomorrow). I'm not familiar enough to know what the additional cost >>>>>>>>>> is so I don't have a firm answer. >>>>>>>>> >>>>>>>>> >>>>>>>>> I would way it would not be that expensive to write it in a "portable >>>>>>>>> compatible" way (i.e it uses the publicly-documented protocol as the >>>>>>>>> interface rather than reaching into internal details) even if it >>>>>>>>> doesn't use GRCP and fire up separate processes/docker images for the >>>>>>>>> workers (preferring to do tall of that inline like the Python >>>>>>>>> portable direct runner does). >>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> - Which SDK I should use for this runner? >>>>>>>>>> >>>>>>>>>> I'd be developing this runner against the python SDK and if the >>>>>>>>>> runner only worked with the python SDK that'd be okay in the short >>>>>>>>>> term >>>>>>>>> >>>>>>>>> >>>>>>>>> Yes. And if you do it the above way, it should be easy to extend (or >>>>>>>>> not) if/when the need arises. >>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Also, we don’t know if this new runner will be contributed back to >>>>>>>>>>> Beam, what is a runtime and what actually is a final goal of it. >>>>>>>>>> >>>>>>>>>> Likely won't be contributed back to Beam (not sure if it'd actually >>>>>>>>>> be useful to a wide audience anyways). >>>>>>>>>> >>>>>>>>>> The context is we've been developing an in-house large-scale >>>>>>>>>> pipeline framework that encapsulates both the programming model and >>>>>>>>>> the runner/execution of data workflows. As it's grown, I keep >>>>>>>>>> finding myself just reimplementing features and abstractions Beam >>>>>>>>>> has already implemented, so I wanted to explore adopting Beam. Our >>>>>>>>>> execution environment is very particular though and our workflows >>>>>>>>>> require it (due to the way we license our software), so my plan was >>>>>>>>>> to try to create a very basic runner that uses our execution >>>>>>>>>> environment. The runner could have very few features e.g. no >>>>>>>>>> streaming, no metrics, no side inputs, etc. After that I'd probably >>>>>>>>>> introduce a shim for some of our internally implemented transforms >>>>>>>>>> and assess from there. >>>>>>>>>> >>>>>>>>>> Not sure if this is a lofty goal or not, so happy to hear your >>>>>>>>>> thoughts as to whether this seems reasonable and achievable without >>>>>>>>>> a large concerted effort or even if the general idea makes any >>>>>>>>>> sense. (I recognize that it might not be easy, but I don't have the >>>>>>>>>> resources to dedicate more than myself to work on a PoC) >>>>>>>>> >>>>>>>>> >>>>>>>>> Totally doable by one person, especially given the limited feature >>>>>>>>> set you mention above. >>>>>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE >>>>>>>>> is a good starting point as to what the relationship between a >>>>>>>>> Runner and the SDK is at a level of detail sufficient for >>>>>>>>> implementation (told from the perspective of an SDK, but the story is >>>>>>>>> largely about the interface which is directly applicable). >>>>>>>>> >>>>>>>>> Given the limited feature set you proposed, this is similar to the >>>>>>>>> original Python portable runner which took a week or two to put >>>>>>>>> together (granted a lot has been added since then), or the typescript >>>>>>>>> direct runner ( >>>>>>>>> https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts >>>>>>>>> ) which was done (in its basic form, no support for side inputs and >>>>>>>>> such) in less than a week. Granted, as these are local runners, this >>>>>>>>> illustrates only the Beam-side complexity of things (not the work of >>>>>>>>> actually implementing a distributed shuffle, starting and assigning >>>>>>>>> work to multiple workers, etc. but presumably that's the kind of >>>>>>>>> thing your execution environment already takes care of. >>>>>>>>> >>>>>>>>> As for some more concrete pointers, you could probably leverage a lot >>>>>>>>> of what's there by invoking create_stages >>>>>>>>> >>>>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362 >>>>>>>>> >>>>>>>>> which will do optimization, fusion, etc. and then implementing your >>>>>>>>> own version of run_stages >>>>>>>>> >>>>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392 >>>>>>>>> >>>>>>>>> to execute these in topological order on your compute infrastructure. >>>>>>>>> (If you're not doing streaming, this is much more straightforward >>>>>>>>> than all the bundler scheduler stuff that currently exists in that >>>>>>>>> code). >>>>>>>>> >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko >>>>>>>>>> <aromanenko....@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user >>>>>>>>>>> <user@beam.apache.org> wrote: >>>>>>>>>>> >>>>>>>>>>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko >>>>>>>>>>> <aromanenko....@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>> If Beam Runner Authoring Guide is rather high-level for you, then, >>>>>>>>>>>> at fist, I’d suggest to answer two questions for yourself: >>>>>>>>>>>> - Am I going to implement a portable runner or native one? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> The answer to this should be portable, as non-portable ones will be >>>>>>>>>>> deprecated. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Well, actually this is a question that I don’t remember we >>>>>>>>>>> discussed here in details before and had a common agreement. >>>>>>>>>>> >>>>>>>>>>> Actually, I’m not sure that I understand clearly what is meant by >>>>>>>>>>> “deprecation" in this case. For example, Portable Spark Runner is >>>>>>>>>>> heavily actually based on native Spark RDD runner and its >>>>>>>>>>> translations. So, which part should be deprecated and what is a >>>>>>>>>>> reason for that? >>>>>>>>>>> >>>>>>>>>>> Well, anyway I guess it’s off topic here. >>>>>>>>>>> >>>>>>>>>>> Also, we don’t know if this new runner will be contributed back to >>>>>>>>>>> Beam, what is a runtime and what actually is a final goal of it. >>>>>>>>>>> So I agree that more details on this would be useful. >>>>>>>>>>> >>>>>>>>>>> — >>>>>>>>>>> Alexey >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> - Which SDK I should use for this runner? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> The answer to the above question makes this one moot :). >>>>>>>>>>> >>>>>>>>>>> On a more serious note, could you tell us a bit more about the >>>>>>>>>>> runner you're looking at implementing? >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Then, depending on answers, I’d suggest to take as an example one >>>>>>>>>>>> of the most similar Beam runners and use it as a more detailed >>>>>>>>>>>> source of information along with Beam runner doc mentioned before. >>>>>>>>>>>> >>>>>>>>>>>> — >>>>>>>>>>>> Alexey >>>>>>>>>>>> >>>>>>>>>>>> On 22 Jun 2023, at 14:39, Joey Tran <joey.t...@schrodinger.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi Beam community! >>>>>>>>>>>> >>>>>>>>>>>> I'm interested in trying to implement a runner with my company's >>>>>>>>>>>> execution environment but I'm struggling to get started. I've read >>>>>>>>>>>> the docs page on implementing a runner but it's quite high level. >>>>>>>>>>>> Anyone have any concrete suggestions on getting started? >>>>>>>>>>>> >>>>>>>>>>>> I've started by cloning and running the hello world example. I've >>>>>>>>>>>> then subclassed `PipelineRunner` to create my own custom runner >>>>>>>>>>>> but at this point I'm a bit stuck. My custom runner just looks like >>>>>>>>>>>> >>>>>>>>>>>> class CustomRunner(runner.PipelineRunner): >>>>>>>>>>>> def run_pipeline(self, pipeline, >>>>>>>>>>>> options): >>>>>>>>>>>> self.visit_transforms(pipeline, options) >>>>>>>>>>>> >>>>>>>>>>>> And when using it I get an error about not having implemented >>>>>>>>>>>> "Impulse" >>>>>>>>>>>> >>>>>>>>>>>> NotImplementedError: Execution of [<Impulse(PTransform) >>>>>>>>>>>> label=[Impulse]>] not implemented in runner >>>>>>>>>>>> <my_app.app.CustomRunner object at 0x135d9ff40>. >>>>>>>>>>>> >>>>>>>>>>>> Am I going about this the right way? Are there tests I can run my >>>>>>>>>>>> custom runner against to validate it beyond just running the hello >>>>>>>>>>>> world example? I'm finding myself just digging through the beam >>>>>>>>>>>> source to try to piece together how a runner works and I'm >>>>>>>>>>>> struggling to get a foothold. Any guidance would be greatly >>>>>>>>>>>> appreciated, especially if anyone has any experience implementing >>>>>>>>>>>> their own python runner. >>>>>>>>>>>> >>>>>>>>>>>> Thanks in advance! Also, could I get a Slack invite? >>>>>>>>>>>> Cheers, >>>>>>>>>>>> Joey >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>