This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new c58941e1cc fix: close tmp fs and evict udf modules on 
ExecutorManager.close() (#5046)
c58941e1cc is described below

commit c58941e1cc758b334d5110a960c19b2e9cf5a455
Author: Matthew B. <[email protected]>
AuthorDate: Wed May 13 07:54:10 2026 -0700

    fix: close tmp fs and evict udf modules on ExecutorManager.close() (#5046)
---
 .../core/architecture/managers/executor_manager.py | 16 ++++++++-
 .../architecture/managers/test_executor_manager.py | 40 ++++++++++++++--------
 2 files changed, 41 insertions(+), 15 deletions(-)

diff --git 
a/amber/src/main/python/core/architecture/managers/executor_manager.py 
b/amber/src/main/python/core/architecture/managers/executor_manager.py
index 538d7b0bf1..7f2249476a 100644
--- a/amber/src/main/python/core/architecture/managers/executor_manager.py
+++ b/amber/src/main/python/core/architecture/managers/executor_manager.py
@@ -114,10 +114,24 @@ class ExecutorManager:
     def close(self) -> None:
         """
         Close the tmp fs and release all resources created within it.
+        This also evicts the loaded operator module from ``sys.modules``
+        and removes the tmp fs path from ``sys.path`` so a single call
+        fully reverses every global side-effect performed by ``fs`` and
+        ``load_executor_definition``.
         :return:
         """
+        if "fs" not in self.__dict__:
+            # fs was never materialized; nothing to clean up.
+            return
+        root = self.fs.getsyspath("/")
         self.fs.close()
-        logger.debug(f"Tmp directory {self.fs.getsyspath('/')} is closed and 
cleared.")
+        try:
+            sys.path.remove(str(Path(root)))
+        except ValueError:
+            pass
+        if self.operator_module_name is not None:
+            sys.modules.pop(self.operator_module_name, None)
+        logger.debug(f"Tmp directory {root} is closed and cleared.")
 
     @staticmethod
     def is_concrete_operator(cls: type) -> bool:
diff --git 
a/amber/src/test/python/core/architecture/managers/test_executor_manager.py 
b/amber/src/test/python/core/architecture/managers/test_executor_manager.py
index f376f64468..1a21b106f3 100644
--- a/amber/src/test/python/core/architecture/managers/test_executor_manager.py
+++ b/amber/src/test/python/core/architecture/managers/test_executor_manager.py
@@ -48,9 +48,7 @@ class TestExecutorManager:
         """Create a fresh ExecutorManager instance for each test."""
         manager = ExecutorManager()
         yield manager
-        # Cleanup: close the temp filesystem
-        if hasattr(manager, "_fs"):
-            manager.close()
+        manager.close()
 
     def _mock_r_plugin(self, executor_class_name, is_source):
         """
@@ -257,6 +255,30 @@ class TestExecutorManager:
             )
         assert "SourceOperator API" in str(exc_info.value)
 
+    def test_close_when_sys_path_entry_already_removed(self):
+        # Exercise the except ValueError branch: if the tmp fs path has
+        # already been pulled out of sys.path by something else, close()
+        # should swallow the error and finish cleanly.
+        from pathlib import Path
+
+        manager = ExecutorManager()
+        root = Path(manager.fs.getsyspath("/"))
+        sys.path.remove(str(root))
+        manager.close()
+        assert str(root) not in sys.path
+
+    def test_close_when_fs_materialized_but_no_executor_loaded(self):
+        # Exercise the branch where self.fs was touched (so the early
+        # return is skipped) but operator_module_name is still None,
+        # meaning the sys.modules.pop branch must NOT execute.
+        from pathlib import Path
+
+        manager = ExecutorManager()
+        root = Path(manager.fs.getsyspath("/"))
+        assert manager.operator_module_name is None
+        manager.close()
+        assert str(root) not in sys.path
+
 
 REPLACEMENT_OPERATOR_CODE = """
 from pytexera import *
@@ -285,17 +307,7 @@ class SecondOperator(UDFOperatorV2):
 
 
 class TestUpdateExecutor:
-    """Test suite for ExecutorManager.update_executor.
-
-    Notes on test isolation: the existing TestExecutorManager fixture cannot
-    fully clean up the udf-vN modules it imports (its `hasattr(manager, "_fs")`
-    cleanup guard is buggy — the actual cached_property key is `fs`), so a
-    given udf-v1 module may already live in sys.modules with a path attached
-    to a previous test's tmp filesystem. These tests therefore avoid asserting
-    on attributes baked into a specific operator class and instead use
-    setattr/getattr-only semantics that hold regardless of which cached
-    module satisfies the import.
-    """
+    """Test suite for ExecutorManager.update_executor."""
 
     @pytest.fixture
     def initialized_manager(self):

Reply via email to