[ 
https://issues.apache.org/jira/browse/BEAM-6186?focusedWorklogId=176908&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-176908
 ]

ASF GitHub Bot logged work on BEAM-6186:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Dec/18 10:30
            Start Date: 19/Dec/18 10:30
    Worklog Time Spent: 10m 
      Work Description: robertwb commented on a change in pull request #7281: 
[BEAM-6186] Finish moving optimization phases.
URL: https://github.com/apache/beam/pull/7281#discussion_r242864081
 
 

 ##########
 File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
 ##########
 @@ -353,6 +518,382 @@ def make_stage(base_stage, transform):
       yield stage
 
 
+def expand_gbk(stages, pipeline_context):
+  """Transforms each GBK into a write followed by a read.
+  """
+  for stage in stages:
+    assert len(stage.transforms) == 1
+    transform = stage.transforms[0]
+    if transform.spec.urn == common_urns.primitives.GROUP_BY_KEY.urn:
+      for pcoll_id in transform.inputs.values():
+        pipeline_context.length_prefix_pcoll_coders(pcoll_id)
+      for pcoll_id in transform.outputs.values():
+        if pipeline_context.use_state_iterables:
+          pipeline_context.components.pcollections[
+              pcoll_id].coder_id = pipeline_context.with_state_iterables(
+                  pipeline_context.components.pcollections[pcoll_id].coder_id)
+        pipeline_context.length_prefix_pcoll_coders(pcoll_id)
+
+      # This is used later to correlate the read and write.
+      transform_id = stage.name
+      if transform != pipeline_context.components.transforms.get(transform_id):
+        transform_id = unique_name(
+            pipeline_context.components.transforms, stage.name)
+        
pipeline_context.components.transforms[transform_id].CopyFrom(transform)
+      grouping_buffer = create_buffer_id(transform_id, kind='group')
+      gbk_write = Stage(
+          transform.unique_name + '/Write',
+          [beam_runner_api_pb2.PTransform(
+              unique_name=transform.unique_name + '/Write',
+              inputs=transform.inputs,
+              spec=beam_runner_api_pb2.FunctionSpec(
+                  urn=bundle_processor.DATA_OUTPUT_URN,
+                  payload=grouping_buffer))],
+          downstream_side_inputs=frozenset(),
+          must_follow=stage.must_follow)
+      yield gbk_write
+
+      yield Stage(
+          transform.unique_name + '/Read',
+          [beam_runner_api_pb2.PTransform(
+              unique_name=transform.unique_name + '/Read',
+              outputs=transform.outputs,
+              spec=beam_runner_api_pb2.FunctionSpec(
+                  urn=bundle_processor.DATA_INPUT_URN,
+                  payload=grouping_buffer))],
+          downstream_side_inputs=stage.downstream_side_inputs,
+          must_follow=union(frozenset([gbk_write]), stage.must_follow))
+    else:
+      yield stage
+
+
+def sink_flattens(stages, pipeline_context):
+  """Sink flattens and remove them from the graph.
+
+  A flatten that cannot be sunk/fused away becomes multiple writes (to the
+  same logical sink) followed by a read.
+  """
+  # TODO(robertwb): Actually attempt to sink rather than always materialize.
+  # TODO(robertwb): Possibly fuse this into one of the stages.
+  pcollections = pipeline_context.components.pcollections
+  for stage in stages:
+    assert len(stage.transforms) == 1
+    transform = stage.transforms[0]
+    if transform.spec.urn == common_urns.primitives.FLATTEN.urn:
+      # This is used later to correlate the read and writes.
+      buffer_id = create_buffer_id(transform.unique_name)
+      output_pcoll_id, = list(transform.outputs.values())
+      output_coder_id = pcollections[output_pcoll_id].coder_id
+      flatten_writes = []
+      for local_in, pcoll_in in transform.inputs.items():
+
+        if pcollections[pcoll_in].coder_id != output_coder_id:
+          # Flatten inputs must all be written with the same coder as is
+          # used to read them.
+          pcollections[pcoll_in].coder_id = output_coder_id
+          transcoded_pcollection = (
+              transform.unique_name + '/Transcode/' + local_in + '/out')
+          yield Stage(
+              transform.unique_name + '/Transcode/' + local_in,
+              [beam_runner_api_pb2.PTransform(
+                  unique_name=
+                  transform.unique_name + '/Transcode/' + local_in,
+                  inputs={local_in: pcoll_in},
+                  outputs={'out': transcoded_pcollection},
+                  spec=beam_runner_api_pb2.FunctionSpec(
+                      urn=bundle_processor.IDENTITY_DOFN_URN))],
+              downstream_side_inputs=frozenset(),
+              must_follow=stage.must_follow)
+          pcollections[transcoded_pcollection].CopyFrom(
+              pcollections[pcoll_in])
+          pcollections[transcoded_pcollection].coder_id = output_coder_id
+        else:
+          transcoded_pcollection = pcoll_in
+
+        flatten_write = Stage(
+            transform.unique_name + '/Write/' + local_in,
+            [beam_runner_api_pb2.PTransform(
+                unique_name=transform.unique_name + '/Write/' + local_in,
+                inputs={local_in: transcoded_pcollection},
+                spec=beam_runner_api_pb2.FunctionSpec(
+                    urn=bundle_processor.DATA_OUTPUT_URN,
+                    payload=buffer_id))],
+            downstream_side_inputs=frozenset(),
+            must_follow=stage.must_follow)
+        flatten_writes.append(flatten_write)
+        yield flatten_write
+
+      yield Stage(
+          transform.unique_name + '/Read',
+          [beam_runner_api_pb2.PTransform(
+              unique_name=transform.unique_name + '/Read',
+              outputs=transform.outputs,
+              spec=beam_runner_api_pb2.FunctionSpec(
+                  urn=bundle_processor.DATA_INPUT_URN,
+                  payload=buffer_id))],
+          downstream_side_inputs=stage.downstream_side_inputs,
+          must_follow=union(frozenset(flatten_writes), stage.must_follow))
+
+    else:
+      yield stage
+
+
+def greedily_fuse(stages, pipeline_context):
+  """Places transforms sharing an edge in the same stage, whenever possible.
+  """
+  producers_by_pcoll = {}
+  consumers_by_pcoll = collections.defaultdict(list)
+
+  # Used to always reference the correct stage as the producer and
+  # consumer maps are not updated when stages are fused away.
+  replacements = {}
+
+  def replacement(s):
+    old_ss = []
+    while s in replacements:
+      old_ss.append(s)
+      s = replacements[s]
+    for old_s in old_ss[:-1]:
+      replacements[old_s] = s
+    return s
+
+  def fuse(producer, consumer):
+    fused = producer.fuse(consumer)
+    replacements[producer] = fused
+    replacements[consumer] = fused
+
+  # First record the producers and consumers of each PCollection.
+  for stage in stages:
+    for transform in stage.transforms:
+      for input in transform.inputs.values():
+        consumers_by_pcoll[input].append(stage)
+      for output in transform.outputs.values():
+        producers_by_pcoll[output] = stage
+
+  logging.debug('consumers\n%s', consumers_by_pcoll)
+  logging.debug('producers\n%s', producers_by_pcoll)
+
+  # Now try to fuse away all pcollections.
+  for pcoll, producer in producers_by_pcoll.items():
+    write_pcoll = None
+    for consumer in consumers_by_pcoll[pcoll]:
+      producer = replacement(producer)
+      consumer = replacement(consumer)
+      # Update consumer.must_follow set, as it's used in can_fuse.
+      consumer.must_follow = frozenset(
+          replacement(s) for s in consumer.must_follow)
+      if producer.can_fuse(consumer):
+        fuse(producer, consumer)
+      else:
+        # If we can't fuse, do a read + write.
+        buffer_id = create_buffer_id(pcoll)
+        if write_pcoll is None:
+          write_pcoll = Stage(
+              pcoll + '/Write',
+              [beam_runner_api_pb2.PTransform(
+                  unique_name=pcoll + '/Write',
+                  inputs={'in': pcoll},
+                  spec=beam_runner_api_pb2.FunctionSpec(
+                      urn=bundle_processor.DATA_OUTPUT_URN,
+                      payload=buffer_id))])
+          fuse(producer, write_pcoll)
+        if consumer.has_as_main_input(pcoll):
+          read_pcoll = Stage(
+              pcoll + '/Read',
+              [beam_runner_api_pb2.PTransform(
+                  unique_name=pcoll + '/Read',
+                  outputs={'out': pcoll},
+                  spec=beam_runner_api_pb2.FunctionSpec(
+                      urn=bundle_processor.DATA_INPUT_URN,
+                      payload=buffer_id))],
+              must_follow=frozenset([write_pcoll]))
+          fuse(read_pcoll, consumer)
+        else:
+          consumer.must_follow = union(
+              consumer.must_follow, frozenset([write_pcoll]))
+
+  # Everything that was originally a stage or a replacement, but wasn't
+  # replaced, should be in the final graph.
+  final_stages = frozenset(stages).union(list(replacements.values()))\
+      .difference(list(replacements))
+
+  for stage in final_stages:
+    # Update all references to their final values before throwing
+    # the replacement data away.
+    stage.must_follow = frozenset(replacement(s) for s in stage.must_follow)
+    # Two reads of the same stage may have been fused.  This is unneeded.
+    stage.deduplicate_read()
+  return final_stages
+
+
+def read_to_impulse(stages, pipeline_context):
+  """Translates Read operations into Impulse operations."""
+  for stage in stages:
+    # First map Reads, if any, to Impulse + triggered read op.
+    for transform in list(stage.transforms):
+      if transform.spec.urn == common_urns.deprecated_primitives.READ.urn:
+        read_pc = only_element(transform.outputs.values())
+        read_pc_proto = pipeline_context.components.pcollections[read_pc]
+        impulse_pc = unique_name(
+            pipeline_context.components.pcollections, 'Impulse')
+        pipeline_context.components.pcollections[impulse_pc].CopyFrom(
+            beam_runner_api_pb2.PCollection(
+                unique_name=impulse_pc,
+                coder_id=pipeline_context.bytes_coder_id,
+                windowing_strategy_id=read_pc_proto.windowing_strategy_id,
+                is_bounded=read_pc_proto.is_bounded))
+        stage.transforms.remove(transform)
+        # TODO(robertwb): If this goes multi-process before fn-api
+        # read is default, expand into split + reshuffle + read.
+        stage.transforms.append(
+            beam_runner_api_pb2.PTransform(
+                unique_name=transform.unique_name + '/Impulse',
+                spec=beam_runner_api_pb2.FunctionSpec(
+                    urn=common_urns.primitives.IMPULSE.urn),
+                outputs={'out': impulse_pc}))
+        stage.transforms.append(
+            beam_runner_api_pb2.PTransform(
+                unique_name=transform.unique_name,
+                spec=beam_runner_api_pb2.FunctionSpec(
+                    urn=python_urns.IMPULSE_READ_TRANSFORM,
+                    payload=transform.spec.payload),
+                inputs={'in': impulse_pc},
+                outputs={'out': read_pc}))
+
+    yield stage
+
+
+def impulse_to_input(stages, pipeline_context):
+  """Translates Impulse operations into GRPC reads."""
+  for stage in stages:
+    for transform in list(stage.transforms):
+      if transform.spec.urn == common_urns.primitives.IMPULSE.urn:
+        stage.transforms.remove(transform)
+        stage.transforms.append(
+            beam_runner_api_pb2.PTransform(
+                unique_name=transform.unique_name,
+                spec=beam_runner_api_pb2.FunctionSpec(
+                    urn=bundle_processor.DATA_INPUT_URN,
+                    payload=IMPULSE_BUFFER),
+                outputs=transform.outputs))
+    yield stage
+
+
+def inject_timer_pcollections(stages, pipeline_context):
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 176908)
    Time Spent: 2h  (was: 1h 50m)

> Cleanup FnApiRunner optimization phases.
> ----------------------------------------
>
>                 Key: BEAM-6186
>                 URL: https://issues.apache.org/jira/browse/BEAM-6186
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>            Reporter: Robert Bradshaw
>            Assignee: Ahmet Altay
>            Priority: Minor
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> They are currently expressed as functions with closure. It would be good to 
> pull them out with explicit dependencies both to better be able to follow the 
> code, and also be able to test and reuse them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to