This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c039ee2  [BEAM-11666]  flake on RecordingManagerTest (#15118)
c039ee2 is described below

commit c039ee2fd42e1e5c7241b4e5deb2dc27927bdee9
Author: AlikRodriguez <74626882+alikrodrig...@users.noreply.github.com>
AuthorDate: Wed Sep 15 15:02:55 2021 -0500

    [BEAM-11666]  flake on RecordingManagerTest (#15118)
---
 .../runners/interactive/recording_manager_test.py  | 35 ++++++++++++++++------
 1 file changed, 26 insertions(+), 9 deletions(-)

diff --git 
a/sdks/python/apache_beam/runners/interactive/recording_manager_test.py 
b/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
index ca44ca3..7b7a6e9 100644
--- a/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
+++ b/sdks/python/apache_beam/runners/interactive/recording_manager_test.py
@@ -437,6 +437,21 @@ class RecordingManagerTest(unittest.TestCase):
         set(pipeline_instrument.cache_key(pc) for pc in (elems, squares)))
 
   def test_clear(self):
+    p1 = beam.Pipeline(InteractiveRunner())
+    elems_1 = p1 | 'elems 1' >> beam.Create([0, 1, 2])
+
+    ib.watch(locals())
+    ie.current_env().track_user_pipelines()
+
+    recording_manager = RecordingManager(p1)
+    recording = recording_manager.record([elems_1], max_n=3, max_duration=500)
+    recording.wait_until_finish()
+    record_describe = recording_manager.describe()
+    self.assertGreater(record_describe['size'], 0)
+    recording_manager.clear()
+    self.assertEqual(recording_manager.describe()['size'], 0)
+
+  def test_clear_specific_pipeline(self):
     """Tests that clear can empty the cache for a specific pipeline."""
 
     # Create two pipelines so we can check that clearing the cache won't clear
@@ -461,16 +476,18 @@ class RecordingManagerTest(unittest.TestCase):
     rm_2 = RecordingManager(p2)
     recording = rm_2.record([elems_2], max_n=3, max_duration=500)
     recording.wait_until_finish()
-
     # Assert that clearing only one recording clears that recording.
-    self.assertGreater(rm_1.describe()['size'], 0)
-    self.assertGreater(rm_2.describe()['size'], 0)
-    rm_1.clear()
-    self.assertEqual(rm_1.describe()['size'], 0)
-    self.assertGreater(rm_2.describe()['size'], 0)
-
-    rm_2.clear()
-    self.assertEqual(rm_2.describe()['size'], 0)
+    if rm_1.describe()['state'] == PipelineState.STOPPED \
+            and rm_2.describe()['state'] == PipelineState.STOPPED:
+
+      self.assertGreater(rm_1.describe()['size'], 0)
+      self.assertGreater(rm_2.describe()['size'], 0)
+      rm_1.clear()
+      self.assertEqual(rm_1.describe()['size'], 0)
+      self.assertGreater(rm_2.describe()['size'], 0)
+
+      rm_2.clear()
+      self.assertEqual(rm_2.describe()['size'], 0)
 
   def test_record_pipeline(self):
     # Add the TestStream so that it can be cached.

Reply via email to