This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 6ae0c46312 fix(pyamber): make ExecutorManager module names 
process-globally unique (#4717)
6ae0c46312 is described below

commit 6ae0c46312ca744b4f761c88f1ec172ba0d41d13
Author: Yicong Huang <[email protected]>
AuthorDate: Sun May 3 09:45:20 2026 -0700

    fix(pyamber): make ExecutorManager module names process-globally unique 
(#4717)
    
    ### What changes were proposed in this PR?
    
    Lift `ExecutorManager`'s tmp module-name counter from a per-instance
    attribute to a process-wide `itertools.count(1)`. Generated module names
    (`udf-vN`) are now unique across every instance in the same Python
    process, so the `if module_name in sys.modules: clear()+reload()`
    recovery branch (which silently returned a stale class on 3.11) is
    unreachable and removed.
    
    Production path is unchanged — a worker process holds exactly one
    `ExecutorManager` and its counter was already monotonic across
    reconfigures.
    
    ### Any related issues, documentation, discussions?
    
    Closes #4705.
    
    ### How was this PR tested?
    
    `sbt 'WorkflowExecutionService / Test / testOnly test_executor_manager
    test_main_loop'` — 23/23 pass. The combined run that previously leaked
    `TestOperator` from `test_executor_manager.py` into
    `test_main_loop.py`'s fixtures is now clean.
    
    `test_executor_manager.py` updated for the new semantics: assertions on
    `executor_version == 0/1` and `operator_module_name == "udf-v1"` change
    to relative-ordering checks since the counter's absolute starting value
    depends on prior tests in the session.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (Opus 4.7, 1M context)
---
 .../core/architecture/managers/executor_manager.py | 34 ++++++++-----
 .../architecture/managers/test_executor_manager.py | 56 +++++++++++++---------
 2 files changed, 56 insertions(+), 34 deletions(-)

diff --git 
a/amber/src/main/python/core/architecture/managers/executor_manager.py 
b/amber/src/main/python/core/architecture/managers/executor_manager.py
index eb1363d0a6..538d7b0bf1 100644
--- a/amber/src/main/python/core/architecture/managers/executor_manager.py
+++ b/amber/src/main/python/core/architecture/managers/executor_manager.py
@@ -18,6 +18,7 @@
 import fs
 import importlib
 import inspect
+import itertools
 import sys
 from cached_property import cached_property
 from fs.base import FS
@@ -29,10 +30,22 @@ from core.models import Operator, SourceOperator
 
 
 class ExecutorManager:
+    # Process-wide monotonically increasing counter used to generate the
+    # tmp module names ExecutorManager hands to importlib. Making this a
+    # class-level counter (rather than a per-instance counter that always
+    # restarts at 1) guarantees that no two ExecutorManager instances in
+    # the same Python process can collide on `udf-v1`. Without that
+    # guarantee, the second instance hits the "module already loaded"
+    # branch of importlib and the post-clear+reload path can return a
+    # stale class on Python 3.11 (see #4705).
+    #
+    # Single-process counters are atomic in CPython under the GIL; we
+    # don't expect cross-thread contention on this anyway.
+    _module_name_counter = itertools.count(1)
+
     def __init__(self):
         self.executor: Optional[Operator] = None
         self.operator_module_name: Optional[str] = None
-        self.executor_version: int = 0  # incremental only
 
     @cached_property
     def fs(self) -> FS:
@@ -60,11 +73,13 @@ class ExecutorManager:
 
     def gen_module_file_name(self) -> Tuple[str, str]:
         """
-        Generate a UUID to be used as udf source code file.
+        Generate a unique module name and corresponding tmp file name.
+        Names come from a process-wide monotonic counter so they never
+        collide with any module already in `sys.modules`, even when
+        multiple ExecutorManager instances live in the same process.
         :return Tuple[str, str]: the pair of module_name and file_name.
         """
-        self.executor_version += 1
-        module_name = f"udf-v{self.executor_version}"
+        module_name = f"udf-v{next(ExecutorManager._module_name_counter)}"
         file_name = f"{module_name}.py"
         return module_name, file_name
 
@@ -84,13 +99,10 @@ class ExecutorManager:
             f"{Path(self.fs.getsyspath('/')).joinpath(file_name)}."
         )
 
-        if module_name in sys.modules:
-            executor_module = importlib.import_module(module_name)
-            executor_module.__dict__.clear()
-            executor_module.__dict__["__name__"] = module_name
-            executor_module = importlib.reload(executor_module)
-        else:
-            executor_module = importlib.import_module(module_name)
+        # gen_module_file_name guarantees module_name is unique across
+        # the process, so import_module will always cleanly load source
+        # from the tmp fs we just wrote — no re-import / reload dance.
+        executor_module = importlib.import_module(module_name)
         self.operator_module_name = module_name
 
         executors = list(
diff --git 
a/amber/src/main/python/core/architecture/managers/test_executor_manager.py 
b/amber/src/main/python/core/architecture/managers/test_executor_manager.py
index cfcdde1880..f376f64468 100644
--- a/amber/src/main/python/core/architecture/managers/test_executor_manager.py
+++ b/amber/src/main/python/core/architecture/managers/test_executor_manager.py
@@ -84,7 +84,6 @@ class TestExecutorManager:
         """Test that ExecutorManager initializes correctly."""
         assert executor_manager.executor is None
         assert executor_manager.operator_module_name is None
-        assert executor_manager.executor_version == 0
 
     def test_reject_r_tuple_language(self, executor_manager):
         """Test that 'r-tuple' language is rejected with ImportError when 
plugin is not available."""
@@ -163,8 +162,11 @@ class TestExecutorManager:
 
         # Verify executor was initialized
         assert executor_manager.executor is not None
-        assert executor_manager.operator_module_name == "udf-v1"
-        assert executor_manager.executor_version == 1
+        # Module name comes from a process-wide counter, so it has the
+        # right shape but its exact value depends on what other tests
+        # have run in the same pytest session.
+        assert executor_manager.operator_module_name is not None
+        assert executor_manager.operator_module_name.startswith("udf-v")
         assert executor_manager.executor.is_source is False
 
     def test_accept_python_language_source_operator(self, executor_manager):
@@ -176,8 +178,8 @@ class TestExecutorManager:
 
         # Verify executor was initialized
         assert executor_manager.executor is not None
-        assert executor_manager.operator_module_name == "udf-v1"
-        assert executor_manager.executor_version == 1
+        assert executor_manager.operator_module_name is not None
+        assert executor_manager.operator_module_name.startswith("udf-v")
         assert executor_manager.executor.is_source is True
 
     def test_reject_other_unsupported_languages(self, executor_manager):
@@ -200,18 +202,26 @@ class TestExecutorManager:
             pass
 
     def test_gen_module_file_name_increments(self, executor_manager):
-        """Test that module file names increment correctly."""
-        module1, file1 = executor_manager.gen_module_file_name()
-        assert module1 == "udf-v1"
-        assert file1 == "udf-v1.py"
+        """Test that module file names increment monotonically.
 
+        The counter is process-wide so the absolute starting value
+        depends on prior tests in the same pytest session; only the
+        relative ordering matters for correctness.
+        """
+        module1, file1 = executor_manager.gen_module_file_name()
         module2, file2 = executor_manager.gen_module_file_name()
-        assert module2 == "udf-v2"
-        assert file2 == "udf-v2.py"
-
         module3, file3 = executor_manager.gen_module_file_name()
-        assert module3 == "udf-v3"
-        assert file3 == "udf-v3.py"
+
+        def version(module_name: str) -> int:
+            return int(module_name.removeprefix("udf-v"))
+
+        v1 = version(module1)
+        assert version(module2) == v1 + 1
+        assert version(module3) == v1 + 2
+
+        assert file1 == f"{module1}.py"
+        assert file2 == f"{module2}.py"
+        assert file3 == f"{module3}.py"
 
     def test_is_concrete_operator_static_method(self):
         """Test the is_concrete_operator static method."""
@@ -320,19 +330,20 @@ class TestUpdateExecutor:
             assert key in after_dict, f"key {key!r} missing after update"
             assert after_dict[key] == value
 
-    def test_update_increments_executor_version_and_module_name(
-        self, initialized_manager
-    ):
-        # initialize_executor already produced udf-v1.
-        assert initialized_manager.executor_version == 1
-        assert initialized_manager.operator_module_name == "udf-v1"
+    def test_update_advances_module_name_monotonically(self, 
initialized_manager):
+        # The module-name counter is process-wide, so absolute values
+        # depend on prior tests in the same pytest session; only the
+        # relative bump matters.
+        before = initialized_manager.operator_module_name
+        assert before is not None and before.startswith("udf-v")
 
         initialized_manager.update_executor(
             code=REPLACEMENT_OPERATOR_CODE, is_source=False
         )
 
-        assert initialized_manager.executor_version == 2
-        assert initialized_manager.operator_module_name == "udf-v2"
+        after = initialized_manager.operator_module_name
+        assert after is not None and after.startswith("udf-v")
+        assert int(after.removeprefix("udf-v")) == 
int(before.removeprefix("udf-v")) + 1
 
     def test_update_with_source_mismatch_raises_assertion(self, 
initialized_manager):
         # The replacement code is a regular operator, but is_source=True asks
@@ -378,4 +389,3 @@ class TestUpdateExecutor:
 
         assert initialized_manager.executor.counter == 42
         assert initialized_manager.executor.added_after_update is True
-        assert initialized_manager.executor_version == 3

Reply via email to