This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 6ae0c46312 fix(pyamber): make ExecutorManager module names
process-globally unique (#4717)
6ae0c46312 is described below
commit 6ae0c46312ca744b4f761c88f1ec172ba0d41d13
Author: Yicong Huang <[email protected]>
AuthorDate: Sun May 3 09:45:20 2026 -0700
fix(pyamber): make ExecutorManager module names process-globally unique
(#4717)
### What changes were proposed in this PR?
Lift `ExecutorManager`'s tmp module-name counter from a per-instance
attribute to a process-wide `itertools.count(1)`. Generated module names
(`udf-vN`) are now unique across every instance in the same Python
process, so the `if module_name in sys.modules: clear()+reload()`
recovery branch (which silently returned a stale class on 3.11) is
unreachable and removed.
Production path is unchanged — a worker process holds exactly one
`ExecutorManager` and its counter was already monotonic across
reconfigures.
### Any related issues, documentation, discussions?
Closes #4705.
### How was this PR tested?
`sbt 'WorkflowExecutionService / Test / testOnly test_executor_manager
test_main_loop'` — 23/23 pass. The combined run that previously leaked
`TestOperator` from `test_executor_manager.py` into
`test_main_loop.py`'s fixtures is now clean.
`test_executor_manager.py` updated for the new semantics: assertions on
`executor_version == 0/1` and `operator_module_name == "udf-v1"` change
to relative-ordering checks since the counter's absolute starting value
depends on prior tests in the session.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.7, 1M context)
---
.../core/architecture/managers/executor_manager.py | 34 ++++++++-----
.../architecture/managers/test_executor_manager.py | 56 +++++++++++++---------
2 files changed, 56 insertions(+), 34 deletions(-)
diff --git
a/amber/src/main/python/core/architecture/managers/executor_manager.py
b/amber/src/main/python/core/architecture/managers/executor_manager.py
index eb1363d0a6..538d7b0bf1 100644
--- a/amber/src/main/python/core/architecture/managers/executor_manager.py
+++ b/amber/src/main/python/core/architecture/managers/executor_manager.py
@@ -18,6 +18,7 @@
import fs
import importlib
import inspect
+import itertools
import sys
from cached_property import cached_property
from fs.base import FS
@@ -29,10 +30,22 @@ from core.models import Operator, SourceOperator
class ExecutorManager:
+ # Process-wide monotonically increasing counter used to generate the
+ # tmp module names ExecutorManager hands to importlib. Making this a
+ # class-level counter (rather than a per-instance counter that always
+ # restarts at 1) guarantees that no two ExecutorManager instances in
+ # the same Python process can collide on `udf-v1`. Without that
+ # guarantee, the second instance hits the "module already loaded"
+ # branch of importlib and the post-clear+reload path can return a
+ # stale class on Python 3.11 (see #4705).
+ #
+ # Single-process counters are atomic in CPython under the GIL; we
+ # don't expect cross-thread contention on this anyway.
+ _module_name_counter = itertools.count(1)
+
def __init__(self):
self.executor: Optional[Operator] = None
self.operator_module_name: Optional[str] = None
- self.executor_version: int = 0 # incremental only
@cached_property
def fs(self) -> FS:
@@ -60,11 +73,13 @@ class ExecutorManager:
def gen_module_file_name(self) -> Tuple[str, str]:
"""
- Generate a UUID to be used as udf source code file.
+ Generate a unique module name and corresponding tmp file name.
+ Names come from a process-wide monotonic counter so they never
+ collide with any module already in `sys.modules`, even when
+ multiple ExecutorManager instances live in the same process.
:return Tuple[str, str]: the pair of module_name and file_name.
"""
- self.executor_version += 1
- module_name = f"udf-v{self.executor_version}"
+ module_name = f"udf-v{next(ExecutorManager._module_name_counter)}"
file_name = f"{module_name}.py"
return module_name, file_name
@@ -84,13 +99,10 @@ class ExecutorManager:
f"{Path(self.fs.getsyspath('/')).joinpath(file_name)}."
)
- if module_name in sys.modules:
- executor_module = importlib.import_module(module_name)
- executor_module.__dict__.clear()
- executor_module.__dict__["__name__"] = module_name
- executor_module = importlib.reload(executor_module)
- else:
- executor_module = importlib.import_module(module_name)
+ # gen_module_file_name guarantees module_name is unique across
+ # the process, so import_module will always cleanly load source
+ # from the tmp fs we just wrote — no re-import / reload dance.
+ executor_module = importlib.import_module(module_name)
self.operator_module_name = module_name
executors = list(
diff --git
a/amber/src/main/python/core/architecture/managers/test_executor_manager.py
b/amber/src/main/python/core/architecture/managers/test_executor_manager.py
index cfcdde1880..f376f64468 100644
--- a/amber/src/main/python/core/architecture/managers/test_executor_manager.py
+++ b/amber/src/main/python/core/architecture/managers/test_executor_manager.py
@@ -84,7 +84,6 @@ class TestExecutorManager:
"""Test that ExecutorManager initializes correctly."""
assert executor_manager.executor is None
assert executor_manager.operator_module_name is None
- assert executor_manager.executor_version == 0
def test_reject_r_tuple_language(self, executor_manager):
"""Test that 'r-tuple' language is rejected with ImportError when
plugin is not available."""
@@ -163,8 +162,11 @@ class TestExecutorManager:
# Verify executor was initialized
assert executor_manager.executor is not None
- assert executor_manager.operator_module_name == "udf-v1"
- assert executor_manager.executor_version == 1
+ # Module name comes from a process-wide counter, so it has the
+ # right shape but its exact value depends on what other tests
+ # have run in the same pytest session.
+ assert executor_manager.operator_module_name is not None
+ assert executor_manager.operator_module_name.startswith("udf-v")
assert executor_manager.executor.is_source is False
def test_accept_python_language_source_operator(self, executor_manager):
@@ -176,8 +178,8 @@ class TestExecutorManager:
# Verify executor was initialized
assert executor_manager.executor is not None
- assert executor_manager.operator_module_name == "udf-v1"
- assert executor_manager.executor_version == 1
+ assert executor_manager.operator_module_name is not None
+ assert executor_manager.operator_module_name.startswith("udf-v")
assert executor_manager.executor.is_source is True
def test_reject_other_unsupported_languages(self, executor_manager):
@@ -200,18 +202,26 @@ class TestExecutorManager:
pass
def test_gen_module_file_name_increments(self, executor_manager):
- """Test that module file names increment correctly."""
- module1, file1 = executor_manager.gen_module_file_name()
- assert module1 == "udf-v1"
- assert file1 == "udf-v1.py"
+ """Test that module file names increment monotonically.
+ The counter is process-wide so the absolute starting value
+ depends on prior tests in the same pytest session; only the
+ relative ordering matters for correctness.
+ """
+ module1, file1 = executor_manager.gen_module_file_name()
module2, file2 = executor_manager.gen_module_file_name()
- assert module2 == "udf-v2"
- assert file2 == "udf-v2.py"
-
module3, file3 = executor_manager.gen_module_file_name()
- assert module3 == "udf-v3"
- assert file3 == "udf-v3.py"
+
+ def version(module_name: str) -> int:
+ return int(module_name.removeprefix("udf-v"))
+
+ v1 = version(module1)
+ assert version(module2) == v1 + 1
+ assert version(module3) == v1 + 2
+
+ assert file1 == f"{module1}.py"
+ assert file2 == f"{module2}.py"
+ assert file3 == f"{module3}.py"
def test_is_concrete_operator_static_method(self):
"""Test the is_concrete_operator static method."""
@@ -320,19 +330,20 @@ class TestUpdateExecutor:
assert key in after_dict, f"key {key!r} missing after update"
assert after_dict[key] == value
- def test_update_increments_executor_version_and_module_name(
- self, initialized_manager
- ):
- # initialize_executor already produced udf-v1.
- assert initialized_manager.executor_version == 1
- assert initialized_manager.operator_module_name == "udf-v1"
+ def test_update_advances_module_name_monotonically(self,
initialized_manager):
+ # The module-name counter is process-wide, so absolute values
+ # depend on prior tests in the same pytest session; only the
+ # relative bump matters.
+ before = initialized_manager.operator_module_name
+ assert before is not None and before.startswith("udf-v")
initialized_manager.update_executor(
code=REPLACEMENT_OPERATOR_CODE, is_source=False
)
- assert initialized_manager.executor_version == 2
- assert initialized_manager.operator_module_name == "udf-v2"
+ after = initialized_manager.operator_module_name
+ assert after is not None and after.startswith("udf-v")
+ assert int(after.removeprefix("udf-v")) ==
int(before.removeprefix("udf-v")) + 1
def test_update_with_source_mismatch_raises_assertion(self,
initialized_manager):
# The replacement code is a regular operator, but is_source=True asks
@@ -378,4 +389,3 @@ class TestUpdateExecutor:
assert initialized_manager.executor.counter == 42
assert initialized_manager.executor.added_after_update is True
- assert initialized_manager.executor_version == 3