This is an automated email from the ASF dual-hosted git repository.

aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git


The following commit(s) were added to refs/heads/main by this push:
     new 9f0cfd7517 test(pyamber): add unit tests for six DP-thread state 
managers (#4893)
9f0cfd7517 is described below

commit 9f0cfd7517044a28315b9aba4efb53b521d25c97
Author: Yicong Huang <[email protected]>
AuthorDate: Sun May 3 21:23:49 2026 -0700

    test(pyamber): add unit tests for six DP-thread state managers (#4893)
---
 .../managers/test_console_message_manager.py       |  83 +++++++++
 .../test_embedded_control_message_manager.py       | 194 +++++++++++++++++++++
 .../managers/test_exception_manager.py             |  76 ++++++++
 .../managers/test_state_processing_manager.py      |  59 +++++++
 .../managers/test_statistics_manager.py            | 165 ++++++++++++++++++
 .../managers/test_tuple_processing_manager.py      | 100 +++++++++++
 6 files changed, 677 insertions(+)

diff --git 
a/amber/src/main/python/core/architecture/managers/test_console_message_manager.py
 
b/amber/src/main/python/core/architecture/managers/test_console_message_manager.py
new file mode 100644
index 0000000000..92c18556ac
--- /dev/null
+++ 
b/amber/src/main/python/core/architecture/managers/test_console_message_manager.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime, timedelta
+
+from core.architecture.managers.console_message_manager import 
ConsoleMessageManager
+from proto.org.apache.texera.amber.engine.architecture.rpc import (
+    ConsoleMessage,
+    ConsoleMessageType,
+)
+
+
+def _msg(title: str) -> ConsoleMessage:
+    return ConsoleMessage(
+        worker_id="w0",
+        timestamp=datetime.now(),
+        msg_type=ConsoleMessageType.PRINT,
+        source="src",
+        title=title,
+        message=title,
+    )
+
+
+class TestConsoleMessageManager:
+    def test_initially_force_flush_drains_empty(self):
+        mgr = ConsoleMessageManager()
+        # No messages put yet — force_flush still yields zero items.
+        assert list(mgr.get_messages(force_flush=True)) == []
+
+    def test_force_flush_drains_all_buffered_in_order(self):
+        mgr = ConsoleMessageManager()
+        for t in ("a", "b", "c"):
+            mgr.put_message(_msg(t))
+        flushed = list(mgr.get_messages(force_flush=True))
+        assert [m.title for m in flushed] == ["a", "b", "c"]
+        # A second drain must come back empty — get() is consumptive.
+        assert list(mgr.get_messages(force_flush=True)) == []
+
+    def test_get_without_flush_below_threshold_yields_nothing(self):
+        # Below max_message_num (default 10) and within max_flush_interval
+        # (default 500ms) — the underlying TimedBuffer should withhold output.
+        # Pin `_last_output_time` to "now" right before the assertion so the
+        # `(now - _last_output_time).seconds >= 1` branch can't fire if the
+        # rest of the test happens to run more than ~1s after construction.
+        mgr = ConsoleMessageManager()
+        mgr.put_message(_msg("only"))
+        mgr.print_buf._last_output_time = datetime.now()
+        assert list(mgr.get_messages(force_flush=False)) == []
+        # The withheld message must still be drainable on a force flush.
+        assert [m.title for m in mgr.get_messages(force_flush=True)] == 
["only"]
+
+    def test_get_without_flush_at_or_over_max_message_num_drains(self):
+        # Once buffered count crosses max_message_num (default 10), the
+        # buffer should auto-flush even without force_flush=True.
+        mgr = ConsoleMessageManager()
+        for i in range(10):
+            mgr.put_message(_msg(f"m{i}"))
+        flushed = [m.title for m in mgr.get_messages(force_flush=False)]
+        assert flushed == [f"m{i}" for i in range(10)]
+
+    def test_get_drains_when_last_output_time_is_stale(self):
+        # Backdate the buffer's `_last_output_time` directly so the
+        # >=500ms branch fires even with a single message and
+        # force_flush=False, without sleeping or monkeypatching `datetime`.
+        mgr = ConsoleMessageManager()
+        mgr.put_message(_msg("stale"))
+        mgr.print_buf._last_output_time = datetime.now() - timedelta(seconds=2)
+        flushed = [m.title for m in mgr.get_messages(force_flush=False)]
+        assert flushed == ["stale"]
diff --git 
a/amber/src/main/python/core/architecture/managers/test_embedded_control_message_manager.py
 
b/amber/src/main/python/core/architecture/managers/test_embedded_control_message_manager.py
new file mode 100644
index 0000000000..c6d436aa99
--- /dev/null
+++ 
b/amber/src/main/python/core/architecture/managers/test_embedded_control_message_manager.py
@@ -0,0 +1,194 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from types import SimpleNamespace
+from unittest.mock import MagicMock
+
+import pytest
+
+from core.architecture.managers.embedded_control_message_manager import (
+    EmbeddedControlMessageManager,
+)
+from proto.org.apache.texera.amber.core import (
+    ActorVirtualIdentity,
+    ChannelIdentity,
+    EmbeddedControlMessageIdentity,
+)
+from proto.org.apache.texera.amber.engine.architecture.rpc import (
+    EmbeddedControlMessage,
+    EmbeddedControlMessageType,
+)
+
+
+SELF_ID = ActorVirtualIdentity(name="self")
+
+
+def _channel(from_name: str, to_name: str = "self", is_control: bool = False):
+    return ChannelIdentity(
+        from_worker_id=ActorVirtualIdentity(name=from_name),
+        to_worker_id=ActorVirtualIdentity(name=to_name),
+        is_control=is_control,
+    )
+
+
+def _make_ecm(
+    ecm_type: EmbeddedControlMessageType,
+    scope=None,
+) -> EmbeddedControlMessage:
+    # Each call constructs a fresh 
`EmbeddedControlMessageIdentity(id="ecm-1")`,
+    # but the dataclass-style equality means all of them hash to the same key
+    # in `EmbeddedControlMessageManager.ecm_received`, so messages built from
+    # different invocations still aggregate under the single "ecm-1" entry.
+    return EmbeddedControlMessage(
+        id=EmbeddedControlMessageIdentity(id="ecm-1"),
+        ecm_type=ecm_type,
+        scope=scope or [],
+    )
+
+
+def _gateway_with_data_channels(*data_channels: ChannelIdentity):
+    """Stub InputManager that exposes only `get_all_data_channel_ids`."""
+    gw = MagicMock()
+    gw.get_all_data_channel_ids.return_value = set(data_channels)
+    gw.get_all_channel_ids.return_value = set(data_channels)
+    return gw
+
+
+def _gateway_with_ports(port_layout: dict, all_channels: set):
+    """Stub InputManager that supports per-port lookups for PORT_ALIGNMENT.
+
+    `port_layout` maps PortIdentity-key (use any hashable) -> set of channels.
+    `get_port_id(channel)` resolves channel -> port.
+    """
+    gw = MagicMock()
+    channel_to_port = {ch: pid for pid, chs in port_layout.items() for ch in 
chs}
+    gw.get_port_id.side_effect = lambda ch: channel_to_port[ch]
+    gw.get_port.side_effect = lambda pid: SimpleNamespace(
+        get_channels=lambda chs=port_layout[pid]: chs
+    )
+    gw.get_all_data_channel_ids.return_value = set(all_channels)
+    gw.get_all_channel_ids.return_value = set(all_channels)
+    return gw
+
+
+class TestEcmAllAlignment:
+    def test_returns_false_until_all_channels_received(self):
+        c1, c2, c3 = _channel("a"), _channel("b"), _channel("c")
+        gw = _gateway_with_data_channels(c1, c2, c3)
+        mgr = EmbeddedControlMessageManager(SELF_ID, gw)
+        ecm = _make_ecm(EmbeddedControlMessageType.ALL_ALIGNMENT)
+
+        assert mgr.is_ecm_aligned(c1, ecm) is False
+        assert mgr.is_ecm_aligned(c2, ecm) is False
+        # The third (last) channel completes the alignment.
+        assert mgr.is_ecm_aligned(c3, ecm) is True
+
+    def test_dict_is_cleaned_up_after_full_alignment(self):
+        # Pin the cleanup contract: once every expected channel has reported,
+        # the per-id entry must be deleted so a recycled id cannot bleed
+        # state into the next ECM round.
+        c1, c2 = _channel("a"), _channel("b")
+        gw = _gateway_with_data_channels(c1, c2)
+        mgr = EmbeddedControlMessageManager(SELF_ID, gw)
+        ecm = _make_ecm(EmbeddedControlMessageType.ALL_ALIGNMENT)
+
+        mgr.is_ecm_aligned(c1, ecm)
+        mgr.is_ecm_aligned(c2, ecm)
+        assert ecm.id not in mgr.ecm_received
+
+
+class TestEcmNoAlignment:
+    def test_first_message_completes_subsequent_do_not(self):
+        c1, c2 = _channel("a"), _channel("b")
+        gw = _gateway_with_data_channels(c1, c2)
+        mgr = EmbeddedControlMessageManager(SELF_ID, gw)
+        ecm = _make_ecm(EmbeddedControlMessageType.NO_ALIGNMENT)
+
+        # First channel: ecm_received={c1}, len==1 → True.
+        assert mgr.is_ecm_aligned(c1, ecm) is True
+        # Second channel: ecm_received={c1,c2}, len==2 → False.
+        # (And on this call from_all_channels=True so the dict is dropped.)
+        assert mgr.is_ecm_aligned(c2, ecm) is False
+        assert ecm.id not in mgr.ecm_received
+
+
+class TestEcmPortAlignment:
+    def test_completes_when_a_ports_channels_have_all_arrived(self):
+        a1, a2 = _channel("a1"), _channel("a2")
+        b1 = _channel("b1")
+        ports = {"portA": {a1, a2}, "portB": {b1}}
+        gw = _gateway_with_ports(ports, all_channels={a1, a2, b1})
+        mgr = EmbeddedControlMessageManager(SELF_ID, gw)
+        ecm = _make_ecm(EmbeddedControlMessageType.PORT_ALIGNMENT)
+
+        # Port A needs both a1 and a2.
+        assert mgr.is_ecm_aligned(a1, ecm) is False
+        assert mgr.is_ecm_aligned(a2, ecm) is True
+        # Port B is single-channel, so b1 alone completes its port.
+        assert mgr.is_ecm_aligned(b1, ecm) is True
+
+    def test_unsupported_ecm_type_raises_value_error(self):
+        # The `else: raise ValueError(...)` branch — guard against any new
+        # enum value silently falling through.
+        c1 = _channel("a")
+        gw = _gateway_with_data_channels(c1)
+        mgr = EmbeddedControlMessageManager(SELF_ID, gw)
+        # Use a sentinel that is not one of the three known values. The enum
+        # type is an IntEnum, so an unused integer won't match any branch.
+        ecm = _make_ecm(EmbeddedControlMessageType.ALL_ALIGNMENT)
+        ecm.ecm_type = 999  # type: ignore[assignment]
+
+        with pytest.raises(ValueError, match="Unsupported ECM type"):
+            mgr.is_ecm_aligned(c1, ecm)
+
+
+class TestEcmScope:
+    def test_scope_intersects_with_all_channel_ids(self):
+        # When `ecm.scope` is set, get_channels_within_scope filters the
+        # gateway's known channels to only those whose `to_worker_id == self`
+        # AND that appear in the gateway's `get_all_channel_ids`.
+        c_in_scope = _channel("a", to_name="self")
+        c_other_target = _channel("b", to_name="someone_else")
+        c_not_in_gateway = _channel("c", to_name="self")
+
+        gw = MagicMock()
+        gw.get_all_channel_ids.return_value = {c_in_scope, c_other_target}
+        gw.get_all_data_channel_ids.return_value = {c_in_scope, c_other_target}
+        mgr = EmbeddedControlMessageManager(SELF_ID, gw)
+
+        ecm = _make_ecm(
+            EmbeddedControlMessageType.ALL_ALIGNMENT,
+            scope=[c_in_scope, c_not_in_gateway],
+        )
+
+        # Only c_in_scope is in scope AND known to the gateway. After
+        # receiving it, alignment should complete.
+        assert mgr.is_ecm_aligned(c_in_scope, ecm) is True
+
+    def test_no_scope_falls_back_to_all_data_channels(self):
+        # When scope is empty, the manager uses get_all_data_channel_ids()
+        # rather than get_all_channel_ids() — control vs data routing.
+        c_data = _channel("a", is_control=False)
+        c_control = _channel("b", is_control=True)
+
+        gw = MagicMock()
+        gw.get_all_data_channel_ids.return_value = {c_data}
+        gw.get_all_channel_ids.return_value = {c_data, c_control}
+        mgr = EmbeddedControlMessageManager(SELF_ID, gw)
+
+        ecm = _make_ecm(EmbeddedControlMessageType.ALL_ALIGNMENT, scope=[])
+        assert mgr.is_ecm_aligned(c_data, ecm) is True
diff --git 
a/amber/src/main/python/core/architecture/managers/test_exception_manager.py 
b/amber/src/main/python/core/architecture/managers/test_exception_manager.py
new file mode 100644
index 0000000000..f95380b2cc
--- /dev/null
+++ b/amber/src/main/python/core/architecture/managers/test_exception_manager.py
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import sys
+
+from core.architecture.managers.exception_manager import ExceptionManager
+from core.models import ExceptionInfo
+
+
+def _real_exc_info() -> ExceptionInfo:
+    """Build a real ExceptionInfo by raising and catching, so the traceback
+    object is the actual one Python produces."""
+    try:
+        raise RuntimeError("boom")
+    except RuntimeError:
+        exc, value, tb = sys.exc_info()
+        return ExceptionInfo(exc=exc, value=value, tb=tb)
+
+
+class TestExceptionManager:
+    def test_initial_state(self):
+        mgr = ExceptionManager()
+        assert mgr.exc_info is None
+        assert mgr.exc_info_history == []
+        assert mgr.has_exception() is False
+
+    def test_set_then_has_exception_true(self):
+        mgr = ExceptionManager()
+        info = _real_exc_info()
+        mgr.set_exception_info(info)
+        assert mgr.has_exception() is True
+        assert mgr.exc_info is info
+        assert mgr.exc_info_history == [info]
+
+    def test_get_exc_info_returns_and_clears_current_only(self):
+        # Pin the documented contract: get_exc_info returns the latest stashed
+        # info AND clears the live slot, but the history must keep it. A
+        # regression that also clears history would break replay/retry flows.
+        mgr = ExceptionManager()
+        info = _real_exc_info()
+        mgr.set_exception_info(info)
+
+        assert mgr.get_exc_info() is info
+        assert mgr.exc_info is None
+        assert mgr.has_exception() is False
+        assert mgr.exc_info_history == [info]
+
+    def test_get_exc_info_when_none_returns_none(self):
+        mgr = ExceptionManager()
+        assert mgr.get_exc_info() is None
+
+    def test_history_accumulates_in_order(self):
+        mgr = ExceptionManager()
+        first = _real_exc_info()
+        second = _real_exc_info()
+        mgr.set_exception_info(first)
+        mgr.set_exception_info(second)
+        assert mgr.exc_info is second
+        assert mgr.exc_info_history == [first, second]
+        # Consuming the latest must leave both entries in history.
+        assert mgr.get_exc_info() is second
+        assert mgr.exc_info_history == [first, second]
diff --git 
a/amber/src/main/python/core/architecture/managers/test_state_processing_manager.py
 
b/amber/src/main/python/core/architecture/managers/test_state_processing_manager.py
new file mode 100644
index 0000000000..b55fc7a236
--- /dev/null
+++ 
b/amber/src/main/python/core/architecture/managers/test_state_processing_manager.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from core.architecture.managers.state_processing_manager import 
StateProcessingManager
+from core.models.state import State
+
+
+class TestStateProcessingManager:
+    def test_initial_state_is_none(self):
+        mgr = StateProcessingManager()
+        assert mgr.current_input_state is None
+        assert mgr.current_output_state is None
+        assert mgr.get_input_state() is None
+        assert mgr.get_output_state() is None
+
+    def test_get_input_state_returns_then_clears(self):
+        # The contract is "consume-once": the first get returns the stashed
+        # value, and subsequent gets see None until the slot is set again.
+        mgr = StateProcessingManager()
+        s = State({"k": "v"})
+        mgr.current_input_state = s
+        assert mgr.get_input_state() is s
+        assert mgr.current_input_state is None
+        assert mgr.get_input_state() is None
+
+    def test_get_output_state_returns_then_clears(self):
+        mgr = StateProcessingManager()
+        s = State({"k": "v"})
+        mgr.current_output_state = s
+        assert mgr.get_output_state() is s
+        assert mgr.current_output_state is None
+        assert mgr.get_output_state() is None
+
+    def test_input_and_output_slots_are_independent(self):
+        # Reading the input slot must not consume the output slot, and
+        # vice versa — pin the no-cross-talk contract.
+        mgr = StateProcessingManager()
+        in_s = State({"side": "in"})
+        out_s = State({"side": "out"})
+        mgr.current_input_state = in_s
+        mgr.current_output_state = out_s
+
+        assert mgr.get_input_state() is in_s
+        assert mgr.current_output_state is out_s
+        assert mgr.get_output_state() is out_s
diff --git 
a/amber/src/main/python/core/architecture/managers/test_statistics_manager.py 
b/amber/src/main/python/core/architecture/managers/test_statistics_manager.py
new file mode 100644
index 0000000000..26d1a3ec64
--- /dev/null
+++ 
b/amber/src/main/python/core/architecture/managers/test_statistics_manager.py
@@ -0,0 +1,165 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from core.architecture.managers.statistics_manager import StatisticsManager
+from proto.org.apache.texera.amber.core import PortIdentity
+
+
+def _port(pid: int) -> PortIdentity:
+    return PortIdentity(id=pid, internal=False)
+
+
+class TestStatisticsManagerDefaults:
+    def test_get_statistics_with_no_activity(self):
+        stats = StatisticsManager().get_statistics()
+        assert list(stats.input_tuple_metrics) == []
+        assert list(stats.output_tuple_metrics) == []
+        assert stats.data_processing_time == 0
+        assert stats.control_processing_time == 0
+        # idle_time = total_execution - data - control = 0 at init.
+        assert stats.idle_time == 0
+
+
+class TestStatisticsManagerInputOutput:
+    def test_increase_input_aggregates_count_and_size_per_port(self):
+        mgr = StatisticsManager()
+        mgr.increase_input_statistics(_port(0), 10)
+        mgr.increase_input_statistics(_port(0), 5)
+        mgr.increase_input_statistics(_port(1), 7)
+
+        stats = mgr.get_statistics()
+        by_port = {m.port_id.id: m.tuple_metrics for m in 
stats.input_tuple_metrics}
+        assert by_port[0].count == 2
+        assert by_port[0].size == 15
+        assert by_port[1].count == 1
+        assert by_port[1].size == 7
+        # Output side stayed empty.
+        assert list(stats.output_tuple_metrics) == []
+
+    def test_increase_output_aggregates_count_and_size_per_port(self):
+        mgr = StatisticsManager()
+        mgr.increase_output_statistics(_port(2), 100)
+        mgr.increase_output_statistics(_port(2), 200)
+
+        stats = mgr.get_statistics()
+        by_port = {m.port_id.id: m.tuple_metrics for m in 
stats.output_tuple_metrics}
+        assert by_port[2].count == 2
+        assert by_port[2].size == 300
+        assert list(stats.input_tuple_metrics) == []
+
+    def test_zero_size_input_is_allowed(self):
+        # Pin: zero is valid (size validation is `< 0`, not `<= 0`).
+        # Empty tuples / heartbeat-style records can legitimately be size 0.
+        mgr = StatisticsManager()
+        mgr.increase_input_statistics(_port(0), 0)
+        stats = mgr.get_statistics()
+        m = list(stats.input_tuple_metrics)[0].tuple_metrics
+        assert m.count == 1
+        assert m.size == 0
+
+    @pytest.mark.parametrize(
+        "method", ["increase_input_statistics", "increase_output_statistics"]
+    )
+    def test_negative_size_raises(self, method):
+        mgr = StatisticsManager()
+        with pytest.raises(ValueError, match="Tuple size must be 
non-negative"):
+            getattr(mgr, method)(_port(0), -1)
+
+
+class TestStatisticsManagerProcessingTime:
+    def test_data_and_control_time_accumulate(self):
+        mgr = StatisticsManager()
+        mgr.increase_data_processing_time(100)
+        mgr.increase_data_processing_time(50)
+        mgr.increase_control_processing_time(20)
+        stats = mgr.get_statistics()
+        assert stats.data_processing_time == 150
+        assert stats.control_processing_time == 20
+
+    def test_zero_processing_time_is_allowed(self):
+        mgr = StatisticsManager()
+        mgr.increase_data_processing_time(0)
+        mgr.increase_control_processing_time(0)
+        stats = mgr.get_statistics()
+        assert stats.data_processing_time == 0
+        assert stats.control_processing_time == 0
+
+    @pytest.mark.parametrize(
+        "method",
+        ["increase_data_processing_time", "increase_control_processing_time"],
+    )
+    def test_negative_time_raises(self, method):
+        mgr = StatisticsManager()
+        with pytest.raises(ValueError, match="Time must be non-negative"):
+            getattr(mgr, method)(-1)
+
+
+class TestStatisticsManagerExecutionTime:
+    def test_total_execution_time_is_relative_to_worker_start(self):
+        mgr = StatisticsManager()
+        mgr.initialize_worker_start_time(1_000)
+        mgr.update_total_execution_time(1_500)
+        stats = mgr.get_statistics()
+        # idle = total_exec - data - control = 500 - 0 - 0
+        assert stats.idle_time == 500
+
+    def test_total_execution_time_equal_to_start_is_allowed(self):
+        # The validation is `time < start`, so equality is OK and yields 0.
+        mgr = StatisticsManager()
+        mgr.initialize_worker_start_time(1_000)
+        mgr.update_total_execution_time(1_000)
+        assert mgr.get_statistics().idle_time == 0
+
+    def test_total_execution_time_before_start_raises(self):
+        mgr = StatisticsManager()
+        mgr.initialize_worker_start_time(1_000)
+        with pytest.raises(
+            ValueError,
+            match="Current time must be greater than or equal to worker start 
time",
+        ):
+            mgr.update_total_execution_time(999)
+
+    def test_idle_time_can_go_negative_when_processing_exceeds_total(self):
+        # Pin a real-but-questionable behavior: get_statistics computes
+        # idle_time = total_execution - data - control with NO clamp. If
+        # instrumentation overcounts (or update_total_execution_time was
+        # called early), idle goes negative. Filed as a Bug — see the
+        # accompanying issue. A future fix that floors at 0 must also
+        # update this test deliberately.
+        mgr = StatisticsManager()
+        mgr.initialize_worker_start_time(1_000)
+        mgr.update_total_execution_time(1_100)  # 100ns total
+        mgr.increase_data_processing_time(80)
+        mgr.increase_control_processing_time(50)  # 130 > 100 total
+        stats = mgr.get_statistics()
+        assert stats.idle_time == -30
+
+    @pytest.mark.xfail(
+        strict=True,
+        reason="Bug: idle_time goes negative when data+control processing time 
"
+        "overshoots total_execution_time. The fix should floor at 0 (or 
surface "
+        "the inconsistency); flips to XPASS when corrected.",
+    )
+    def test_idle_time_should_never_be_negative(self):
+        mgr = StatisticsManager()
+        mgr.initialize_worker_start_time(1_000)
+        mgr.update_total_execution_time(1_100)
+        mgr.increase_data_processing_time(80)
+        mgr.increase_control_processing_time(50)
+        assert mgr.get_statistics().idle_time >= 0
diff --git 
a/amber/src/main/python/core/architecture/managers/test_tuple_processing_manager.py
 
b/amber/src/main/python/core/architecture/managers/test_tuple_processing_manager.py
new file mode 100644
index 0000000000..a970a34e35
--- /dev/null
+++ 
b/amber/src/main/python/core/architecture/managers/test_tuple_processing_manager.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from threading import Condition, Event
+
+from core.architecture.managers.tuple_processing_manager import 
TupleProcessingManager
+from core.models import InternalMarker
+from proto.org.apache.texera.amber.core import PortIdentity
+
+
+class TestTupleProcessingManager:
+    def test_initial_state(self):
+        mgr = TupleProcessingManager()
+        assert mgr.current_input_tuple is None
+        assert mgr.current_input_port_id is None
+        assert mgr.current_input_tuple_iter is None
+        assert mgr.current_output_tuple is None
+        assert mgr.current_internal_marker is None
+        assert isinstance(mgr.context_switch_condition, Condition)
+        assert isinstance(mgr.finished_current, Event)
+        assert mgr.finished_current.is_set() is False
+
+    def test_get_internal_marker_consume_once(self):
+        mgr = TupleProcessingManager()
+        marker = InternalMarker()
+        mgr.current_internal_marker = marker
+        assert mgr.get_internal_marker() is marker
+        assert mgr.current_internal_marker is None
+        assert mgr.get_internal_marker() is None
+
+    def test_get_input_tuple_consume_once(self):
+        mgr = TupleProcessingManager()
+        sentinel = object()
+        mgr.current_input_tuple = sentinel
+        assert mgr.get_input_tuple() is sentinel
+        assert mgr.current_input_tuple is None
+        assert mgr.get_input_tuple() is None
+
+    def test_get_output_tuple_consume_once(self):
+        mgr = TupleProcessingManager()
+        sentinel = object()
+        mgr.current_output_tuple = sentinel
+        assert mgr.get_output_tuple() is sentinel
+        assert mgr.current_output_tuple is None
+        assert mgr.get_output_tuple() is None
+
+    def test_get_input_port_id_returns_zero_when_unset(self):
+        # Documented "no upstream / source executor" fallback. Worth pinning
+        # because it conflates "unset" with "real port id 0" — see the
+        # follow-up test below that exposes the collision.
+        mgr = TupleProcessingManager()
+        assert mgr.current_input_port_id is None
+        assert mgr.get_input_port_id() == 0
+
+    def test_get_input_port_id_returns_real_port_id(self):
+        mgr = TupleProcessingManager()
+        mgr.current_input_port_id = PortIdentity(id=7, internal=False)
+        assert mgr.get_input_port_id() == 7
+
+    def test_get_input_port_id_collides_for_port_zero(self):
+        # Pin: a real port with id=0 is indistinguishable from the
+        # "no upstream" sentinel. If callers ever need to tell them apart,
+        # the API has to change — this test guards the current behavior so
+        # any future fix breaks it deliberately.
+        mgr = TupleProcessingManager()
+        mgr.current_input_port_id = PortIdentity(id=0, internal=False)
+        assert mgr.get_input_port_id() == 0
+        # And the sentinel path also returns 0.
+        mgr.current_input_port_id = None
+        assert mgr.get_input_port_id() == 0
+
+    def test_finished_current_event_can_be_signalled(self):
+        mgr = TupleProcessingManager()
+        mgr.finished_current.set()
+        assert mgr.finished_current.is_set() is True
+        mgr.finished_current.clear()
+        assert mgr.finished_current.is_set() is False
+
+    def test_input_tuple_does_not_clear_output_or_marker(self):
+        mgr = TupleProcessingManager()
+        mgr.current_input_tuple = "in"
+        mgr.current_output_tuple = "out"
+        mgr.current_internal_marker = InternalMarker()
+        mgr.get_input_tuple()
+        assert mgr.current_output_tuple == "out"
+        assert mgr.current_internal_marker is not None

Reply via email to