This is an automated email from the ASF dual-hosted git repository.

ningk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5efb8a5b8e6 [BEAM-14330] Temporarily disable the clusters auto-cleanup 
(#17400)
5efb8a5b8e6 is described below

commit 5efb8a5b8e67a6213ef9b907bcb04b80c65710a1
Author: Ning Kang <ningkang0...@gmail.com>
AuthorDate: Wed Apr 20 14:43:11 2022 -0700

    [BEAM-14330] Temporarily disable the clusters auto-cleanup (#17400)
    
    Disables the clusters cleanup to avoid unimplemented errors when tests
    run in parallel racing the global state without necessary mocking.
    Removed unnecessary tests, isolated singleton env in tests.
    Mocked env in dataproc cluster manager related tests.
---
 .../dataproc/dataproc_cluster_manager_test.py      | 10 ++++
 .../runners/interactive/interactive_environment.py |  3 +-
 .../interactive/interactive_environment_test.py    | 49 ----------------
 .../runners/interactive/interactive_runner.py      |  2 +-
 .../runners/interactive/interactive_runner_test.py | 67 +++++++---------------
 5 files changed, 35 insertions(+), 96 deletions(-)

diff --git 
a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py
 
b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py
index a72412092ca..fc826e34cbc 100644
--- 
a/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py
+++ 
b/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager_test.py
@@ -22,6 +22,7 @@ dataproc_cluster_manager."""
 import unittest
 from unittest.mock import patch
 
+from apache_beam.runners.interactive import interactive_beam as ib
 from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import 
DataprocClusterManager
 from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier
 
@@ -69,6 +70,15 @@ class MockFileIO:
 @unittest.skipIf(not _dataproc_imported, 'dataproc package was not imported.')
 class DataprocClusterManagerTest(unittest.TestCase):
   """Unit test for DataprocClusterManager"""
+  def setUp(self):
+    self.patcher = patch(
+        'apache_beam.runners.interactive.interactive_environment.current_env')
+    self.m_env = self.patcher.start()
+    self.m_env().clusters = ib.Clusters()
+
+  def tearDown(self):
+    self.patcher.stop()
+
   @patch(
       'google.cloud.dataproc_v1.ClusterControllerClient.create_cluster',
       side_effect=MockException(409))
diff --git 
a/sdks/python/apache_beam/runners/interactive/interactive_environment.py 
b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
index 5fc638cfedc..78b6de920cf 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_environment.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
@@ -284,7 +284,8 @@ class InteractiveEnvironment(object):
       # we don't need to clean it up here.
       if cache_manager and pipeline_id not in self._recording_managers:
         cache_manager.cleanup()
-    self.clusters.cleanup()
+    # TODO(BEAM-14330): uncomment this once tests are refactored.
+    # self.clusters.cleanup()
 
   def cleanup(self, pipeline=None):
     """Cleans up cached states for the given pipeline. Noop if the given
diff --git 
a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py 
b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
index ec7e1c97b3b..a2dd24b57ef 100644
--- 
a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
+++ 
b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
@@ -23,12 +23,9 @@ import unittest
 from unittest.mock import patch
 
 import apache_beam as beam
-from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.runners import runner
 from apache_beam.runners.interactive import cache_manager as cache
 from apache_beam.runners.interactive import interactive_environment as ie
-from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import 
DataprocClusterManager
-from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier
 from apache_beam.runners.interactive.recording_manager import RecordingManager
 from apache_beam.runners.interactive.sql.sql_chain import SqlNode
 
@@ -357,52 +354,6 @@ class InteractiveEnvironmentTest(unittest.TestCase):
     with self.assertRaises(ValueError):
       env._get_gcs_cache_dir(p, cache_root)
 
-  @unittest.skipIf(
-      not ie.current_env().is_interactive_ready,
-      '[interactive] dependency is not installed.')
-  @patch(
-      'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
-      'DataprocClusterManager.cleanup',
-      return_value=None)
-  def test_cleanup_specific_dataproc_cluster(self, mock_cleanup):
-    env = ie.InteractiveEnvironment()
-    project = 'test-project'
-    region = 'test-region'
-    p = beam.Pipeline(
-        options=PipelineOptions(
-            project=project,
-            region=region,
-        ))
-    cluster_metadata = MasterURLIdentifier(project_id=project, region=region)
-    env.clusters.dataproc_cluster_managers[str(
-        id(p))] = DataprocClusterManager(cluster_metadata)
-    env._tracked_user_pipelines.add_user_pipeline(p)
-    env.cleanup(p)
-    self.assertEqual(env.clusters.dataproc_cluster_managers, {})
-
-  @unittest.skipIf(
-      not ie.current_env().is_interactive_ready,
-      '[interactive] dependency is not installed.')
-  @patch(
-      'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
-      'DataprocClusterManager.cleanup',
-      return_value=None)
-  def test_cleanup_all_dataproc_clusters(self, mock_cleanup):
-    env = ie.InteractiveEnvironment()
-    project = 'test-project'
-    region = 'test-region'
-    p = beam.Pipeline(
-        options=PipelineOptions(
-            project=project,
-            region=region,
-        ))
-    cluster_metadata = MasterURLIdentifier(project_id=project, region=region)
-    env.clusters.dataproc_cluster_managers[str(
-        id(p))] = DataprocClusterManager(cluster_metadata)
-    env._tracked_user_pipelines.add_user_pipeline(p)
-    env.cleanup()
-    self.assertEqual(env.clusters.dataproc_cluster_managers, {})
-
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py 
b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index 8716b94f0dd..3c356f1331f 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -34,7 +34,6 @@ from apache_beam.runners.direct import direct_runner
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive import pipeline_instrument as inst
 from apache_beam.runners.interactive import background_caching_job
-from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager import 
DataprocClusterManager
 from apache_beam.runners.interactive.dataproc.types import MasterURLIdentifier
 from apache_beam.runners.interactive.display import pipeline_graph
 from apache_beam.runners.interactive.options import capture_control
@@ -248,6 +247,7 @@ class InteractiveRunner(runners.PipelineRunner):
       '--environment_type=DOCKER'
       ])
     """
+    from apache_beam.runners.interactive.dataproc.dataproc_cluster_manager 
import DataprocClusterManager
     from apache_beam.runners.portability.flink_runner import FlinkRunner
     flink_master = user_pipeline.options.view_as(
         FlinkRunnerOptions).flink_master
diff --git 
a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py 
b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
index d3ec6d87f28..ff954f0dae2 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py
@@ -31,7 +31,6 @@ import pandas as pd
 
 import apache_beam as beam
 from apache_beam.dataframe.convert import to_dataframe
-from apache_beam.options.pipeline_options import FlinkRunnerOptions
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.runners.direct import direct_runner
@@ -493,8 +492,12 @@ class InteractiveRunnerTest(unittest.TestCase):
       'apache_beam.runners.interactive.dataproc.dataproc_cluster_manager.'
       'DataprocClusterManager.create_flink_cluster',
       return_value=None)
+  @patch('apache_beam.runners.interactive.interactive_environment.current_env')
   def test_get_master_url_no_flink_master_or_provided_master_url(
-      self, mock_create_cluster):
+      self, m_env, mock_create_cluster):
+    clusters = ib.Clusters()
+    m_env().clusters = clusters
+
     from apache_beam.runners.portability.flink_runner import FlinkRunner
     runner = interactive_runner.InteractiveRunner(
         underlying_runner=FlinkRunner())
@@ -505,14 +508,16 @@ class InteractiveRunnerTest(unittest.TestCase):
         ))
     runner._get_dataproc_cluster_master_url_if_applicable(p)
     self.assertEqual(
-        ie.current_env().clusters.describe(p)['cluster_metadata'].project_id,
-        'test-project')
-    ie.current_env().clusters = ib.Clusters()
+        clusters.describe(p)['cluster_metadata'].project_id, 'test-project')
 
   @unittest.skipIf(
       not ie.current_env().is_interactive_ready,
       '[interactive] dependency is not installed.')
-  def test_get_master_url_no_flink_master_and_master_url_exists(self):
+  @patch('apache_beam.runners.interactive.interactive_environment.current_env')
+  def test_get_master_url_no_flink_master_and_master_url_exists(self, m_env):
+    clusters = ib.Clusters()
+    m_env().clusters = clusters
+
     from apache_beam.runners.portability.flink_runner import FlinkRunner
     runner = interactive_runner.InteractiveRunner(
         underlying_runner=FlinkRunner())
@@ -521,61 +526,33 @@ class InteractiveRunnerTest(unittest.TestCase):
             project='test-project',
             region='test-region',
         ))
-    cluster_name = ie.current_env().clusters.default_cluster_name
+    cluster_name = clusters.default_cluster_name
     cluster_metadata = MasterURLIdentifier(
         project_id='test-project',
         region='test-region',
         cluster_name=cluster_name)
-    ie.current_env().clusters.master_urls['test-url'] = cluster_metadata
-    ie.current_env(
-    ).clusters.master_urls_to_dashboards['test-url'] = 'test-dashboard'
+    clusters.master_urls['test-url'] = cluster_metadata
+    clusters.master_urls_to_dashboards['test-url'] = 'test-dashboard'
     flink_master = runner._get_dataproc_cluster_master_url_if_applicable(p)
     self.assertEqual(
-        ie.current_env().clusters.describe(p)['cluster_metadata'].project_id,
-        'test-project')
-    self.assertEqual(
-        flink_master, ie.current_env().clusters.describe(p)['master_url'])
+        clusters.describe(p)['cluster_metadata'].project_id, 'test-project')
+    self.assertEqual(flink_master, clusters.describe(p)['master_url'])
 
   @unittest.skipIf(
       not ie.current_env().is_interactive_ready,
       '[interactive] dependency is not installed.')
-  def test_get_master_url_flink_master_provided(self):
+  @patch('apache_beam.runners.interactive.interactive_environment.current_env')
+  def test_get_master_url_flink_master_provided(self, m_env):
+    clusters = ib.Clusters()
+    m_env().clusters = clusters
+
     runner = interactive_runner.InteractiveRunner()
     from apache_beam.runners.portability.flink_runner import FlinkRunner
     p = beam.Pipeline(
         interactive_runner.InteractiveRunner(underlying_runner=FlinkRunner()),
         options=PipelineOptions(flink_master='--flink_master=test.internal:1'))
     runner._get_dataproc_cluster_master_url_if_applicable(p)
-    self.assertEqual(ie.current_env().clusters.describe(), {})
-    ie.current_env().clusters = ib.Clusters()
-
-  @unittest.skipIf(
-      not ie.current_env().is_interactive_ready,
-      '[interactive] dependency is not installed.')
-  @patch(
-      'apache_beam.runners.interactive.interactive_runner.'
-      'InteractiveRunner._get_dataproc_cluster_master_url_if_applicable',
-      return_value='test.internal:1')
-  def test_set_flink_dataproc_version(self, mock_get_master_url):
-    runner = interactive_runner.InteractiveRunner()
-    options = PipelineOptions()
-    p = beam.Pipeline(interactive_runner.InteractiveRunner())
-
-    # Watch the local scope for Interactive Beam so that values will be cached.
-    ib.watch(locals())
-
-    # This is normally done in the interactive_utils when a transform is
-    # applied but needs an IPython environment. So we manually run this here.
-    ie.current_env().track_user_pipelines()
-
-    # Run the pipeline
-    runner.run_pipeline(p, options)
-
-    # Check that the Flink version is set to the Dataproc image Flink version
-    # inside ib.clusters.
-    self.assertEqual(
-        options.view_as(FlinkRunnerOptions).flink_version,
-        ib.clusters.DATAPROC_FLINK_VERSION)
+    self.assertEqual(clusters.describe(), {})
 
 
 if __name__ == '__main__':

Reply via email to