This is an automated email from the ASF dual-hosted git repository.

mikhail pushed a commit to branch release-2.17.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.17.0 by this push:
     new e17ddfd  Revert "Merge pull request #9854 from [BEAM-8457] Label 
Dataflow jobs from Notebook"
     new bd744a3  Merge pull request #9887 from pabloem/release-2.17.0
e17ddfd is described below

commit e17ddfdafe33a03c67977d1f9da49697f59a92f8
Author: Ahmet Altay <aal...@gmail.com>
AuthorDate: Thu Oct 24 17:25:22 2019 -0700

    Revert "Merge pull request #9854 from [BEAM-8457] Label Dataflow jobs from 
Notebook"
    
    This reverts commit 1a8391da9222ab8d0493b0007bd60bdbeeb5e275.
---
 sdks/python/apache_beam/pipeline.py                | 48 ++++------------------
 .../runners/dataflow/dataflow_runner.py            | 10 -----
 .../runners/interactive/interactive_runner.py      |  2 +-
 3 files changed, 9 insertions(+), 51 deletions(-)

diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index 5574a82..a776d30 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -171,10 +171,6 @@ class Pipeline(object):
     # If a transform is applied and the full label is already in the set
     # then the transform will have to be cloned with a new label.
     self.applied_labels = set()
-    # A boolean value indicating whether the pipeline is created in an
-    # interactive environment such as interactive notebooks. Initialized as
-    # None. The value is set ad hoc when `pipeline.run()` is invoked.
-    self.interactive = None
 
   @property
   @deprecated(since='First stable release',
@@ -399,56 +395,28 @@ class Pipeline(object):
     for override in replacements:
       self._check_replacement(override)
 
-  def run(self, test_runner_api=True, runner=None, options=None,
-          interactive=None):
-    """Runs the pipeline. Returns whatever our runner returns after running.
-
-    If another runner instance and options are provided, that runner will
-    execute the pipeline with the given options. If either of them is not set,
-    a ValueError is raised. The usage is similar to directly invoking
-    `runner.run_pipeline(pipeline, options)`.
-    Additionally, an interactive field can be set to override the pipeline's
-    self.interactive field to mark current pipeline as being initiated from an
-    interactive environment.
-    """
-    from apache_beam.runners.interactive import interactive_runner
-    if interactive:
-      self.interactive = interactive
-    elif isinstance(self.runner, interactive_runner.InteractiveRunner):
-      self.interactive = True
-    else:
-      self.interactive = False
-    runner_in_use = self.runner
-    options_in_use = self._options
-    if runner and options:
-      runner_in_use = runner
-      options_in_use = options
-    elif not runner and options:
-      raise ValueError('Parameter runner is not given when parameter options '
-                       'is given.')
-    elif not options and runner:
-      raise ValueError('Parameter options is not given when parameter runner '
-                       'is given.')
+  def run(self, test_runner_api=True):
+    """Runs the pipeline. Returns whatever our runner returns after running."""
+
     # When possible, invoke a round trip through the runner API.
     if test_runner_api and self._verify_runner_api_compatible():
       return Pipeline.from_runner_api(
           self.to_runner_api(use_fake_coders=True),
-          runner_in_use,
-          options_in_use).run(test_runner_api=False,
-                              interactive=self.interactive)
+          self.runner,
+          self._options).run(False)
 
-    if options_in_use.view_as(TypeOptions).runtime_type_check:
+    if self._options.view_as(TypeOptions).runtime_type_check:
       from apache_beam.typehints import typecheck
       self.visit(typecheck.TypeCheckVisitor())
 
-    if options_in_use.view_as(SetupOptions).save_main_session:
+    if self._options.view_as(SetupOptions).save_main_session:
       # If this option is chosen, verify we can pickle the main session early.
       tmpdir = tempfile.mkdtemp()
       try:
         pickler.dump_session(os.path.join(tmpdir, 'main_session.pickle'))
       finally:
         shutil.rmtree(tmpdir)
-    return runner_in_use.run_pipeline(self, options_in_use)
+    return self.runner.run_pipeline(self, self._options)
 
   def __enter__(self):
     return self
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index f57be74..4928550 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -364,16 +364,6 @@ class DataflowRunner(PipelineRunner):
 
   def run_pipeline(self, pipeline, options):
     """Remotely executes entire pipeline or parts reachable from node."""
-    # Label goog-dataflow-notebook if pipeline is initiated from interactive
-    # runner.
-    if pipeline.interactive:
-      notebook_version = ('goog-dataflow-notebook=' +
-                          beam.version.__version__.replace('.', '_'))
-      if options.view_as(GoogleCloudOptions).labels:
-        options.view_as(GoogleCloudOptions).labels.append(notebook_version)
-      else:
-        options.view_as(GoogleCloudOptions).labels = [notebook_version]
-
     # Import here to avoid adding the dependency for local running scenarios.
     try:
       # pylint: disable=wrong-import-order, wrong-import-position
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py 
b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index 56a3c18..94c0de7 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -146,7 +146,7 @@ class InteractiveRunner(runners.PipelineRunner):
         cache_manager=self._cache_manager,
         pipeline_graph_renderer=self._renderer)
     display.start_periodic_update()
-    result = pipeline_to_execute.run(interactive=True)
+    result = pipeline_to_execute.run()
     result.wait_until_finish()
     display.stop_periodic_update()
 

Reply via email to