This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch release/v1.1.0-incubating
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/release/v1.1.0-incubating by 
this push:
     new 22f1d0ff28 fix(pyamber, 1.1): make ExecutorManager module names 
process-globally unique (#4868)
22f1d0ff28 is described below

commit 22f1d0ff28fcc480fd7650b2366c767f3dc3a8c9
Author: Yicong Huang <[email protected]>
AuthorDate: Sun May 3 11:09:43 2026 -0700

    fix(pyamber, 1.1): make ExecutorManager module names process-globally 
unique (#4868)
    
    ### What changes were proposed in this PR?
    
    Backport [#4717](https://github.com/apache/texera/pull/4717) (commit
    `6ae0c46312ca744b4f761c88f1ec172ba0d41d13` on `main`) onto
    `release/v1.1.0-incubating`.
    
    `ExecutorManager` previously used a per-instance `executor_version`
    counter for the `udf-vN` tmp module name, so a fresh `ExecutorManager`
    always produced `udf-v1` and collided with whatever `udf-v1` was already
    cached in `sys.modules` from an earlier instance. The post-collision
    `clear()` + `importlib.reload()` recovery silently returned a stale
    class on Python 3.11. Lift the counter to a class-level
    `itertools.count(1)` so module names are unique across every instance in
    the same Python process; the recovery branch becomes unreachable and is
    removed.
    
    This unblocks #4636 (and any future `release/*`-labelled PR): the
    auto-backport leg's
    `core/runnables/test_main_loop.py::test_batch_dp_thread_can_process_batch`
    was failing on the 3.11 matrix entry against this branch with
    `AttributeError: 'TestOperator' object has no attribute 'count'` (the
    stale-class symptom), and the failed test left a non-daemon
    `main_loop_thread` alive that prevented pytest from exiting — surfacing
    as a 30+ minute hang.
    
    ### Any related issues, documentation, discussions?
    
    Backports #4717. Original issue: #4705.
    
    ### How was this PR tested?
    
    Cherry-pick of an already-reviewed and merged commit. One conflict in
    `test_executor_manager.py` (release branch lacked the trailing
    `TestUpdateExecutor` test class that #4717 introduced); resolved by
    taking the incoming version verbatim. Local syntax + `ruff format
    --check` pass on both modified files. CI on this PR will exercise the
    change against the release branch's full Python matrix.
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (claude-opus-4-7)
---
 .../core/architecture/managers/executor_manager.py |  34 ++--
 .../architecture/managers/test_executor_manager.py | 171 +++++++++++++++++++--
 2 files changed, 180 insertions(+), 25 deletions(-)

diff --git 
a/amber/src/main/python/core/architecture/managers/executor_manager.py 
b/amber/src/main/python/core/architecture/managers/executor_manager.py
index eb1363d0a6..538d7b0bf1 100644
--- a/amber/src/main/python/core/architecture/managers/executor_manager.py
+++ b/amber/src/main/python/core/architecture/managers/executor_manager.py
@@ -18,6 +18,7 @@
 import fs
 import importlib
 import inspect
+import itertools
 import sys
 from cached_property import cached_property
 from fs.base import FS
@@ -29,10 +30,22 @@ from core.models import Operator, SourceOperator
 
 
 class ExecutorManager:
+    # Process-wide monotonically increasing counter used to generate the
+    # tmp module names ExecutorManager hands to importlib. Making this a
+    # class-level counter (rather than a per-instance counter that always
+    # restarts at 1) guarantees that no two ExecutorManager instances in
+    # the same Python process can collide on `udf-v1`. Without that
+    # guarantee, the second instance hits the "module already loaded"
+    # branch of importlib and the post-clear+reload path can return a
+    # stale class on Python 3.11 (see #4705).
+    #
+    # Single-process counters are atomic in CPython under the GIL; we
+    # don't expect cross-thread contention on this anyway.
+    _module_name_counter = itertools.count(1)
+
     def __init__(self):
         self.executor: Optional[Operator] = None
         self.operator_module_name: Optional[str] = None
-        self.executor_version: int = 0  # incremental only
 
     @cached_property
     def fs(self) -> FS:
@@ -60,11 +73,13 @@ class ExecutorManager:
 
     def gen_module_file_name(self) -> Tuple[str, str]:
         """
-        Generate a UUID to be used as udf source code file.
+        Generate a unique module name and corresponding tmp file name.
+        Names come from a process-wide monotonic counter so they never
+        collide with any module already in `sys.modules`, even when
+        multiple ExecutorManager instances live in the same process.
         :return Tuple[str, str]: the pair of module_name and file_name.
         """
-        self.executor_version += 1
-        module_name = f"udf-v{self.executor_version}"
+        module_name = f"udf-v{next(ExecutorManager._module_name_counter)}"
         file_name = f"{module_name}.py"
         return module_name, file_name
 
@@ -84,13 +99,10 @@ class ExecutorManager:
             f"{Path(self.fs.getsyspath('/')).joinpath(file_name)}."
         )
 
-        if module_name in sys.modules:
-            executor_module = importlib.import_module(module_name)
-            executor_module.__dict__.clear()
-            executor_module.__dict__["__name__"] = module_name
-            executor_module = importlib.reload(executor_module)
-        else:
-            executor_module = importlib.import_module(module_name)
+        # gen_module_file_name guarantees module_name is unique across
+        # the process, so import_module will always cleanly load source
+        # from the tmp fs we just wrote — no re-import / reload dance.
+        executor_module = importlib.import_module(module_name)
         self.operator_module_name = module_name
 
         executors = list(
diff --git 
a/amber/src/main/python/core/architecture/managers/test_executor_manager.py 
b/amber/src/main/python/core/architecture/managers/test_executor_manager.py
index 901f768a21..f376f64468 100644
--- a/amber/src/main/python/core/architecture/managers/test_executor_manager.py
+++ b/amber/src/main/python/core/architecture/managers/test_executor_manager.py
@@ -84,7 +84,6 @@ class TestExecutorManager:
         """Test that ExecutorManager initializes correctly."""
         assert executor_manager.executor is None
         assert executor_manager.operator_module_name is None
-        assert executor_manager.executor_version == 0
 
     def test_reject_r_tuple_language(self, executor_manager):
         """Test that 'r-tuple' language is rejected with ImportError when 
plugin is not available."""
@@ -163,8 +162,11 @@ class TestExecutorManager:
 
         # Verify executor was initialized
         assert executor_manager.executor is not None
-        assert executor_manager.operator_module_name == "udf-v1"
-        assert executor_manager.executor_version == 1
+        # Module name comes from a process-wide counter, so it has the
+        # right shape but its exact value depends on what other tests
+        # have run in the same pytest session.
+        assert executor_manager.operator_module_name is not None
+        assert executor_manager.operator_module_name.startswith("udf-v")
         assert executor_manager.executor.is_source is False
 
     def test_accept_python_language_source_operator(self, executor_manager):
@@ -176,8 +178,8 @@ class TestExecutorManager:
 
         # Verify executor was initialized
         assert executor_manager.executor is not None
-        assert executor_manager.operator_module_name == "udf-v1"
-        assert executor_manager.executor_version == 1
+        assert executor_manager.operator_module_name is not None
+        assert executor_manager.operator_module_name.startswith("udf-v")
         assert executor_manager.executor.is_source is True
 
     def test_reject_other_unsupported_languages(self, executor_manager):
@@ -200,18 +202,26 @@ class TestExecutorManager:
             pass
 
     def test_gen_module_file_name_increments(self, executor_manager):
-        """Test that module file names increment correctly."""
-        module1, file1 = executor_manager.gen_module_file_name()
-        assert module1 == "udf-v1"
-        assert file1 == "udf-v1.py"
+        """Test that module file names increment monotonically.
 
+        The counter is process-wide so the absolute starting value
+        depends on prior tests in the same pytest session; only the
+        relative ordering matters for correctness.
+        """
+        module1, file1 = executor_manager.gen_module_file_name()
         module2, file2 = executor_manager.gen_module_file_name()
-        assert module2 == "udf-v2"
-        assert file2 == "udf-v2.py"
-
         module3, file3 = executor_manager.gen_module_file_name()
-        assert module3 == "udf-v3"
-        assert file3 == "udf-v3.py"
+
+        def version(module_name: str) -> int:
+            return int(module_name.removeprefix("udf-v"))
+
+        v1 = version(module1)
+        assert version(module2) == v1 + 1
+        assert version(module3) == v1 + 2
+
+        assert file1 == f"{module1}.py"
+        assert file2 == f"{module2}.py"
+        assert file3 == f"{module3}.py"
 
     def test_is_concrete_operator_static_method(self):
         """Test the is_concrete_operator static method."""
@@ -246,3 +256,136 @@ class TestExecutorManager:
                 language="python",
             )
         assert "SourceOperator API" in str(exc_info.value)
+
+
+REPLACEMENT_OPERATOR_CODE = """
+from pytexera import *
+
+class ReplacementOperator(UDFOperatorV2):
+    def process_tuple(self, tuple_: Tuple, port: int) -> 
Iterator[Optional[TupleLike]]:
+        yield tuple_
+"""
+
+NO_OPERATOR_CODE = """
+def helper():
+    return 42
+"""
+
+TWO_OPERATORS_CODE = """
+from pytexera import *
+
+class FirstOperator(UDFOperatorV2):
+    def process_tuple(self, tuple_: Tuple, port: int) -> 
Iterator[Optional[TupleLike]]:
+        yield tuple_
+
+class SecondOperator(UDFOperatorV2):
+    def process_tuple(self, tuple_: Tuple, port: int) -> 
Iterator[Optional[TupleLike]]:
+        yield tuple_
+"""
+
+
+class TestUpdateExecutor:
+    """Test suite for ExecutorManager.update_executor.
+
+    Notes on test isolation: the existing TestExecutorManager fixture cannot
+    fully clean up the udf-vN modules it imports (its `hasattr(manager, "_fs")`
+    cleanup guard is buggy — the actual cached_property key is `fs`), so a
+    given udf-v1 module may already live in sys.modules with a path attached
+    to a previous test's tmp filesystem. These tests therefore avoid asserting
+    on attributes baked into a specific operator class and instead use
+    setattr/getattr-only semantics that hold regardless of which cached
+    module satisfies the import.
+    """
+
+    @pytest.fixture
+    def initialized_manager(self):
+        manager = ExecutorManager()
+        manager.initialize_executor(
+            code=SAMPLE_OPERATOR_CODE, is_source=False, language="python"
+        )
+        # Stamp custom attributes on the live instance so the dict-preservation
+        # check works even if the underlying class came from a cached module.
+        manager.executor.runtime_field = "set-after-init"
+        manager.executor.counter = 6
+        yield manager
+        manager.close()
+
+    def test_update_preserves_pre_update_dict_state(self, initialized_manager):
+        before = initialized_manager.executor
+        before_dict = dict(before.__dict__)
+
+        initialized_manager.update_executor(
+            code=REPLACEMENT_OPERATOR_CODE, is_source=False
+        )
+
+        # update_executor reuses the prior __dict__ on a freshly instantiated
+        # operator — verify both halves: a NEW instance, but the OLD state.
+        assert initialized_manager.executor is not before
+        assert initialized_manager.executor.runtime_field == "set-after-init"
+        assert initialized_manager.executor.counter == 6
+        # Assert key presence explicitly so a missing key with an expected
+        # value of None doesn't slip past via dict.get()'s default.
+        after_dict = initialized_manager.executor.__dict__
+        for key, value in before_dict.items():
+            assert key in after_dict, f"key {key!r} missing after update"
+            assert after_dict[key] == value
+
+    def test_update_advances_module_name_monotonically(self, 
initialized_manager):
+        # The module-name counter is process-wide, so absolute values
+        # depend on prior tests in the same pytest session; only the
+        # relative bump matters.
+        before = initialized_manager.operator_module_name
+        assert before is not None and before.startswith("udf-v")
+
+        initialized_manager.update_executor(
+            code=REPLACEMENT_OPERATOR_CODE, is_source=False
+        )
+
+        after = initialized_manager.operator_module_name
+        assert after is not None and after.startswith("udf-v")
+        assert int(after.removeprefix("udf-v")) == 
int(before.removeprefix("udf-v")) + 1
+
+    def test_update_with_source_mismatch_raises_assertion(self, 
initialized_manager):
+        # The replacement code is a regular operator, but is_source=True asks
+        # the manager to treat it as a source operator. Same guardrail as
+        # initialize_executor.
+        with pytest.raises(AssertionError) as exc_info:
+            initialized_manager.update_executor(
+                code=REPLACEMENT_OPERATOR_CODE, is_source=True
+            )
+        assert "SourceOperator API" in str(exc_info.value)
+
+    def test_update_with_no_operator_class_raises_assertion(self, 
initialized_manager):
+        # load_executor_definition asserts exactly one Operator subclass exists
+        # in the module — an empty module trips that assertion.
+        with pytest.raises(AssertionError) as exc_info:
+            initialized_manager.update_executor(code=NO_OPERATOR_CODE, 
is_source=False)
+        assert "one and only one Operator" in str(exc_info.value)
+
+    def test_update_with_multiple_operator_classes_raises_assertion(
+        self, initialized_manager
+    ):
+        with pytest.raises(AssertionError) as exc_info:
+            initialized_manager.update_executor(
+                code=TWO_OPERATORS_CODE, is_source=False
+            )
+        assert "one and only one Operator" in str(exc_info.value)
+
+    def test_repeated_updates_keep_carrying_the_running_state(
+        self, initialized_manager
+    ):
+        # Update once, mutate the new instance, then update again — the second
+        # update must see the *latest* state, not the snapshot from before
+        # the first update.
+        initialized_manager.update_executor(
+            code=REPLACEMENT_OPERATOR_CODE, is_source=False
+        )
+        initialized_manager.executor.counter = 42
+        initialized_manager.executor.added_after_update = True
+
+        initialized_manager.update_executor(
+            code=REPLACEMENT_OPERATOR_CODE, is_source=False
+        )
+
+        assert initialized_manager.executor.counter == 42
+        assert initialized_manager.executor.added_after_update is True

Reply via email to