I'm taking an action item to update that page, as it is *way* out of date. On Thu, Jul 13, 2023 at 6:54 PM Joey Tran <[email protected]> wrote:
> I see. I guess I got a little confused since these are mentioned in the > Authoring > a Runner > <https://beam.apache.org/contribute/runner-guide/#the-runner-api-protos> docs > page which implied to me that they'd be safe to use. I'll check out the > bundle_processor. Thanks! > > On Mon, Jul 10, 2023 at 1:07 PM Robert Bradshaw <[email protected]> > wrote: > >> On Sun, Jul 9, 2023 at 9:22 AM Joey Tran <[email protected]> >> wrote: >> >>> Working on this on and off now and getting some pretty good traction. >>> >>> One thing I'm a little worried about is all the classes that are marked >>> "internal use only". A lot of these seem either very useful or possibly >>> critical to writing a runner. How strictly should I interpret these private >>> implementation labels? >>> >>> A few bits that I'm interested in using ordered by how surprised I was >>> to find that they're internal only. >>> >>> - apache_bean.pipeline.AppliedPTransform >>> - apache_beam.pipeline.PipelineVisitor >>> - apache_beam.runners.common.DoFnRunner >>> >> >> The public API is the protos. You should not have to interact >> with AppliedPTransform and PipelineVisitor directly (and while you can >> reach in and do so, there are no promises here and these are subject to >> change). As for DoFnRunner, if you're trying to reach in at this level >> you're probably going to have to be replicating a bunch of surrounding >> infrastructure as well. I would recommend using a BundleProcessor [1] to >> coordinate the work (which will internally wire up the chain of DoFns >> correctly and take them through their proper lifecycle). As mentioned >> above, you can directly borrow the translations in fn_api_runner to go from >> a full Pipeline graph (proto) to a set of fused DoFns to execute in >> topological order (as ProcessBundleDescriptor protos, which is >> what BundleProcessor accepts). >> >> [1] >> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L851 >> >> >>> Thanks again for the help, >>> Joey >>> >>> On Fri, Jun 23, 2023 at 8:34 PM Chamikara Jayalath <[email protected]> >>> wrote: >>> >>>> Another advantage of a portable runner would be that it will be using >>>> well defined and backwards compatible Beam portable APIs to communicate >>>> with SDKs. I think this is specially important for runners that do not live >>>> in the Beam repo since otherwise future SDK releases could break your >>>> runner in subtle ways. Also portability gives you more flexibility when it >>>> comes to choosing an SDK to define the pipeline and will allow you to >>>> execute transforms in any SDK via cross-language. >>>> >>>> Thanks, >>>> Cham >>>> >>>> On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user < >>>> [email protected]> wrote: >>>> >>>>> >>>>> >>>>> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran <[email protected]> >>>>> wrote: >>>>> >>>>>> Totally doable by one person, especially given the limited feature >>>>>>> set you mention above. >>>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE >>>>>>> is >>>>>>> a good starting point as to what the relationship between a Runner and >>>>>>> the >>>>>>> SDK is at a level of detail sufficient for implementation (told from the >>>>>>> perspective of an SDK, but the story is largely about the interface >>>>>>> which >>>>>>> is directly applicable). >>>>>> >>>>>> >>>>>> Great slides, I really appreciate the illustrations. >>>>>> >>>>>> I hadn't realized there was a concept of an "SDK Worker", I had >>>>>> imagined that once the Runner started execution of a workflow, it was >>>>>> Runner all the way down. Is the Fn API the only way to implement a >>>>>> runner? >>>>>> Our execution environment is a bit constrained in such a way that we >>>>>> can't >>>>>> expose the APIs required to implement the Fn API. To be forthright, we >>>>>> basically only have the ability to start a worker either with a known >>>>>> Pub/Sub topic to expect data from and a Pub/Sub topic to write to; or >>>>>> with >>>>>> a bundle of data to process and return the outputs for. We're constrained >>>>>> from really any additional communication with a worker beyond that. >>>>>> >>>>> >>>>> The "worker" abstraction gives the ability to wrap any user code in a >>>>> way that it can be called from any runner. If you're willing to constrain >>>>> the code you're wrapping (e.g. "Python DoFns only") then this "worker" can >>>>> be a logical, rather than physical, concept. >>>>> >>>>> Another way to look at it is that in practice, the "runner" often has >>>>> its own notion of "workers" which wrap (often in a 1:1 way) the logical >>>>> "SDK Worker" (which in turn invokes the actual DoFns). This latter may be >>>>> inlined (e.g. if it's 100% Python on both sides). See, for example, >>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py#L350 >>>>> >>>>> >>>>>> On Fri, Jun 23, 2023 at 4:02 PM Robert Bradshaw <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> On Fri, Jun 23, 2023 at 11:15 AM Joey Tran < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Thanks all for the responses! >>>>>>>> >>>>>>>> If Beam Runner Authoring Guide is rather high-level for you, then, >>>>>>>>> at fist, I’d suggest to answer two questions for yourself: >>>>>>>>> - Am I going to implement a portable runner or native one? >>>>>>>>> >>>>>>>> >>>>>>>> Portable sounds great, but the answer depends on how much >>>>>>>> additional cost it'd require to implement portable over non-portable, >>>>>>>> even >>>>>>>> considering future deprecation (unless deprecation is happening >>>>>>>> tomorrow). >>>>>>>> I'm not familiar enough to know what the additional cost is so I don't >>>>>>>> have >>>>>>>> a firm answer. >>>>>>>> >>>>>>> >>>>>>> I would way it would not be that expensive to write it in a >>>>>>> "portable compatible" way (i.e it uses the publicly-documented protocol >>>>>>> as >>>>>>> the interface rather than reaching into internal details) even if it >>>>>>> doesn't use GRCP and fire up separate processes/docker images for the >>>>>>> workers (preferring to do tall of that inline like the Python portable >>>>>>> direct runner does). >>>>>>> >>>>>>> >>>>>>>> - Which SDK I should use for this runner? >>>>>>>>> >>>>>>>> I'd be developing this runner against the python SDK and if the >>>>>>>> runner only worked with the python SDK that'd be okay in the short term >>>>>>>> >>>>>>> >>>>>>> Yes. And if you do it the above way, it should be easy to extend (or >>>>>>> not) if/when the need arises. >>>>>>> >>>>>>> >>>>>>>> Also, we don’t know if this new runner will be contributed back to >>>>>>>>> Beam, what is a runtime and what actually is a final goal of it. >>>>>>>> >>>>>>>> Likely won't be contributed back to Beam (not sure if it'd actually >>>>>>>> be useful to a wide audience anyways). >>>>>>>> >>>>>>>> The context is we've been developing an in-house large-scale >>>>>>>> pipeline framework that encapsulates both the programming model and the >>>>>>>> runner/execution of data workflows. As it's grown, I keep finding >>>>>>>> myself >>>>>>>> just reimplementing features and abstractions Beam has already >>>>>>>> implemented, >>>>>>>> so I wanted to explore adopting Beam. Our execution environment is very >>>>>>>> particular though and our workflows require it (due to the way we >>>>>>>> license >>>>>>>> our software), so my plan was to try to create a very basic runner that >>>>>>>> uses our execution environment. The runner could have very few features >>>>>>>> e.g. no streaming, no metrics, no side inputs, etc. After that I'd >>>>>>>> probably >>>>>>>> introduce a shim for some of our internally implemented transforms and >>>>>>>> assess from there. >>>>>>>> >>>>>>>> Not sure if this is a lofty goal or not, so happy to hear your >>>>>>>> thoughts as to whether this seems reasonable and achievable without a >>>>>>>> large >>>>>>>> concerted effort or even if the general idea makes any sense. (I >>>>>>>> recognize >>>>>>>> that it might not be *easy*, but I don't have the resources to >>>>>>>> dedicate more than myself to work on a PoC) >>>>>>>> >>>>>>> >>>>>>> Totally doable by one person, especially given the limited feature >>>>>>> set you mention above. >>>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE >>>>>>> is a good starting point as to what the relationship between a Runner >>>>>>> and >>>>>>> the SDK is at a level of detail sufficient for implementation (told from >>>>>>> the perspective of an SDK, but the story is largely about the interface >>>>>>> which is directly applicable). >>>>>>> >>>>>>> Given the limited feature set you proposed, this is similar to the >>>>>>> original Python portable runner which took a week or two to put together >>>>>>> (granted a lot has been added since then), or the typescript direct >>>>>>> runner >>>>>>> ( >>>>>>> https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts >>>>>>> ) which was done (in its basic form, no support for side inputs and >>>>>>> such) >>>>>>> in less than a week. Granted, as these are local runners, this >>>>>>> illustrates >>>>>>> only the Beam-side complexity of things (not the work of actually >>>>>>> implementing a distributed shuffle, starting and assigning work to >>>>>>> multiple >>>>>>> workers, etc. but presumably that's the kind of thing your execution >>>>>>> environment already takes care of. >>>>>>> >>>>>>> As for some more concrete pointers, you could probably leverage a >>>>>>> lot of what's there by invoking create_stages >>>>>>> >>>>>>> >>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362 >>>>>>> >>>>>>> which will do optimization, fusion, etc. and then implementing your >>>>>>> own version of run_stages >>>>>>> >>>>>>> >>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392 >>>>>>> >>>>>>> to execute these in topological order on your compute >>>>>>> infrastructure. (If you're not doing streaming, this is much more >>>>>>> straightforward than all the bundler scheduler stuff that currently >>>>>>> exists >>>>>>> in that code). >>>>>>> >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> If Beam Runner Authoring Guide is rather high-level for you, >>>>>>>>>> then, at fist, I’d suggest to answer two questions for yourself: >>>>>>>>>> - Am I going to implement a portable runner or native one? >>>>>>>>>> >>>>>>>>> >>>>>>>>> The answer to this should be portable, as non-portable ones will >>>>>>>>> be deprecated. >>>>>>>>> >>>>>>>>> >>>>>>>>> Well, actually this is a question that I don’t remember we >>>>>>>>> discussed here in details before and had a common agreement. >>>>>>>>> >>>>>>>>> Actually, I’m not sure that I understand clearly what is meant by >>>>>>>>> “deprecation" in this case. For example, Portable Spark Runner is >>>>>>>>> heavily >>>>>>>>> actually based on native Spark RDD runner and its translations. So, >>>>>>>>> which >>>>>>>>> part should be deprecated and what is a reason for that? >>>>>>>>> >>>>>>>>> Well, anyway I guess it’s off topic here. >>>>>>>>> >>>>>>>>> Also, we don’t know if this new runner will be contributed back to >>>>>>>>> Beam, what is a runtime and what actually is a final goal of it. >>>>>>>>> So I agree that more details on this would be useful. >>>>>>>>> >>>>>>>>> — >>>>>>>>> Alexey >>>>>>>>> >>>>>>>>> >>>>>>>>> - Which SDK I should use for this runner? >>>>>>>>>> >>>>>>>>> >>>>>>>>> The answer to the above question makes this one moot :). >>>>>>>>> >>>>>>>>> On a more serious note, could you tell us a bit more about the >>>>>>>>> runner you're looking at implementing? >>>>>>>>> >>>>>>>>> >>>>>>>>>> Then, depending on answers, I’d suggest to take as an example one >>>>>>>>>> of the most similar Beam runners and use it as a more detailed >>>>>>>>>> source of >>>>>>>>>> information along with Beam runner doc mentioned before. >>>>>>>>>> >>>>>>>>>> — >>>>>>>>>> Alexey >>>>>>>>>> >>>>>>>>>> On 22 Jun 2023, at 14:39, Joey Tran <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hi Beam community! >>>>>>>>>> >>>>>>>>>> I'm interested in trying to implement a runner with my company's >>>>>>>>>> execution environment but I'm struggling to get started. I've read >>>>>>>>>> the docs >>>>>>>>>> page >>>>>>>>>> <https://beam.apache.org/contribute/runner-guide/#testing-your-runner> >>>>>>>>>> on implementing a runner but it's quite high level. Anyone have any >>>>>>>>>> concrete suggestions on getting started? >>>>>>>>>> >>>>>>>>>> I've started by cloning and running the hello world example >>>>>>>>>> <https://github.com/apache/beam-starter-python>. I've then >>>>>>>>>> subclassed `PipelineRunner >>>>>>>>>> <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>` >>>>>>>>>> to create my own custom runner but at this point I'm a bit stuck. My >>>>>>>>>> custom >>>>>>>>>> runner just looks like >>>>>>>>>> >>>>>>>>>> class CustomRunner(runner.PipelineRunner): >>>>>>>>>> def run_pipeline(self, pipeline, >>>>>>>>>> options): >>>>>>>>>> self.visit_transforms(pipeline, options) >>>>>>>>>> >>>>>>>>>> And when using it I get an error about not having implemented >>>>>>>>>> "Impulse" >>>>>>>>>> >>>>>>>>>> NotImplementedError: Execution of [<Impulse(PTransform) >>>>>>>>>> label=[Impulse]>] not implemented in runner <my_app.app.CustomRunner >>>>>>>>>> object >>>>>>>>>> at 0x135d9ff40>. >>>>>>>>>> >>>>>>>>>> Am I going about this the right way? Are there tests I can run my >>>>>>>>>> custom runner against to validate it beyond just running the hello >>>>>>>>>> world >>>>>>>>>> example? I'm finding myself just digging through the beam source to >>>>>>>>>> try to >>>>>>>>>> piece together how a runner works and I'm struggling to get a >>>>>>>>>> foothold. Any >>>>>>>>>> guidance would be greatly appreciated, especially if anyone has any >>>>>>>>>> experience implementing their own python runner. >>>>>>>>>> >>>>>>>>>> Thanks in advance! Also, could I get a Slack invite? >>>>>>>>>> Cheers, >>>>>>>>>> Joey >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>
