This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 06456f8 [BEAM-6243] Add an experiment to use Python's optimizer on Flink. new ee85ea1 Merge pull request #7297 [BEAM-6243] Add an experiment to use Python's optimizer on Flink. 06456f8 is described below commit 06456f8b137d87c07c8b7d26b5d4be448e17affb Author: Robert Bradshaw <rober...@google.com> AuthorDate: Thu Dec 13 17:09:08 2018 +0100 [BEAM-6243] Add an experiment to use Python's optimizer on Flink. --- .../FlinkBatchPortablePipelineTranslator.java | 3 +- .../beam/runners/flink/FlinkJobInvocation.java | 8 +- .../jobsubmission/InMemoryJobService.java | 2 + .../python/apache_beam/options/pipeline_options.py | 10 + .../runners/portability/flink_runner_test.py | 7 +- .../runners/portability/fn_api_runner.py | 52 ++-- .../portability/fn_api_runner_transforms.py | 296 +++++++++++++++++---- .../runners/portability/portable_runner.py | 41 ++- sdks/python/build.gradle | 11 +- 9 files changed, 341 insertions(+), 89 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java index 8f9fcb9..496d737 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java @@ -412,7 +412,8 @@ public class FlinkBatchPortablePipelineTranslator for (SideInputId sideInputId : stagePayload.getSideInputsList()) { String collectionId = - components + stagePayload + .getComponents() .getTransformsOrThrow(sideInputId.getTransformId()) .getInputsOrThrow(sideInputId.getLocalName()); // Register under the global PCollection name. Only ExecutableStageFunction needs to know the diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java index efaa40a..d3e95dd 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java @@ -34,6 +34,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.PipelineOptionsTranslation; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; @@ -127,7 +128,12 @@ public class FlinkJobInvocation implements JobInvocation { ImmutableSet.of(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN))); // Fused pipeline proto. - RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(trimmedPipeline).toPipeline(); + // TODO: Consider supporting partially-fused graphs. + RunnerApi.Pipeline fusedPipeline = + trimmedPipeline.getComponents().getTransformsMap().values().stream() + .anyMatch(proto -> ExecutableStage.URN.equals(proto.getSpec().getUrn())) + ? pipeline + : GreedyPipelineFuser.fuse(pipeline).toPipeline(); JobInfo jobInfo = JobInfo.create( id, diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java index 545641a..69ce491 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java @@ -169,7 +169,9 @@ public class InMemoryJobService extends JobServiceGrpc.JobServiceImplBase implem try { PipelineValidator.validate(preparation.pipeline()); } catch (Exception e) { + LOG.warn("Encountered Unexpected Exception during validation", e); responseObserver.onError(new StatusRuntimeException(Status.INVALID_ARGUMENT.withCause(e))); + return; } // create new invocation diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index ce55d9b..a7c74b4 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -582,6 +582,16 @@ class DebugOptions(PipelineOptions): 'enabled with this flag. Please sync with the owners of the runner ' 'before enabling any experiments.')) + def lookup_experiment(self, key, default=None): + if not self.experiments: + return default + elif key in self.experiments: + return True + for experiment in self.experiments: + if experiment.startswith(key + '='): + return experiment.split('=', 1)[1] + return default + class ProfilingOptions(PipelineOptions): diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py index 297dfb2..2ae23d4 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py @@ -44,6 +44,7 @@ if __name__ == '__main__': # --flink_job_server_jar=/path/to/job_server.jar \ # --type=Batch \ # --environment_type=docker \ + # --extra_experiments=beam_experiments \ # [FlinkRunnerTest.test_method, ...] parser = argparse.ArgumentParser(add_help=True) @@ -54,6 +55,8 @@ if __name__ == '__main__': parser.add_argument('--environment_type', default='docker', help='Environment type. docker or process') parser.add_argument('--environment_config', help='Environment config.') + parser.add_argument('--extra_experiments', default=[], action='append', + help='Beam experiments config.') known_args, args = parser.parse_known_args(sys.argv) sys.argv = args @@ -62,6 +65,7 @@ if __name__ == '__main__': environment_type = known_args.environment_type.lower() environment_config = ( known_args.environment_config if known_args.environment_config else None) + extra_experiments = known_args.extra_experiments # This is defined here to only be run when we invoke this file explicitly. class FlinkRunnerTest(portable_runner_test.PortableRunnerTest): @@ -127,7 +131,8 @@ if __name__ == '__main__': def create_options(self): options = super(FlinkRunnerTest, self).create_options() - options.view_as(DebugOptions).experiments = ['beam_fn_api'] + options.view_as(DebugOptions).experiments = [ + 'beam_fn_api'] + extra_experiments options._all_options['parallelism'] = 1 options._all_options['shutdown_sources_on_final_watermark'] = True options.view_as(PortableOptions).environment_type = ( diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 1253857..be8799b 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -317,39 +317,27 @@ class FnApiRunner(runner.PipelineRunner): yield def create_stages(self, pipeline_proto): - - pipeline_context = fn_api_runner_transforms.TransformContext( - copy.deepcopy(pipeline_proto.components), + return fn_api_runner_transforms.create_and_optimize_stages( + copy.deepcopy(pipeline_proto), + phases=[fn_api_runner_transforms.annotate_downstream_side_inputs, + fn_api_runner_transforms.fix_side_input_pcoll_coders, + fn_api_runner_transforms.lift_combiners, + fn_api_runner_transforms.expand_gbk, + fn_api_runner_transforms.sink_flattens, + fn_api_runner_transforms.greedily_fuse, + fn_api_runner_transforms.read_to_impulse, + fn_api_runner_transforms.impulse_to_input, + fn_api_runner_transforms.inject_timer_pcollections, + fn_api_runner_transforms.sort_stages, + fn_api_runner_transforms.window_pcollection_coders], + known_runner_urns=frozenset([ + common_urns.primitives.FLATTEN.urn, + common_urns.primitives.GROUP_BY_KEY.urn]), use_state_iterables=self._use_state_iterables) - # Initial set of stages are singleton leaf transforms. - stages = list(fn_api_runner_transforms.leaf_transform_stages( - pipeline_proto.root_transform_ids, - pipeline_proto.components)) - - # Apply each phase in order. - for phase in [ - fn_api_runner_transforms.annotate_downstream_side_inputs, - fn_api_runner_transforms.fix_side_input_pcoll_coders, - fn_api_runner_transforms.lift_combiners, - fn_api_runner_transforms.expand_gbk, - fn_api_runner_transforms.sink_flattens, - fn_api_runner_transforms.greedily_fuse, - fn_api_runner_transforms.read_to_impulse, - fn_api_runner_transforms.impulse_to_input, - fn_api_runner_transforms.inject_timer_pcollections, - fn_api_runner_transforms.sort_stages, - fn_api_runner_transforms.window_pcollection_coders]: - logging.info('%s %s %s', '=' * 20, phase, '=' * 20) - stages = list(phase(stages, pipeline_context)) - logging.debug('Stages: %s', [str(s) for s in stages]) - - # Return the (possibly mutated) context and ordered set of stages. - return pipeline_context.components, stages, pipeline_context.safe_coders - - def run_stages(self, pipeline_components, stages, safe_coders): + def run_stages(self, stage_context, stages): worker_handler_manager = WorkerHandlerManager( - pipeline_components.environments, self._provision_info) + stage_context.components.environments, self._provision_info) metrics_by_stage = {} monitoring_infos_by_stage = {} @@ -359,10 +347,10 @@ class FnApiRunner(runner.PipelineRunner): for stage in stages: stage_results = self.run_stage( worker_handler_manager.get_worker_handler, - pipeline_components, + stage_context.components, stage, pcoll_buffers, - safe_coders) + stage_context.safe_coders) metrics_by_stage[stage.name] = stage_results.process_bundle.metrics monitoring_infos_by_stage[stage.name] = ( stage_results.process_bundle.monitoring_infos) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py index 8152c0c..21f8fa2 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py @@ -46,7 +46,7 @@ class Stage(object): """A set of Transforms that can be sent to the worker for processing.""" def __init__(self, name, transforms, downstream_side_inputs=None, must_follow=frozenset(), - parent=None, environment=None): + parent=None, environment=None, forced_root=False): self.name = name self.transforms = transforms self.downstream_side_inputs = downstream_side_inputs @@ -58,6 +58,7 @@ class Stage(object): self._merge_environments, (self._extract_environment(t) for t in transforms)) self.environment = environment + self.forced_root = forced_root def __repr__(self): must_follow = ', '.join(prev.name for prev in self.must_follow) @@ -103,7 +104,7 @@ class Stage(object): str(env2).replace('\n', ' '))) return env1 - def can_fuse(self, consumer): + def can_fuse(self, consumer, context): try: self._merge_environments(self.environment, consumer.environment) except ValueError: @@ -113,8 +114,10 @@ class Stage(object): return not a.intersection(b) return ( - not self in consumer.must_follow - and not self.is_flatten() and not consumer.is_flatten() + not consumer.forced_root + and not self in consumer.must_follow + and not self.is_runner_urn(context) + and not consumer.is_runner_urn(context) and no_overlap(self.downstream_side_inputs, consumer.side_inputs())) def fuse(self, other): @@ -124,10 +127,12 @@ class Stage(object): union(self.downstream_side_inputs, other.downstream_side_inputs), union(self.must_follow, other.must_follow), environment=self._merge_environments( - self.environment, other.environment)) + self.environment, other.environment), + parent=self.parent if self.parent == other.parent else None, + forced_root=self.forced_root or other.forced_root) - def is_flatten(self): - return any(transform.spec.urn == common_urns.primitives.FLATTEN.urn + def is_runner_urn(self, context): + return any(transform.spec.urn in context.known_runner_urns for transform in self.transforms) def side_inputs(self): @@ -162,6 +167,89 @@ class Stage(object): new_transforms.append(transform) self.transforms = new_transforms + def executable_stage_transform( + self, known_runner_urns, all_consumers, components): + if (len(self.transforms) == 1 + and self.transforms[0].spec.urn in known_runner_urns): + return self.transforms[0] + + else: + all_inputs = set( + pcoll for t in self.transforms for pcoll in t.inputs.values()) + all_outputs = set( + pcoll for t in self.transforms for pcoll in t.outputs.values()) + internal_transforms = set(id(t) for t in self.transforms) + external_outputs = [pcoll for pcoll in all_outputs + if all_consumers[pcoll] - internal_transforms] + + stage_components = beam_runner_api_pb2.Components() + stage_components.CopyFrom(components) + + # Only keep the referenced PCollections. + for pcoll_id in stage_components.pcollections.keys(): + if pcoll_id not in all_inputs and pcoll_id not in all_outputs: + del stage_components.pcollections[pcoll_id] + + # Only keep the transforms in this stage. + # Also gather up payload data as we iterate over the transforms. + stage_components.transforms.clear() + main_inputs = set() + side_inputs = [] + user_states = [] + timers = [] + for ix, transform in enumerate(self.transforms): + transform_id = 'transform_%d' % ix + if transform.spec.urn == common_urns.primitives.PAR_DO.urn: + payload = proto_utils.parse_Bytes( + transform.spec.payload, beam_runner_api_pb2.ParDoPayload) + for tag in payload.side_inputs.keys(): + side_inputs.append( + beam_runner_api_pb2.ExecutableStagePayload.SideInputId( + transform_id=transform_id, + local_name=tag)) + for tag in payload.state_specs.keys(): + user_states.append( + beam_runner_api_pb2.ExecutableStagePayload.UserStateId( + transform_id=transform_id, + local_name=tag)) + for tag in payload.timer_specs.keys(): + timers.append( + beam_runner_api_pb2.ExecutableStagePayload.TimerId( + transform_id=transform_id, + local_name=tag)) + main_inputs.update( + pcoll_id + for tag, pcoll_id in transform.inputs.items() + if tag not in payload.side_inputs) + else: + main_inputs.update(transform.inputs.values()) + stage_components.transforms[transform_id].CopyFrom(transform) + + main_input_id = only_element(main_inputs - all_outputs) + named_inputs = dict({ + '%s:%s' % (side.transform_id, side.local_name): + stage_components.transforms[side.transform_id].inputs[side.local_name] + for side in side_inputs + }, main_input=main_input_id) + payload = beam_runner_api_pb2.ExecutableStagePayload( + environment=components.environments[self.environment], + input=main_input_id, + outputs=external_outputs, + transforms=stage_components.transforms.keys(), + components=stage_components, + side_inputs=side_inputs, + user_states=user_states, + timers=timers) + + return beam_runner_api_pb2.PTransform( + unique_name=unique_name(None, self.name), + spec=beam_runner_api_pb2.FunctionSpec( + urn='beam:runner:executable_stage:v1', + payload=payload.SerializeToString()), + inputs=named_inputs, + outputs={'output_%d' % ix: pcoll + for ix, pcoll in enumerate(external_outputs)}) + def memoize_on_instance(f): missing = object() @@ -185,8 +273,9 @@ class TransformContext(object): _KNOWN_CODER_URNS = set( value.urn for value in common_urns.coders.__dict__.values()) - def __init__(self, components, use_state_iterables=False): + def __init__(self, components, known_runner_urns, use_state_iterables=False): self.components = components + self.known_runner_urns = known_runner_urns self.use_state_iterables = use_state_iterables self.safe_coders = {} self.bytes_coder_id = self.add_or_get_coder_id( @@ -296,7 +385,8 @@ def leaf_transform_stages( yield stage -def with_stages(pipeline_proto, stages): +def pipeline_from_stages( + pipeline_proto, stages, known_runner_urns, partial): # In case it was a generator that mutates components as it # produces outputs (as is the case with most transformations). @@ -307,6 +397,7 @@ def with_stages(pipeline_proto, stages): components = new_proto.components components.transforms.clear() + roots = set() parents = { child: parent for parent, proto in pipeline_proto.components.transforms.items() @@ -314,24 +405,83 @@ def with_stages(pipeline_proto, stages): } def add_parent(child, parent): - if parent not in components.transforms: - components.transforms[parent].CopyFrom( - pipeline_proto.components.transforms[parent]) - del components.transforms[parent].subtransforms[:] - if parent in parents: - add_parent(parent, parents[parent]) - components.transforms[parent].subtransforms.append(child) - + if parent is None: + roots.add(child) + else: + if parent not in components.transforms: + components.transforms[parent].CopyFrom( + pipeline_proto.components.transforms[parent]) + del components.transforms[parent].subtransforms[:] + add_parent(parent, parents.get(parent)) + components.transforms[parent].subtransforms.append(child) + + all_consumers = collections.defaultdict(set) for stage in stages: for transform in stage.transforms: - id = unique_name(components.transforms, stage.name) - components.transforms[id].CopyFrom(transform) - if stage.parent: - add_parent(id, stage.parent) + for pcoll in transform.inputs.values(): + all_consumers[pcoll].add(id(transform)) + + for stage in stages: + if partial: + transform = only_element(stage.transforms) + else: + transform = stage.executable_stage_transform( + known_runner_urns, all_consumers, components) + transform_id = unique_name(components.transforms, stage.name) + components.transforms[transform_id].CopyFrom(transform) + add_parent(transform_id, stage.parent) + + del new_proto.root_transform_ids[:] + new_proto.root_transform_ids.extend(roots) return new_proto +def create_and_optimize_stages( + pipeline_proto, + phases, + known_runner_urns, + use_state_iterables=False): + pipeline_context = TransformContext( + pipeline_proto.components, + known_runner_urns, + use_state_iterables=use_state_iterables) + + # Initial set of stages are singleton leaf transforms. + stages = list(leaf_transform_stages( + pipeline_proto.root_transform_ids, + pipeline_proto.components, + union(known_runner_urns, KNOWN_COMPOSITES))) + + # Apply each phase in order. + for phase in phases: + logging.info('%s %s %s', '=' * 20, phase, '=' * 20) + stages = list(phase(stages, pipeline_context)) + logging.debug('%s %s' % (len(stages), [len(s.transforms) for s in stages])) + logging.debug('Stages: %s', [str(s) for s in stages]) + + # Return the (possibly mutated) context and ordered set of stages. + return pipeline_context, stages + + +def optimize_pipeline( + pipeline_proto, + phases, + known_runner_urns, + partial=False, + **kwargs): + unused_context, stages = create_and_optimize_stages( + pipeline_proto, + phases, + known_runner_urns, + **kwargs) + return pipeline_from_stages( + pipeline_proto, stages, known_runner_urns, partial) + + +# Optimization stages. + + def annotate_downstream_side_inputs(stages, pipeline_context): """Annotate each stage with fusion-prohibiting information. @@ -379,6 +529,17 @@ def annotate_downstream_side_inputs(stages, pipeline_context): return stages +def annotate_stateful_dofns_as_roots(stages, pipeline_context): + for stage in stages: + for transform in stage.transforms: + if transform.spec.urn == common_urns.primitives.PAR_DO.urn: + pardo_payload = proto_utils.parse_Bytes( + transform.spec.payload, beam_runner_api_pb2.ParDoPayload) + if pardo_payload.state_specs or pardo_payload.timer_specs: + stage.forced_root = True + yield stage + + def fix_side_input_pcoll_coders(stages, pipeline_context): """Length prefix side input PCollection coders. """ @@ -569,37 +730,30 @@ def expand_gbk(stages, pipeline_context): yield stage -def sink_flattens(stages, pipeline_context): - """Sink flattens and remove them from the graph. - - A flatten that cannot be sunk/fused away becomes multiple writes (to the - same logical sink) followed by a read. +def fix_flatten_coders(stages, pipeline_context): + """Ensures that the inputs of Flatten have the same coders as the output. """ - # TODO(robertwb): Actually attempt to sink rather than always materialize. - # TODO(robertwb): Possibly fuse this into one of the stages. pcollections = pipeline_context.components.pcollections for stage in stages: - assert len(stage.transforms) == 1 - transform = stage.transforms[0] + transform = only_element(stage.transforms) if transform.spec.urn == common_urns.primitives.FLATTEN.urn: - # This is used later to correlate the read and writes. - buffer_id = create_buffer_id(transform.unique_name) - output_pcoll_id, = list(transform.outputs.values()) + output_pcoll_id = only_element(transform.outputs.values()) output_coder_id = pcollections[output_pcoll_id].coder_id - flatten_writes = [] - for local_in, pcoll_in in transform.inputs.items(): - + for local_in, pcoll_in in list(transform.inputs.items()): if pcollections[pcoll_in].coder_id != output_coder_id: # Flatten requires that all its inputs be materialized with the # same coder as its output. Add stages to transcode flatten # inputs that use different coders. - transcoded_pcollection = ( + transcoded_pcollection = unique_name( + pcollections, transform.unique_name + '/Transcode/' + local_in + '/out') + transcode_name = unique_name( + pipeline_context.components.transforms, + transform.unique_name + '/Transcode/' + local_in) yield Stage( - transform.unique_name + '/Transcode/' + local_in, + transcode_name, [beam_runner_api_pb2.PTransform( - unique_name= - transform.unique_name + '/Transcode/' + local_in, + unique_name=transcode_name, inputs={local_in: pcoll_in}, outputs={'out': transcoded_pcollection}, spec=beam_runner_api_pb2.FunctionSpec( @@ -608,15 +762,34 @@ def sink_flattens(stages, pipeline_context): must_follow=stage.must_follow) pcollections[transcoded_pcollection].CopyFrom( pcollections[pcoll_in]) + pcollections[transcoded_pcollection].unique_name = ( + transcoded_pcollection) pcollections[transcoded_pcollection].coder_id = output_coder_id - else: - transcoded_pcollection = pcoll_in + transform.inputs[local_in] = transcoded_pcollection + + yield stage + + +def sink_flattens(stages, pipeline_context): + """Sink flattens and remove them from the graph. + A flatten that cannot be sunk/fused away becomes multiple writes (to the + same logical sink) followed by a read. + """ + # TODO(robertwb): Actually attempt to sink rather than always materialize. + # TODO(robertwb): Possibly fuse this into one of the stages. + for stage in fix_flatten_coders(stages, pipeline_context): + transform = only_element(stage.transforms) + if transform.spec.urn == common_urns.primitives.FLATTEN.urn: + # This is used later to correlate the read and writes. + buffer_id = create_buffer_id(transform.unique_name) + flatten_writes = [] + for local_in, pcoll_in in transform.inputs.items(): flatten_write = Stage( transform.unique_name + '/Write/' + local_in, [beam_runner_api_pb2.PTransform( unique_name=transform.unique_name + '/Write/' + local_in, - inputs={local_in: transcoded_pcollection}, + inputs={local_in: pcoll_in}, spec=beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.DATA_OUTPUT_URN, payload=buffer_id))], @@ -684,7 +857,7 @@ def greedily_fuse(stages, pipeline_context): # Update consumer.must_follow set, as it's used in can_fuse. consumer.must_follow = frozenset( replacement(s) for s in consumer.must_follow) - if producer.can_fuse(consumer): + if producer.can_fuse(consumer, pipeline_context): fuse(producer, consumer) else: # If we can't fuse, do a read + write. @@ -697,7 +870,8 @@ def greedily_fuse(stages, pipeline_context): inputs={'in': pcoll}, spec=beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.DATA_OUTPUT_URN, - payload=buffer_id))]) + payload=buffer_id))], + downstream_side_inputs=producer.downstream_side_inputs) fuse(producer, write_pcoll) if consumer.has_as_main_input(pcoll): read_pcoll = Stage( @@ -708,6 +882,7 @@ def greedily_fuse(stages, pipeline_context): spec=beam_runner_api_pb2.FunctionSpec( urn=bundle_processor.DATA_INPUT_URN, payload=buffer_id))], + downstream_side_inputs=consumer.downstream_side_inputs, must_follow=frozenset([write_pcoll])) fuse(read_pcoll, consumer) else: @@ -781,6 +956,34 @@ def impulse_to_input(stages, pipeline_context): yield stage +def extract_impulse_stages(stages, pipeline_context): + """Splits fused Impulse operations into their own stage.""" + for stage in stages: + for transform in list(stage.transforms): + if transform.spec.urn == common_urns.primitives.IMPULSE.urn: + stage.transforms.remove(transform) + yield Stage( + transform.unique_name, + transforms=[transform], + downstream_side_inputs=stage.downstream_side_inputs, + must_follow=stage.must_follow, + parent=stage.parent) + + if stage.transforms: + yield stage + + +def remove_data_plane_ops(stages, pipeline_context): + for stage in stages: + for transform in list(stage.transforms): + if transform.spec.urn in (bundle_processor.DATA_INPUT_URN, + bundle_processor.DATA_OUTPUT_URN): + stage.transforms.remove(transform) + + if stage.transforms: + yield stage + + def inject_timer_pcollections(stages, pipeline_context): """Create PCollections for fired timers and to-be-set timers. @@ -858,12 +1061,15 @@ def inject_timer_pcollections(stages, pipeline_context): def sort_stages(stages, pipeline_context): """Order stages suitable for sequential execution. """ + all_stages = set(stages) seen = set() ordered = [] def process(stage): if stage not in seen: seen.add(stage) + if stage not in all_stages: + return for prev in stage.must_follow: process(prev) ordered.append(stage) diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py index 42bca33..87f05d3 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner.py @@ -28,6 +28,7 @@ from concurrent import futures import grpc from apache_beam import metrics +from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import PortableOptions from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions @@ -161,6 +162,7 @@ class PortableRunner(runner.PipelineRunner): portable_options.environment_config, server = ( BeamFnExternalWorkerPoolServicer.start( sdk_worker_main._get_worker_count(options))) + globals()['x'] = server cleanup_callbacks = [functools.partial(server.stop, 1)] else: cleanup_callbacks = [] @@ -180,14 +182,39 @@ class PortableRunner(runner.PipelineRunner): # Preemptively apply combiner lifting, until all runners support it. # This optimization is idempotent. + pre_optimize = options.view_as(DebugOptions).lookup_experiment( + 'pre_optimize', 'combine').lower() if not options.view_as(StandardOptions).streaming: - stages = list(fn_api_runner_transforms.leaf_transform_stages( - proto_pipeline.root_transform_ids, proto_pipeline.components)) - stages = fn_api_runner_transforms.lift_combiners( - stages, - fn_api_runner_transforms.TransformContext(proto_pipeline.components)) - proto_pipeline = fn_api_runner_transforms.with_stages( - proto_pipeline, stages) + flink_known_urns = frozenset([ + common_urns.composites.RESHUFFLE.urn, + common_urns.primitives.IMPULSE.urn, + common_urns.primitives.FLATTEN.urn, + common_urns.primitives.GROUP_BY_KEY.urn]) + if pre_optimize == 'combine': + proto_pipeline = fn_api_runner_transforms.optimize_pipeline( + proto_pipeline, + phases=[fn_api_runner_transforms.lift_combiners], + known_runner_urns=flink_known_urns, + partial=True) + elif pre_optimize == 'all': + proto_pipeline = fn_api_runner_transforms.optimize_pipeline( + proto_pipeline, + phases=[fn_api_runner_transforms.annotate_downstream_side_inputs, + fn_api_runner_transforms.annotate_stateful_dofns_as_roots, + fn_api_runner_transforms.fix_side_input_pcoll_coders, + fn_api_runner_transforms.lift_combiners, + fn_api_runner_transforms.fix_flatten_coders, + # fn_api_runner_transforms.sink_flattens, + fn_api_runner_transforms.greedily_fuse, + fn_api_runner_transforms.read_to_impulse, + fn_api_runner_transforms.extract_impulse_stages, + fn_api_runner_transforms.remove_data_plane_ops, + fn_api_runner_transforms.sort_stages], + known_runner_urns=flink_known_urns) + elif pre_optimize == 'none': + pass + else: + raise ValueError('Unknown value for pre_optimize: %s' % pre_optimize) if not job_service: channel = grpc.insecure_channel(job_endpoint) diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle index aa0b9c7..b74b030 100644 --- a/sdks/python/build.gradle +++ b/sdks/python/build.gradle @@ -334,6 +334,9 @@ class CompatibilityMatrixConfig { enum SDK_WORKER_TYPE { DOCKER, PROCESS, LOOPBACK } + + // Whether to pre-optimize the pipeline with the Python optimizer. + boolean preOptimize = false } def flinkCompatibilityMatrix = { @@ -341,7 +344,10 @@ def flinkCompatibilityMatrix = { def workerType = config.workerType.name() def streaming = config.streaming def environment_config = config.workerType == CompatibilityMatrixConfig.SDK_WORKER_TYPE.PROCESS ? "--environment_config='{\"command\": \"${project(":beam-sdks-python:").buildDir.absolutePath}/sdk_worker.sh\"}'" : "" - def name = "flinkCompatibilityMatrix${streaming ? 'Streaming' : 'Batch'}${workerType}" + def name = "flinkCompatibilityMatrix${streaming ? 'Streaming' : 'Batch'}${config.preOptimize ? 'PreOptimize' : ''}${workerType}" + def extra_experiments = [] + if (config.preOptimize) + extra_experiments.add('pre_optimize=all') tasks.create(name: name) { dependsOn 'setupVirtualenv' dependsOn ':beam-runners-flink_2.11-job-server:shadowJar' @@ -352,7 +358,7 @@ def flinkCompatibilityMatrix = { doLast { exec { executable 'sh' - args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.runners.portability.flink_runner_test --flink_job_server_jar=${project(":beam-runners-flink_2.11-job-server:").shadowJar.archivePath} --environment_type=${workerType} ${environment_config} ${streaming ? '--streaming' : ''}" + args '-c', ". ${project.ext.envdir}/bin/activate && pip install -e .[test] && python -m apache_beam.runners.portability.flink_runner_test --flink_job_server_jar=${project(":beam-runners-flink_2.11-job-server:").shadowJar.archivePath} --environment_type=${workerType} ${environment_config} ${streaming ? '--streaming' : ''} ${extra_experiments ? '--extra_experiments=' + extra_experiments.join(',') : ''}" } } } @@ -371,6 +377,7 @@ task flinkCompatibilityMatrixProcess() { task flinkCompatibilityMatrixLoopback() { dependsOn flinkCompatibilityMatrix(streaming: false, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK) dependsOn flinkCompatibilityMatrix(streaming: true, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK) + dependsOn flinkCompatibilityMatrix(streaming: true, workerType: CompatibilityMatrixConfig.SDK_WORKER_TYPE.LOOPBACK, preOptimize: true) } task flinkValidatesRunner() {