Copilot commented on code in PR #4893:
URL: https://github.com/apache/texera/pull/4893#discussion_r3179277287


##########
amber/src/main/python/core/architecture/managers/test_console_message_manager.py:
##########
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime, timedelta
+
+from core.architecture.managers.console_message_manager import 
ConsoleMessageManager
+from proto.org.apache.texera.amber.engine.architecture.rpc import (
+    ConsoleMessage,
+    ConsoleMessageType,
+)
+
+
+def _msg(title: str) -> ConsoleMessage:
+    return ConsoleMessage(
+        worker_id="w0",
+        timestamp=datetime.now(),
+        msg_type=ConsoleMessageType.PRINT,
+        source="src",
+        title=title,
+        message=title,
+    )
+
+
+class TestConsoleMessageManager:
+    def test_initially_force_flush_drains_empty(self):
+        mgr = ConsoleMessageManager()
+        # No messages put yet — force_flush still yields zero items.
+        assert list(mgr.get_messages(force_flush=True)) == []
+
+    def test_force_flush_drains_all_buffered_in_order(self):
+        mgr = ConsoleMessageManager()
+        for t in ("a", "b", "c"):
+            mgr.put_message(_msg(t))
+        flushed = list(mgr.get_messages(force_flush=True))
+        assert [m.title for m in flushed] == ["a", "b", "c"]
+        # A second drain must come back empty — get() is consumptive.
+        assert list(mgr.get_messages(force_flush=True)) == []
+
+    def test_get_without_flush_below_threshold_yields_nothing(self):
+        # Below max_message_num (default 10) and within max_flush_interval
+        # (default 500ms) — the underlying TimedBuffer should withhold output.
+        mgr = ConsoleMessageManager()
+        mgr.put_message(_msg("only"))

Review Comment:
   `test_get_without_flush_below_threshold_yields_nothing` depends on real 
wall-clock timing: if the test runs >~1s after `ConsoleMessageManager()` 
construction (TimedBuffer uses `(now - _last_output_time).seconds`), the buffer 
may flush and make this assertion flaky. Make the test deterministic by 
explicitly setting `mgr.print_buf._last_output_time = datetime.now()` (or 
monkeypatching `datetime.now`) immediately before calling 
`get_messages(force_flush=False)`.
   



##########
amber/src/main/python/core/architecture/managers/test_console_message_manager.py:
##########
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime, timedelta
+
+from core.architecture.managers.console_message_manager import 
ConsoleMessageManager
+from proto.org.apache.texera.amber.engine.architecture.rpc import (
+    ConsoleMessage,
+    ConsoleMessageType,
+)
+
+
+def _msg(title: str) -> ConsoleMessage:
+    return ConsoleMessage(
+        worker_id="w0",
+        timestamp=datetime.now(),
+        msg_type=ConsoleMessageType.PRINT,
+        source="src",
+        title=title,
+        message=title,
+    )
+
+
+class TestConsoleMessageManager:
+    def test_initially_force_flush_drains_empty(self):
+        mgr = ConsoleMessageManager()
+        # No messages put yet — force_flush still yields zero items.
+        assert list(mgr.get_messages(force_flush=True)) == []
+
+    def test_force_flush_drains_all_buffered_in_order(self):
+        mgr = ConsoleMessageManager()
+        for t in ("a", "b", "c"):
+            mgr.put_message(_msg(t))
+        flushed = list(mgr.get_messages(force_flush=True))
+        assert [m.title for m in flushed] == ["a", "b", "c"]
+        # A second drain must come back empty — get() is consumptive.
+        assert list(mgr.get_messages(force_flush=True)) == []
+
+    def test_get_without_flush_below_threshold_yields_nothing(self):
+        # Below max_message_num (default 10) and within max_flush_interval
+        # (default 500ms) — the underlying TimedBuffer should withhold output.
+        mgr = ConsoleMessageManager()
+        mgr.put_message(_msg("only"))
+        assert list(mgr.get_messages(force_flush=False)) == []
+        # The withheld message must still be drainable on a force flush.
+        assert [m.title for m in mgr.get_messages(force_flush=True)] == 
["only"]
+
+    def test_get_without_flush_at_or_over_max_message_num_drains(self):
+        # Once buffered count crosses max_message_num (default 10), the
+        # buffer should auto-flush even without force_flush=True.
+        mgr = ConsoleMessageManager()
+        for i in range(10):
+            mgr.put_message(_msg(f"m{i}"))
+        flushed = [m.title for m in mgr.get_messages(force_flush=False)]
+        assert flushed == [f"m{i}" for i in range(10)]
+
+    def test_get_drains_when_last_output_time_is_stale(self, monkeypatch):
+        # Backdate the buffer's `_last_output_time` so the >=500ms branch
+        # fires even with a single message and force_flush=False.
+        mgr = ConsoleMessageManager()
+        mgr.put_message(_msg("stale"))
+        mgr.print_buf._last_output_time = datetime.now() - timedelta(seconds=2)
+        flushed = [m.title for m in mgr.get_messages(force_flush=False)]
+        assert flushed == ["stale"]

Review Comment:
   The `monkeypatch` fixture is accepted as an argument but never used in this 
test. Either remove the parameter or use it to patch time (which would also 
make the test more deterministic).



##########
amber/src/main/python/core/architecture/managers/test_embedded_control_message_manager.py:
##########
@@ -0,0 +1,192 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from types import SimpleNamespace
+from unittest.mock import MagicMock
+
+import pytest
+
+from core.architecture.managers.embedded_control_message_manager import (
+    EmbeddedControlMessageManager,
+)
+from proto.org.apache.texera.amber.core import (
+    ActorVirtualIdentity,
+    ChannelIdentity,
+    EmbeddedControlMessageIdentity,
+)
+from proto.org.apache.texera.amber.engine.architecture.rpc import (
+    EmbeddedControlMessage,
+    EmbeddedControlMessageType,
+)
+
+
+SELF_ID = ActorVirtualIdentity(name="self")
+
+
+def _channel(from_name: str, to_name: str = "self", is_control: bool = False):
+    return ChannelIdentity(
+        from_worker_id=ActorVirtualIdentity(name=from_name),
+        to_worker_id=ActorVirtualIdentity(name=to_name),
+        is_control=is_control,
+    )
+
+
+def _make_ecm(
+    ecm_type: EmbeddedControlMessageType,
+    scope=None,
+) -> EmbeddedControlMessage:
+    # Reuse the same identity *object* across calls so the dict in
+    # `EmbeddedControlMessageManager.ecm_received` aggregates under one key.
+    return EmbeddedControlMessage(
+        id=EmbeddedControlMessageIdentity(id="ecm-1"),
+        ecm_type=ecm_type,
+        scope=scope or [],
+    )

Review Comment:
   The comment claims the ECM identity object is reused across calls, but 
`_make_ecm` constructs a new `EmbeddedControlMessageIdentity(id="ecm-1")` each 
time. Either adjust the comment to reflect that the *same `ecm` instance* is 
reused within a test, or change the helper to actually reuse a shared identity 
object if that behavior matters for the dict keying.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to