This is an automated email from the ASF dual-hosted git repository.
aglinxinyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 9f0cfd7517 test(pyamber): add unit tests for six DP-thread state
managers (#4893)
9f0cfd7517 is described below
commit 9f0cfd7517044a28315b9aba4efb53b521d25c97
Author: Yicong Huang <[email protected]>
AuthorDate: Sun May 3 21:23:49 2026 -0700
test(pyamber): add unit tests for six DP-thread state managers (#4893)
---
.../managers/test_console_message_manager.py | 83 +++++++++
.../test_embedded_control_message_manager.py | 194 +++++++++++++++++++++
.../managers/test_exception_manager.py | 76 ++++++++
.../managers/test_state_processing_manager.py | 59 +++++++
.../managers/test_statistics_manager.py | 165 ++++++++++++++++++
.../managers/test_tuple_processing_manager.py | 100 +++++++++++
6 files changed, 677 insertions(+)
diff --git
a/amber/src/main/python/core/architecture/managers/test_console_message_manager.py
b/amber/src/main/python/core/architecture/managers/test_console_message_manager.py
new file mode 100644
index 0000000000..92c18556ac
--- /dev/null
+++
b/amber/src/main/python/core/architecture/managers/test_console_message_manager.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime, timedelta
+
+from core.architecture.managers.console_message_manager import
ConsoleMessageManager
+from proto.org.apache.texera.amber.engine.architecture.rpc import (
+ ConsoleMessage,
+ ConsoleMessageType,
+)
+
+
+def _msg(title: str) -> ConsoleMessage:
+ return ConsoleMessage(
+ worker_id="w0",
+ timestamp=datetime.now(),
+ msg_type=ConsoleMessageType.PRINT,
+ source="src",
+ title=title,
+ message=title,
+ )
+
+
+class TestConsoleMessageManager:
+ def test_initially_force_flush_drains_empty(self):
+ mgr = ConsoleMessageManager()
+ # No messages put yet — force_flush still yields zero items.
+ assert list(mgr.get_messages(force_flush=True)) == []
+
+ def test_force_flush_drains_all_buffered_in_order(self):
+ mgr = ConsoleMessageManager()
+ for t in ("a", "b", "c"):
+ mgr.put_message(_msg(t))
+ flushed = list(mgr.get_messages(force_flush=True))
+ assert [m.title for m in flushed] == ["a", "b", "c"]
+ # A second drain must come back empty — get() is consumptive.
+ assert list(mgr.get_messages(force_flush=True)) == []
+
+ def test_get_without_flush_below_threshold_yields_nothing(self):
+ # Below max_message_num (default 10) and within max_flush_interval
+ # (default 500ms) — the underlying TimedBuffer should withhold output.
+ # Pin `_last_output_time` to "now" right before the assertion so the
+ # `(now - _last_output_time).seconds >= 1` branch can't fire if the
+ # rest of the test happens to run more than ~1s after construction.
+ mgr = ConsoleMessageManager()
+ mgr.put_message(_msg("only"))
+ mgr.print_buf._last_output_time = datetime.now()
+ assert list(mgr.get_messages(force_flush=False)) == []
+ # The withheld message must still be drainable on a force flush.
+ assert [m.title for m in mgr.get_messages(force_flush=True)] ==
["only"]
+
+ def test_get_without_flush_at_or_over_max_message_num_drains(self):
+ # Once buffered count crosses max_message_num (default 10), the
+ # buffer should auto-flush even without force_flush=True.
+ mgr = ConsoleMessageManager()
+ for i in range(10):
+ mgr.put_message(_msg(f"m{i}"))
+ flushed = [m.title for m in mgr.get_messages(force_flush=False)]
+ assert flushed == [f"m{i}" for i in range(10)]
+
+ def test_get_drains_when_last_output_time_is_stale(self):
+ # Backdate the buffer's `_last_output_time` directly so the
+ # >=500ms branch fires even with a single message and
+ # force_flush=False, without sleeping or monkeypatching `datetime`.
+ mgr = ConsoleMessageManager()
+ mgr.put_message(_msg("stale"))
+ mgr.print_buf._last_output_time = datetime.now() - timedelta(seconds=2)
+ flushed = [m.title for m in mgr.get_messages(force_flush=False)]
+ assert flushed == ["stale"]
diff --git
a/amber/src/main/python/core/architecture/managers/test_embedded_control_message_manager.py
b/amber/src/main/python/core/architecture/managers/test_embedded_control_message_manager.py
new file mode 100644
index 0000000000..c6d436aa99
--- /dev/null
+++
b/amber/src/main/python/core/architecture/managers/test_embedded_control_message_manager.py
@@ -0,0 +1,194 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from types import SimpleNamespace
+from unittest.mock import MagicMock
+
+import pytest
+
+from core.architecture.managers.embedded_control_message_manager import (
+ EmbeddedControlMessageManager,
+)
+from proto.org.apache.texera.amber.core import (
+ ActorVirtualIdentity,
+ ChannelIdentity,
+ EmbeddedControlMessageIdentity,
+)
+from proto.org.apache.texera.amber.engine.architecture.rpc import (
+ EmbeddedControlMessage,
+ EmbeddedControlMessageType,
+)
+
+
+SELF_ID = ActorVirtualIdentity(name="self")
+
+
+def _channel(from_name: str, to_name: str = "self", is_control: bool = False):
+ return ChannelIdentity(
+ from_worker_id=ActorVirtualIdentity(name=from_name),
+ to_worker_id=ActorVirtualIdentity(name=to_name),
+ is_control=is_control,
+ )
+
+
+def _make_ecm(
+ ecm_type: EmbeddedControlMessageType,
+ scope=None,
+) -> EmbeddedControlMessage:
+ # Each call constructs a fresh
`EmbeddedControlMessageIdentity(id="ecm-1")`,
+ # but the dataclass-style equality means all of them hash to the same key
+ # in `EmbeddedControlMessageManager.ecm_received`, so messages built from
+ # different invocations still aggregate under the single "ecm-1" entry.
+ return EmbeddedControlMessage(
+ id=EmbeddedControlMessageIdentity(id="ecm-1"),
+ ecm_type=ecm_type,
+ scope=scope or [],
+ )
+
+
+def _gateway_with_data_channels(*data_channels: ChannelIdentity):
+ """Stub InputManager that exposes only `get_all_data_channel_ids`."""
+ gw = MagicMock()
+ gw.get_all_data_channel_ids.return_value = set(data_channels)
+ gw.get_all_channel_ids.return_value = set(data_channels)
+ return gw
+
+
+def _gateway_with_ports(port_layout: dict, all_channels: set):
+ """Stub InputManager that supports per-port lookups for PORT_ALIGNMENT.
+
+ `port_layout` maps PortIdentity-key (use any hashable) -> set of channels.
+ `get_port_id(channel)` resolves channel -> port.
+ """
+ gw = MagicMock()
+ channel_to_port = {ch: pid for pid, chs in port_layout.items() for ch in
chs}
+ gw.get_port_id.side_effect = lambda ch: channel_to_port[ch]
+ gw.get_port.side_effect = lambda pid: SimpleNamespace(
+ get_channels=lambda chs=port_layout[pid]: chs
+ )
+ gw.get_all_data_channel_ids.return_value = set(all_channels)
+ gw.get_all_channel_ids.return_value = set(all_channels)
+ return gw
+
+
+class TestEcmAllAlignment:
+ def test_returns_false_until_all_channels_received(self):
+ c1, c2, c3 = _channel("a"), _channel("b"), _channel("c")
+ gw = _gateway_with_data_channels(c1, c2, c3)
+ mgr = EmbeddedControlMessageManager(SELF_ID, gw)
+ ecm = _make_ecm(EmbeddedControlMessageType.ALL_ALIGNMENT)
+
+ assert mgr.is_ecm_aligned(c1, ecm) is False
+ assert mgr.is_ecm_aligned(c2, ecm) is False
+ # The third (last) channel completes the alignment.
+ assert mgr.is_ecm_aligned(c3, ecm) is True
+
+ def test_dict_is_cleaned_up_after_full_alignment(self):
+ # Pin the cleanup contract: once every expected channel has reported,
+ # the per-id entry must be deleted so a recycled id cannot bleed
+ # state into the next ECM round.
+ c1, c2 = _channel("a"), _channel("b")
+ gw = _gateway_with_data_channels(c1, c2)
+ mgr = EmbeddedControlMessageManager(SELF_ID, gw)
+ ecm = _make_ecm(EmbeddedControlMessageType.ALL_ALIGNMENT)
+
+ mgr.is_ecm_aligned(c1, ecm)
+ mgr.is_ecm_aligned(c2, ecm)
+ assert ecm.id not in mgr.ecm_received
+
+
+class TestEcmNoAlignment:
+ def test_first_message_completes_subsequent_do_not(self):
+ c1, c2 = _channel("a"), _channel("b")
+ gw = _gateway_with_data_channels(c1, c2)
+ mgr = EmbeddedControlMessageManager(SELF_ID, gw)
+ ecm = _make_ecm(EmbeddedControlMessageType.NO_ALIGNMENT)
+
+ # First channel: ecm_received={c1}, len==1 → True.
+ assert mgr.is_ecm_aligned(c1, ecm) is True
+ # Second channel: ecm_received={c1,c2}, len==2 → False.
+ # (And on this call from_all_channels=True so the dict is dropped.)
+ assert mgr.is_ecm_aligned(c2, ecm) is False
+ assert ecm.id not in mgr.ecm_received
+
+
+class TestEcmPortAlignment:
+ def test_completes_when_a_ports_channels_have_all_arrived(self):
+ a1, a2 = _channel("a1"), _channel("a2")
+ b1 = _channel("b1")
+ ports = {"portA": {a1, a2}, "portB": {b1}}
+ gw = _gateway_with_ports(ports, all_channels={a1, a2, b1})
+ mgr = EmbeddedControlMessageManager(SELF_ID, gw)
+ ecm = _make_ecm(EmbeddedControlMessageType.PORT_ALIGNMENT)
+
+ # Port A needs both a1 and a2.
+ assert mgr.is_ecm_aligned(a1, ecm) is False
+ assert mgr.is_ecm_aligned(a2, ecm) is True
+ # Port B is single-channel, so b1 alone completes its port.
+ assert mgr.is_ecm_aligned(b1, ecm) is True
+
+ def test_unsupported_ecm_type_raises_value_error(self):
+ # The `else: raise ValueError(...)` branch — guard against any new
+ # enum value silently falling through.
+ c1 = _channel("a")
+ gw = _gateway_with_data_channels(c1)
+ mgr = EmbeddedControlMessageManager(SELF_ID, gw)
+ # Use a sentinel that is not one of the three known values. The enum
+ # type is an IntEnum, so an unused integer won't match any branch.
+ ecm = _make_ecm(EmbeddedControlMessageType.ALL_ALIGNMENT)
+ ecm.ecm_type = 999 # type: ignore[assignment]
+
+ with pytest.raises(ValueError, match="Unsupported ECM type"):
+ mgr.is_ecm_aligned(c1, ecm)
+
+
+class TestEcmScope:
+ def test_scope_intersects_with_all_channel_ids(self):
+ # When `ecm.scope` is set, get_channels_within_scope filters the
+ # gateway's known channels to only those whose `to_worker_id == self`
+ # AND that appear in the gateway's `get_all_channel_ids`.
+ c_in_scope = _channel("a", to_name="self")
+ c_other_target = _channel("b", to_name="someone_else")
+ c_not_in_gateway = _channel("c", to_name="self")
+
+ gw = MagicMock()
+ gw.get_all_channel_ids.return_value = {c_in_scope, c_other_target}
+ gw.get_all_data_channel_ids.return_value = {c_in_scope, c_other_target}
+ mgr = EmbeddedControlMessageManager(SELF_ID, gw)
+
+ ecm = _make_ecm(
+ EmbeddedControlMessageType.ALL_ALIGNMENT,
+ scope=[c_in_scope, c_not_in_gateway],
+ )
+
+ # Only c_in_scope is in scope AND known to the gateway. After
+ # receiving it, alignment should complete.
+ assert mgr.is_ecm_aligned(c_in_scope, ecm) is True
+
+ def test_no_scope_falls_back_to_all_data_channels(self):
+ # When scope is empty, the manager uses get_all_data_channel_ids()
+ # rather than get_all_channel_ids() — control vs data routing.
+ c_data = _channel("a", is_control=False)
+ c_control = _channel("b", is_control=True)
+
+ gw = MagicMock()
+ gw.get_all_data_channel_ids.return_value = {c_data}
+ gw.get_all_channel_ids.return_value = {c_data, c_control}
+ mgr = EmbeddedControlMessageManager(SELF_ID, gw)
+
+ ecm = _make_ecm(EmbeddedControlMessageType.ALL_ALIGNMENT, scope=[])
+ assert mgr.is_ecm_aligned(c_data, ecm) is True
diff --git
a/amber/src/main/python/core/architecture/managers/test_exception_manager.py
b/amber/src/main/python/core/architecture/managers/test_exception_manager.py
new file mode 100644
index 0000000000..f95380b2cc
--- /dev/null
+++ b/amber/src/main/python/core/architecture/managers/test_exception_manager.py
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import sys
+
+from core.architecture.managers.exception_manager import ExceptionManager
+from core.models import ExceptionInfo
+
+
+def _real_exc_info() -> ExceptionInfo:
+ """Build a real ExceptionInfo by raising and catching, so the traceback
+ object is the actual one Python produces."""
+ try:
+ raise RuntimeError("boom")
+ except RuntimeError:
+ exc, value, tb = sys.exc_info()
+ return ExceptionInfo(exc=exc, value=value, tb=tb)
+
+
+class TestExceptionManager:
+ def test_initial_state(self):
+ mgr = ExceptionManager()
+ assert mgr.exc_info is None
+ assert mgr.exc_info_history == []
+ assert mgr.has_exception() is False
+
+ def test_set_then_has_exception_true(self):
+ mgr = ExceptionManager()
+ info = _real_exc_info()
+ mgr.set_exception_info(info)
+ assert mgr.has_exception() is True
+ assert mgr.exc_info is info
+ assert mgr.exc_info_history == [info]
+
+ def test_get_exc_info_returns_and_clears_current_only(self):
+ # Pin the documented contract: get_exc_info returns the latest stashed
+ # info AND clears the live slot, but the history must keep it. A
+ # regression that also clears history would break replay/retry flows.
+ mgr = ExceptionManager()
+ info = _real_exc_info()
+ mgr.set_exception_info(info)
+
+ assert mgr.get_exc_info() is info
+ assert mgr.exc_info is None
+ assert mgr.has_exception() is False
+ assert mgr.exc_info_history == [info]
+
+ def test_get_exc_info_when_none_returns_none(self):
+ mgr = ExceptionManager()
+ assert mgr.get_exc_info() is None
+
+ def test_history_accumulates_in_order(self):
+ mgr = ExceptionManager()
+ first = _real_exc_info()
+ second = _real_exc_info()
+ mgr.set_exception_info(first)
+ mgr.set_exception_info(second)
+ assert mgr.exc_info is second
+ assert mgr.exc_info_history == [first, second]
+ # Consuming the latest must leave both entries in history.
+ assert mgr.get_exc_info() is second
+ assert mgr.exc_info_history == [first, second]
diff --git
a/amber/src/main/python/core/architecture/managers/test_state_processing_manager.py
b/amber/src/main/python/core/architecture/managers/test_state_processing_manager.py
new file mode 100644
index 0000000000..b55fc7a236
--- /dev/null
+++
b/amber/src/main/python/core/architecture/managers/test_state_processing_manager.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from core.architecture.managers.state_processing_manager import
StateProcessingManager
+from core.models.state import State
+
+
+class TestStateProcessingManager:
+ def test_initial_state_is_none(self):
+ mgr = StateProcessingManager()
+ assert mgr.current_input_state is None
+ assert mgr.current_output_state is None
+ assert mgr.get_input_state() is None
+ assert mgr.get_output_state() is None
+
+ def test_get_input_state_returns_then_clears(self):
+ # The contract is "consume-once": the first get returns the stashed
+ # value, and subsequent gets see None until the slot is set again.
+ mgr = StateProcessingManager()
+ s = State({"k": "v"})
+ mgr.current_input_state = s
+ assert mgr.get_input_state() is s
+ assert mgr.current_input_state is None
+ assert mgr.get_input_state() is None
+
+ def test_get_output_state_returns_then_clears(self):
+ mgr = StateProcessingManager()
+ s = State({"k": "v"})
+ mgr.current_output_state = s
+ assert mgr.get_output_state() is s
+ assert mgr.current_output_state is None
+ assert mgr.get_output_state() is None
+
+ def test_input_and_output_slots_are_independent(self):
+ # Reading the input slot must not consume the output slot, and
+ # vice versa — pin the no-cross-talk contract.
+ mgr = StateProcessingManager()
+ in_s = State({"side": "in"})
+ out_s = State({"side": "out"})
+ mgr.current_input_state = in_s
+ mgr.current_output_state = out_s
+
+ assert mgr.get_input_state() is in_s
+ assert mgr.current_output_state is out_s
+ assert mgr.get_output_state() is out_s
diff --git
a/amber/src/main/python/core/architecture/managers/test_statistics_manager.py
b/amber/src/main/python/core/architecture/managers/test_statistics_manager.py
new file mode 100644
index 0000000000..26d1a3ec64
--- /dev/null
+++
b/amber/src/main/python/core/architecture/managers/test_statistics_manager.py
@@ -0,0 +1,165 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from core.architecture.managers.statistics_manager import StatisticsManager
+from proto.org.apache.texera.amber.core import PortIdentity
+
+
+def _port(pid: int) -> PortIdentity:
+ return PortIdentity(id=pid, internal=False)
+
+
+class TestStatisticsManagerDefaults:
+ def test_get_statistics_with_no_activity(self):
+ stats = StatisticsManager().get_statistics()
+ assert list(stats.input_tuple_metrics) == []
+ assert list(stats.output_tuple_metrics) == []
+ assert stats.data_processing_time == 0
+ assert stats.control_processing_time == 0
+ # idle_time = total_execution - data - control = 0 at init.
+ assert stats.idle_time == 0
+
+
+class TestStatisticsManagerInputOutput:
+ def test_increase_input_aggregates_count_and_size_per_port(self):
+ mgr = StatisticsManager()
+ mgr.increase_input_statistics(_port(0), 10)
+ mgr.increase_input_statistics(_port(0), 5)
+ mgr.increase_input_statistics(_port(1), 7)
+
+ stats = mgr.get_statistics()
+ by_port = {m.port_id.id: m.tuple_metrics for m in
stats.input_tuple_metrics}
+ assert by_port[0].count == 2
+ assert by_port[0].size == 15
+ assert by_port[1].count == 1
+ assert by_port[1].size == 7
+ # Output side stayed empty.
+ assert list(stats.output_tuple_metrics) == []
+
+ def test_increase_output_aggregates_count_and_size_per_port(self):
+ mgr = StatisticsManager()
+ mgr.increase_output_statistics(_port(2), 100)
+ mgr.increase_output_statistics(_port(2), 200)
+
+ stats = mgr.get_statistics()
+ by_port = {m.port_id.id: m.tuple_metrics for m in
stats.output_tuple_metrics}
+ assert by_port[2].count == 2
+ assert by_port[2].size == 300
+ assert list(stats.input_tuple_metrics) == []
+
+ def test_zero_size_input_is_allowed(self):
+ # Pin: zero is valid (size validation is `< 0`, not `<= 0`).
+ # Empty tuples / heartbeat-style records can legitimately be size 0.
+ mgr = StatisticsManager()
+ mgr.increase_input_statistics(_port(0), 0)
+ stats = mgr.get_statistics()
+ m = list(stats.input_tuple_metrics)[0].tuple_metrics
+ assert m.count == 1
+ assert m.size == 0
+
+ @pytest.mark.parametrize(
+ "method", ["increase_input_statistics", "increase_output_statistics"]
+ )
+ def test_negative_size_raises(self, method):
+ mgr = StatisticsManager()
+ with pytest.raises(ValueError, match="Tuple size must be
non-negative"):
+ getattr(mgr, method)(_port(0), -1)
+
+
+class TestStatisticsManagerProcessingTime:
+ def test_data_and_control_time_accumulate(self):
+ mgr = StatisticsManager()
+ mgr.increase_data_processing_time(100)
+ mgr.increase_data_processing_time(50)
+ mgr.increase_control_processing_time(20)
+ stats = mgr.get_statistics()
+ assert stats.data_processing_time == 150
+ assert stats.control_processing_time == 20
+
+ def test_zero_processing_time_is_allowed(self):
+ mgr = StatisticsManager()
+ mgr.increase_data_processing_time(0)
+ mgr.increase_control_processing_time(0)
+ stats = mgr.get_statistics()
+ assert stats.data_processing_time == 0
+ assert stats.control_processing_time == 0
+
+ @pytest.mark.parametrize(
+ "method",
+ ["increase_data_processing_time", "increase_control_processing_time"],
+ )
+ def test_negative_time_raises(self, method):
+ mgr = StatisticsManager()
+ with pytest.raises(ValueError, match="Time must be non-negative"):
+ getattr(mgr, method)(-1)
+
+
+class TestStatisticsManagerExecutionTime:
+ def test_total_execution_time_is_relative_to_worker_start(self):
+ mgr = StatisticsManager()
+ mgr.initialize_worker_start_time(1_000)
+ mgr.update_total_execution_time(1_500)
+ stats = mgr.get_statistics()
+ # idle = total_exec - data - control = 500 - 0 - 0
+ assert stats.idle_time == 500
+
+ def test_total_execution_time_equal_to_start_is_allowed(self):
+ # The validation is `time < start`, so equality is OK and yields 0.
+ mgr = StatisticsManager()
+ mgr.initialize_worker_start_time(1_000)
+ mgr.update_total_execution_time(1_000)
+ assert mgr.get_statistics().idle_time == 0
+
+ def test_total_execution_time_before_start_raises(self):
+ mgr = StatisticsManager()
+ mgr.initialize_worker_start_time(1_000)
+ with pytest.raises(
+ ValueError,
+ match="Current time must be greater than or equal to worker start
time",
+ ):
+ mgr.update_total_execution_time(999)
+
+ def test_idle_time_can_go_negative_when_processing_exceeds_total(self):
+ # Pin a real-but-questionable behavior: get_statistics computes
+ # idle_time = total_execution - data - control with NO clamp. If
+ # instrumentation overcounts (or update_total_execution_time was
+ # called early), idle goes negative. Filed as a Bug — see the
+ # accompanying issue. A future fix that floors at 0 must also
+ # update this test deliberately.
+ mgr = StatisticsManager()
+ mgr.initialize_worker_start_time(1_000)
+ mgr.update_total_execution_time(1_100) # 100ns total
+ mgr.increase_data_processing_time(80)
+ mgr.increase_control_processing_time(50) # 130 > 100 total
+ stats = mgr.get_statistics()
+ assert stats.idle_time == -30
+
+ @pytest.mark.xfail(
+ strict=True,
+ reason="Bug: idle_time goes negative when data+control processing time
"
+ "overshoots total_execution_time. The fix should floor at 0 (or
surface "
+ "the inconsistency); flips to XPASS when corrected.",
+ )
+ def test_idle_time_should_never_be_negative(self):
+ mgr = StatisticsManager()
+ mgr.initialize_worker_start_time(1_000)
+ mgr.update_total_execution_time(1_100)
+ mgr.increase_data_processing_time(80)
+ mgr.increase_control_processing_time(50)
+ assert mgr.get_statistics().idle_time >= 0
diff --git
a/amber/src/main/python/core/architecture/managers/test_tuple_processing_manager.py
b/amber/src/main/python/core/architecture/managers/test_tuple_processing_manager.py
new file mode 100644
index 0000000000..a970a34e35
--- /dev/null
+++
b/amber/src/main/python/core/architecture/managers/test_tuple_processing_manager.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from threading import Condition, Event
+
+from core.architecture.managers.tuple_processing_manager import
TupleProcessingManager
+from core.models import InternalMarker
+from proto.org.apache.texera.amber.core import PortIdentity
+
+
+class TestTupleProcessingManager:
+ def test_initial_state(self):
+ mgr = TupleProcessingManager()
+ assert mgr.current_input_tuple is None
+ assert mgr.current_input_port_id is None
+ assert mgr.current_input_tuple_iter is None
+ assert mgr.current_output_tuple is None
+ assert mgr.current_internal_marker is None
+ assert isinstance(mgr.context_switch_condition, Condition)
+ assert isinstance(mgr.finished_current, Event)
+ assert mgr.finished_current.is_set() is False
+
+ def test_get_internal_marker_consume_once(self):
+ mgr = TupleProcessingManager()
+ marker = InternalMarker()
+ mgr.current_internal_marker = marker
+ assert mgr.get_internal_marker() is marker
+ assert mgr.current_internal_marker is None
+ assert mgr.get_internal_marker() is None
+
+ def test_get_input_tuple_consume_once(self):
+ mgr = TupleProcessingManager()
+ sentinel = object()
+ mgr.current_input_tuple = sentinel
+ assert mgr.get_input_tuple() is sentinel
+ assert mgr.current_input_tuple is None
+ assert mgr.get_input_tuple() is None
+
+ def test_get_output_tuple_consume_once(self):
+ mgr = TupleProcessingManager()
+ sentinel = object()
+ mgr.current_output_tuple = sentinel
+ assert mgr.get_output_tuple() is sentinel
+ assert mgr.current_output_tuple is None
+ assert mgr.get_output_tuple() is None
+
+ def test_get_input_port_id_returns_zero_when_unset(self):
+ # Documented "no upstream / source executor" fallback. Worth pinning
+ # because it conflates "unset" with "real port id 0" — see the
+ # follow-up test below that exposes the collision.
+ mgr = TupleProcessingManager()
+ assert mgr.current_input_port_id is None
+ assert mgr.get_input_port_id() == 0
+
+ def test_get_input_port_id_returns_real_port_id(self):
+ mgr = TupleProcessingManager()
+ mgr.current_input_port_id = PortIdentity(id=7, internal=False)
+ assert mgr.get_input_port_id() == 7
+
+ def test_get_input_port_id_collides_for_port_zero(self):
+ # Pin: a real port with id=0 is indistinguishable from the
+ # "no upstream" sentinel. If callers ever need to tell them apart,
+ # the API has to change — this test guards the current behavior so
+ # any future fix breaks it deliberately.
+ mgr = TupleProcessingManager()
+ mgr.current_input_port_id = PortIdentity(id=0, internal=False)
+ assert mgr.get_input_port_id() == 0
+ # And the sentinel path also returns 0.
+ mgr.current_input_port_id = None
+ assert mgr.get_input_port_id() == 0
+
+ def test_finished_current_event_can_be_signalled(self):
+ mgr = TupleProcessingManager()
+ mgr.finished_current.set()
+ assert mgr.finished_current.is_set() is True
+ mgr.finished_current.clear()
+ assert mgr.finished_current.is_set() is False
+
+ def test_input_tuple_does_not_clear_output_or_marker(self):
+ mgr = TupleProcessingManager()
+ mgr.current_input_tuple = "in"
+ mgr.current_output_tuple = "out"
+ mgr.current_internal_marker = InternalMarker()
+ mgr.get_input_tuple()
+ assert mgr.current_output_tuple == "out"
+ assert mgr.current_internal_marker is not None