[ 
https://issues.apache.org/jira/browse/BEAM-8457?focusedWorklogId=332355&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-332355
 ]

ASF GitHub Bot logged work on BEAM-8457:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Oct/19 01:15
            Start Date: 23/Oct/19 01:15
    Worklog Time Spent: 10m 
      Work Description: pabloem commented on pull request #9854: [BEAM-8457] 
Label Dataflow jobs from Notebook
URL: https://github.com/apache/beam/pull/9854#discussion_r337813433
 
 

 ##########
 File path: sdks/python/apache_beam/pipeline.py
 ##########
 @@ -396,28 +405,46 @@ def replace_all(self, replacements):
     for override in replacements:
       self._check_replacement(override)
 
-  def run(self, test_runner_api=True):
-    """Runs the pipeline. Returns whatever our runner returns after running."""
+  def run(self, test_runner_api=True, runner=None, options=None):
+    """Runs the pipeline. Returns whatever our runner returns after running.
 
+    If another runner instance and options are provided, that runner will
+    execute the pipeline with the given options. If either of them is not set,
+    the default runner will run the pipeline with the original options
+    assigned to the pipeline. The usage is similar to directly invoking
+    `runner.run_pipeline(pipeline, options)`.
+    """
+    runner_in_use = self.runner
+    options_in_use = self._options
+    if runner and options:
+      runner_in_use = runner
+      options_in_use = options
+    elif not runner and options:
+      raise ValueError('Parameter runner is not given when parameter options '
+                       'is given.')
+    elif not options and runner:
+      raise ValueError('Parameter options is not given when parameter runner '
+                       'is given.')
     # When possible, invoke a round trip through the runner API.
     if test_runner_api and self._verify_runner_api_compatible():
       return Pipeline.from_runner_api(
           self.to_runner_api(use_fake_coders=True),
-          self.runner,
-          self._options).run(False)
+          runner_in_use,
+          options_in_use,
+          interactive=self.interactive).run(False)
 
 Review comment:
   Did you find that this was necessary? I don't think we should change the 
signature of the `from_runner_api` call. The pipeline protobuf should contain 
all the necessary information... Though I'd defer to @robertwb on this.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 332355)
    Time Spent: 1h 10m  (was: 1h)

> Instrument Dataflow jobs that are launched from Notebooks
> ---------------------------------------------------------
>
>                 Key: BEAM-8457
>                 URL: https://issues.apache.org/jira/browse/BEAM-8457
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-py-interactive
>            Reporter: Ning Kang
>            Assignee: Ning Kang
>            Priority: Major
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Dataflow needs the capability to tell how many Dataflow jobs are launched 
> from the Notebook environment, i.e., the Interactive Runner.
>  # Change the pipeline.run() API to allow supply a runner and an option 
> parameter so that a pipeline initially bundled w/ an interactive runner can 
> be directly run by other runners from notebook.
>  # Implicitly add the necessary source information through user labels when 
> the user does p.run(runner=DataflowRunner()).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to