This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new c039ee2 [BEAM-11666] flake on RecordingManagerTest (#15118) c039ee2 is described below commit c039ee2fd42e1e5c7241b4e5deb2dc27927bdee9 Author: AlikRodriguez <74626882+alikrodrig...@users.noreply.github.com> AuthorDate: Wed Sep 15 15:02:55 2021 -0500 [BEAM-11666] flake on RecordingManagerTest (#15118) --- .../runners/interactive/recording_manager_test.py | 35 ++++++++++++++++------ 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/runners/interactive/recording_manager_test.py b/sdks/python/apache_beam/runners/interactive/recording_manager_test.py index ca44ca3..7b7a6e9 100644 --- a/sdks/python/apache_beam/runners/interactive/recording_manager_test.py +++ b/sdks/python/apache_beam/runners/interactive/recording_manager_test.py @@ -437,6 +437,21 @@ class RecordingManagerTest(unittest.TestCase): set(pipeline_instrument.cache_key(pc) for pc in (elems, squares))) def test_clear(self): + p1 = beam.Pipeline(InteractiveRunner()) + elems_1 = p1 | 'elems 1' >> beam.Create([0, 1, 2]) + + ib.watch(locals()) + ie.current_env().track_user_pipelines() + + recording_manager = RecordingManager(p1) + recording = recording_manager.record([elems_1], max_n=3, max_duration=500) + recording.wait_until_finish() + record_describe = recording_manager.describe() + self.assertGreater(record_describe['size'], 0) + recording_manager.clear() + self.assertEqual(recording_manager.describe()['size'], 0) + + def test_clear_specific_pipeline(self): """Tests that clear can empty the cache for a specific pipeline.""" # Create two pipelines so we can check that clearing the cache won't clear @@ -461,16 +476,18 @@ class RecordingManagerTest(unittest.TestCase): rm_2 = RecordingManager(p2) recording = rm_2.record([elems_2], max_n=3, max_duration=500) recording.wait_until_finish() - # Assert that clearing only one recording clears that recording. - self.assertGreater(rm_1.describe()['size'], 0) - self.assertGreater(rm_2.describe()['size'], 0) - rm_1.clear() - self.assertEqual(rm_1.describe()['size'], 0) - self.assertGreater(rm_2.describe()['size'], 0) - - rm_2.clear() - self.assertEqual(rm_2.describe()['size'], 0) + if rm_1.describe()['state'] == PipelineState.STOPPED \ + and rm_2.describe()['state'] == PipelineState.STOPPED: + + self.assertGreater(rm_1.describe()['size'], 0) + self.assertGreater(rm_2.describe()['size'], 0) + rm_1.clear() + self.assertEqual(rm_1.describe()['size'], 0) + self.assertGreater(rm_2.describe()['size'], 0) + + rm_2.clear() + self.assertEqual(rm_2.describe()['size'], 0) def test_record_pipeline(self): # Add the TestStream so that it can be cached.