I see. I guess I got a little confused since these are mentioned in
the Authoring
a Runner
<https://beam.apache.org/contribute/runner-guide/#the-runner-api-protos> docs
page which implied to me that they'd be safe to use. I'll check out the
bundle_processor. Thanks!

On Mon, Jul 10, 2023 at 1:07 PM Robert Bradshaw <rober...@google.com> wrote:

> On Sun, Jul 9, 2023 at 9:22 AM Joey Tran <joey.t...@schrodinger.com>
> wrote:
>
>> Working on this on and off now and getting some pretty good traction.
>>
>> One thing I'm a little worried about is all the classes that are marked
>> "internal use only". A lot of these seem either very useful or possibly
>> critical to writing a runner. How strictly should I interpret these private
>> implementation labels?
>>
>> A few bits that I'm interested in using ordered by how surprised I was to
>> find that they're internal only.
>>
>>  - apache_bean.pipeline.AppliedPTransform
>>  - apache_beam.pipeline.PipelineVisitor
>>  - apache_beam.runners.common.DoFnRunner
>>
>
> The public API is the protos. You should not have to interact
> with AppliedPTransform and PipelineVisitor directly (and while you can
> reach in and do so, there are no promises here and these are subject to
> change). As for DoFnRunner, if you're trying to reach in at this level
> you're probably going to have to be replicating a bunch of surrounding
> infrastructure as well. I would recommend using a BundleProcessor [1] to
> coordinate the work (which will internally wire up the chain of DoFns
> correctly and take them through their proper lifecycle). As mentioned
> above, you can directly borrow the translations in fn_api_runner to go from
> a full Pipeline graph (proto) to a set of fused DoFns to execute in
> topological order (as ProcessBundleDescriptor protos, which is
> what BundleProcessor accepts).
>
> [1]
> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L851
>
>
>> Thanks again for the help,
>> Joey
>>
>> On Fri, Jun 23, 2023 at 8:34 PM Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>>> Another advantage of a portable runner would be that it will be using
>>> well defined and backwards compatible Beam portable APIs to communicate
>>> with SDKs. I think this is specially important for runners that do not live
>>> in the Beam repo since otherwise future SDK releases could break your
>>> runner in subtle ways. Also portability gives you more flexibility when it
>>> comes to choosing an SDK to define the pipeline and will allow you to
>>> execute transforms in any SDK via cross-language.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user <
>>> user@beam.apache.org> wrote:
>>>
>>>>
>>>>
>>>> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran <joey.t...@schrodinger.com>
>>>> wrote:
>>>>
>>>>> Totally doable by one person, especially given the limited feature set
>>>>>> you mention above.
>>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>>>>>>  is
>>>>>> a good starting point as to what the relationship between a Runner and 
>>>>>> the
>>>>>> SDK is at a level of detail sufficient for implementation (told from the
>>>>>> perspective of an SDK, but the story is largely about the interface which
>>>>>> is directly applicable).
>>>>>
>>>>>
>>>>> Great slides, I really appreciate the illustrations.
>>>>>
>>>>> I hadn't realized there was a concept of an "SDK Worker", I had
>>>>> imagined that once the Runner started execution of a workflow, it was
>>>>> Runner all the way down. Is the Fn API the only way to implement a runner?
>>>>> Our execution environment is a bit constrained in such a way that we can't
>>>>> expose the APIs required to implement the Fn API. To be forthright, we
>>>>> basically only have the ability to start a worker either with a known
>>>>> Pub/Sub topic to expect data from and a Pub/Sub topic to write to; or with
>>>>> a bundle of data to process and return the outputs for. We're constrained
>>>>> from really any additional communication with a worker beyond that.
>>>>>
>>>>
>>>> The "worker" abstraction gives the ability to wrap any user code in a
>>>> way that it can be called from any runner. If you're willing to constrain
>>>> the code you're wrapping (e.g. "Python DoFns only") then this "worker" can
>>>> be a logical, rather than physical, concept.
>>>>
>>>> Another way to look at it is that in practice, the "runner" often has
>>>> its own notion of "workers" which wrap (often in a 1:1 way) the logical
>>>> "SDK Worker" (which in turn invokes the actual DoFns). This latter may be
>>>> inlined (e.g. if it's 100% Python on both sides). See, for example,
>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py#L350
>>>>
>>>>
>>>>> On Fri, Jun 23, 2023 at 4:02 PM Robert Bradshaw <rober...@google.com>
>>>>> wrote:
>>>>>
>>>>>> On Fri, Jun 23, 2023 at 11:15 AM Joey Tran <joey.t...@schrodinger.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks all for the responses!
>>>>>>>
>>>>>>> If Beam Runner Authoring Guide is rather high-level for you, then,
>>>>>>>> at fist, I’d suggest to answer two questions for yourself:
>>>>>>>> - Am I going to implement a portable runner or native one?
>>>>>>>>
>>>>>>>
>>>>>>> Portable sounds great, but the answer depends on how much additional
>>>>>>> cost it'd require to implement portable over non-portable, even 
>>>>>>> considering
>>>>>>> future deprecation (unless deprecation is happening tomorrow). I'm not
>>>>>>> familiar enough to know what the additional cost is so I don't have a 
>>>>>>> firm
>>>>>>> answer.
>>>>>>>
>>>>>>
>>>>>> I would way it would not be that expensive to write it in a "portable
>>>>>> compatible" way (i.e it uses the publicly-documented protocol as the
>>>>>> interface rather than reaching into internal details) even if it doesn't
>>>>>> use GRCP and fire up separate processes/docker images for the workers
>>>>>> (preferring to do tall of that inline like the Python portable direct
>>>>>> runner does).
>>>>>>
>>>>>>
>>>>>>> - Which SDK I should use for this runner?
>>>>>>>>
>>>>>>> I'd be developing this runner against the python SDK and if the
>>>>>>> runner only worked with the python SDK that'd be okay in the short term
>>>>>>>
>>>>>>
>>>>>> Yes. And if you do it the above way, it should be easy to extend (or
>>>>>> not) if/when the need arises.
>>>>>>
>>>>>>
>>>>>>> Also, we don’t know if this new runner will be contributed back to
>>>>>>>> Beam, what is a runtime and what actually is a final goal of it.
>>>>>>>
>>>>>>> Likely won't be contributed back to Beam (not sure if it'd actually
>>>>>>> be useful to a wide audience anyways).
>>>>>>>
>>>>>>> The context is we've been developing an in-house large-scale
>>>>>>> pipeline framework that encapsulates both the programming model and the
>>>>>>> runner/execution of data workflows. As it's grown, I keep finding myself
>>>>>>> just reimplementing features and abstractions Beam has already 
>>>>>>> implemented,
>>>>>>> so I wanted to explore adopting Beam. Our execution environment is very
>>>>>>> particular though and our workflows require it (due to the way we 
>>>>>>> license
>>>>>>> our software), so my plan was to try to create a very basic runner that
>>>>>>> uses our execution environment. The runner could have very few features
>>>>>>> e.g. no streaming, no metrics, no side inputs, etc. After that I'd 
>>>>>>> probably
>>>>>>> introduce a shim for some of our internally implemented transforms and
>>>>>>> assess from there.
>>>>>>>
>>>>>>> Not sure if this is a lofty goal or not, so happy to hear your
>>>>>>> thoughts as to whether this seems reasonable and achievable without a 
>>>>>>> large
>>>>>>> concerted effort or even if the general idea makes any sense. (I 
>>>>>>> recognize
>>>>>>> that it might not be *easy*, but I don't have the resources to
>>>>>>> dedicate more than myself to work on a PoC)
>>>>>>>
>>>>>>
>>>>>> Totally doable by one person, especially given the limited feature
>>>>>> set you mention above.
>>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>>>>>> is a good starting point as to what the relationship between a Runner and
>>>>>> the SDK is at a level of detail sufficient for implementation (told from
>>>>>> the perspective of an SDK, but the story is largely about the interface
>>>>>> which is directly applicable).
>>>>>>
>>>>>> Given the limited feature set you proposed, this is similar to the
>>>>>> original Python portable runner which took a week or two to put together
>>>>>> (granted a lot has been added since then), or the typescript direct 
>>>>>> runner
>>>>>> (
>>>>>> https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts
>>>>>> ) which was done (in its basic form, no support for side inputs and such)
>>>>>> in less than a week. Granted, as these are local runners, this 
>>>>>> illustrates
>>>>>> only the Beam-side complexity of things (not the work of actually
>>>>>> implementing a distributed shuffle, starting and assigning work to 
>>>>>> multiple
>>>>>> workers, etc. but presumably that's the kind of thing your execution
>>>>>> environment already takes care of.
>>>>>>
>>>>>> As for some more concrete pointers, you could probably leverage a lot
>>>>>> of what's there by invoking create_stages
>>>>>>
>>>>>>
>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362
>>>>>>
>>>>>> which will do optimization, fusion, etc. and then implementing your
>>>>>> own version of run_stages
>>>>>>
>>>>>>
>>>>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392
>>>>>>
>>>>>> to execute these in topological order on your compute infrastructure.
>>>>>> (If you're not doing streaming, this is much more straightforward than 
>>>>>> all
>>>>>> the bundler scheduler stuff that currently exists in that code).
>>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko <
>>>>>>> aromanenko....@gmail.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user <
>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>
>>>>>>>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko <
>>>>>>>> aromanenko....@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> If Beam Runner Authoring Guide is rather high-level for you, then,
>>>>>>>>> at fist, I’d suggest to answer two questions for yourself:
>>>>>>>>> - Am I going to implement a portable runner or native one?
>>>>>>>>>
>>>>>>>>
>>>>>>>> The answer to this should be portable, as non-portable ones will be
>>>>>>>> deprecated.
>>>>>>>>
>>>>>>>>
>>>>>>>> Well, actually this is a question that I don’t remember we
>>>>>>>> discussed here in details before and had a common agreement.
>>>>>>>>
>>>>>>>> Actually, I’m not sure that I understand clearly what is meant by
>>>>>>>> “deprecation" in this case. For example, Portable Spark Runner is 
>>>>>>>> heavily
>>>>>>>> actually based on native Spark RDD runner and its translations. So, 
>>>>>>>> which
>>>>>>>> part should be deprecated and what is a reason for that?
>>>>>>>>
>>>>>>>> Well, anyway I guess it’s off topic here.
>>>>>>>>
>>>>>>>> Also, we don’t know if this new runner will be contributed back to
>>>>>>>> Beam, what is a runtime and what actually is a final goal of it.
>>>>>>>> So I agree that more details on this would be useful.
>>>>>>>>
>>>>>>>> —
>>>>>>>> Alexey
>>>>>>>>
>>>>>>>>
>>>>>>>> - Which SDK I should use for this runner?
>>>>>>>>>
>>>>>>>>
>>>>>>>> The answer to the above question makes this one moot :).
>>>>>>>>
>>>>>>>> On a more serious note, could you tell us a bit more about the
>>>>>>>> runner you're looking at implementing?
>>>>>>>>
>>>>>>>>
>>>>>>>>> Then, depending on answers, I’d suggest to take as an example one
>>>>>>>>> of the most similar Beam runners and use it as a more detailed source 
>>>>>>>>> of
>>>>>>>>> information along with Beam runner doc mentioned before.
>>>>>>>>>
>>>>>>>>> —
>>>>>>>>> Alexey
>>>>>>>>>
>>>>>>>>> On 22 Jun 2023, at 14:39, Joey Tran <joey.t...@schrodinger.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi Beam community!
>>>>>>>>>
>>>>>>>>> I'm interested in trying to implement a runner with my company's
>>>>>>>>> execution environment but I'm struggling to get started. I've read 
>>>>>>>>> the docs
>>>>>>>>> page
>>>>>>>>> <https://beam.apache.org/contribute/runner-guide/#testing-your-runner>
>>>>>>>>> on implementing a runner but it's quite high level. Anyone have any
>>>>>>>>> concrete suggestions on getting started?
>>>>>>>>>
>>>>>>>>> I've started by cloning and running the hello world example
>>>>>>>>> <https://github.com/apache/beam-starter-python>. I've then
>>>>>>>>> subclassed `PipelineRunner
>>>>>>>>> <https://github.com/apache/beam/blob/9d0fc05d0042c2bb75ded511497e1def8c218c33/sdks/python/apache_beam/runners/runner.py#L103>`
>>>>>>>>> to create my own custom runner but at this point I'm a bit stuck. My 
>>>>>>>>> custom
>>>>>>>>> runner just looks like
>>>>>>>>>
>>>>>>>>> class CustomRunner(runner.PipelineRunner):
>>>>>>>>>     def run_pipeline(self, pipeline,
>>>>>>>>>                      options):
>>>>>>>>>         self.visit_transforms(pipeline, options)
>>>>>>>>>
>>>>>>>>> And when using it I get an error about not having implemented
>>>>>>>>> "Impulse"
>>>>>>>>>
>>>>>>>>> NotImplementedError: Execution of [<Impulse(PTransform)
>>>>>>>>> label=[Impulse]>] not implemented in runner <my_app.app.CustomRunner 
>>>>>>>>> object
>>>>>>>>> at 0x135d9ff40>.
>>>>>>>>>
>>>>>>>>> Am I going about this the right way? Are there tests I can run my
>>>>>>>>> custom runner against to validate it beyond just running the hello 
>>>>>>>>> world
>>>>>>>>> example? I'm finding myself just digging through the beam source to 
>>>>>>>>> try to
>>>>>>>>> piece together how a runner works and I'm struggling to get a 
>>>>>>>>> foothold. Any
>>>>>>>>> guidance would be greatly appreciated, especially if anyone has any
>>>>>>>>> experience implementing their own python runner.
>>>>>>>>>
>>>>>>>>> Thanks in advance! Also, could I get a Slack invite?
>>>>>>>>> Cheers,
>>>>>>>>> Joey
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>

Reply via email to