[ 
https://issues.apache.org/jira/browse/BEAM-7926?focusedWorklogId=384652&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-384652
 ]

ASF GitHub Bot logged work on BEAM-7926:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Feb/20 19:00
            Start Date: 10/Feb/20 19:00
    Worklog Time Spent: 10m 
      Work Description: aaltay commented on pull request #10731: [BEAM-7926] 
Data-centric Interactive Part3
URL: https://github.com/apache/beam/pull/10731#discussion_r377252061
 
 

 ##########
 File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
 ##########
 @@ -82,7 +89,100 @@ def run_pipeline(self):
   ie.current_env().watch(watchable)
 
 
-def visualize(pcoll):
-  """Visualizes a PCollection."""
-  # TODO(BEAM-7926)
-  pass
+def show(*pcolls):
+  """Visualizes given PCollections in an interactive exploratory way if used
+  within a notebook, or prints a heading sampled data if used within an ipython
+  shell. Noop if used in a non-interactive environment.
+
+  Ad hoc builds a pipeline fragment including only transforms that are
+  necessary to produce data for given PCollections pcolls, runs the pipeline
+  fragment to compute data for those pcolls and then visualizes the data.
+
+  The function is always blocking. If used within a notebook, the data
+  visualized might be dynamically updated before the function returns as more
+  and more data could getting processed and emitted when the pipeline fragment
+  is being executed. If used within an ipython shell, there will be no dynamic
+  plotting but a static plotting in the end of pipeline fragment execution.
+
+  The PCollections given must belong to the same pipeline and be watched by
+  Interactive Beam (PCollections defined in __main__ are automatically 
watched).
+
+    For example::
+
+      p = beam.Pipeline(InteractiveRunner())
+      init = p | 'Init' >> beam.Create(range(1000))
+      square = init | 'Square' >> beam.Map(lambda x: x * x)
+      cube = init | 'Cube' >> beam.Map(lambda x: x ** 3)
+
+      # Below builds a pipeline fragment from the defined pipeline `p` that
+      # contains only applied transforms of `Init` and `Square`. Then the
+      # interactive runner runs the pipeline fragment implicitly to compute 
data
+      # represented by PCollection `square` and visualizes it.
+      show(square)
+
+      # This is equivalent to `show(square)` because `square` depends on `init`
+      # and `init` is included in the pipeline fragment and computed anyway.
+      show(init, square)
+
+      # Below is similar to running `p.run()`. It computes data for both
+      # PCollection `square` and PCollection `cube`, then visualizes them.
+      show(square, cube)
+  """
+  assert len(pcolls) > 0, (
+      'Need at least 1 PCollection to show data visualization.')
+  for pcoll in pcolls:
+    assert isinstance(pcoll, beam.pvalue.PCollection), (
+        '{} is not an apache_beam.pvalue.PCollection.'.format(pcoll))
+  user_pipeline = pcolls[0].pipeline
+  for pcoll in pcolls:
+    assert pcoll.pipeline is user_pipeline, (
+        '{} belongs to a different user-defined pipeline ({}) than that of'
+        ' other PCollections ({}).'.format(
+            pcoll, pcoll.pipeline, user_pipeline))
+  runner = user_pipeline.runner
+  if isinstance(runner, ir.InteractiveRunner):
+    runner = runner._underlying_runner
+
+  # Make sure that all PCollections to be shown are watched. If a PCollection
+  # has not been watched, make up a variable name for that PCollection and 
watch
+  # it. No validation is needed here because the watch logic can handle
+  # arbitrary variables.
+  watched_pcollections = set()
+  for watching in ie.current_env().watching():
+    for key, val in watching:
+      if hasattr(val, '__class__') and isinstance(val, 
beam.pvalue.PCollection):
+        watched_pcollections.add(val)
+  for pcoll in pcolls:
+    if pcoll not in watched_pcollections:
+      watch({re.sub(r'[\[\]\(\)]', '_', str(pcoll)): pcoll})
+
+  # Attempt to run background caching job since we have the reference to the
+  # user-defined pipeline.
+  bcj.attempt_to_run_background_caching_job(runner, user_pipeline)
+
+  # Build a pipeline fragment for the PCollections and run it.
+  result = pf.PipelineFragment(list(pcolls)).run()
+  ie.current_env().set_pipeline_result(
+      user_pipeline,
+      result,
+      is_main_job=True)
+
+  # If in notebook, dynamic plotting as computation goes.
+  if ie.current_env().is_in_notebook:
+    for pcoll in pcolls:
+      visualize(pcoll, dynamic_plotting_interval=1)
 
 Review comment:
   What is `dynamic_plotting_interval` ? Is it a 1 second update?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 384652)
    Time Spent: 45.5h  (was: 45h 20m)

> Show PCollection with Interactive Beam in a data-centric user flow
> ------------------------------------------------------------------
>
>                 Key: BEAM-7926
>                 URL: https://issues.apache.org/jira/browse/BEAM-7926
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-py-interactive
>            Reporter: Ning Kang
>            Assignee: Ning Kang
>            Priority: Major
>          Time Spent: 45.5h
>  Remaining Estimate: 0h
>
> Support auto plotting / charting of materialized data of a given PCollection 
> with Interactive Beam.
> Say an Interactive Beam pipeline defined as
>  
> {code:java}
> p = beam.Pipeline(InteractiveRunner())
> pcoll = p | 'Transform' >> transform()
> pcoll2 = ...
> pcoll3 = ...{code}
> The use can call a single function and get auto-magical charting of the data.
> e.g.,
> {code:java}
> show(pcoll, pcoll2)
> {code}
> Throughout the process, a pipeline fragment is built to include only 
> transforms necessary to produce the desired pcolls (pcoll and pcoll2) and 
> execute that fragment.
> This makes the Interactive Beam user flow data-centric.
>  
> Detailed 
> [design|https://docs.google.com/document/d/1DYWrT6GL_qDCXhRMoxpjinlVAfHeVilK5Mtf8gO6zxQ/edit#heading=h.v6k2o3roarzz].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to