This is an automated email from the ASF dual-hosted git repository. mikhail pushed a commit to branch release-2.17.0 in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.17.0 by this push: new e17ddfd Revert "Merge pull request #9854 from [BEAM-8457] Label Dataflow jobs from Notebook" new bd744a3 Merge pull request #9887 from pabloem/release-2.17.0 e17ddfd is described below commit e17ddfdafe33a03c67977d1f9da49697f59a92f8 Author: Ahmet Altay <aal...@gmail.com> AuthorDate: Thu Oct 24 17:25:22 2019 -0700 Revert "Merge pull request #9854 from [BEAM-8457] Label Dataflow jobs from Notebook" This reverts commit 1a8391da9222ab8d0493b0007bd60bdbeeb5e275. --- sdks/python/apache_beam/pipeline.py | 48 ++++------------------ .../runners/dataflow/dataflow_runner.py | 10 ----- .../runners/interactive/interactive_runner.py | 2 +- 3 files changed, 9 insertions(+), 51 deletions(-) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 5574a82..a776d30 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -171,10 +171,6 @@ class Pipeline(object): # If a transform is applied and the full label is already in the set # then the transform will have to be cloned with a new label. self.applied_labels = set() - # A boolean value indicating whether the pipeline is created in an - # interactive environment such as interactive notebooks. Initialized as - # None. The value is set ad hoc when `pipeline.run()` is invoked. - self.interactive = None @property @deprecated(since='First stable release', @@ -399,56 +395,28 @@ class Pipeline(object): for override in replacements: self._check_replacement(override) - def run(self, test_runner_api=True, runner=None, options=None, - interactive=None): - """Runs the pipeline. Returns whatever our runner returns after running. - - If another runner instance and options are provided, that runner will - execute the pipeline with the given options. If either of them is not set, - a ValueError is raised. The usage is similar to directly invoking - `runner.run_pipeline(pipeline, options)`. - Additionally, an interactive field can be set to override the pipeline's - self.interactive field to mark current pipeline as being initiated from an - interactive environment. - """ - from apache_beam.runners.interactive import interactive_runner - if interactive: - self.interactive = interactive - elif isinstance(self.runner, interactive_runner.InteractiveRunner): - self.interactive = True - else: - self.interactive = False - runner_in_use = self.runner - options_in_use = self._options - if runner and options: - runner_in_use = runner - options_in_use = options - elif not runner and options: - raise ValueError('Parameter runner is not given when parameter options ' - 'is given.') - elif not options and runner: - raise ValueError('Parameter options is not given when parameter runner ' - 'is given.') + def run(self, test_runner_api=True): + """Runs the pipeline. Returns whatever our runner returns after running.""" + # When possible, invoke a round trip through the runner API. if test_runner_api and self._verify_runner_api_compatible(): return Pipeline.from_runner_api( self.to_runner_api(use_fake_coders=True), - runner_in_use, - options_in_use).run(test_runner_api=False, - interactive=self.interactive) + self.runner, + self._options).run(False) - if options_in_use.view_as(TypeOptions).runtime_type_check: + if self._options.view_as(TypeOptions).runtime_type_check: from apache_beam.typehints import typecheck self.visit(typecheck.TypeCheckVisitor()) - if options_in_use.view_as(SetupOptions).save_main_session: + if self._options.view_as(SetupOptions).save_main_session: # If this option is chosen, verify we can pickle the main session early. tmpdir = tempfile.mkdtemp() try: pickler.dump_session(os.path.join(tmpdir, 'main_session.pickle')) finally: shutil.rmtree(tmpdir) - return runner_in_use.run_pipeline(self, options_in_use) + return self.runner.run_pipeline(self, self._options) def __enter__(self): return self diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index f57be74..4928550 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -364,16 +364,6 @@ class DataflowRunner(PipelineRunner): def run_pipeline(self, pipeline, options): """Remotely executes entire pipeline or parts reachable from node.""" - # Label goog-dataflow-notebook if pipeline is initiated from interactive - # runner. - if pipeline.interactive: - notebook_version = ('goog-dataflow-notebook=' + - beam.version.__version__.replace('.', '_')) - if options.view_as(GoogleCloudOptions).labels: - options.view_as(GoogleCloudOptions).labels.append(notebook_version) - else: - options.view_as(GoogleCloudOptions).labels = [notebook_version] - # Import here to avoid adding the dependency for local running scenarios. try: # pylint: disable=wrong-import-order, wrong-import-position diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py b/sdks/python/apache_beam/runners/interactive/interactive_runner.py index 56a3c18..94c0de7 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py @@ -146,7 +146,7 @@ class InteractiveRunner(runners.PipelineRunner): cache_manager=self._cache_manager, pipeline_graph_renderer=self._renderer) display.start_periodic_update() - result = pipeline_to_execute.run(interactive=True) + result = pipeline_to_execute.run() result.wait_until_finish() display.stop_periodic_update()