This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8de55cc  [BEAM-8379] Cache Eviction
     new 1bb581f  Merge pull request #10062 from [BEAM-8379] Cache Eviction for 
Interactive Beam
8de55cc is described below

commit 8de55cca86061a5724e647ffccd2300ce705d793
Author: Ning Kang <kawai...@gmail.com>
AuthorDate: Wed Nov 13 10:16:57 2019 -0800

    [BEAM-8379] Cache Eviction
    
    1. Implemented cache eviction whenever Python interpreter exits.
    2. Cache for PCollections is grouped by PCollections as the
    Interactive Beam user flow is now data-centric. And cache including
    its eviction is  managed by a global interactive environment instance
    created/retrieved/reset implicitly by runners in the same main
    thread/loop.
    3. Unified Python version requirement to 3.6 for interactive features and 
their tests for simplicity. We don't have test suite between py35 and py36 
(both exclusive) anyway, so there is no need to check by 3.5.3 when some mock 
test features are only available in 3.6 and later.
---
 .../display/pcoll_visualization_test.py            | 18 +----
 .../runners/interactive/interactive_environment.py | 31 +++++++-
 .../interactive/interactive_environment_test.py    | 93 +++++++++++++++++++++-
 3 files changed, 122 insertions(+), 20 deletions(-)

diff --git 
a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization_test.py
 
b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization_test.py
index 4628c25..8eefec7 100644
--- 
a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization_test.py
+++ 
b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization_test.py
@@ -42,6 +42,8 @@ except ImportError:
 
 @unittest.skipIf(not ie.current_env().is_interactive_ready,
                  '[interactive] dependency is not installed.')
+@unittest.skipIf(sys.version_info < (3, 6),
+                 'The tests require at least Python 3.6 to work.')
 class PCollectionVisualizationTest(unittest.TestCase):
 
   def setUp(self):
@@ -56,8 +58,6 @@ class PCollectionVisualizationTest(unittest.TestCase):
     # pylint: disable=range-builtin-not-iterating
     self._pcoll = self._p | 'Create' >> beam.Create(range(1000))
 
-  @unittest.skipIf(sys.version_info < (3, 5, 3),
-                   'PCollectionVisualization is supported on Python 3.5.3+.')
   def test_raise_error_for_non_pcoll_input(self):
     class Foo(object):
       pass
@@ -67,8 +67,6 @@ class PCollectionVisualizationTest(unittest.TestCase):
       self.assertTrue('pcoll should be apache_beam.pvalue.PCollection' in
                       ctx.exception)
 
-  @unittest.skipIf(sys.version_info < (3, 5, 3),
-                   'PCollectionVisualization is supported on Python 3.5.3+.')
   def test_pcoll_visualization_generate_unique_display_id(self):
     pv_1 = pv.PCollectionVisualization(self._pcoll)
     pv_2 = pv.PCollectionVisualization(self._pcoll)
@@ -76,8 +74,6 @@ class PCollectionVisualizationTest(unittest.TestCase):
     self.assertNotEqual(pv_1._overview_display_id, pv_2._overview_display_id)
     self.assertNotEqual(pv_1._df_display_id, pv_2._df_display_id)
 
-  @unittest.skipIf(sys.version_info < (3, 5, 3),
-                   'PCollectionVisualization is supported on Python 3.5.3+.')
   @patch('apache_beam.runners.interactive.display.pcoll_visualization'
          '.PCollectionVisualization._to_element_list', lambda x: [1, 2, 3])
   def test_one_shot_visualization_not_return_handle(self):
@@ -91,8 +87,6 @@ class PCollectionVisualizationTest(unittest.TestCase):
     yield [1, 2, 3, 4, 5, 6, 7]
     yield [1, 2, 3, 4, 5, 6, 7, 8]
 
-  @unittest.skipIf(sys.version_info < (3, 5, 3),
-                   'PCollectionVisualization is supported on Python 3.5.3+.')
   @patch('apache_beam.runners.interactive.display.pcoll_visualization'
          '.PCollectionVisualization._to_element_list', _mock_to_element_list)
   def test_dynamic_plotting_return_handle(self):
@@ -100,8 +94,6 @@ class PCollectionVisualizationTest(unittest.TestCase):
     self.assertIsInstance(h, timeloop.Timeloop)
     h.stop()
 
