Hi Beamers,

Recently we’ve had some requests on user@ and Slack for instructions on how
to use custom-built containers in cross-language pipelines (typically
calling Java transforms from a predominantly Python pipeline). Currently,
it seems like there is no way to change the container used by a
cross-language transform except by modifying and rebuilding the expansion
service. The SDK does not pass pipeline options to the expansion service
(BEAM-9449 [1]). Fixing BEAM-9449 does not solve everything, however. Even
if pipeline options are passed, the existing set of pipeline options still
limits the amount of control we have over environments. Here are the
existing pipeline options that I’m aware of:

Python [2] and Go [3] have these:

   -

   environment_type (DOCKER, PROCESS, LOOPBACK)
   -

   environment_config (This one is confusingly overloaded. It’s a string
   that means different things depending on environment_type. For DOCKER, it
   is the Docker image URL. For PROCESS it is a JSON blob. For EXTERNAL, it is
   the external service address.)


Whereas Java [4] has defaultEnvironmentType and defaultEnvironmentConfig,
which are named differently but otherwise act the same as the above.

I was unsatisfied with environment_config for a number of reasons. First,
having a single overloaded option that can mean entirely different things
depending on context is poor design. Second, in PROCESS mode, requiring the
user to type in a JSON blob for environment_config is not especially
human-friendly (though it has also been argued that JSON makes complex
arguments like this easier to parse). Finally, we must overload this string
further to introduce new environment-specific options, such as a mounted
Docker volume (BEAM-5440 [5]).

To address these problems, I added a new option called
“environment_options” (BEAM-10671 [6]). (This option has been implemented
in the Python SDK, but not the other SDKs yet.) Environment_options,
similar to the “experiments” option, takes a list of strings, for example
“--environment_option=docker_container_image=my_beam_sdk:latest”. It could
be argued we should have made “docker_container_image” etc. top-level
options instead, but this “catch-all” design makes what I am about to
propose a lot easier.

The solution proposed in PR #11638 [7] set a flag to include unrecognized
pipeline options during serialization, since otherwise unrecognized options
are dropped. In a Python pipeline, this will allow us to set
environment_config and default_environment_config to separate values, for
Python and Java containers, respectively. However, this still limits us to
one container image for all Python and Go transforms, and one container
image for all Java transforms. As more cross-language transforms are
implemented, sooner or later someone will want to have different Java SDK
containers for different external transforms.

(I should also mention the sdk_harness_container_image_overrides pipeline
option [8], which is currently only supported by the Dataflow runner. It
lets us basically perform a find/replace on container image strings. This
is not significantly more flexible than having a single option per SDK,
since the default container images for all external transforms in each SDK
are expected to be the same.)

Environments logically belong with transforms, and that’s how it works in
the Runner API [9]. The problem now is that from the user’s perspective,
the environment is bound to the expansion service. After addressing
BEAM-9449, the problem will be that one or two environments at most are
bound to the pipeline. Ideally, though, users should have fully granular
control over environments at the transform level.

All this context for a very simple proposal: we should have all
ExternalTransform subclasses take optional environment_type and
environment_options fields in their constructors. As with their
corresponding pipeline options, these options would default to DOCKER and
none, respectively. Then we could overwrite the environment_type and
environment_options in the pipeline options passed to the expansion service
with these values. (Alternatively, we could pass environment_type and
environment_options to the expansion service individually to avoid having
to overwrite their original values, but their original values should be
irrelevant to the expansion service anyway.)

What do you think?

[1] https://issues.apache.org/jira/browse/BEAM-9449

[2]
https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L1097-L1115

[3]
https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/go/pkg/beam/options/jobopts/options.go#L41-L53

[4]
https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L53-L71

[5] https://issues.apache.org/jira/browse/BEAM-5440

[6] https://issues.apache.org/jira/browse/BEAM-10671

[7] https://github.com/apache/beam/pull/11638

[8]
https://github.com/apache/beam/blob/f2c9b6e1aa5d38385f4c168107c85d4fe7f0f259/sdks/python/apache_beam/options/pipeline_options.py#L840-L850

[9]
https://github.com/apache/beam/blob/b56b61a9a6401271f14746000ecc38b17aab753d/model/pipeline/src/main/proto/beam_runner_api.proto#L194

Reply via email to