This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new c58941e1cc fix: close tmp fs and evict udf modules on
ExecutorManager.close() (#5046)
c58941e1cc is described below
commit c58941e1cc758b334d5110a960c19b2e9cf5a455
Author: Matthew B. <[email protected]>
AuthorDate: Wed May 13 07:54:10 2026 -0700
fix: close tmp fs and evict udf modules on ExecutorManager.close() (#5046)
---
.../core/architecture/managers/executor_manager.py | 16 ++++++++-
.../architecture/managers/test_executor_manager.py | 40 ++++++++++++++--------
2 files changed, 41 insertions(+), 15 deletions(-)
diff --git
a/amber/src/main/python/core/architecture/managers/executor_manager.py
b/amber/src/main/python/core/architecture/managers/executor_manager.py
index 538d7b0bf1..7f2249476a 100644
--- a/amber/src/main/python/core/architecture/managers/executor_manager.py
+++ b/amber/src/main/python/core/architecture/managers/executor_manager.py
@@ -114,10 +114,24 @@ class ExecutorManager:
def close(self) -> None:
"""
Close the tmp fs and release all resources created within it.
+ This also evicts the loaded operator module from ``sys.modules``
+ and removes the tmp fs path from ``sys.path`` so a single call
+ fully reverses every global side-effect performed by ``fs`` and
+ ``load_executor_definition``.
:return:
"""
+ if "fs" not in self.__dict__:
+ # fs was never materialized; nothing to clean up.
+ return
+ root = self.fs.getsyspath("/")
self.fs.close()
- logger.debug(f"Tmp directory {self.fs.getsyspath('/')} is closed and
cleared.")
+ try:
+ sys.path.remove(str(Path(root)))
+ except ValueError:
+ pass
+ if self.operator_module_name is not None:
+ sys.modules.pop(self.operator_module_name, None)
+ logger.debug(f"Tmp directory {root} is closed and cleared.")
@staticmethod
def is_concrete_operator(cls: type) -> bool:
diff --git
a/amber/src/test/python/core/architecture/managers/test_executor_manager.py
b/amber/src/test/python/core/architecture/managers/test_executor_manager.py
index f376f64468..1a21b106f3 100644
--- a/amber/src/test/python/core/architecture/managers/test_executor_manager.py
+++ b/amber/src/test/python/core/architecture/managers/test_executor_manager.py
@@ -48,9 +48,7 @@ class TestExecutorManager:
"""Create a fresh ExecutorManager instance for each test."""
manager = ExecutorManager()
yield manager
- # Cleanup: close the temp filesystem
- if hasattr(manager, "_fs"):
- manager.close()
+ manager.close()
def _mock_r_plugin(self, executor_class_name, is_source):
"""
@@ -257,6 +255,30 @@ class TestExecutorManager:
)
assert "SourceOperator API" in str(exc_info.value)
+ def test_close_when_sys_path_entry_already_removed(self):
+ # Exercise the except ValueError branch: if the tmp fs path has
+ # already been pulled out of sys.path by something else, close()
+ # should swallow the error and finish cleanly.
+ from pathlib import Path
+
+ manager = ExecutorManager()
+ root = Path(manager.fs.getsyspath("/"))
+ sys.path.remove(str(root))
+ manager.close()
+ assert str(root) not in sys.path
+
+ def test_close_when_fs_materialized_but_no_executor_loaded(self):
+ # Exercise the branch where self.fs was touched (so the early
+ # return is skipped) but operator_module_name is still None,
+ # meaning the sys.modules.pop branch must NOT execute.
+ from pathlib import Path
+
+ manager = ExecutorManager()
+ root = Path(manager.fs.getsyspath("/"))
+ assert manager.operator_module_name is None
+ manager.close()
+ assert str(root) not in sys.path
+
REPLACEMENT_OPERATOR_CODE = """
from pytexera import *
@@ -285,17 +307,7 @@ class SecondOperator(UDFOperatorV2):
class TestUpdateExecutor:
- """Test suite for ExecutorManager.update_executor.
-
- Notes on test isolation: the existing TestExecutorManager fixture cannot
- fully clean up the udf-vN modules it imports (its `hasattr(manager, "_fs")`
- cleanup guard is buggy — the actual cached_property key is `fs`), so a
- given udf-v1 module may already live in sys.modules with a path attached
- to a previous test's tmp filesystem. These tests therefore avoid asserting
- on attributes baked into a specific operator class and instead use
- setattr/getattr-only semantics that hold regardless of which cached
- module satisfies the import.
- """
+ """Test suite for ExecutorManager.update_executor."""
@pytest.fixture
def initialized_manager(self):