This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 3f8af4b61b2 Resolve Any environment types before optimization. (#30568) 3f8af4b61b2 is described below commit 3f8af4b61b29f6ea2734f8f990041d984a8ac4d2 Author: Robert Bradshaw <rober...@gmail.com> AuthorDate: Fri Mar 8 08:02:35 2024 -0800 Resolve Any environment types before optimization. (#30568) This allows us to correctly inspect environment capabilities during optimization, etc. --- .../runners/portability/fn_api_runner/fn_runner.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py index a736288dc62..b3dd124216b 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py @@ -56,6 +56,7 @@ from apache_beam.metrics.monitoring_infos import consolidate as consolidate_moni from apache_beam.options import pipeline_options from apache_beam.options.value_provider import RuntimeValueProvider from apache_beam.portability import common_urns +from apache_beam.portability import python_urns from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_provision_api_pb2 from apache_beam.portability.api import beam_runner_api_pb2 @@ -220,6 +221,7 @@ class FnApiRunner(runner.PipelineRunner): ] if direct_options.direct_embed_docker_python: pipeline_proto = self.embed_default_docker_image(pipeline_proto) + pipeline_proto = self.resolve_any_environments(pipeline_proto) stage_context, stages = self.create_stages(pipeline_proto) return self.run_stages(stage_context, stages) @@ -235,6 +237,9 @@ class FnApiRunner(runner.PipelineRunner): environments.DockerEnvironment.default_docker_image()).to_runner_api( None) # type: ignore[arg-type] + # We'd rather deal with the complexity of any environments here rather + # than resolve them first so we can get optimal substitution in case + # docker is not high in the preferred environment type list. def is_this_python_docker_env(env): return any( e == docker_env for e in environments.expand_anyof_environments(env)) @@ -264,6 +269,16 @@ class FnApiRunner(runner.PipelineRunner): transform.environment_id = embedded_env_id return pipeline_proto + def resolve_any_environments(self, pipeline_proto): + for env_id, env in pipeline_proto.components.environments.items(): + pipeline_proto.components.environments[env_id].CopyFrom( + environments.resolve_anyof_environment( + env, + python_urns.EMBEDDED_PYTHON, + common_urns.environments.EXTERNAL.urn, + common_urns.environments.DOCKER.urn)) + return pipeline_proto + @contextlib.contextmanager def maybe_profile(self): # type: () -> Iterator[None]