This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch release/v1.1.0-incubating
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/release/v1.1.0-incubating by
this push:
new 22f1d0ff28 fix(pyamber, 1.1): make ExecutorManager module names
process-globally unique (#4868)
22f1d0ff28 is described below
commit 22f1d0ff28fcc480fd7650b2366c767f3dc3a8c9
Author: Yicong Huang <[email protected]>
AuthorDate: Sun May 3 11:09:43 2026 -0700
fix(pyamber, 1.1): make ExecutorManager module names process-globally
unique (#4868)
### What changes were proposed in this PR?
Backport [#4717](https://github.com/apache/texera/pull/4717) (commit
`6ae0c46312ca744b4f761c88f1ec172ba0d41d13` on `main`) onto
`release/v1.1.0-incubating`.
`ExecutorManager` previously used a per-instance `executor_version`
counter for the `udf-vN` tmp module name, so a fresh `ExecutorManager`
always produced `udf-v1` and collided with whatever `udf-v1` was already
cached in `sys.modules` from an earlier instance. The post-collision
`clear()` + `importlib.reload()` recovery silently returned a stale
class on Python 3.11. Lift the counter to a class-level
`itertools.count(1)` so module names are unique across every instance in
the same Python process; the recovery branch becomes unreachable and is
removed.
This unblocks #4636 (and any future `release/*`-labelled PR): the
auto-backport leg's
`core/runnables/test_main_loop.py::test_batch_dp_thread_can_process_batch`
was failing on the 3.11 matrix entry against this branch with
`AttributeError: 'TestOperator' object has no attribute 'count'` (the
stale-class symptom), and the failed test left a non-daemon
`main_loop_thread` alive that prevented pytest from exiting — surfacing
as a 30+ minute hang.
### Any related issues, documentation, discussions?
Backports #4717. Original issue: #4705.
### How was this PR tested?
Cherry-pick of an already-reviewed and merged commit. One conflict in
`test_executor_manager.py` (release branch lacked the trailing
`TestUpdateExecutor` test class that #4717 introduced); resolved by
taking the incoming version verbatim. Local syntax + `ruff format
--check` pass on both modified files. CI on this PR will exercise the
change against the release branch's full Python matrix.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-7)
---
.../core/architecture/managers/executor_manager.py | 34 ++--
.../architecture/managers/test_executor_manager.py | 171 +++++++++++++++++++--
2 files changed, 180 insertions(+), 25 deletions(-)
diff --git
a/amber/src/main/python/core/architecture/managers/executor_manager.py
b/amber/src/main/python/core/architecture/managers/executor_manager.py
index eb1363d0a6..538d7b0bf1 100644
--- a/amber/src/main/python/core/architecture/managers/executor_manager.py
+++ b/amber/src/main/python/core/architecture/managers/executor_manager.py
@@ -18,6 +18,7 @@
import fs
import importlib
import inspect
+import itertools
import sys
from cached_property import cached_property
from fs.base import FS
@@ -29,10 +30,22 @@ from core.models import Operator, SourceOperator
class ExecutorManager:
+ # Process-wide monotonically increasing counter used to generate the
+ # tmp module names ExecutorManager hands to importlib. Making this a
+ # class-level counter (rather than a per-instance counter that always
+ # restarts at 1) guarantees that no two ExecutorManager instances in
+ # the same Python process can collide on `udf-v1`. Without that
+ # guarantee, the second instance hits the "module already loaded"
+ # branch of importlib and the post-clear+reload path can return a
+ # stale class on Python 3.11 (see #4705).
+ #
+ # Single-process counters are atomic in CPython under the GIL; we
+ # don't expect cross-thread contention on this anyway.
+ _module_name_counter = itertools.count(1)
+
def __init__(self):
self.executor: Optional[Operator] = None
self.operator_module_name: Optional[str] = None
- self.executor_version: int = 0 # incremental only
@cached_property
def fs(self) -> FS:
@@ -60,11 +73,13 @@ class ExecutorManager:
def gen_module_file_name(self) -> Tuple[str, str]:
"""
- Generate a UUID to be used as udf source code file.
+ Generate a unique module name and corresponding tmp file name.
+ Names come from a process-wide monotonic counter so they never
+ collide with any module already in `sys.modules`, even when
+ multiple ExecutorManager instances live in the same process.
:return Tuple[str, str]: the pair of module_name and file_name.
"""
- self.executor_version += 1
- module_name = f"udf-v{self.executor_version}"
+ module_name = f"udf-v{next(ExecutorManager._module_name_counter)}"
file_name = f"{module_name}.py"
return module_name, file_name
@@ -84,13 +99,10 @@ class ExecutorManager:
f"{Path(self.fs.getsyspath('/')).joinpath(file_name)}."
)
- if module_name in sys.modules:
- executor_module = importlib.import_module(module_name)
- executor_module.__dict__.clear()
- executor_module.__dict__["__name__"] = module_name
- executor_module = importlib.reload(executor_module)
- else:
- executor_module = importlib.import_module(module_name)
+ # gen_module_file_name guarantees module_name is unique across
+ # the process, so import_module will always cleanly load source
+ # from the tmp fs we just wrote — no re-import / reload dance.
+ executor_module = importlib.import_module(module_name)
self.operator_module_name = module_name
executors = list(
diff --git
a/amber/src/main/python/core/architecture/managers/test_executor_manager.py
b/amber/src/main/python/core/architecture/managers/test_executor_manager.py
index 901f768a21..f376f64468 100644
--- a/amber/src/main/python/core/architecture/managers/test_executor_manager.py
+++ b/amber/src/main/python/core/architecture/managers/test_executor_manager.py
@@ -84,7 +84,6 @@ class TestExecutorManager:
"""Test that ExecutorManager initializes correctly."""
assert executor_manager.executor is None
assert executor_manager.operator_module_name is None
- assert executor_manager.executor_version == 0
def test_reject_r_tuple_language(self, executor_manager):
"""Test that 'r-tuple' language is rejected with ImportError when
plugin is not available."""
@@ -163,8 +162,11 @@ class TestExecutorManager:
# Verify executor was initialized
assert executor_manager.executor is not None
- assert executor_manager.operator_module_name == "udf-v1"
- assert executor_manager.executor_version == 1
+ # Module name comes from a process-wide counter, so it has the
+ # right shape but its exact value depends on what other tests
+ # have run in the same pytest session.
+ assert executor_manager.operator_module_name is not None
+ assert executor_manager.operator_module_name.startswith("udf-v")
assert executor_manager.executor.is_source is False
def test_accept_python_language_source_operator(self, executor_manager):
@@ -176,8 +178,8 @@ class TestExecutorManager:
# Verify executor was initialized
assert executor_manager.executor is not None
- assert executor_manager.operator_module_name == "udf-v1"
- assert executor_manager.executor_version == 1
+ assert executor_manager.operator_module_name is not None
+ assert executor_manager.operator_module_name.startswith("udf-v")
assert executor_manager.executor.is_source is True
def test_reject_other_unsupported_languages(self, executor_manager):
@@ -200,18 +202,26 @@ class TestExecutorManager:
pass
def test_gen_module_file_name_increments(self, executor_manager):
- """Test that module file names increment correctly."""
- module1, file1 = executor_manager.gen_module_file_name()
- assert module1 == "udf-v1"
- assert file1 == "udf-v1.py"
+ """Test that module file names increment monotonically.
+ The counter is process-wide so the absolute starting value
+ depends on prior tests in the same pytest session; only the
+ relative ordering matters for correctness.
+ """
+ module1, file1 = executor_manager.gen_module_file_name()
module2, file2 = executor_manager.gen_module_file_name()
- assert module2 == "udf-v2"
- assert file2 == "udf-v2.py"
-
module3, file3 = executor_manager.gen_module_file_name()
- assert module3 == "udf-v3"
- assert file3 == "udf-v3.py"
+
+ def version(module_name: str) -> int:
+ return int(module_name.removeprefix("udf-v"))
+
+ v1 = version(module1)
+ assert version(module2) == v1 + 1
+ assert version(module3) == v1 + 2
+
+ assert file1 == f"{module1}.py"
+ assert file2 == f"{module2}.py"
+ assert file3 == f"{module3}.py"
def test_is_concrete_operator_static_method(self):
"""Test the is_concrete_operator static method."""
@@ -246,3 +256,136 @@ class TestExecutorManager:
language="python",
)
assert "SourceOperator API" in str(exc_info.value)
+
+
+REPLACEMENT_OPERATOR_CODE = """
+from pytexera import *
+
+class ReplacementOperator(UDFOperatorV2):
+ def process_tuple(self, tuple_: Tuple, port: int) ->
Iterator[Optional[TupleLike]]:
+ yield tuple_
+"""
+
+NO_OPERATOR_CODE = """
+def helper():
+ return 42
+"""
+
+TWO_OPERATORS_CODE = """
+from pytexera import *
+
+class FirstOperator(UDFOperatorV2):
+ def process_tuple(self, tuple_: Tuple, port: int) ->
Iterator[Optional[TupleLike]]:
+ yield tuple_
+
+class SecondOperator(UDFOperatorV2):
+ def process_tuple(self, tuple_: Tuple, port: int) ->
Iterator[Optional[TupleLike]]:
+ yield tuple_
+"""
+
+
+class TestUpdateExecutor:
+ """Test suite for ExecutorManager.update_executor.
+
+ Notes on test isolation: the existing TestExecutorManager fixture cannot
+ fully clean up the udf-vN modules it imports (its `hasattr(manager, "_fs")`
+ cleanup guard is buggy — the actual cached_property key is `fs`), so a
+ given udf-v1 module may already live in sys.modules with a path attached
+ to a previous test's tmp filesystem. These tests therefore avoid asserting
+ on attributes baked into a specific operator class and instead use
+ setattr/getattr-only semantics that hold regardless of which cached
+ module satisfies the import.
+ """
+
+ @pytest.fixture
+ def initialized_manager(self):
+ manager = ExecutorManager()
+ manager.initialize_executor(
+ code=SAMPLE_OPERATOR_CODE, is_source=False, language="python"
+ )
+ # Stamp custom attributes on the live instance so the dict-preservation
+ # check works even if the underlying class came from a cached module.
+ manager.executor.runtime_field = "set-after-init"
+ manager.executor.counter = 6
+ yield manager
+ manager.close()
+
+ def test_update_preserves_pre_update_dict_state(self, initialized_manager):
+ before = initialized_manager.executor
+ before_dict = dict(before.__dict__)
+
+ initialized_manager.update_executor(
+ code=REPLACEMENT_OPERATOR_CODE, is_source=False
+ )
+
+ # update_executor reuses the prior __dict__ on a freshly instantiated
+ # operator — verify both halves: a NEW instance, but the OLD state.
+ assert initialized_manager.executor is not before
+ assert initialized_manager.executor.runtime_field == "set-after-init"
+ assert initialized_manager.executor.counter == 6
+ # Assert key presence explicitly so a missing key with an expected
+ # value of None doesn't slip past via dict.get()'s default.
+ after_dict = initialized_manager.executor.__dict__
+ for key, value in before_dict.items():
+ assert key in after_dict, f"key {key!r} missing after update"
+ assert after_dict[key] == value
+
+ def test_update_advances_module_name_monotonically(self,
initialized_manager):
+ # The module-name counter is process-wide, so absolute values
+ # depend on prior tests in the same pytest session; only the
+ # relative bump matters.
+ before = initialized_manager.operator_module_name
+ assert before is not None and before.startswith("udf-v")
+
+ initialized_manager.update_executor(
+ code=REPLACEMENT_OPERATOR_CODE, is_source=False
+ )
+
+ after = initialized_manager.operator_module_name
+ assert after is not None and after.startswith("udf-v")
+ assert int(after.removeprefix("udf-v")) ==
int(before.removeprefix("udf-v")) + 1
+
+ def test_update_with_source_mismatch_raises_assertion(self,
initialized_manager):
+ # The replacement code is a regular operator, but is_source=True asks
+ # the manager to treat it as a source operator. Same guardrail as
+ # initialize_executor.
+ with pytest.raises(AssertionError) as exc_info:
+ initialized_manager.update_executor(
+ code=REPLACEMENT_OPERATOR_CODE, is_source=True
+ )
+ assert "SourceOperator API" in str(exc_info.value)
+
+ def test_update_with_no_operator_class_raises_assertion(self,
initialized_manager):
+ # load_executor_definition asserts exactly one Operator subclass exists
+ # in the module — an empty module trips that assertion.
+ with pytest.raises(AssertionError) as exc_info:
+ initialized_manager.update_executor(code=NO_OPERATOR_CODE,
is_source=False)
+ assert "one and only one Operator" in str(exc_info.value)
+
+ def test_update_with_multiple_operator_classes_raises_assertion(
+ self, initialized_manager
+ ):
+ with pytest.raises(AssertionError) as exc_info:
+ initialized_manager.update_executor(
+ code=TWO_OPERATORS_CODE, is_source=False
+ )
+ assert "one and only one Operator" in str(exc_info.value)
+
+ def test_repeated_updates_keep_carrying_the_running_state(
+ self, initialized_manager
+ ):
+ # Update once, mutate the new instance, then update again — the second
+ # update must see the *latest* state, not the snapshot from before
+ # the first update.
+ initialized_manager.update_executor(
+ code=REPLACEMENT_OPERATOR_CODE, is_source=False
+ )
+ initialized_manager.executor.counter = 42
+ initialized_manager.executor.added_after_update = True
+
+ initialized_manager.update_executor(
+ code=REPLACEMENT_OPERATOR_CODE, is_source=False
+ )
+
+ assert initialized_manager.executor.counter == 42
+ assert initialized_manager.executor.added_after_update is True