This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch revert-17338-dataflow-portable-runner
in repository https://gitbox.apache.org/repos/asf/beam.git

commit d09b93122467ce12780ddddfd3596fe740f38fba
Author: tvalentyn <tvalen...@users.noreply.github.com>
AuthorDate: Wed Apr 20 18:44:44 2022 +0200

    Revert "Improvements to dataflow job service for non-Python jobs. (#17338)"
    
    This reverts commit bf910a1f30c11ec00ecb4241dfc6757705fc7d94.
---
 .../runners/dataflow/dataflow_runner.py            | 36 ++++++++++------------
 .../runners/dataflow/internal/apiclient.py         | 10 ++----
 .../portability/fn_api_runner/translations.py      |  7 ++---
 .../runners/portability/local_job_service.py       | 13 +++-----
 4 files changed, 24 insertions(+), 42 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index b395508fd12..edb319dbfb0 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -457,27 +457,26 @@ class DataflowRunner(PipelineRunner):
       if use_fnapi and not apiclient._use_unified_worker(options):
         pipeline.replace_all(DataflowRunner._JRH_PTRANSFORM_OVERRIDES)
 
+    from apache_beam.transforms import environments
+    if options.view_as(SetupOptions).prebuild_sdk_container_engine:
+      # if prebuild_sdk_container_engine is specified we will build a new sdk
+      # container image with dependencies pre-installed and use that image,
+      # instead of using the inferred default container image.
+      self._default_environment = (
+          environments.DockerEnvironment.from_options(options))
+      options.view_as(WorkerOptions).sdk_container_image = (
+          self._default_environment.container_image)
+    else:
+      self._default_environment = (
+          environments.DockerEnvironment.from_container_image(
+              apiclient.get_container_image_from_options(options),
+              artifacts=environments.python_sdk_dependencies(options),
+              
resource_hints=environments.resource_hints_from_options(options)))
+
     if pipeline_proto:
       self.proto_pipeline = pipeline_proto
 
     else:
-      from apache_beam.transforms import environments
-      if options.view_as(SetupOptions).prebuild_sdk_container_engine:
-        # if prebuild_sdk_container_engine is specified we will build a new sdk
-        # container image with dependencies pre-installed and use that image,
-        # instead of using the inferred default container image.
-        self._default_environment = (
-            environments.DockerEnvironment.from_options(options))
-        options.view_as(WorkerOptions).sdk_container_image = (
-            self._default_environment.container_image)
-      else:
-        self._default_environment = (
-            environments.DockerEnvironment.from_container_image(
-                apiclient.get_container_image_from_options(options),
-                artifacts=environments.python_sdk_dependencies(options),
-                resource_hints=environments.resource_hints_from_options(
-                    options)))
-
       # This has to be performed before pipeline proto is constructed to make
       # sure that the changes are reflected in the portable job submission 
path.
       self._adjust_pipeline_for_dataflow_v2(pipeline)
@@ -1650,8 +1649,6 @@ class DataflowPipelineResult(PipelineResult):
       assert duration or terminated, (
           'Job did not reach to a terminal state after waiting indefinitely.')
 
-      # TODO(BEAM-14291): Also run this check if wait_until_finish was called
-      # after the pipeline completed.
       if terminated and self.state != PipelineState.DONE:
         # TODO(BEAM-1290): Consider converting this to an error log based on
         # theresolution of the issue.
@@ -1659,7 +1656,6 @@ class DataflowPipelineResult(PipelineResult):
             'Dataflow pipeline failed. State: %s, Error:\n%s' %
             (self.state, getattr(self._runner, 'last_error_msg', None)),
             self)
-
     return self.state
 
   def cancel(self):
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 21e3335c077..cec614bbd9b 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -279,6 +279,8 @@ class Environment(object):
       pool.network = self.worker_options.network
     if self.worker_options.subnetwork:
       pool.subnetwork = self.worker_options.subnetwork
+    pool.workerHarnessContainerImage = (
+        get_container_image_from_options(options))
 
     # Setting worker pool sdk_harness_container_images option for supported
     # Dataflow workers.
@@ -305,14 +307,6 @@ class Environment(object):
         container_image.capabilities.append(capability)
       pool.sdkHarnessContainerImages.append(container_image)
 
-    if not _use_fnapi(options) or not pool.sdkHarnessContainerImages:
-      pool.workerHarnessContainerImage = (
-          get_container_image_from_options(options))
-    elif len(pool.sdkHarnessContainerImages) == 1:
-      # Dataflow expects a value here when there is only one environment.
-      pool.workerHarnessContainerImage = (
-          pool.sdkHarnessContainerImages[0].containerImage)
-
     if self.debug_options.number_of_worker_harness_threads:
       pool.numThreadsPerWorker = (
           self.debug_options.number_of_worker_harness_threads)
diff --git 
a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
index 5ed965fb751..5e2f5568fde 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
@@ -21,7 +21,6 @@
 # mypy: check-untyped-defs
 
 import collections
-import copy
 import functools
 import itertools
 import logging
@@ -287,9 +286,7 @@ class Stage(object):
     # type: (...) -> beam_runner_api_pb2.PTransform
     if (len(self.transforms) == 1 and
         self.transforms[0].spec.urn in known_runner_urns):
-      result = copy.copy(self.transforms[0])
-      del result.subtransforms[:]
-      return result
+      return self.transforms[0]
 
     else:
       all_inputs = set(
@@ -707,7 +704,7 @@ def create_and_optimize_stages(
       leaf_transform_stages(
           pipeline_proto.root_transform_ids,
           pipeline_proto.components,
-          known_composites=union(known_runner_urns, KNOWN_COMPOSITES)))
+          union(known_runner_urns, KNOWN_COMPOSITES)))
 
   # Apply each phase in order.
   for phase in phases:
diff --git a/sdks/python/apache_beam/runners/portability/local_job_service.py 
b/sdks/python/apache_beam/runners/portability/local_job_service.py
index 53fd7f03fdb..f8bcdaac09a 100644
--- a/sdks/python/apache_beam/runners/portability/local_job_service.py
+++ b/sdks/python/apache_beam/runners/portability/local_job_service.py
@@ -44,7 +44,6 @@ from apache_beam.portability.api import beam_provision_api_pb2
 from apache_beam.portability.api import endpoints_pb2
 from apache_beam.runners.portability import abstract_job_service
 from apache_beam.runners.portability import artifact_service
-from apache_beam.runners.portability import portable_runner
 from apache_beam.runners.portability.fn_api_runner import fn_runner
 from apache_beam.runners.portability.fn_api_runner import worker_handlers
 from apache_beam.utils import thread_pool_executor
@@ -272,15 +271,11 @@ class BeamJob(abstract_job_service.AbstractBeamJob):
       self._update_dependencies()
       try:
         start = time.time()
-        self.result = self._invoke_runner()
-        self.result.wait_until_finish()
+        result = self._invoke_runner()
         _LOGGER.info(
-            'Completed job in %s seconds with state %s.',
-            time.time() - start,
-            self.result.state)
-        self.set_state(
-            portable_runner.PipelineResult.pipeline_state_to_runner_api_state(
-                self.result.state))
+            'Successfully completed job in %s seconds.', time.time() - start)
+        self.set_state(beam_job_api_pb2.JobState.DONE)
+        self.result = result
       except:  # pylint: disable=bare-except
         self._log_queues.put(
             beam_job_api_pb2.JobMessage(

Reply via email to