This is an automated email from the ASF dual-hosted git repository. ningk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 141e9507437 Issue#20877 Updated Interactive Beam README (#22034) 141e9507437 is described below commit 141e95074370e2e1dde40605d05e653503768e14 Author: Ning Kang <ningkang0...@gmail.com> AuthorDate: Mon Jun 27 15:33:30 2022 -0700 Issue#20877 Updated Interactive Beam README (#22034) fixes #20877 --- .../apache_beam/runners/interactive/README.md | 146 ++++++++++++++++++--- 1 file changed, 125 insertions(+), 21 deletions(-) diff --git a/sdks/python/apache_beam/runners/interactive/README.md b/sdks/python/apache_beam/runners/interactive/README.md index 519e33b1a2e..15c1d6b3e95 100644 --- a/sdks/python/apache_beam/runners/interactive/README.md +++ b/sdks/python/apache_beam/runners/interactive/README.md @@ -27,38 +27,142 @@ exploration much faster and easier. It provides nice features including 1. Graphical representation - When a pipeline is executed on a Jupyter notebook, it instantly displays the - pipeline as a directed acyclic graph. Sampled PCollection results will be - added to the graph as the pipeline execution proceeds. + Visualize the Pipeline DAG: -2. Fetching PCollections as list + ```python + import apache_beam.runners.interactive.interactive_beam as ib + from apache_beam.runners.interactive.interactive_runner import InteractiveRunner + + p = beam.Pipeline(InteractiveRunner()) + # ... add transforms + ib.show_graph(pipeline) + ``` - PCollections can be fetched as a list from the pipeline result. This unique - feature of - [InteractiveRunner](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/interactive_runner.py) - makes it much easier to integrate Beam pipeline into data analysis. + Visualize elements in a PCollection: ```python - p = beam.Pipeline(interactive_runner.InteractiveRunner()) - pcoll = p | SomePTransform | AnotherPTransform - result = p.run().wait_until_finish() - pcoll_list = result.get(pcoll) # This returns a list! + pcoll = p | beam.Create([1, 2, 3]) + # include_window_info displays windowing information + # visualize_data visualizes data with https://pair-code.github.io/facets/ + ib.show(pcoll, include_window_info=True, visualize_data=True) ``` + More details see the docstrings of `interactive_beam` module. + +2. Support of streaming record/replay and dynamic visualization -3. Faster re-execution + For streaming pipelines, Interactive Beam records a subset of unbounded + sources in the pipeline automatically so that they can be replayed for + pipeline changes during prototyping. - [InteractiveRunner](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/interactive_runner.py) - caches PCollection results of pipeline executed previously and re-uses it - when the same pipeline is submitted again. + There are a few knobs to tune the source recording: ```python - p = beam.Pipeline(interactive_runner.InteractiveRunner()) - pcoll = p | SomePTransform | AnotherPTransform - result = p.run().wait_until_finish() + # Set the amount of time recording data from unbounded sources. + ib.options.recording_duration = '10m' + + # Set the recording size limit to 1 GB. + ib.options.recording_size_limit = 1e9 + + # Visualization is dynamic as data streamed in real time. + # n=100 indicates that displays at most 100 elements. + # duration=60 indicates that displays at most 60 seconds worth of unbounded + # source generated data. + ib.show(pcoll, include_window_info=True, n=100, duration=60) + + # duration can also be strings. + ib.show(pcoll, include_window_info=True, duration='1m') - pcoll2 = pcoll | YetAnotherPTransform - result = p.run().wait_until_finish() # <- only executes YetAnotherPTransform + # If neither n nor duration is provided, the display is indefinitely until + # the current machine's recording usage hits the threadshold set by + # ib.options. + ib.show(pcoll, include_window_info=True) ``` + More details see the docstrings of `interactive_beam` module. + +3. Fetching PCollections as pandas.DataFrame + + PCollections can be collected as a pandas.DataFrame: + + ```python + pcoll_df = ib.collect(pcoll) # This returns a pandas.DataFrame! + ``` + +4. Faster execution and re-execution + + Interactive Beam analyzes the pipeline graph depending on what PCollection + you want to inspect and builds a pipeline fragment to only compute + necessary data. + + ```python + pcoll = p | PTransformA | PTransformB + pcoll2 = p | PTransformC | PTransformD + + ib.collect(pcoll) # <- only executes PTransformA and PTransformB + ib.collect(pcoll2) # <- only executes PTransformC and PTransformD + ``` + + Interactive Beam caches PCollection inspected previously and re-uses it + when the data is still in scope. + + ```python + pcoll = p | PTransformA + # pcoll2 depends on pcoll + pcoll2 = pcoll | PTransformB + ib.collect(pcoll2) # <- caches data for both pcoll and pcoll2 + + pcoll3 = pcoll2 | PTransformC + ib.collect(pcoll3) # <- reuses data of pcoll2 and only executes PTransformC + + pcoll4 = pcoll | PTransformD + ib.collect(pcoll4) # <- reuses data of pcoll and only executes PTransformD + ``` + +5. Supports global and local scopes + + Interactive Beam automatically watches the `__main__` scope for pipeline and + PCollection definitions to implicitly do magic under the hood. + + ```python + # In a script or in a notebook + p = beam.Pipeline(InteractiveRunner()) + pcoll = beam | SomeTransform + pcoll2 = pcoll | SomeOtherTransform + + # p, pcoll and pcoll2 are all known to Interactive Beam. + ib.collect(pcoll) + ib.collect(pcoll2) + ib.show_graph(p) + ``` + + You have to explicitly watch pipelines and PCollections in your local scope. + Otherwise, Interactive Beam doesn't know about them and won't handle them + with interactive features. + + ```python + def a_func(): + p = beam.Pipeline(InteractiveRunner()) + pcoll = beam | SomeTransform + pcoll2 = pcoll | SomeOtherTransform + + # Watch everything defined locally before this line. + ib.watch(locals()) + # Or explicitly watch them. + ib.watch({ + 'p': p, + 'pcoll': pcoll, + 'pcoll2': pcoll2}) + + # p, pcoll and pcoll2 are all known to Interactive Beam. + ib.collect(pcoll) + ib.collect(pcoll2) + ib.show_graph(p) + + return p, pcoll, pcoll2 + + # Or return them to main scope + p, pcoll, pcoll2 = a_func() + ib.collect(pcoll) # Also works! + ``` ## Status