chamikaramj commented on a change in pull request #13877:
URL: https://github.com/apache/beam/pull/13877#discussion_r568916188
##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -462,62 +462,6 @@ def run_pipeline(self, pipeline, options):
self._maybe_add_unified_worker_missing_options(options)
- from apache_beam.transforms import environments
- if options.view_as(SetupOptions).prebuild_sdk_container_engine:
Review comment:
So this optimization is not needed for Dataflow Runner v1 (just to
confirm) ?
##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -544,6 +488,21 @@ def run_pipeline(self, pipeline, options):
if use_fnapi and not apiclient._use_unified_worker(options):
pipeline.replace_all(DataflowRunner._JRH_PTRANSFORM_OVERRIDES)
+ from apache_beam.transforms import environments
+ if options.view_as(SetupOptions).prebuild_sdk_container_engine:
+ # if prebuild_sdk_container_engine is specified we will build a new sdk
+ # container image with dependencies pre-installed and use that image,
+ # instead of using the inferred default container image.
+ self._default_environment = (
+ environments.DockerEnvironment.from_options(options))
+ options.view_as(WorkerOptions).worker_harness_container_image = (
+ self._default_environment.container_image)
Review comment:
Where is "self._default_environment.container_image" set ?
##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -552,6 +511,22 @@ def run_pipeline(self, pipeline, options):
self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
return_context=True, default_environment=self._default_environment)
+ # Optimize the pipeline if it not streaming and the
+ # disable_optimize_pipeline_for_dataflow experiment has not been set.
+ if (not options.view_as(StandardOptions).streaming and
+ not options.view_as(DebugOptions).lookup_experiment(
+ "disable_optimize_pipeline_for_dataflow")):
Review comment:
Is this a new optimization ?
Usually, for new optimizations, the preference is to not opt-in users by
default.
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -1000,25 +1000,151 @@ def make_pack_name(names):
*[
core.CombineFn.from_runner_api(combine_payload.combine_fn,
context) # type: ignore[arg-type]
Review comment:
This "core.CombineFn.from_runner_api" call will break for x-lang Combine
transforms in the proto. Can we make this optimization opt in instead of making
this the default ? If this is the default it should work for all pipelines
including x-lang.
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -1000,25 +1000,151 @@ def make_pack_name(names):
*[
core.CombineFn.from_runner_api(combine_payload.combine_fn,
context) # type: ignore[arg-type]
for combine_payload in combine_payloads
- ]).to_runner_api(context) # type: ignore[arg-type]
+ ]) # type: ignore[arg-type]
pack_transform = beam_runner_api_pb2.PTransform(
- unique_name=pack_transform_name + '/Pack',
+ unique_name=pack_transform_name + '/CombinePerKey',
spec=beam_runner_api_pb2.FunctionSpec(
urn=common_urns.composites.COMBINE_PER_KEY.urn,
payload=beam_runner_api_pb2.CombinePayload(
- combine_fn=pack_combine_fn,
+ combine_fn=pack_combine_fn.to_runner_api(context),
accumulator_coder_id=tuple_accumulator_coder_id).
SerializeToString()),
inputs={'in': input_pcoll_id},
# 'None' single output key follows convention for CombinePerKey.
outputs={'None': pack_pcoll_id},
environment_id=fused_stage.environment)
pack_stage = Stage(
- pack_stage_name + '/Pack', [pack_transform],
+ pack_stage_name + '/CombinePerKey', [pack_transform],
downstream_side_inputs=fused_stage.downstream_side_inputs,
must_follow=fused_stage.must_follow,
parent=fused_stage.parent,
environment=fused_stage.environment)
+
+ # Traverse the subtransform structure.
+ original_group_by_key_transforms = []
+ original_combine_grouped_values_transforms = []
+ original_combine_values_par_do_transforms = []
+ for transform in transforms:
Review comment:
This whole thing looks pretty brittle. Can we not remove sub-transforms
from the CombinePerKey transform in the first place (before generating the
proto) so that we do not manually have to modify the proto here ?
##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -544,6 +488,21 @@ def run_pipeline(self, pipeline, options):
if use_fnapi and not apiclient._use_unified_worker(options):
pipeline.replace_all(DataflowRunner._JRH_PTRANSFORM_OVERRIDES)
+ from apache_beam.transforms import environments
+ if options.view_as(SetupOptions).prebuild_sdk_container_engine:
+ # if prebuild_sdk_container_engine is specified we will build a new sdk
+ # container image with dependencies pre-installed and use that image,
+ # instead of using the inferred default container image.
+ self._default_environment = (
Review comment:
Default environment should have the correct container URL to be used by
Dataflow (same as "worker_harness_container_image")
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]