[ 
https://issues.apache.org/jira/browse/BEAM-7760?focusedWorklogId=290079&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-290079
 ]

ASF GitHub Bot logged work on BEAM-7760:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Aug/19 23:57
            Start Date: 06/Aug/19 23:57
    Worklog Time Spent: 10m 
      Work Description: aaltay commented on pull request #9278: [BEAM-7760] 
Added iBeam module
URL: https://github.com/apache/beam/pull/9278#discussion_r311319990
 
 

 ##########
 File path: sdks/python/apache_beam/runners/interactive/interactive_beam.py
 ##########
 @@ -0,0 +1,199 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Module of the current iBeam (interactive Beam) environment.
+
+The purpose of the module is to reduce the learning curve of iBeam users, 
+provide a single place for importing and add sugar syntax for all iBeam
+components. It gives users capability to manipulate existing environment for
+interactive beam, TODO(ningk) run interactive pipeline on selected runner as
+normal pipeline, create pipeline with interactive runner and visualize
+PCollections as bounded dataset.
+
+Note: iBeam works the same as normal Beam with DirectRunner when not in an
+interactively environment such as Jupyter lab or Jupyter Notebook. You can also
+run pipeline created by iBeam as normal Beam pipeline by run_pipeline() with
+desired runners.
+"""
+
+import importlib
+
+import apache_beam as beam
+from apache_beam.runners.interactive import interactive_runner
+
+_ibeam_env = None
+
+
+def watch(watchable):
+  """Watches a watchable so that iBeam can understand your pipeline.
+
+  If you write Beam pipeline in a notebook or __main__ module directly, since
+  __main__ module is always watched by default, you don't have to instruct
+  iBeam. However, if your Beam pipeline is defined in some module other than
+  __main__, e.g., inside a class function or a unit test, you can watch() the
+  scope to instruct iBeam to apply magic to your pipeline when running pipeline
+  interactively.
+
+    For example:
+
+    class Foo(object)
+      def build_pipeline(self):
+        p = create_pipeline()
+        init_pcoll = p |  'Init Create' >> beam.Create(range(10))
+        watch(locals())
+        return p
+    Foo().build_pipeline().run()
+
+    iBeam will cache init_pcoll for the first run. You can use:
+
+    visualize(init_pcoll)
+
+    To visualize data from init_pcoll once the pipeline is executed. And if you
+    make change to the original pipeline by adding:
+
+    squares = init_pcoll | 'Square' >> beam.Map(lambda x: x*x)
+
+    When you re-run the pipeline from the line you just added, squares will
+    use the init_pcoll data cached so you can have an interactive experience.
+
+  Currently the implementation mainly watches for PCollection variables defined
+  in user code. A watchable can be a dictionary of variable metadata such as
+  locals(), a str name of a module, a module object or an instance of a class.
+  The variable can come from any scope even local variables in a method of a
+  class defined in a module.
+
+    Below are all valid:
+
+    watch(__main__)  # if import __main__ is already invoked
+    watch('__main__')  # does not require invoking import __main__ beforehand
+    watch(self)  # inside a class
+    watch(SomeInstance())  # an instance of a class
+    watch(locals())  # inside a function, watching local variables within
+  """
+  current_env().watch(watchable)
+
+
+def create_pipeline(runner=None, options=None, argv=None):
 
 Review comment:
   This is not really very different than beam.Pipeline(...). It might be 
better to avoid adding. Using existing constructs will have the advantage of 
existing documentation and user knowledge, and user's can copy contents out of 
notebooks without much change.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 290079)
    Time Spent: 50m  (was: 40m)

> Interactive Beam Caching PCollections bound to user defined vars in notebook
> ----------------------------------------------------------------------------
>
>                 Key: BEAM-7760
>                 URL: https://issues.apache.org/jira/browse/BEAM-7760
>             Project: Beam
>          Issue Type: New Feature
>          Components: examples-python
>            Reporter: Ning Kang
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> Cache only PCollections bound to user defined variables in a pipeline when 
> running pipeline with interactive runner in jupyter notebooks.
> [Interactive 
> Beam|[https://github.com/apache/beam/tree/master/sdks/python/apache_beam/runners/interactive]]
>  has been caching and using caches of "leaf" PCollections for interactive 
> execution in jupyter notebooks.
> The interactive execution is currently supported so that when appending new 
> transforms to existing pipeline for a new run, executed part of the pipeline 
> doesn't need to be re-executed. 
> A PCollection is "leaf" when it is never used as input in any PTransform in 
> the pipeline.
> The problem with building caches and pipeline to execute around "leaf" is 
> that when a PCollection is consumed by a sink with no output, the pipeline to 
> execute built will miss the subgraph generating and consuming that 
> PCollection.
> An example, "ReadFromPubSub --> WirteToPubSub" will result in an empty 
> pipeline.
> Caching around PCollections bound to user defined variables and replacing 
> transforms with source and sink of caches could resolve the pipeline to 
> execute properly under the interactive execution scenario. Also, cached 
> PCollection now can trace back to user code and can be used for user data 
> visualization if user wants to do it.
> E.g.,
> {code:java}
> // ...
> p = beam.Pipeline(interactive_runner.InteractiveRunner(),
>                   options=pipeline_options)
> messages = p | "Read" >> beam.io.ReadFromPubSub(subscription='...')
> messages | "Write" >> beam.io.WriteToPubSub(topic_path)
> result = p.run()
> // ...
> visualize(messages){code}
>  The interactive runner automatically figures out that PCollection
> {code:java}
> messages{code}
> created by
> {code:java}
> p | "Read" >> beam.io.ReadFromPubSub(subscription='...'){code}
> should be cached and reused if the notebook user appends more transforms.
>  And once the pipeline gets executed, the user could use any 
> visualize(PCollection) module to visualize the data statically (batch) or 
> dynamically (stream)



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to