This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ea504d  [BEAM-11715] Partial revert of "Combiner packing in Dataflow" 
(#13763) (#13884)
5ea504d is described below

commit 5ea504de2eb187dca733f6087aea780dc781040d
Author: Yifan Mai <yifan...@google.com>
AuthorDate: Thu Feb 4 20:30:17 2021 -0800

    [BEAM-11715] Partial revert of "Combiner packing in Dataflow" (#13763) 
(#13884)
    
    * Revert "[BEAM-11695] Combiner packing in Dataflow (#13763)"
    
    This reverts commit 3b51aaac556bcdc89b661793b55c4aca9a803e51.
    
    * Make pack_combiners optional
    
    * Don't revert translations.py
    
    * Add missing ValidatesRunner
---
 .../runners/dataflow/dataflow_runner.py            | 103 ++++++++++-----------
 .../runners/dataflow/dataflow_runner_test.py       |   1 +
 .../runners/dataflow/ptransform_overrides.py       |  27 ------
 .../portability/fn_api_runner/translations_test.py |   1 +
 4 files changed, 49 insertions(+), 83 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 8fa69fb..6590366 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -462,62 +462,6 @@ class DataflowRunner(PipelineRunner):
 
     self._maybe_add_unified_worker_missing_options(options)
 
-    from apache_beam.transforms import environments
-    if options.view_as(SetupOptions).prebuild_sdk_container_engine:
-      # if prebuild_sdk_container_engine is specified we will build a new sdk
-      # container image with dependencies pre-installed and use that image,
-      # instead of using the inferred default container image.
-      self._default_environment = (
-          environments.DockerEnvironment.from_options(options))
-      options.view_as(WorkerOptions).worker_harness_container_image = (
-          self._default_environment.container_image)
-    else:
-      self._default_environment = (
-          environments.DockerEnvironment.from_container_image(
-              apiclient.get_container_image_from_options(options),
-              artifacts=environments.python_sdk_dependencies(options)))
-
-    # Optimize the pipeline if it not streaming and optimizations are enabled
-    # in options.
-    pre_optimize = options.view_as(DebugOptions).lookup_experiment(
-        'pre_optimize', 'default').lower()
-    if (not options.view_as(StandardOptions).streaming and
-        pre_optimize != 'none' and pre_optimize != 'default'):
-      from apache_beam.runners.portability.fn_api_runner import translations
-      if pre_optimize == 'all':
-        phases = [
-            translations.eliminate_common_key_with_none,
-            translations.pack_combiners,
-            translations.sort_stages
-        ]
-      else:
-        phases = []
-        for phase_name in pre_optimize.split(','):
-          # For now, these are all we allow.
-          if phase_name in ('eliminate_common_key_with_none', 
'pack_combiners'):
-            phases.append(getattr(translations, phase_name))
-          else:
-            raise ValueError(
-                'Unknown or inapplicable phase for pre_optimize: %s' %
-                phase_name)
-        phases.append(translations.sort_stages)
-
-      proto_pipeline_to_optimize = pipeline.to_runner_api(
-          default_environment=self._default_environment)
-      optimized_proto_pipeline = translations.optimize_pipeline(
-          proto_pipeline_to_optimize,
-          phases=phases,
-          known_runner_urns=frozenset(),
-          partial=True)
-      pipeline = beam.Pipeline.from_runner_api(
-          optimized_proto_pipeline, self, options)
-      # The translations.pack_combiners optimizer phase produces a 
CombinePerKey
-      # PTransform, but DataflowRunner treats CombinePerKey as a composite, so
-      # this override expands CombinePerKey into primitive PTransforms.
-      if translations.pack_combiners in phases:
-        from apache_beam.runners.dataflow.ptransform_overrides import 
CombinePerKeyPTransformOverride
-        pipeline.replace_all([CombinePerKeyPTransformOverride()])
-
     use_fnapi = apiclient._use_fnapi(options)
 
     if not use_fnapi:
@@ -544,6 +488,21 @@ class DataflowRunner(PipelineRunner):
     if use_fnapi and not apiclient._use_unified_worker(options):
       pipeline.replace_all(DataflowRunner._JRH_PTRANSFORM_OVERRIDES)
 
+    from apache_beam.transforms import environments
+    if options.view_as(SetupOptions).prebuild_sdk_container_engine:
+      # if prebuild_sdk_container_engine is specified we will build a new sdk
+      # container image with dependencies pre-installed and use that image,
+      # instead of using the inferred default container image.
+      self._default_environment = (
+          environments.DockerEnvironment.from_options(options))
+      options.view_as(WorkerOptions).worker_harness_container_image = (
+          self._default_environment.container_image)
+    else:
+      self._default_environment = (
+          environments.DockerEnvironment.from_container_image(
+              apiclient.get_container_image_from_options(options),
+              artifacts=environments.python_sdk_dependencies(options)))
+
     # This has to be performed before pipeline proto is constructed to make 
sure
     # that the changes are reflected in the portable job submission path.
     self._adjust_pipeline_for_dataflow_v2(pipeline)
@@ -552,6 +511,38 @@ class DataflowRunner(PipelineRunner):
     self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
         return_context=True, default_environment=self._default_environment)
 
+    # Optimize the pipeline if it not streaming and the pre_optimize
+    # experiment is set.
+    pre_optimize = options.view_as(DebugOptions).lookup_experiment(
+        'pre_optimize', 'default').lower()
+    from apache_beam.runners.portability.fn_api_runner import translations
+    if (options.view_as(StandardOptions).streaming or pre_optimize == 'none' or
+        pre_optimize == 'default'):
+      phases = []
+    elif pre_optimize == 'all':
+      phases = [
+          translations.eliminate_common_key_with_none,
+          # TODO(BEAM-11694): Enable translations.pack_combiners
+          # translations.pack_combiners,
+          translations.sort_stages
+      ]
+    else:
+      phases = []
+      for phase_name in pre_optimize.split(','):
+        # For now, these are all we allow.
+        if phase_name in ('eliminate_common_key_with_none', 'pack_combiners'):
+          phases.append(getattr(translations, phase_name))
+        else:
+          raise ValueError(
+              'Unknown or inapplicable phase for pre_optimize: %s' % 
phase_name)
+      phases.append(translations.sort_stages)
+
+    self.proto_pipeline = translations.optimize_pipeline(
+        self.proto_pipeline,
+        phases=phases,
+        known_runner_urns=frozenset(),
+        partial=True)
+
     if use_fnapi:
       self._check_for_unsupported_fnapi_features(self.proto_pipeline)
     else:
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index d921980..9074d97 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -877,6 +877,7 @@ class DataflowRunnerTest(unittest.TestCase, 
ExtraAssertionsMixin):
     self._test_pack_combiners(
         PipelineOptions(self.default_properties), expect_packed=False)
 
+  @unittest.skip("BEAM-11694")
   def test_pack_combiners_enabled_by_experiment(self):
     self.default_properties.append('--experiment=pre_optimize=all')
     self._test_pack_combiners(
diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py 
b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
index 14300de..402a4ed 100644
--- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
+++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
@@ -125,33 +125,6 @@ class JrhReadPTransformOverride(PTransformOverride):
             'Read'))
 
 
-class CombinePerKeyPTransformOverride(PTransformOverride):
-  """A ``PTransformOverride`` for ``CombinePerKey``.
-
-  The translations.pack_combiners optimizer phase produces a CombinePerKey
-  PTransform, but DataflowRunner treats CombinePerKey as a composite, so
-  this override expands CombinePerKey into primitive PTransforms.
-  """
-  def matches(self, applied_ptransform):
-    # Imported here to avoid circular dependencies.
-    # pylint: disable=wrong-import-order, wrong-import-position
-    from apache_beam import CombinePerKey
-
-    if isinstance(applied_ptransform.transform, CombinePerKey):
-      self.transform = applied_ptransform.transform
-      return True
-    return False
-
-  def get_replacement_transform(self, ptransform):
-    from apache_beam.transforms import ptransform_fn
-
-    @ptransform_fn
-    def ExpandCombinePerKey(pcoll):
-      return pcoll | ptransform
-
-    return ExpandCombinePerKey()
-
-
 class CombineValuesPTransformOverride(PTransformOverride):
   """A ``PTransformOverride`` for ``CombineValues``.
 
diff --git 
a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py
 
b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py
index d57307f..7ca5e3f 100644
--- 
a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py
+++ 
b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py
@@ -259,6 +259,7 @@ class TranslationsTest(unittest.TestCase):
           | Create([('a', x) for x in vals])
           | 'multiple-combines' >> MultipleCombines())
 
+  @attr('ValidatesRunner')
   def test_run_packable_combine_globally(self):
     class MultipleCombines(beam.PTransform):
       def expand(self, pcoll):

Reply via email to