yifanmai commented on a change in pull request #13884:
URL: https://github.com/apache/beam/pull/13884#discussion_r570557047



##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -552,6 +511,38 @@ def run_pipeline(self, pipeline, options):
     self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
         return_context=True, default_environment=self._default_environment)
 
+    # Optimize the pipeline if it not streaming and the pre_optimize
+    # experiment is set.
+    pre_optimize = options.view_as(DebugOptions).lookup_experiment(
+        'pre_optimize', 'default').lower()
+    from apache_beam.runners.portability.fn_api_runner import translations
+    if (options.view_as(StandardOptions).streaming or pre_optimize == 'none' or
+        pre_optimize == 'default'):
+      phases = []
+    elif pre_optimize == 'all':
+      phases = [
+          translations.eliminate_common_key_with_none,

Review comment:
       Yes, this is tested by 
[`translations_test.TranslationsTest.test_run_packable_combine_globally`](https://github.com/apache/beam/blob/5c31997a18dd1434f903838fc2acdbe728b40abe/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py).
 Also I previously forgot to mark the test as ValidatesRunner so I've added 
that to this PR.
   
   Note that the test only checks that the pipeline is _correct_ with or 
without the optimization, but does not check that the optimization was 
_actually performed_, because that would require inspecting the optimized graph 
which has to be done differently per runner, and because this optimization may 
be enabled or disabled in different runners.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to