I took a first pass at
https://github.com/apache/beam/blob/be19140f3e9194721f36e57f4a946adc6c43971a/website/www/site/content/en/contribute/runner-guide.md

https://github.com/apache/beam/blob/1cfc0fdc6ff27ad70365683fdc8264f42642f6e9/sdks/python/apache_beam/runners/trivial_runner.py
may also be of interest.

On Fri, Jul 21, 2023 at 7:25 AM Joey Tran <joey.t...@schrodinger.com> wrote:
>
> Could you let me know when you update it? I would be interested in rereading 
> after the rewrite.
>
> Thanks!
> Joey
>
> On Fri, Jul 14, 2023 at 4:38 PM Robert Bradshaw <rober...@google.com> wrote:
>>
>> I'm taking an action item to update that page, as it is *way* out of date.
>>
>> On Thu, Jul 13, 2023 at 6:54 PM Joey Tran <joey.t...@schrodinger.com> wrote:
>>>
>>> I see. I guess I got a little confused since these are mentioned in the 
>>> Authoring a Runner docs page which implied to me that they'd be safe to 
>>> use. I'll check out the bundle_processor. Thanks!
>>>
>>> On Mon, Jul 10, 2023 at 1:07 PM Robert Bradshaw <rober...@google.com> wrote:
>>>>
>>>> On Sun, Jul 9, 2023 at 9:22 AM Joey Tran <joey.t...@schrodinger.com> wrote:
>>>>>
>>>>> Working on this on and off now and getting some pretty good traction.
>>>>>
>>>>> One thing I'm a little worried about is all the classes that are marked 
>>>>> "internal use only". A lot of these seem either very useful or possibly 
>>>>> critical to writing a runner. How strictly should I interpret these 
>>>>> private implementation labels?
>>>>>
>>>>> A few bits that I'm interested in using ordered by how surprised I was to 
>>>>> find that they're internal only.
>>>>>
>>>>>  - apache_bean.pipeline.AppliedPTransform
>>>>>  - apache_beam.pipeline.PipelineVisitor
>>>>>  - apache_beam.runners.common.DoFnRunner
>>>>
>>>>
>>>> The public API is the protos. You should not have to interact with 
>>>> AppliedPTransform and PipelineVisitor directly (and while you can reach in 
>>>> and do so, there are no promises here and these are subject to change). As 
>>>> for DoFnRunner, if you're trying to reach in at this level you're probably 
>>>> going to have to be replicating a bunch of surrounding infrastructure as 
>>>> well. I would recommend using a BundleProcessor [1] to coordinate the work 
>>>> (which will internally wire up the chain of DoFns correctly and take them 
>>>> through their proper lifecycle). As mentioned above, you can directly 
>>>> borrow the translations in fn_api_runner to go from a full Pipeline graph 
>>>> (proto) to a set of fused DoFns to execute in topological order (as 
>>>> ProcessBundleDescriptor protos, which is what BundleProcessor accepts).
>>>>
>>>> [1] 
>>>> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L851
>>>>
>>>>>
>>>>> Thanks again for the help,
>>>>> Joey
>>>>>
>>>>> On Fri, Jun 23, 2023 at 8:34 PM Chamikara Jayalath <chamik...@google.com> 
>>>>> wrote:
>>>>>>
>>>>>> Another advantage of a portable runner would be that it will be using 
>>>>>> well defined and backwards compatible Beam portable APIs to communicate 
>>>>>> with SDKs. I think this is specially important for runners that do not 
>>>>>> live in the Beam repo since otherwise future SDK releases could break 
>>>>>> your runner in subtle ways. Also portability gives you more flexibility 
>>>>>> when it comes to choosing an SDK to define the pipeline and will allow 
>>>>>> you to execute transforms in any SDK via cross-language.
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>> On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user 
>>>>>> <user@beam.apache.org> wrote:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran <joey.t...@schrodinger.com> 
>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Totally doable by one person, especially given the limited feature 
>>>>>>>>> set you mention above. 
>>>>>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>>>>>>>>>  is a good starting point as to what the relationship between a 
>>>>>>>>> Runner and the SDK is at a level of detail sufficient for 
>>>>>>>>> implementation (told from the perspective of an SDK, but the story is 
>>>>>>>>> largely about the interface which is directly applicable).
>>>>>>>>
>>>>>>>>
>>>>>>>> Great slides, I really appreciate the illustrations.
>>>>>>>>
>>>>>>>> I hadn't realized there was a concept of an "SDK Worker", I had 
>>>>>>>> imagined that once the Runner started execution of a workflow, it was 
>>>>>>>> Runner all the way down. Is the Fn API the only way to implement a 
>>>>>>>> runner? Our execution environment is a bit constrained in such a way 
>>>>>>>> that we can't expose the APIs required to implement the Fn API. To be 
>>>>>>>> forthright, we basically only have the ability to start a worker 
>>>>>>>> either with a known Pub/Sub topic to expect data from and a Pub/Sub 
>>>>>>>> topic to write to; or with a bundle of data to process and return the 
>>>>>>>> outputs for. We're constrained from really any additional 
>>>>>>>> communication with a worker beyond that.
>>>>>>>
>>>>>>>
>>>>>>> The "worker" abstraction gives the ability to wrap any user code in a 
>>>>>>> way that it can be called from any runner. If you're willing to 
>>>>>>> constrain the code you're wrapping (e.g. "Python DoFns only") then this 
>>>>>>> "worker" can be a logical, rather than physical, concept.
>>>>>>>
>>>>>>> Another way to look at it is that in practice, the "runner" often has 
>>>>>>> its own notion of "workers" which wrap (often in a 1:1 way) the logical 
>>>>>>> "SDK Worker" (which in turn invokes the actual DoFns). This latter may 
>>>>>>> be inlined (e.g. if it's 100% Python on both sides). See, for example, 
>>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py#L350
>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jun 23, 2023 at 4:02 PM Robert Bradshaw <rober...@google.com> 
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> On Fri, Jun 23, 2023 at 11:15 AM Joey Tran 
>>>>>>>>> <joey.t...@schrodinger.com> wrote:
>>>>>>>>>>
>>>>>>>>>> Thanks all for the responses!
>>>>>>>>>>
>>>>>>>>>>> If Beam Runner Authoring Guide is rather high-level for you, then, 
>>>>>>>>>>> at fist, I’d suggest to answer two questions for yourself:
>>>>>>>>>>> - Am I going to implement a portable runner or native one?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Portable sounds great, but the answer depends on how much additional 
>>>>>>>>>> cost it'd require to implement portable over non-portable, even 
>>>>>>>>>> considering future deprecation (unless deprecation is happening 
>>>>>>>>>> tomorrow). I'm not familiar enough to know what the additional cost 
>>>>>>>>>> is so I don't have a firm answer.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I would way it would not be that expensive to write it in a "portable 
>>>>>>>>> compatible" way (i.e it uses the publicly-documented protocol as the 
>>>>>>>>> interface rather than reaching into internal details) even if it 
>>>>>>>>> doesn't use GRCP and fire up separate processes/docker images for the 
>>>>>>>>> workers (preferring to do tall of that inline like the Python 
>>>>>>>>> portable direct runner does).
>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> - Which SDK I should use for this runner?
>>>>>>>>>>
>>>>>>>>>> I'd be developing this runner against the python SDK and if the 
>>>>>>>>>> runner only worked with the python SDK that'd be okay in the short 
>>>>>>>>>> term
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Yes. And if you do it the above way, it should be easy to extend (or 
>>>>>>>>> not) if/when the need arises.
>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Also, we don’t know if this new runner will be contributed back to 
>>>>>>>>>>> Beam, what is a runtime and what actually is a final goal of it.
>>>>>>>>>>
>>>>>>>>>> Likely won't be contributed back to Beam (not sure if it'd actually 
>>>>>>>>>> be useful to a wide audience anyways).
>>>>>>>>>>
>>>>>>>>>> The context is we've been developing an in-house large-scale 
>>>>>>>>>> pipeline framework that encapsulates both the programming model and 
>>>>>>>>>> the runner/execution of data workflows. As it's grown, I keep 
>>>>>>>>>> finding myself just reimplementing features and abstractions Beam 
>>>>>>>>>> has already implemented, so I wanted to explore adopting Beam. Our 
>>>>>>>>>> execution environment is very particular though and our workflows 
>>>>>>>>>> require it (due to the way we license our software), so my plan was 
>>>>>>>>>> to try to create a very basic runner that uses our execution 
>>>>>>>>>> environment. The runner could have very few features e.g. no 
>>>>>>>>>> streaming, no metrics, no side inputs, etc. After that I'd probably 
>>>>>>>>>> introduce a shim for some of our internally implemented transforms 
>>>>>>>>>> and assess from there.
>>>>>>>>>>
>>>>>>>>>> Not sure if this is a lofty goal or not, so happy to hear your 
>>>>>>>>>> thoughts as to whether this seems reasonable and achievable without 
>>>>>>>>>> a large concerted effort or even if the general idea makes any 
>>>>>>>>>> sense. (I recognize that it might not be easy, but I don't have the 
>>>>>>>>>> resources to dedicate more than myself to work on a PoC)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Totally doable by one person, especially given the limited feature 
>>>>>>>>> set you mention above. 
>>>>>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>>>>>>>>>  is a good starting point as to what the relationship between a 
>>>>>>>>> Runner and the SDK is at a level of detail sufficient for 
>>>>>>>>> implementation (told from the perspective of an SDK, but the story is 
>>>>>>>>> largely about the interface which is directly applicable).
>>>>>>>>>
>>>>>>>>> Given the limited feature set you proposed, this is similar to the 
>>>>>>>>> original Python portable runner which took a week or two to put 
>>>>>>>>> together (granted a lot has been added since then), or the typescript 
>>>>>>>>> direct runner ( 
>>>>>>>>> https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts
>>>>>>>>>  ) which was done (in its basic form, no support for side inputs and 
>>>>>>>>> such) in less than a week. Granted, as these are local runners, this 
>>>>>>>>> illustrates only the Beam-side complexity of things (not the work of 
>>>>>>>>> actually implementing a distributed shuffle, starting and assigning 
>>>>>>>>> work to multiple workers, etc. but presumably that's the kind of 
>>>>>>>>> thing your execution environment already takes care of.
>>>>>>>>>
>>>>>>>>> As for some more concrete pointers, you could probably leverage a lot 
>>>>>>>>> of what's there by invoking create_stages
>>>>>>>>>
>>>>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362
>>>>>>>>>
>>>>>>>>> which will do optimization, fusion, etc. and then implementing your 
>>>>>>>>> own version of run_stages
>>>>>>>>>
>>>>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392
>>>>>>>>>
>>>>>>>>> to execute these in topological order on your compute infrastructure. 
>>>>>>>>> (If you're not doing streaming, this is much more straightforward 
>>>>>>>>> than all the bundler scheduler stuff that currently exists in that 
>>>>>>>>> code).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko 
>>>>>>>>>> <aromanenko....@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user 
>>>>>>>>>>> <user@beam.apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko 
>>>>>>>>>>> <aromanenko....@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> If Beam Runner Authoring Guide is rather high-level for you, then, 
>>>>>>>>>>>> at fist, I’d suggest to answer two questions for yourself:
>>>>>>>>>>>> - Am I going to implement a portable runner or native one?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> The answer to this should be portable, as non-portable ones will be 
>>>>>>>>>>> deprecated.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Well, actually this is a question that I don’t remember we 
>>>>>>>>>>> discussed here in details before and had a common agreement.
>>>>>>>>>>>
>>>>>>>>>>> Actually, I’m not sure that I understand clearly what is meant by 
>>>>>>>>>>> “deprecation" in this case. For example, Portable Spark Runner is 
>>>>>>>>>>> heavily actually based on native Spark RDD runner and its 
>>>>>>>>>>> translations. So, which part should be deprecated and what is a 
>>>>>>>>>>> reason for that?
>>>>>>>>>>>
>>>>>>>>>>> Well, anyway I guess it’s off topic here.
>>>>>>>>>>>
>>>>>>>>>>> Also, we don’t know if this new runner will be contributed back to 
>>>>>>>>>>> Beam, what is a runtime and what actually is a final goal of it.
>>>>>>>>>>> So I agree that more details on this would be useful.
>>>>>>>>>>>
>>>>>>>>>>> —
>>>>>>>>>>> Alexey
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> - Which SDK I should use for this runner?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> The answer to the above question makes this one moot :).
>>>>>>>>>>>
>>>>>>>>>>> On a more serious note, could you tell us a bit more about the 
>>>>>>>>>>> runner you're looking at implementing?
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Then, depending on answers, I’d suggest to take as an example one 
>>>>>>>>>>>> of the most similar Beam runners and use it as a more detailed 
>>>>>>>>>>>> source of information along with Beam runner doc mentioned before.
>>>>>>>>>>>>
>>>>>>>>>>>> —
>>>>>>>>>>>> Alexey
>>>>>>>>>>>>
>>>>>>>>>>>> On 22 Jun 2023, at 14:39, Joey Tran <joey.t...@schrodinger.com> 
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi Beam community!
>>>>>>>>>>>>
>>>>>>>>>>>> I'm interested in trying to implement a runner with my company's 
>>>>>>>>>>>> execution environment but I'm struggling to get started. I've read 
>>>>>>>>>>>> the docs page on implementing a runner but it's quite high level. 
>>>>>>>>>>>> Anyone have any concrete suggestions on getting started?
>>>>>>>>>>>>
>>>>>>>>>>>> I've started by cloning and running the hello world example. I've 
>>>>>>>>>>>> then subclassed `PipelineRunner` to create my own custom runner 
>>>>>>>>>>>> but at this point I'm a bit stuck. My custom runner just looks like
>>>>>>>>>>>>
>>>>>>>>>>>> class CustomRunner(runner.PipelineRunner):
>>>>>>>>>>>>     def run_pipeline(self, pipeline,
>>>>>>>>>>>>                      options):
>>>>>>>>>>>>         self.visit_transforms(pipeline, options)
>>>>>>>>>>>>
>>>>>>>>>>>> And when using it I get an error about not having implemented 
>>>>>>>>>>>> "Impulse"
>>>>>>>>>>>>
>>>>>>>>>>>> NotImplementedError: Execution of [<Impulse(PTransform) 
>>>>>>>>>>>> label=[Impulse]>] not implemented in runner 
>>>>>>>>>>>> <my_app.app.CustomRunner object at 0x135d9ff40>.
>>>>>>>>>>>>
>>>>>>>>>>>> Am I going about this the right way? Are there tests I can run my 
>>>>>>>>>>>> custom runner against to validate it beyond just running the hello 
>>>>>>>>>>>> world example? I'm finding myself just digging through the beam 
>>>>>>>>>>>> source to try to piece together how a runner works and I'm 
>>>>>>>>>>>> struggling to get a foothold. Any guidance would be greatly 
>>>>>>>>>>>> appreciated, especially if anyone has any experience implementing 
>>>>>>>>>>>> their own python runner.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks in advance! Also, could I get a Slack invite?
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Joey
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to