This is an automated email from the ASF dual-hosted git repository. tvalentyn pushed a commit to branch revert-17338-dataflow-portable-runner in repository https://gitbox.apache.org/repos/asf/beam.git
commit d09b93122467ce12780ddddfd3596fe740f38fba Author: tvalentyn <tvalen...@users.noreply.github.com> AuthorDate: Wed Apr 20 18:44:44 2022 +0200 Revert "Improvements to dataflow job service for non-Python jobs. (#17338)" This reverts commit bf910a1f30c11ec00ecb4241dfc6757705fc7d94. --- .../runners/dataflow/dataflow_runner.py | 36 ++++++++++------------ .../runners/dataflow/internal/apiclient.py | 10 ++---- .../portability/fn_api_runner/translations.py | 7 ++--- .../runners/portability/local_job_service.py | 13 +++----- 4 files changed, 24 insertions(+), 42 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index b395508fd12..edb319dbfb0 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -457,27 +457,26 @@ class DataflowRunner(PipelineRunner): if use_fnapi and not apiclient._use_unified_worker(options): pipeline.replace_all(DataflowRunner._JRH_PTRANSFORM_OVERRIDES) + from apache_beam.transforms import environments + if options.view_as(SetupOptions).prebuild_sdk_container_engine: + # if prebuild_sdk_container_engine is specified we will build a new sdk + # container image with dependencies pre-installed and use that image, + # instead of using the inferred default container image. + self._default_environment = ( + environments.DockerEnvironment.from_options(options)) + options.view_as(WorkerOptions).sdk_container_image = ( + self._default_environment.container_image) + else: + self._default_environment = ( + environments.DockerEnvironment.from_container_image( + apiclient.get_container_image_from_options(options), + artifacts=environments.python_sdk_dependencies(options), + resource_hints=environments.resource_hints_from_options(options))) + if pipeline_proto: self.proto_pipeline = pipeline_proto else: - from apache_beam.transforms import environments - if options.view_as(SetupOptions).prebuild_sdk_container_engine: - # if prebuild_sdk_container_engine is specified we will build a new sdk - # container image with dependencies pre-installed and use that image, - # instead of using the inferred default container image. - self._default_environment = ( - environments.DockerEnvironment.from_options(options)) - options.view_as(WorkerOptions).sdk_container_image = ( - self._default_environment.container_image) - else: - self._default_environment = ( - environments.DockerEnvironment.from_container_image( - apiclient.get_container_image_from_options(options), - artifacts=environments.python_sdk_dependencies(options), - resource_hints=environments.resource_hints_from_options( - options))) - # This has to be performed before pipeline proto is constructed to make # sure that the changes are reflected in the portable job submission path. self._adjust_pipeline_for_dataflow_v2(pipeline) @@ -1650,8 +1649,6 @@ class DataflowPipelineResult(PipelineResult): assert duration or terminated, ( 'Job did not reach to a terminal state after waiting indefinitely.') - # TODO(BEAM-14291): Also run this check if wait_until_finish was called - # after the pipeline completed. if terminated and self.state != PipelineState.DONE: # TODO(BEAM-1290): Consider converting this to an error log based on # theresolution of the issue. @@ -1659,7 +1656,6 @@ class DataflowPipelineResult(PipelineResult): 'Dataflow pipeline failed. State: %s, Error:\n%s' % (self.state, getattr(self._runner, 'last_error_msg', None)), self) - return self.state def cancel(self): diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 21e3335c077..cec614bbd9b 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -279,6 +279,8 @@ class Environment(object): pool.network = self.worker_options.network if self.worker_options.subnetwork: pool.subnetwork = self.worker_options.subnetwork + pool.workerHarnessContainerImage = ( + get_container_image_from_options(options)) # Setting worker pool sdk_harness_container_images option for supported # Dataflow workers. @@ -305,14 +307,6 @@ class Environment(object): container_image.capabilities.append(capability) pool.sdkHarnessContainerImages.append(container_image) - if not _use_fnapi(options) or not pool.sdkHarnessContainerImages: - pool.workerHarnessContainerImage = ( - get_container_image_from_options(options)) - elif len(pool.sdkHarnessContainerImages) == 1: - # Dataflow expects a value here when there is only one environment. - pool.workerHarnessContainerImage = ( - pool.sdkHarnessContainerImages[0].containerImage) - if self.debug_options.number_of_worker_harness_threads: pool.numThreadsPerWorker = ( self.debug_options.number_of_worker_harness_threads) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py index 5ed965fb751..5e2f5568fde 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py @@ -21,7 +21,6 @@ # mypy: check-untyped-defs import collections -import copy import functools import itertools import logging @@ -287,9 +286,7 @@ class Stage(object): # type: (...) -> beam_runner_api_pb2.PTransform if (len(self.transforms) == 1 and self.transforms[0].spec.urn in known_runner_urns): - result = copy.copy(self.transforms[0]) - del result.subtransforms[:] - return result + return self.transforms[0] else: all_inputs = set( @@ -707,7 +704,7 @@ def create_and_optimize_stages( leaf_transform_stages( pipeline_proto.root_transform_ids, pipeline_proto.components, - known_composites=union(known_runner_urns, KNOWN_COMPOSITES))) + union(known_runner_urns, KNOWN_COMPOSITES))) # Apply each phase in order. for phase in phases: diff --git a/sdks/python/apache_beam/runners/portability/local_job_service.py b/sdks/python/apache_beam/runners/portability/local_job_service.py index 53fd7f03fdb..f8bcdaac09a 100644 --- a/sdks/python/apache_beam/runners/portability/local_job_service.py +++ b/sdks/python/apache_beam/runners/portability/local_job_service.py @@ -44,7 +44,6 @@ from apache_beam.portability.api import beam_provision_api_pb2 from apache_beam.portability.api import endpoints_pb2 from apache_beam.runners.portability import abstract_job_service from apache_beam.runners.portability import artifact_service -from apache_beam.runners.portability import portable_runner from apache_beam.runners.portability.fn_api_runner import fn_runner from apache_beam.runners.portability.fn_api_runner import worker_handlers from apache_beam.utils import thread_pool_executor @@ -272,15 +271,11 @@ class BeamJob(abstract_job_service.AbstractBeamJob): self._update_dependencies() try: start = time.time() - self.result = self._invoke_runner() - self.result.wait_until_finish() + result = self._invoke_runner() _LOGGER.info( - 'Completed job in %s seconds with state %s.', - time.time() - start, - self.result.state) - self.set_state( - portable_runner.PipelineResult.pipeline_state_to_runner_api_state( - self.result.state)) + 'Successfully completed job in %s seconds.', time.time() - start) + self.set_state(beam_job_api_pb2.JobState.DONE) + self.result = result except: # pylint: disable=bare-except self._log_queues.put( beam_job_api_pb2.JobMessage(