This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 871017e2b3 test(amber-python): add unit tests for evaluate-expression
and retry-current-tuple handlers (#4520)
871017e2b3 is described below
commit 871017e2b354618f0b94a61a559aa0e78db06200
Author: Yicong Huang <[email protected]>
AuthorDate: Sun Apr 26 20:19:46 2026 -0700
test(amber-python): add unit tests for evaluate-expression and
retry-current-tuple handlers (#4520)
### What changes were proposed in this PR?
Adds unit tests for the two debugger-adjacent worker RPC handlers that
had no Python coverage:
- **`test_evaluate_expression_handler.py`** (4 cases) — covers
`EvaluateExpressionHandler.evaluate_python_expression`, which backs the
frontend's "watch variable" feature. Verifies the evaluator's return is
passed through unchanged, the runtime context exposes the executor as
`self` / current tuple as `tuple_` / current port id as `input_`, the
context is read fresh on each call (not snapshot at handler
construction), and the handler tolerates `None` tuple/port (worker
before any input has arrived).
- **`test_replay_current_tuple_handler.py`** (6 cases) — covers
`RetryCurrentTupleHandler.retry_current_tuple` (used by debugger "step
over an exception" flows). Verifies it chains the current tuple onto the
front of the input iterator, resumes `USER_PAUSE` + `EXCEPTION_PAUSE` in
order, **does not** resume `DEBUG_PAUSE` (so an active debugging session
is not silently dropped), no-ops when the worker is `COMPLETED`, and
still chains correctly when the remaining iterator is empty.
No production code is touched. Async handlers are driven via
`asyncio.run` to avoid pulling in `pytest-asyncio`, matching the pattern
from #4510 / #4512.
### Any related issues, documentation, discussions?
Closes #4516. Same gap pattern as #4509.
### How was this PR tested?
```
$ python -m pytest
core/architecture/handlers/control/test_evaluate_expression_handler.py \
core/architecture/handlers/control/test_replay_current_tuple_handler.py -v
======================== 10 passed in 1.14s ========================
```
`ruff format --check .` and `ruff check .` clean locally.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.7)
---
.../control/test_evaluate_expression_handler.py | 156 +++++++++++++++++++++
.../control/test_replay_current_tuple_handler.py | 139 ++++++++++++++++++
2 files changed, 295 insertions(+)
diff --git
a/amber/src/main/python/core/architecture/handlers/control/test_evaluate_expression_handler.py
b/amber/src/main/python/core/architecture/handlers/control/test_evaluate_expression_handler.py
new file mode 100644
index 0000000000..a72c1f8263
--- /dev/null
+++
b/amber/src/main/python/core/architecture/handlers/control/test_evaluate_expression_handler.py
@@ -0,0 +1,156 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import asyncio
+from types import SimpleNamespace
+from unittest.mock import patch
+
+import pytest
+
+from core.architecture.handlers.control.evaluate_expression_handler import (
+ EvaluateExpressionHandler,
+)
+from proto.org.apache.texera.amber.engine.architecture.rpc import (
+ EvaluatedValue,
+ EvaluatePythonExpressionRequest,
+ TypedValue,
+)
+
+
+class TestEvaluateExpressionHandler:
+ @pytest.fixture
+ def executor(self):
+ # A stand-in for the user's UDF instance — anything addressable as
+ # `self` from the evaluated expression will do.
+ return SimpleNamespace(state="alive")
+
+ @pytest.fixture
+ def handler(self, executor):
+ instance = EvaluateExpressionHandler.__new__(EvaluateExpressionHandler)
+ instance.context = SimpleNamespace(
+ executor_manager=SimpleNamespace(executor=executor),
+ tuple_processing_manager=SimpleNamespace(
+ current_input_tuple={"col": 42},
+ current_input_port_id="port-0",
+ ),
+ )
+ return instance
+
+ def test_returns_what_the_evaluator_returns(self, handler):
+ sentinel = EvaluatedValue(
+ value=TypedValue(expression="1+1", value_ref="2", value_type="int")
+ )
+ with patch(
+ "core.architecture.handlers.control.evaluate_expression_handler"
+ ".ExpressionEvaluator.evaluate",
+ return_value=sentinel,
+ ) as evaluate:
+ result = asyncio.run(
+ handler.evaluate_python_expression(
+ EvaluatePythonExpressionRequest(expression="1+1")
+ )
+ )
+
+ assert result is sentinel
+ evaluate.assert_called_once()
+
+ def test_runtime_context_exposes_self_tuple_input(self, handler, executor):
+ with patch(
+ "core.architecture.handlers.control.evaluate_expression_handler"
+ ".ExpressionEvaluator.evaluate",
+ return_value=EvaluatedValue(),
+ ) as evaluate:
+ asyncio.run(
+ handler.evaluate_python_expression(
+ EvaluatePythonExpressionRequest(expression="self.state")
+ )
+ )
+
+ expression, runtime_context = evaluate.call_args.args
+ assert expression == "self.state"
+ assert runtime_context["self"] is executor
+ assert runtime_context["tuple_"] == {"col": 42}
+ assert runtime_context["input_"] == "port-0"
+
+ def test_runtime_context_reflects_current_tuple_at_call_time(
+ self, handler, executor
+ ):
+ # The handler must read the *current* tuple/port out of the context on
+ # each call — not snapshot them at construction. Drive two calls with
+ # different intermediate state.
+ captured: list = []
+
+ def capture(_expression, runtime_context):
+ captured.append((runtime_context["tuple_"],
runtime_context["input_"]))
+ return EvaluatedValue()
+
+ with patch(
+ "core.architecture.handlers.control.evaluate_expression_handler"
+ ".ExpressionEvaluator.evaluate",
+ side_effect=capture,
+ ):
+ asyncio.run(
+ handler.evaluate_python_expression(
+ EvaluatePythonExpressionRequest(expression="x")
+ )
+ )
+ handler.context.tuple_processing_manager.current_input_tuple =
{"col": 99}
+ handler.context.tuple_processing_manager.current_input_port_id =
"port-1"
+ asyncio.run(
+ handler.evaluate_python_expression(
+ EvaluatePythonExpressionRequest(expression="x")
+ )
+ )
+
+ assert captured == [({"col": 42}, "port-0"), ({"col": 99}, "port-1")]
+
+ def test_handles_none_input_tuple_and_port(self, handler):
+ # Before the worker has received any input, current_input_tuple and
+ # current_input_port_id are None. The handler must still build a
+ # context (the user might be evaluating `self.foo`).
+ handler.context.tuple_processing_manager.current_input_tuple = None
+ handler.context.tuple_processing_manager.current_input_port_id = None
+ with patch(
+ "core.architecture.handlers.control.evaluate_expression_handler"
+ ".ExpressionEvaluator.evaluate",
+ return_value=EvaluatedValue(),
+ ) as evaluate:
+ asyncio.run(
+ handler.evaluate_python_expression(
+ EvaluatePythonExpressionRequest(expression="self.state")
+ )
+ )
+
+ _expression, runtime_context = evaluate.call_args.args
+ assert runtime_context["tuple_"] is None
+ assert runtime_context["input_"] is None
+
+ def test_evaluator_exception_propagates(self, handler):
+ # If the evaluator raises (bad syntax, attribute error in the user's
+ # expression, etc.), the handler must not swallow it — the RPC layer
+ # is responsible for surfacing the failure to the frontend.
+ with patch(
+ "core.architecture.handlers.control.evaluate_expression_handler"
+ ".ExpressionEvaluator.evaluate",
+ side_effect=AttributeError("no such attribute"),
+ ):
+ with pytest.raises(AttributeError, match="no such attribute"):
+ asyncio.run(
+ handler.evaluate_python_expression(
+
EvaluatePythonExpressionRequest(expression="self.missing")
+ )
+ )
diff --git
a/amber/src/main/python/core/architecture/handlers/control/test_replay_current_tuple_handler.py
b/amber/src/main/python/core/architecture/handlers/control/test_replay_current_tuple_handler.py
new file mode 100644
index 0000000000..2ba9b92131
--- /dev/null
+++
b/amber/src/main/python/core/architecture/handlers/control/test_replay_current_tuple_handler.py
@@ -0,0 +1,139 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import asyncio
+from types import SimpleNamespace
+from unittest.mock import MagicMock
+
+import pytest
+
+from core.architecture.handlers.control.replay_current_tuple_handler import (
+ RetryCurrentTupleHandler,
+)
+from core.architecture.managers.pause_manager import PauseType
+from proto.org.apache.texera.amber.engine.architecture.rpc import (
+ EmptyRequest,
+ EmptyReturn,
+)
+from proto.org.apache.texera.amber.engine.architecture.worker import
WorkerState
+
+
+def _build_handler(state: WorkerState, current_tuple, remaining_iter):
+ instance = RetryCurrentTupleHandler.__new__(RetryCurrentTupleHandler)
+ state_manager = MagicMock()
+ state_manager.confirm_state.side_effect = lambda *states: state in states
+ instance.context = SimpleNamespace(
+ state_manager=state_manager,
+ tuple_processing_manager=SimpleNamespace(
+ current_input_tuple=current_tuple,
+ current_input_tuple_iter=iter(remaining_iter),
+ ),
+ pause_manager=MagicMock(),
+ )
+ return instance
+
+
+class TestRetryCurrentTupleHandler:
+ @pytest.fixture
+ def running_handler(self):
+ return _build_handler(
+ WorkerState.RUNNING,
+ current_tuple={"col": "current"},
+ remaining_iter=[{"col": "next"}],
+ )
+
+ def test_returns_empty_return(self, running_handler):
+ result =
asyncio.run(running_handler.retry_current_tuple(EmptyRequest()))
+ assert isinstance(result, EmptyReturn)
+
+ def test_chains_current_tuple_back_onto_iterator(self, running_handler):
+ asyncio.run(running_handler.retry_current_tuple(EmptyRequest()))
+ # The iterator must now yield the current tuple first, then the
+ # tuples that were already queued.
+ chained = list(
+
running_handler.context.tuple_processing_manager.current_input_tuple_iter
+ )
+ assert chained == [{"col": "current"}, {"col": "next"}]
+
+ def test_resumes_user_and_exception_pause_in_order(self, running_handler):
+ asyncio.run(running_handler.retry_current_tuple(EmptyRequest()))
+ actual = [
+ call.args[0]
+ for call in
running_handler.context.pause_manager.resume.call_args_list
+ ]
+ assert actual == [PauseType.USER_PAUSE, PauseType.EXCEPTION_PAUSE]
+
+ def test_does_not_resume_debug_pause(self, running_handler):
+ # Unlike WorkerDebugCommandHandler, retry only releases USER and
+ # EXCEPTION pauses — DEBUG_PAUSE must remain in effect so an active
+ # debugging session is not silently dropped.
+ asyncio.run(running_handler.retry_current_tuple(EmptyRequest()))
+ resumed = {
+ call.args[0]
+ for call in
running_handler.context.pause_manager.resume.call_args_list
+ }
+ assert PauseType.DEBUG_PAUSE not in resumed
+
+ def test_no_op_when_state_is_completed(self):
+ completed_handler = _build_handler(
+ WorkerState.COMPLETED,
+ current_tuple={"col": "current"},
+ remaining_iter=[{"col": "next"}],
+ )
+ result =
asyncio.run(completed_handler.retry_current_tuple(EmptyRequest()))
+
+ # Iterator must be untouched (no chaining), and no pause type is
+ # resumed — replaying a tuple after completion is meaningless.
+ remaining = list(
+
completed_handler.context.tuple_processing_manager.current_input_tuple_iter
+ )
+ assert remaining == [{"col": "next"}]
+ completed_handler.context.pause_manager.resume.assert_not_called()
+ assert isinstance(result, EmptyReturn)
+
+ def test_chains_even_when_remaining_iter_is_exhausted(self):
+ handler = _build_handler(
+ WorkerState.RUNNING,
+ current_tuple={"col": "lone"},
+ remaining_iter=[],
+ )
+ asyncio.run(handler.retry_current_tuple(EmptyRequest()))
+ chained = list(
+ handler.context.tuple_processing_manager.current_input_tuple_iter
+ )
+ assert chained == [{"col": "lone"}]
+
+ def test_paused_state_still_chains_and_resumes(self):
+ # The completion guard is `if not confirm_state(COMPLETED)`, so every
+ # other state — RUNNING, READY, PAUSED, UNINITIALIZED — must take the
+ # chain+resume path. PAUSED is the most likely real-world entry point
+ # (the user hits "retry" while the worker is paused on an exception).
+ handler = _build_handler(
+ WorkerState.PAUSED,
+ current_tuple={"col": "current"},
+ remaining_iter=[{"col": "next"}],
+ )
+ asyncio.run(handler.retry_current_tuple(EmptyRequest()))
+
+ chained = list(
+ handler.context.tuple_processing_manager.current_input_tuple_iter
+ )
+ assert chained == [{"col": "current"}, {"col": "next"}]
+ resumed = [
+ call.args[0] for call in
handler.context.pause_manager.resume.call_args_list
+ ]
+ assert resumed == [PauseType.USER_PAUSE, PauseType.EXCEPTION_PAUSE]