-  @unittest.skipIf(sys.version_info < (3, 5, 3),
-                   'PCollectionVisualization is supported on Python 3.5.3+.')
   @patch('apache_beam.runners.interactive.display.pcoll_visualization'
          '.PCollectionVisualization._to_element_list', _mock_to_element_list)
   @patch('apache_beam.runners.interactive.display.pcoll_visualization'
@@ -126,10 +118,6 @@ class PCollectionVisualizationTest(unittest.TestCase):
       self.assertIs(kwargs['updating_pv'], updating_pv)
     h.stop()
 
-  # The code being tested supports 3.5.3+. This specific test has assertion
-  # feature that was introduced in 3.6.
-  @unittest.skipIf(sys.version_info < (3, 6),
-                   'The test requires Python 3.6+.')
   @patch('apache_beam.runners.interactive.display.pcoll_visualization'
          '.PCollectionVisualization._to_element_list', _mock_to_element_list)
   @patch('timeloop.Timeloop.stop')
@@ -150,8 +138,6 @@ class PCollectionVisualizationTest(unittest.TestCase):
     # "assert_called" is new in Python 3.6.
     mocked_timeloop.assert_called()
 
-  @unittest.skipIf(sys.version_info < (3, 5, 3),
-                   'PCollectionVisualization is supported on Python 3.5.3+.')
   @patch('apache_beam.runners.interactive.display.pcoll_visualization'
          '.PCollectionVisualization._to_element_list', lambda x: [1, 2, 3])
   @patch('pandas.DataFrame.sample')
diff --git 
a/sdks/python/apache_beam/runners/interactive/interactive_environment.py 
b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
index 2dbc102..414d564 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_environment.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
@@ -24,6 +24,7 @@ application code or notebook.
 """
 from __future__ import absolute_import
 
+import atexit
 import importlib
 import logging
 import sys
@@ -32,6 +33,9 @@ import apache_beam as beam
 from apache_beam.runners import runner
 from apache_beam.runners.utils import is_interactive
 
+# Interactive Beam user flow is data-centric rather than pipeline-centric, so
+# there is only one global interactive environment instance that manages
+# implementation that enables interactivity.
 _interactive_beam_env = None
 
 
@@ -46,6 +50,8 @@ def current_env(cache_manager=None):
 def new_env(cache_manager=None):
   """Creates a new Interactive Beam environment to replace current one."""
   global _interactive_beam_env
+  if _interactive_beam_env:
+    _interactive_beam_env.cleanup()
   _interactive_beam_env = None
   return current_env(cache_manager)
 
@@ -63,6 +69,9 @@ class InteractiveEnvironment(object):
 
   def __init__(self, cache_manager=None):
     self._cache_manager = cache_manager
+    # Register a cleanup routine when kernel is restarted or terminated.
+    if cache_manager:
+      atexit.register(self.cleanup)
     # Holds class instances, module object, string of module names.
     self._watching_set = set()
     # Holds variables list of (Dict[str, object]).
@@ -74,10 +83,10 @@ class InteractiveEnvironment(object):
     self._pipeline_results = {}
     # Always watch __main__ module.
     self.watch('__main__')
-    # Do a warning level logging if current python version is below 3.5.3.
-    if sys.version_info < (3, 5, 3):
+    # Do a warning level logging if current python version is below 3.6.
+    if sys.version_info < (3, 6):
       self._is_py_version_ready = False
-      logging.warning('Interactive Beam requires Python 3.5.3+.')
+      logging.warning('Interactive Beam requires Python 3.6+.')
     else:
       self._is_py_version_ready = True
     # Check if [interactive] dependencies are installed.
@@ -127,6 +136,11 @@ class InteractiveEnvironment(object):
     """
     return self._is_in_notebook
 
+  def cleanup(self):
+    # Utilizes cache manager to clean up cache from everywhere.
+    if self.cache_manager():
+      self.cache_manager().cleanup()
+
   def watch(self, watchable):
     """Watches a watchable.
 
@@ -163,7 +177,18 @@ class InteractiveEnvironment(object):
 
   def set_cache_manager(self, cache_manager):
     """Sets the cache manager held by current Interactive Environment."""
+    if self._cache_manager is cache_manager:
+      # NOOP if setting to the same cache_manager.
+      return
+    if self._cache_manager:
+      # Invoke cleanup routine when a new cache_manager is forcefully set and
+      # current cache_manager is not None.
+      self.cleanup()
+      atexit.unregister(self.cleanup)
     self._cache_manager = cache_manager
+    if self._cache_manager:
+      # Re-register cleanup routine for the new cache_manager if it's not None.
+      atexit.register(self.cleanup)
 
   def cache_manager(self):
     """Gets the cache manager held by current Interactive Environment."""
diff --git 
a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py 
b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
index 6fa257b..76c29b8 100644
--- 
a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
+++ 
b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
@@ -19,16 +19,27 @@
 from __future__ import absolute_import
 
 import importlib
+import sys
 import unittest
 
 import apache_beam as beam
 from apache_beam.runners import runner
+from apache_beam.runners.interactive import cache_manager as cache
 from apache_beam.runners.interactive import interactive_environment as ie
 
+# TODO(BEAM-8288): clean up the work-around of nose tests using Python2 without
+# unittest.mock module.
+try:
+  from unittest.mock import call, patch
+except ImportError:
+  from mock import call, patch
+
 # The module name is also a variable in module.
 _module_name = 'apache_beam.runners.interactive.interactive_environment_test'
 
 
+@unittest.skipIf(sys.version_info < (3, 6),
+                 'The tests require at least Python 3.6 to work.')
 class InteractiveEnvironmentTest(unittest.TestCase):
 
   def setUp(self):
@@ -152,11 +163,91 @@ class InteractiveEnvironmentTest(unittest.TestCase):
                   pipeline_result)
     self.assertIs(ie.current_env().pipeline_result(self._p), None)
 
-  def test_is_none_when_pipeline_absent(self):
+  def test_pipeline_result_is_none_when_pipeline_absent(self):
     self.assertIs(ie.current_env().pipeline_result(self._p), None)
     self.assertIs(ie.current_env().is_terminated(self._p), True)
     self.assertIs(ie.current_env().evict_pipeline_result(self._p), None)
 
+  @patch('atexit.register')
+  def test_no_cleanup_when_cm_none(self,
+                                   mocked_atexit):
+    ie.new_env(None)
+    mocked_atexit.assert_not_called()
+
+  @patch('atexit.register')
+  def test_cleanup_when_cm_not_none(self,
+                                    mocked_atexit):
+    ie.new_env(cache.FileBasedCacheManager())
+    mocked_atexit.assert_called_once()
+
+  @patch('atexit.register')
+  @patch('atexit.unregister')
+  def test_cleanup_unregistered_when_not_none_cm_cleared(self,
+                                                         mocked_unreg,
+                                                         mocked_reg):
+    ie.new_env(cache.FileBasedCacheManager())
+    mocked_reg.assert_called_once()
+    mocked_unreg.assert_not_called()
+    ie.current_env().set_cache_manager(None)
+    mocked_reg.assert_called_once()
+    mocked_unreg.assert_called_once()
+
+  @patch('atexit.register')
+  @patch('atexit.unregister')
+  def test_cleanup_reregistered_when_cm_changed(self,
+                                                mocked_unreg,
+                                                mocked_reg):
+    ie.new_env(cache.FileBasedCacheManager())
+    mocked_unreg.assert_not_called()
+    ie.current_env().set_cache_manager(cache.FileBasedCacheManager())
+    mocked_unreg.assert_called_once()
+    mocked_reg.assert_has_calls([call(ie.current_env().cleanup),
+                                 call(ie.current_env().cleanup)])
+
+  @patch('apache_beam.runners.interactive.interactive_environment'
+         '.InteractiveEnvironment.cleanup')
+  def test_cleanup_invoked_when_new_env_replace_not_none_env(self,
+                                                             mocked_cleanup):
+    ie._interactive_beam_env = None
+    ie.new_env(cache.FileBasedCacheManager())
+    mocked_cleanup.assert_not_called()
+    ie.new_env(cache.FileBasedCacheManager())
+    mocked_cleanup.assert_called_once()
+
+  @patch('apache_beam.runners.interactive.interactive_environment'
+         '.InteractiveEnvironment.cleanup')
+  def test_cleanup_invoked_when_cm_changed(self,
+                                           mocked_cleanup):
+    ie._interactive_beam_env = None
+    ie.new_env(cache.FileBasedCacheManager())
+    ie.current_env().set_cache_manager(cache.FileBasedCacheManager())
+    mocked_cleanup.assert_called_once()
+
+  @patch('atexit.register')
+  @patch('atexit.unregister')
+  def test_cleanup_registered_when_none_cm_changed(self,
+                                                   mocked_unreg,
+                                                   mocked_reg):
+    ie.new_env(None)
+    mocked_reg.assert_not_called()
+    mocked_unreg.assert_not_called()
+    ie.current_env().set_cache_manager(cache.FileBasedCacheManager())
+    mocked_reg.assert_called_once()
+    mocked_unreg.assert_not_called()
+
+  @patch('atexit.register')
+  @patch('atexit.unregister')
+  def test_noop_when_cm_is_not_changed(self,
+                                       mocked_unreg,
+                                       mocked_reg):
+    cache_manager = cache.FileBasedCacheManager()
+    ie.new_env(cache_manager)
+    mocked_unreg.assert_not_called()
+    mocked_reg.assert_called_once()
+    ie.current_env().set_cache_manager(cache_manager)
+    mocked_unreg.assert_not_called()
+    mocked_reg.assert_called_once()
+
 
 if __name__ == '__main__':
   unittest.main()

Reply via email to