This is an automated email from the ASF dual-hosted git repository. chamikara pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 5ea504d [BEAM-11715] Partial revert of "Combiner packing in Dataflow" (#13763) (#13884) 5ea504d is described below commit 5ea504de2eb187dca733f6087aea780dc781040d Author: Yifan Mai <yifan...@google.com> AuthorDate: Thu Feb 4 20:30:17 2021 -0800 [BEAM-11715] Partial revert of "Combiner packing in Dataflow" (#13763) (#13884) * Revert "[BEAM-11695] Combiner packing in Dataflow (#13763)" This reverts commit 3b51aaac556bcdc89b661793b55c4aca9a803e51. * Make pack_combiners optional * Don't revert translations.py * Add missing ValidatesRunner --- .../runners/dataflow/dataflow_runner.py | 103 ++++++++++----------- .../runners/dataflow/dataflow_runner_test.py | 1 + .../runners/dataflow/ptransform_overrides.py | 27 ------ .../portability/fn_api_runner/translations_test.py | 1 + 4 files changed, 49 insertions(+), 83 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 8fa69fb..6590366 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -462,62 +462,6 @@ class DataflowRunner(PipelineRunner): self._maybe_add_unified_worker_missing_options(options) - from apache_beam.transforms import environments - if options.view_as(SetupOptions).prebuild_sdk_container_engine: - # if prebuild_sdk_container_engine is specified we will build a new sdk - # container image with dependencies pre-installed and use that image, - # instead of using the inferred default container image. - self._default_environment = ( - environments.DockerEnvironment.from_options(options)) - options.view_as(WorkerOptions).worker_harness_container_image = ( - self._default_environment.container_image) - else: - self._default_environment = ( - environments.DockerEnvironment.from_container_image( - apiclient.get_container_image_from_options(options), - artifacts=environments.python_sdk_dependencies(options))) - - # Optimize the pipeline if it not streaming and optimizations are enabled - # in options. - pre_optimize = options.view_as(DebugOptions).lookup_experiment( - 'pre_optimize', 'default').lower() - if (not options.view_as(StandardOptions).streaming and - pre_optimize != 'none' and pre_optimize != 'default'): - from apache_beam.runners.portability.fn_api_runner import translations - if pre_optimize == 'all': - phases = [ - translations.eliminate_common_key_with_none, - translations.pack_combiners, - translations.sort_stages - ] - else: - phases = [] - for phase_name in pre_optimize.split(','): - # For now, these are all we allow. - if phase_name in ('eliminate_common_key_with_none', 'pack_combiners'): - phases.append(getattr(translations, phase_name)) - else: - raise ValueError( - 'Unknown or inapplicable phase for pre_optimize: %s' % - phase_name) - phases.append(translations.sort_stages) - - proto_pipeline_to_optimize = pipeline.to_runner_api( - default_environment=self._default_environment) - optimized_proto_pipeline = translations.optimize_pipeline( - proto_pipeline_to_optimize, - phases=phases, - known_runner_urns=frozenset(), - partial=True) - pipeline = beam.Pipeline.from_runner_api( - optimized_proto_pipeline, self, options) - # The translations.pack_combiners optimizer phase produces a CombinePerKey - # PTransform, but DataflowRunner treats CombinePerKey as a composite, so - # this override expands CombinePerKey into primitive PTransforms. - if translations.pack_combiners in phases: - from apache_beam.runners.dataflow.ptransform_overrides import CombinePerKeyPTransformOverride - pipeline.replace_all([CombinePerKeyPTransformOverride()]) - use_fnapi = apiclient._use_fnapi(options) if not use_fnapi: @@ -544,6 +488,21 @@ class DataflowRunner(PipelineRunner): if use_fnapi and not apiclient._use_unified_worker(options): pipeline.replace_all(DataflowRunner._JRH_PTRANSFORM_OVERRIDES) + from apache_beam.transforms import environments + if options.view_as(SetupOptions).prebuild_sdk_container_engine: + # if prebuild_sdk_container_engine is specified we will build a new sdk + # container image with dependencies pre-installed and use that image, + # instead of using the inferred default container image. + self._default_environment = ( + environments.DockerEnvironment.from_options(options)) + options.view_as(WorkerOptions).worker_harness_container_image = ( + self._default_environment.container_image) + else: + self._default_environment = ( + environments.DockerEnvironment.from_container_image( + apiclient.get_container_image_from_options(options), + artifacts=environments.python_sdk_dependencies(options))) + # This has to be performed before pipeline proto is constructed to make sure # that the changes are reflected in the portable job submission path. self._adjust_pipeline_for_dataflow_v2(pipeline) @@ -552,6 +511,38 @@ class DataflowRunner(PipelineRunner): self.proto_pipeline, self.proto_context = pipeline.to_runner_api( return_context=True, default_environment=self._default_environment) + # Optimize the pipeline if it not streaming and the pre_optimize + # experiment is set. + pre_optimize = options.view_as(DebugOptions).lookup_experiment( + 'pre_optimize', 'default').lower() + from apache_beam.runners.portability.fn_api_runner import translations + if (options.view_as(StandardOptions).streaming or pre_optimize == 'none' or + pre_optimize == 'default'): + phases = [] + elif pre_optimize == 'all': + phases = [ + translations.eliminate_common_key_with_none, + # TODO(BEAM-11694): Enable translations.pack_combiners + # translations.pack_combiners, + translations.sort_stages + ] + else: + phases = [] + for phase_name in pre_optimize.split(','): + # For now, these are all we allow. + if phase_name in ('eliminate_common_key_with_none', 'pack_combiners'): + phases.append(getattr(translations, phase_name)) + else: + raise ValueError( + 'Unknown or inapplicable phase for pre_optimize: %s' % phase_name) + phases.append(translations.sort_stages) + + self.proto_pipeline = translations.optimize_pipeline( + self.proto_pipeline, + phases=phases, + known_runner_urns=frozenset(), + partial=True) + if use_fnapi: self._check_for_unsupported_fnapi_features(self.proto_pipeline) else: diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index d921980..9074d97 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -877,6 +877,7 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin): self._test_pack_combiners( PipelineOptions(self.default_properties), expect_packed=False) + @unittest.skip("BEAM-11694") def test_pack_combiners_enabled_by_experiment(self): self.default_properties.append('--experiment=pre_optimize=all') self._test_pack_combiners( diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py index 14300de..402a4ed 100644 --- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py +++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py @@ -125,33 +125,6 @@ class JrhReadPTransformOverride(PTransformOverride): 'Read')) -class CombinePerKeyPTransformOverride(PTransformOverride): - """A ``PTransformOverride`` for ``CombinePerKey``. - - The translations.pack_combiners optimizer phase produces a CombinePerKey - PTransform, but DataflowRunner treats CombinePerKey as a composite, so - this override expands CombinePerKey into primitive PTransforms. - """ - def matches(self, applied_ptransform): - # Imported here to avoid circular dependencies. - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam import CombinePerKey - - if isinstance(applied_ptransform.transform, CombinePerKey): - self.transform = applied_ptransform.transform - return True - return False - - def get_replacement_transform(self, ptransform): - from apache_beam.transforms import ptransform_fn - - @ptransform_fn - def ExpandCombinePerKey(pcoll): - return pcoll | ptransform - - return ExpandCombinePerKey() - - class CombineValuesPTransformOverride(PTransformOverride): """A ``PTransformOverride`` for ``CombineValues``. diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py index d57307f..7ca5e3f 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py @@ -259,6 +259,7 @@ class TranslationsTest(unittest.TestCase): | Create([('a', x) for x in vals]) | 'multiple-combines' >> MultipleCombines()) + @attr('ValidatesRunner') def test_run_packable_combine_globally(self): class MultipleCombines(beam.PTransform): def expand(self, pcoll):