This is an automated email from the ASF dual-hosted git repository.

Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new e99577bcb2 test(pyamber): add unit tests for update_executor logic 
(#4719)
e99577bcb2 is described below

commit e99577bcb2d2613a7435dc389a7bcdca040aa3f4
Author: Yicong Huang <[email protected]>
AuthorDate: Sat May 2 22:25:37 2026 -0700

    test(pyamber): add unit tests for update_executor logic (#4719)
    
    ### What changes were proposed in this PR?
    
    Adds pytest coverage for `ExecutorManager.update_executor` (extending
    the existing `test_executor_manager.py`) and the `UpdateExecutorHandler`
    that wraps it (new `test_update_executor_handler.py`).
    
    ### Any related issues, documentation, discussions?
    
    Closes #4718.
    
    Test-isolation note (not pinned by these tests): the existing
    `executor_manager` fixture has a buggy cleanup guard — `if
    hasattr(manager, "_fs"): manager.close()` checks for `_fs` but the
    `cached_property` actually stores under `fs`, so `close()` is never
    invoked. Combined with the fact that every fresh `ExecutorManager`
    resets `executor_version` to 0 and thus reuses the module name `udf-v1`,
    this leaves stale modules in `sys.modules` with paths that point at
    deleted tmp dirs from earlier tests. The new `TestUpdateExecutor`
    therefore avoids asserting on the *class* of the loaded executor and
    instead checks dict-state preservation via setattr/getattr, so it works
    regardless of which cached `udf-v1` happens to satisfy the import. A
    separate fix would untangle the cleanup guard and probably move version
    numbering off of the per-manager counter.
    
    ### How was this PR tested?
    
    ```
    cd amber/src/main/python
    ruff check core/architecture/managers/test_executor_manager.py 
core/architecture/handlers/control/test_update_executor_handler.py
    ruff format --check core/architecture/managers/test_executor_manager.py 
core/architecture/handlers/control/test_update_executor_handler.py
    python -m pytest core/architecture/managers/test_executor_manager.py 
core/architecture/handlers/control/test_update_executor_handler.py
    ```
    
    ### Was this PR authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code (claude-opus-4-7)
---
 .../control/test_update_executor_handler.py        |  89 ++++++++++++++
 .../architecture/managers/test_executor_manager.py | 133 +++++++++++++++++++++
 2 files changed, 222 insertions(+)

diff --git 
a/amber/src/main/python/core/architecture/handlers/control/test_update_executor_handler.py
 
b/amber/src/main/python/core/architecture/handlers/control/test_update_executor_handler.py
new file mode 100644
index 0000000000..4987dda814
--- /dev/null
+++ 
b/amber/src/main/python/core/architecture/handlers/control/test_update_executor_handler.py
@@ -0,0 +1,89 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import asyncio
+from types import SimpleNamespace
+from unittest.mock import MagicMock
+
+import pytest
+
+from core.architecture.handlers.control.update_executor_handler import (
+    UpdateExecutorHandler,
+)
+from proto.org.apache.texera.amber.core import OpExecInitInfo, OpExecWithCode
+from proto.org.apache.texera.amber.engine.architecture.rpc import (
+    EmptyReturn,
+    UpdateExecutorRequest,
+)
+
+
+def make_request(code: str) -> UpdateExecutorRequest:
+    """Build an UpdateExecutorRequest carrying inline Python code."""
+    return UpdateExecutorRequest(
+        
new_exec_init_info=OpExecInitInfo(op_exec_with_code=OpExecWithCode(code=code))
+    )
+
+
+def make_handler(executor_is_source: bool = False) -> UpdateExecutorHandler:
+    """Wire a handler with a SimpleNamespace context exposing 
executor_manager."""
+    executor_manager = MagicMock()
+    executor_manager.executor = SimpleNamespace(is_source=executor_is_source)
+    context = SimpleNamespace(executor_manager=executor_manager)
+    handler = UpdateExecutorHandler(context)
+    return handler
+
+
+class TestUpdateExecutorHandler:
+    def test_returns_empty_return(self):
+        handler = make_handler(executor_is_source=False)
+        result = asyncio.run(handler.update_executor(make_request("# code")))
+        assert isinstance(result, EmptyReturn)
+
+    def test_delegates_extracted_code_to_executor_manager(self):
+        handler = make_handler(executor_is_source=False)
+        asyncio.run(handler.update_executor(make_request("user-code-v2")))
+        
handler.context.executor_manager.update_executor.assert_called_once_with(
+            "user-code-v2", False
+        )
+
+    def test_propagates_current_executor_is_source_not_request_field(self):
+        # The handler passes the *current* executor's is_source flag forward,
+        # not anything derived from the request payload. Pin this so a future
+        # change that reads is_source from the request is reviewed.
+        handler = make_handler(executor_is_source=True)
+        asyncio.run(handler.update_executor(make_request("source-code")))
+        
handler.context.executor_manager.update_executor.assert_called_once_with(
+            "source-code", True
+        )
+
+    def test_extracts_code_via_get_one_of_for_op_exec_with_code(self):
+        # OpExecInitInfo is a sealed-oneof of {with_class_name, with_code,
+        # source}. The handler relies on get_one_of to surface the populated
+        # variant; if the request carries a different variant the handler must
+        # not silently call the manager with stale data — instead the call
+        # surfaces an attribute error on `.code`. Pin the contract explicitly.
+        from proto.org.apache.texera.amber.core import OpExecWithClassName
+
+        handler = make_handler(executor_is_source=False)
+        request = UpdateExecutorRequest(
+            new_exec_init_info=OpExecInitInfo(
+                op_exec_with_class_name=OpExecWithClassName(class_name="X")
+            )
+        )
+        with pytest.raises(AttributeError):
+            asyncio.run(handler.update_executor(request))
+        handler.context.executor_manager.update_executor.assert_not_called()
diff --git 
a/amber/src/main/python/core/architecture/managers/test_executor_manager.py 
b/amber/src/main/python/core/architecture/managers/test_executor_manager.py
index 901f768a21..cfcdde1880 100644
--- a/amber/src/main/python/core/architecture/managers/test_executor_manager.py
+++ b/amber/src/main/python/core/architecture/managers/test_executor_manager.py
@@ -246,3 +246,136 @@ class TestExecutorManager:
                 language="python",
             )
         assert "SourceOperator API" in str(exc_info.value)
+
+
+REPLACEMENT_OPERATOR_CODE = """
+from pytexera import *
+
+class ReplacementOperator(UDFOperatorV2):
+    def process_tuple(self, tuple_: Tuple, port: int) -> 
Iterator[Optional[TupleLike]]:
+        yield tuple_
+"""
+
+NO_OPERATOR_CODE = """
+def helper():
+    return 42
+"""
+
+TWO_OPERATORS_CODE = """
+from pytexera import *
+
+class FirstOperator(UDFOperatorV2):
+    def process_tuple(self, tuple_: Tuple, port: int) -> 
Iterator[Optional[TupleLike]]:
+        yield tuple_
+
+class SecondOperator(UDFOperatorV2):
+    def process_tuple(self, tuple_: Tuple, port: int) -> 
Iterator[Optional[TupleLike]]:
+        yield tuple_
+"""
+
+
+class TestUpdateExecutor:
+    """Test suite for ExecutorManager.update_executor.
+
+    Notes on test isolation: the existing TestExecutorManager fixture cannot
+    fully clean up the udf-vN modules it imports (its `hasattr(manager, "_fs")`
+    cleanup guard is buggy — the actual cached_property key is `fs`), so a
+    given udf-v1 module may already live in sys.modules with a path attached
+    to a previous test's tmp filesystem. These tests therefore avoid asserting
+    on attributes baked into a specific operator class and instead use
+    setattr/getattr-only semantics that hold regardless of which cached
+    module satisfies the import.
+    """
+
+    @pytest.fixture
+    def initialized_manager(self):
+        manager = ExecutorManager()
+        manager.initialize_executor(
+            code=SAMPLE_OPERATOR_CODE, is_source=False, language="python"
+        )
+        # Stamp custom attributes on the live instance so the dict-preservation
+        # check works even if the underlying class came from a cached module.
+        manager.executor.runtime_field = "set-after-init"
+        manager.executor.counter = 6
+        yield manager
+        manager.close()
+
+    def test_update_preserves_pre_update_dict_state(self, initialized_manager):
+        before = initialized_manager.executor
+        before_dict = dict(before.__dict__)
+
+        initialized_manager.update_executor(
+            code=REPLACEMENT_OPERATOR_CODE, is_source=False
+        )
+
+        # update_executor reuses the prior __dict__ on a freshly instantiated
+        # operator — verify both halves: a NEW instance, but the OLD state.
+        assert initialized_manager.executor is not before
+        assert initialized_manager.executor.runtime_field == "set-after-init"
+        assert initialized_manager.executor.counter == 6
+        # Assert key presence explicitly so a missing key with an expected
+        # value of None doesn't slip past via dict.get()'s default.
+        after_dict = initialized_manager.executor.__dict__
+        for key, value in before_dict.items():
+            assert key in after_dict, f"key {key!r} missing after update"
+            assert after_dict[key] == value
+
+    def test_update_increments_executor_version_and_module_name(
+        self, initialized_manager
+    ):
+        # initialize_executor already produced udf-v1.
+        assert initialized_manager.executor_version == 1
+        assert initialized_manager.operator_module_name == "udf-v1"
+
+        initialized_manager.update_executor(
+            code=REPLACEMENT_OPERATOR_CODE, is_source=False
+        )
+
+        assert initialized_manager.executor_version == 2
+        assert initialized_manager.operator_module_name == "udf-v2"
+
+    def test_update_with_source_mismatch_raises_assertion(self, 
initialized_manager):
+        # The replacement code is a regular operator, but is_source=True asks
+        # the manager to treat it as a source operator. Same guardrail as
+        # initialize_executor.
+        with pytest.raises(AssertionError) as exc_info:
+            initialized_manager.update_executor(
+                code=REPLACEMENT_OPERATOR_CODE, is_source=True
+            )
+        assert "SourceOperator API" in str(exc_info.value)
+
+    def test_update_with_no_operator_class_raises_assertion(self, 
initialized_manager):
+        # load_executor_definition asserts exactly one Operator subclass exists
+        # in the module — an empty module trips that assertion.
+        with pytest.raises(AssertionError) as exc_info:
+            initialized_manager.update_executor(code=NO_OPERATOR_CODE, 
is_source=False)
+        assert "one and only one Operator" in str(exc_info.value)
+
+    def test_update_with_multiple_operator_classes_raises_assertion(
+        self, initialized_manager
+    ):
+        with pytest.raises(AssertionError) as exc_info:
+            initialized_manager.update_executor(
+                code=TWO_OPERATORS_CODE, is_source=False
+            )
+        assert "one and only one Operator" in str(exc_info.value)
+
+    def test_repeated_updates_keep_carrying_the_running_state(
+        self, initialized_manager
+    ):
+        # Update once, mutate the new instance, then update again — the second
+        # update must see the *latest* state, not the snapshot from before
+        # the first update.
+        initialized_manager.update_executor(
+            code=REPLACEMENT_OPERATOR_CODE, is_source=False
+        )
+        initialized_manager.executor.counter = 42
+        initialized_manager.executor.added_after_update = True
+
+        initialized_manager.update_executor(
+            code=REPLACEMENT_OPERATOR_CODE, is_source=False
+        )
+
+        assert initialized_manager.executor.counter == 42
+        assert initialized_manager.executor.added_after_update is True
+        assert initialized_manager.executor_version == 3

Reply via email to