This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f8af4b61b2 Resolve Any environment types before optimization. (#30568)
3f8af4b61b2 is described below

commit 3f8af4b61b29f6ea2734f8f990041d984a8ac4d2
Author: Robert Bradshaw <rober...@gmail.com>
AuthorDate: Fri Mar 8 08:02:35 2024 -0800

    Resolve Any environment types before optimization. (#30568)
    
    This allows us to correctly inspect environment capabilities during 
optimization, etc.
---
 .../runners/portability/fn_api_runner/fn_runner.py        | 15 +++++++++++++++
 1 file changed, 15 insertions(+)

diff --git 
a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
index a736288dc62..b3dd124216b 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
@@ -56,6 +56,7 @@ from apache_beam.metrics.monitoring_infos import consolidate 
as consolidate_moni
 from apache_beam.options import pipeline_options
 from apache_beam.options.value_provider import RuntimeValueProvider
 from apache_beam.portability import common_urns
+from apache_beam.portability import python_urns
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_provision_api_pb2
 from apache_beam.portability.api import beam_runner_api_pb2
@@ -220,6 +221,7 @@ class FnApiRunner(runner.PipelineRunner):
     ]
     if direct_options.direct_embed_docker_python:
       pipeline_proto = self.embed_default_docker_image(pipeline_proto)
+    pipeline_proto = self.resolve_any_environments(pipeline_proto)
     stage_context, stages = self.create_stages(pipeline_proto)
     return self.run_stages(stage_context, stages)
 
@@ -235,6 +237,9 @@ class FnApiRunner(runner.PipelineRunner):
         environments.DockerEnvironment.default_docker_image()).to_runner_api(
             None)  # type: ignore[arg-type]
 
+    # We'd rather deal with the complexity of any environments here rather
+    # than resolve them first so we can get optimal substitution in case
+    # docker is not high in the preferred environment type list.
     def is_this_python_docker_env(env):
       return any(
           e == docker_env for e in environments.expand_anyof_environments(env))
@@ -264,6 +269,16 @@ class FnApiRunner(runner.PipelineRunner):
         transform.environment_id = embedded_env_id
     return pipeline_proto
 
+  def resolve_any_environments(self, pipeline_proto):
+    for env_id, env in pipeline_proto.components.environments.items():
+      pipeline_proto.components.environments[env_id].CopyFrom(
+          environments.resolve_anyof_environment(
+              env,
+              python_urns.EMBEDDED_PYTHON,
+              common_urns.environments.EXTERNAL.urn,
+              common_urns.environments.DOCKER.urn))
+    return pipeline_proto
+
   @contextlib.contextmanager
   def maybe_profile(self):
     # type: () -> Iterator[None]

Reply via email to