[ 
https://issues.apache.org/jira/browse/BEAM-8016?focusedWorklogId=346391&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-346391
 ]

ASF GitHub Bot logged work on BEAM-8016:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Nov/19 01:05
            Start Date: 20/Nov/19 01:05
    Worklog Time Spent: 10m 
      Work Description: davidyan74 commented on pull request #10132: 
[BEAM-8016] Pipeline Graph
URL: https://github.com/apache/beam/pull/10132#discussion_r348243649
 
 

 ##########
 File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
 ##########
 @@ -117,123 +119,43 @@ def apply(self, transform, pvalueish, options):
     return self._underlying_runner.apply(transform, pvalueish, options)
 
   def run_pipeline(self, pipeline, options):
-    if not hasattr(self, '_desired_cache_labels'):
-      self._desired_cache_labels = set()
-
-    # Invoke a round trip through the runner API. This makes sure the Pipeline
-    # proto is stable.
-    pipeline = beam.pipeline.Pipeline.from_runner_api(
-        pipeline.to_runner_api(use_fake_coders=True),
-        pipeline.runner,
-        options)
-
-    # Snapshot the pipeline in a portable proto before mutating it.
-    pipeline_proto, original_context = pipeline.to_runner_api(
-        return_context=True, use_fake_coders=True)
-    pcolls_to_pcoll_id = self._pcolls_to_pcoll_id(pipeline, original_context)
-
-    analyzer = pipeline_analyzer.PipelineAnalyzer(self._cache_manager,
-                                                  pipeline_proto,
-                                                  self._underlying_runner,
-                                                  options,
-                                                  self._desired_cache_labels)
-    # Should be only accessed for debugging purpose.
-    self._analyzer = analyzer
+    pin = inst.pin(pipeline, options)
 
     pipeline_to_execute = beam.pipeline.Pipeline.from_runner_api(
-        analyzer.pipeline_proto_to_execute(),
+        pin.instrumented_pipeline_proto(),
         self._underlying_runner,
         options)
 
     if not self._skip_display:
-      display = display_manager.DisplayManager(
-          pipeline_proto=pipeline_proto,
-          pipeline_analyzer=analyzer,
-          cache_manager=self._cache_manager,
-          pipeline_graph_renderer=self._renderer)
-      display.start_periodic_update()
+      pg = pipeline_graph.PipelineGraph(pin.original_pipeline,
+                                        render_option=self._render_option)
+      pg.display_graph()
 
     result = pipeline_to_execute.run()
     result.wait_until_finish()
 
-    if not self._skip_display:
-      display.stop_periodic_update()
-
-    return PipelineResult(result, self, self._analyzer.pipeline_info(),
-                          self._cache_manager, pcolls_to_pcoll_id)
-
-  def _pcolls_to_pcoll_id(self, pipeline, original_context):
-    """Returns a dict mapping PCollections string to PCollection IDs.
-
-    Using a PipelineVisitor to iterate over every node in the pipeline,
-    records the mapping from PCollections to PCollections IDs. This mapping
-    will be used to query cached PCollections.
-
-    Args:
-      pipeline: (pipeline.Pipeline)
-      original_context: (pipeline_context.PipelineContext)
-
-    Returns:
-      (dict from str to str) a dict mapping str(pcoll) to pcoll_id.
-    """
-    pcolls_to_pcoll_id = {}
-
-    from apache_beam.pipeline import PipelineVisitor  # pylint: 
disable=import-error
-
-    class PCollVisitor(PipelineVisitor):  # pylint: 
disable=used-before-assignment
-      """"A visitor that records input and output values to be replaced.
-
-      Input and output values that should be updated are recorded in maps
-      input_replacements and output_replacements respectively.
-
-      We cannot update input and output values while visiting since that
-      results in validation errors.
-      """
-
-      def enter_composite_transform(self, transform_node):
-        self.visit_transform(transform_node)
-
-      def visit_transform(self, transform_node):
-        for pcoll in transform_node.outputs.values():
-          pcolls_to_pcoll_id[str(pcoll)] = 
original_context.pcollections.get_id(
-              pcoll)
-
-    pipeline.visit(PCollVisitor())
-    return pcolls_to_pcoll_id
+    return PipelineResult(result, pin)
 
 
 class PipelineResult(beam.runners.runner.PipelineResult):
   """Provides access to information about a pipeline."""
 
-  def __init__(self, underlying_result, runner, pipeline_info, cache_manager,
-               pcolls_to_pcoll_id):
+  def __init__(self, underlying_result, pin):
     super(PipelineResult, self).__init__(underlying_result.state)
-    self._runner = runner
-    self._pipeline_info = pipeline_info
-    self._cache_manager = cache_manager
-    self._pcolls_to_pcoll_id = pcolls_to_pcoll_id
-
-  def _cache_label(self, pcoll):
-    pcoll_id = self._pcolls_to_pcoll_id[str(pcoll)]
-    return self._pipeline_info.cache_label(pcoll_id)
+    self._underlying_result = underlying_result
+    self._pin = pin
 
 Review comment:
   I would say always avoid abbreviations that can be confusing. Perhaps just 
spell it out? _pipline_instrument
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 346391)
    Time Spent: 4h 40m  (was: 4.5h)

> Render Beam Pipeline as DOT with Interactive Beam  
> ---------------------------------------------------
>
>                 Key: BEAM-8016
>                 URL: https://issues.apache.org/jira/browse/BEAM-8016
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-py-interactive
>            Reporter: Ning Kang
>            Assignee: Ning Kang
>            Priority: Major
>          Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> With work in https://issues.apache.org/jira/browse/BEAM-7760, Beam pipeline 
> converted to DOT then rendered should mark user defined variables on edges.
> With work in https://issues.apache.org/jira/browse/BEAM-7926, it might be 
> redundant or confusing to render arbitrary random sample PCollection data on 
> edges.
> We'll also make sure edges in the graph corresponds to output -> input 
> relationship in the user defined pipeline. Each edge is one output. If 
> multiple down stream inputs take the same output, it should be rendered as 
> one edge diverging into two instead of two edges.
> For advanced interactivity highlight where each execution highlights the part 
> of the pipeline really executed from the original pipeline, we'll also 
> provide the support in beta.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to