On Thu, Feb 4, 2021 at 5:36 PM Robert Bradshaw <[email protected]> wrote:
> On Thu, Feb 4, 2021 at 4:16 PM Kyle Weaver <[email protected]> wrote: > >> I do think it can be useful to specify a custom "top-level" environment. >>> We should probably make it easy to use customized expansion services. >> >> >> I'm fine with adding startup argument(s) in the expansion service for >> configuring the "top-level" environment. Since which expansion service to >> use is already configurable in external transforms, it solves the problem >> just as well as my original proposal. And if a particular expansion service >> wants to do something more complicated, it can have its own logic to handle >> that. >> > > That sounds like a good plan. > Sounds good to me as well. Thanks Kyle. > > >> >> >>> Ah, that clarifies things. Would it be possible/preferable to pass the >>> credentials as parameters to the transform itself? >> >> >> Maybe. But it's generally useful to be able to stage files to SDK >> containers, so it's something we should consider making into a general >> feature, perhaps based on the artifact API. >> > > +1 > > >> >> On Thu, Feb 4, 2021 at 3:52 PM Robert Bradshaw <[email protected]> >> wrote: >> >>> On Thu, Feb 4, 2021 at 3:33 PM Kyle Weaver <[email protected]> wrote: >>> >>>> This gets into the distinction of customizing what kind of environment >>>>> one wants to have (which could be generally applicable) vs. an absolute >>>>> designation of a particular environment (e.g. a docker image). >>>> >>>> >>>> For common environment modifications, resource hints are a great idea, >>>> since it's much easier to set an annotation than to build and set a custom >>>> container. The limitation of this approach is we can't handle every >>>> possible modification a user might want to make to their environment. >>>> Custom containers give the user ultimate control over the environment, so >>>> we forfeit a lot of flexibility if we don't provide enough options to use >>>> them. >>>> >>>> Note that what we're running into in part is that "pipeline options" >>>>> are the wrong level of granularity for specifying characteristics of an >>>>> environment, as there is not a single environment to parameterize (or, >>>>> possibly, even one per language). >>>> >>>> >>>> Yes, this is the crux of the problem. We already expose an >>>> environment_config as a pipeline option, so we basically have three >>>> choices: >>>> 1. Deprecate pipeline-level environment options altogether. >>>> 2. Find a way to generalize environment options. >>>> 3. Keep and document the status quo (ie users can use custom >>>> containers, but at most only one per language). >>>> >>> >>> I do think it can be useful to specify a custom "top-level" environment. >>> We should probably make it easy to use customized expansion services. >>> >>> >>>> The caller should not need any visibility into the environment(s) that >>>>> an expansion service uses, which is an implementation detail that the >>>>> expansion service is free to change at any time. (In fact, whether it is >>>>> (partially or fully) implemented as an external transform is an >>>>> implementation detail that the end user should not need to care about or >>>>> depend on.) >>>> >>>> >>>> I personally think pattern matching and substitution by runners (maybe >>>>> more sophisticated than regexp on container names) is a reasonable way to >>>>> approach customization of environments. >>>> >>>> >>>> Aren't these ideas contradictory? Pattern matching requires knowledge >>>> in advance of which patterns to match. We'd need to know at least some >>>> information about the environment the expansion service is expected to use >>>> in order to replace it. >>>> >>> >>> The pattern matching is not such that I want to replace the environment >>> for this particular transform, but that /if/ I see a Java environment of a >>> certain type /then/ I want to run it in this way. >>> >>> >>>> For example, suppose I construct a pipeline that uses both Python and >>>>> Java transforms. (I could do this from Go, Java, or Python). If I want to >>>>> run this locally (e.g. on the Python FnAPI runner), I would prefer that >>>>> the >>>>> python bits be run in-process but would have to shell out (maybe via >>>>> docker, maybe something cheaper) for the java bits. On the other hand, if >>>>> I >>>>> want to run this same pipeline (ideally, the same model proto, such that >>>>> we >>>>> don't have runner-dependent construction) on Flink, I might want the java >>>>> bits to be inlined and the Python bits to be in a separate process. On >>>>> Dataflow, both would live in containers. To do this, the Python runner >>>>> would say "hey, I know that Python environment" and just swap it out for >>>>> in-process, and vice versa. (For isolation/other reasons, one may want the >>>>> option to force everything to be docker, but that's more of a "don't make >>>>> substitutions" option than manually providing environment configs.) >>>> >>>> >>>> In this example, wouldn't you normally just rebuild the pipeline? I'm >>>> not sure what the advantage of re-using the same model proto is. >>>> >>> >>> Yes, you'd re-build the pipeline. But if all you change is the --runner >>> flag the model proto produced should not change. (And, sometimes, you may >>> want to stash the proto itself, or pass it to one-of-N runners depending on >>> some other condition, etc.) >>> >>> >>>> It would be helpful for me to have concrete usecases of why a user >>>>> wants to customize the container used by some transform they did not >>>>> write, >>>>> which could possibly inform the best course(s) of action here. >>>> >>>> >>>> I should have led with this. Someone wanted to mount credentials into >>>> the SDK harness [1]. So in this particular case the user just wants to >>>> mount files into their SDK harness, which is a pretty common use case, so >>>> resource hints are probably a more appropriate solution. >>>> >>>> [1] >>>> https://lists.apache.org/thread.html/r690094f1c9ebc4e1d20f029a21ba8bc846672a65baafd57c4f52cb94%40%3Cuser.beam.apache.org%3E >>>> >>> >>> Ah, that clarifies things. Would it be possible/preferable to pass the >>> credentials as parameters to the transform itself? >>> >> This is very useful for testing as well. For example, to test containers generated for release candidates. > >>> >>>> >>>> >>>> On Thu, Feb 4, 2021 at 1:51 PM Robert Bradshaw <[email protected]> >>>> wrote: >>>> >>>>> On Thu, Feb 4, 2021 at 12:38 PM Kyle Weaver <[email protected]> >>>>> wrote: >>>>> >>>>>> So, an external transform is uniquely identified by its URN. An >>>>>>> external transform identified by a URN may refer to an arbitrary >>>>>>> composite >>>>>>> which may have sub-transforms that refer to different environments. I >>>>>>> think >>>>>>> with the above proposal we'll lose this flexibility. >>>>>>> What we need is a way to override environments (or properties of >>>>>>> environments) that results in the final pipeline proto. Once we modify >>>>>>> such >>>>>>> environments in the proto it will be reflected to all transforms that >>>>>>> utilize such environments. >>>>>> >>>>>> >>>>>> As far as I can tell we currently only register a single environment >>>>>> for the entire transform (and it's always the default). Am I missing >>>>>> something? >>>>>> https://github.com/apache/beam/blob/0cfa80fd919d141a2061393ec5c12521c7d7af0b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionService.java#L447-L449 >>>>>> >>>>>> Anyway, I don't see how sub-transforms require overrides. We should >>>>>> be able to propagate environment options to sub-transforms to achieve the >>>>>> same purpose. >>>>>> >>>>> >>>>> The discussion of resource hints at >>>>> https://lists.apache.org/thread.html/ra40286b66a03a1d9f4086c9e1ecdeb9f299836d2d0361c3e3fe7c382%40%3Cdev.beam.apache.org%3E >>>>> actually may tie into this as well. I would assume a localised request >>>>> for, >>>>> say, high memory should be propagated down to cross-language pipelines. It >>>>> is possible that other customizations (such as making sure specific >>>>> dependencies are available, or filesystems mounted) would fit here too. >>>>> >>>>> This gets into the distinction of customizing what kind of environment >>>>> one wants to have (which could be generally applicable) vs. an absolute >>>>> designation of a particular environment (e.g. a docker image). >>>>> >>>>> Note that what we're running into in part is that "pipeline options" >>>>> are the wrong level of granularity for specifying characteristics of an >>>>> environment, as there is not a single environment to parameterize (or, >>>>> possibly, even one per language). If I call >>>>> ExpansionRequset(MyFancyTransform,environment_config=docker_path) >>>>> and MyFancyTransform is composed of two environments, to which >>>>> does docker_path apply? What about PTransforms that use ExternalTransforms >>>>> under the hood (e.g does some pre-processing and then calls SQL, or calls >>>>> Kafka followed by some Python-level post-processing)? >>>>> >>>>> >>>>> 'sdk_harness_container_image_overrides' is such a property (which >>>>>>> unfortunately only works for Dataflow today). Also this only works for >>>>>>> Docker URLs. Maybe we can extend this property to all runners or >>>>>>> introduce >>>>>>> a new property that works for all types of environments ? >>>>>> >>>>>> >>>>>> In my original email, I wrote that >>>>>> sdk_harness_container_image_overrides is no more flexible than having a >>>>>> single option per SDK, since the default container images for all >>>>>> external >>>>>> transforms in each SDK are expected to be the same. For example, in the >>>>>> case of a pipeline with two external transforms that both use the same >>>>>> default container image, sdk_harness_container_image_overrides does not >>>>>> let >>>>>> the user give those two transforms different containers. >>>>>> >>>>>> From a design standpoint, I feel find-replace is hacky and backwards. >>>>>> It's cleaner to specify what kind of environment we want directly in >>>>>> the ExpansionRequest. That way all of the environment creation logic >>>>>> belongs inside the expansion service. >>>>>> >>>>> >>>>> While Environments logically belong with Transforms, it is the >>>>> expansion service's job to attach the right environments to the transforms >>>>> that it vends. The caller should not need any visibility into the >>>>> environment(s) that an expansion service uses, which is an implementation >>>>> detail that the expansion service is free to change at any time. (In fact, >>>>> whether it is (partially or fully) implemented as an external transform is >>>>> an implementation detail that the end user should not need to care about >>>>> or >>>>> depend on.) >>>>> >>>>> I personally think pattern matching and substitution by runners (maybe >>>>> more sophisticated than regexp on container names) is a reasonable way to >>>>> approach customization of environments. For example, suppose I construct a >>>>> pipeline that uses both Python and Java transforms. (I could do this from >>>>> Go, Java, or Python). If I want to run this locally (e.g. on the Python >>>>> FnAPI runner), I would prefer that the python bits be run in-process but >>>>> would have to shell out (maybe via docker, maybe something cheaper) for >>>>> the >>>>> java bits. On the other hand, if I want to run this same pipeline >>>>> (ideally, >>>>> the same model proto, such that we don't have >>>>> runner-dependent construction) on Flink, I might want the java bits to be >>>>> inlined and the Python bits to be in a separate process. On Dataflow, both >>>>> would live in containers. To do this, the Python runner would say "hey, I >>>>> know that Python environment" and just swap it out for in-process, and >>>>> vice >>>>> versa. (For isolation/other reasons, one may want the option to force >>>>> everything to be docker, but that's more of a "don't make substitutions" >>>>> option than manually providing environment configs.) >>>>> >>>>> On the other hand, as we go the route of custom containers, especially >>>>> expansion services that might vend custom containers, I think we need a >>>>> way >>>>> to push down *properties* of environments (such as resource hints) through >>>>> the expansion service that may influence the environments that get >>>>> attached >>>>> and returned. >>>>> >>>>> It would be helpful for me to have concrete usecases of why a user >>>>> wants to customize the container used by some transform they did not >>>>> write, >>>>> which could possibly inform the best course(s) of action here. >>>>> >>>>> >>>>> >>>>>> >>>>>> >>>>>> On Wed, Feb 3, 2021 at 5:07 PM Chamikara Jayalath < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Feb 3, 2021 at 12:34 PM Kyle Weaver <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Beamers, >>>>>>>> >>>>>>>> Recently we’ve had some requests on user@ and Slack for >>>>>>>> instructions on how to use custom-built containers in cross-language >>>>>>>> pipelines (typically calling Java transforms from a predominantly >>>>>>>> Python >>>>>>>> pipeline). Currently, it seems like there is no way to change the >>>>>>>> container >>>>>>>> used by a cross-language transform except by modifying and rebuilding >>>>>>>> the >>>>>>>> expansion service. The SDK does not pass pipeline options to the >>>>>>>> expansion >>>>>>>> service (BEAM-9449 [1]). Fixing BEAM-9449 does not solve everything, >>>>>>>> however. Even if pipeline options are passed, the existing set of >>>>>>>> pipeline >>>>>>>> options still limits the amount of control we have over environments. >>>>>>>> Here >>>>>>>> are the existing pipeline options that I’m aware of: >>>>>>>> >>>>>>>> Python [2] and Go [3] have these: >>>>>>>> >>>>>>>> - >>>>>>>> >>>>>>>> environment_type (DOCKER, PROCESS, LOOPBACK) >>>>>>>> - >>>>>>>> >>>>>>>> environment_config (This one is confusingly overloaded. It’s a >>>>>>>> string that means different things depending on environment_type. >>>>>>>> For >>>>>>>> DOCKER, it is the Docker image URL. For PROCESS it is a JSON blob. >>>>>>>> For >>>>>>>> EXTERNAL, it is the external service address.) >>>>>>>> >>>>>>>> >>>>>>>> Whereas Java [4] has defaultEnvironmentType and >>>>>>>> defaultEnvironmentConfig, which are named differently but otherwise >>>>>>>> act the >>>>>>>> same as the above. >>>>>>>> >>>>>>>> I was unsatisfied with environment_config for a number of reasons. >>>>>>>> First, having a single overloaded option that can mean entirely >>>>>>>> different >>>>>>>> things depending on context is poor design. Second, in PROCESS mode, >>>>>>>> requiring the user to type in a JSON blob for environment_config is not >>>>>>>> especially human-friendly (though it has also been argued that JSON >>>>>>>> makes >>>>>>>> complex arguments like this easier to parse). Finally, we must overload >>>>>>>> this string further to introduce new environment-specific options, >>>>>>>> such as >>>>>>>> a mounted Docker volume (BEAM-5440 [5]). >>>>>>>> >>>>>>> >>>>>>> Agree. >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> To address these problems, I added a new option called >>>>>>>> “environment_options” (BEAM-10671 [6]). (This option has been >>>>>>>> implemented >>>>>>>> in the Python SDK, but not the other SDKs yet.) Environment_options, >>>>>>>> similar to the “experiments” option, takes a list of strings, for >>>>>>>> example >>>>>>>> “--environment_option=docker_container_image=my_beam_sdk:latest”. It >>>>>>>> could >>>>>>>> be argued we should have made “docker_container_image” etc. top-level >>>>>>>> options instead, but this “catch-all” design makes what I am about to >>>>>>>> propose a lot easier. >>>>>>>> >>>>>>>> The solution proposed in PR #11638 [7] set a flag to include >>>>>>>> unrecognized pipeline options during serialization, since otherwise >>>>>>>> unrecognized options are dropped. In a Python pipeline, this will >>>>>>>> allow us >>>>>>>> to set environment_config and default_environment_config to separate >>>>>>>> values, for Python and Java containers, respectively. However, this >>>>>>>> still >>>>>>>> limits us to one container image for all Python and Go transforms, and >>>>>>>> one >>>>>>>> container image for all Java transforms. As more cross-language >>>>>>>> transforms >>>>>>>> are implemented, sooner or later someone will want to have different >>>>>>>> Java >>>>>>>> SDK containers for different external transforms. >>>>>>>> >>>>>>>> (I should also mention the sdk_harness_container_image_overrides >>>>>>>> pipeline option [8], which is currently only supported by the Dataflow >>>>>>>> runner. It lets us basically perform a find/replace on container image >>>>>>>> strings. This is not significantly more flexible than having a single >>>>>>>> option per SDK, since the default container images for all external >>>>>>>> transforms in each SDK are expected to be the same.) >>>>>>>> >>>>>>>> Environments logically belong with transforms, and that’s how it >>>>>>>> works in the Runner API [9]. The problem now is that from the user’s >>>>>>>> perspective, the environment is bound to the expansion service. After >>>>>>>> addressing BEAM-9449, the problem will be that one or two environments >>>>>>>> at >>>>>>>> most are bound to the pipeline. Ideally, though, users should have >>>>>>>> fully >>>>>>>> granular control over environments at the transform level. >>>>>>>> >>>>>>>> All this context for a very simple proposal: we should have all >>>>>>>> ExternalTransform subclasses take optional environment_type and >>>>>>>> environment_options fields in their constructors. As with their >>>>>>>> corresponding pipeline options, these options would default to DOCKER >>>>>>>> and >>>>>>>> none, respectively. Then we could overwrite the environment_type and >>>>>>>> environment_options in the pipeline options passed to the expansion >>>>>>>> service >>>>>>>> with these values. (Alternatively, we could pass environment_type and >>>>>>>> environment_options to the expansion service individually to avoid >>>>>>>> having >>>>>>>> to overwrite their original values, but their original values should be >>>>>>>> irrelevant to the expansion service anyway.) >>>>>>>> >>>>>>>> What do you think? >>>>>>>> >>>>>>> >>>>>>> So, an external transform is uniquely identified by its URN. An >>>>>>> external transform identified by a URN may refer to an arbitrary >>>>>>> composite >>>>>>> which may have sub-transforms that refer to different environments. I >>>>>>> think >>>>>>> with the above proposal we'll lose this flexibility. >>>>>>> What we need is a way to override environments (or properties of >>>>>>> environments) that results in the final pipeline proto. Once we modify >>>>>>> such >>>>>>> environments in the proto it will be reflected to all transforms that >>>>>>> utilize such environments. >>>>>>> >>>>>>> 'sdk_harness_container_image_overrides' is such a property (which >>>>>>> unfortunately only works for Dataflow today). Also this only works for >>>>>>> Docker URLs. Maybe we can extend this property to all runners or >>>>>>> introduce >>>>>>> a new property that works for all types of environments ? >>>>>>> >>>>>>> Thanks, >>>>>>> Cham >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-9449 >>>>>>>> >>>>>>>> [2] >>>>>>>> https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L1097-L1115 >>>>>>>> >>>>>>>> [3] >>>>>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/go/pkg/beam/options/jobopts/options.go#L41-L53 >>>>>>>> >>>>>>>> [4] >>>>>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L53-L71 >>>>>>>> >>>>>>>> [5] https://issues.apache.org/jira/browse/BEAM-5440 >>>>>>>> >>>>>>>> [6] https://issues.apache.org/jira/browse/BEAM-10671 >>>>>>>> >>>>>>>> [7] https://github.com/apache/beam/pull/11638 >>>>>>>> >>>>>>>> [8] >>>>>>>> https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L840-L850 >>>>>>>> >>>>>>>> [9] >>>>>>>> https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/model/pipeline/src/main/proto/beam_runner_api.proto#L194 >>>>>>>> >>>>>>>>
