This is an automated email from the ASF dual-hosted git repository.
Yicong-Huang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 0c5681989f test(pyamber): add unit tests for Heartbeat (#4704)
0c5681989f is described below
commit 0c5681989fae462ec38fef9a510bb28380b3ea88
Author: Yicong Huang <[email protected]>
AuthorDate: Sat May 2 18:53:17 2026 -0700
test(pyamber): add unit tests for Heartbeat (#4704)
### What changes were proposed in this PR?
Adds pytest coverage for
`amber/src/main/python/core/runnables/heartbeat.py`. `stop()` is
intentionally not exercised — it sends `SIGTERM` to the test runner —
and `run()` is only verified on the early-exit branch (a pre-set stop
event).
### Any related issues, documentation, discussions?
Closes #4703.
Potential bug noted while reading the module (not pinned by these
tests): `_check_heartbeat`'s try/except wraps both the
`create_connection` and the subsequent `temp_socket.close()`. If
`close()` raises after a successful connect, the method logs "Server is
down" and returns `False` even though the JVM is reachable. The
double-check in `run()` masks transient occurrences but it is still
misleading. Worth deciding whether to narrow the try/except to the
connect call only.
### How was this PR tested?
```
cd amber/src/main/python
ruff check core/runnables/test_heartbeat.py
ruff format --check core/runnables/test_heartbeat.py
python -m pytest core/runnables/test_heartbeat.py
```
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-7)
---
.../main/python/core/runnables/test_heartbeat.py | 117 +++++++++++++++++++++
1 file changed, 117 insertions(+)
diff --git a/amber/src/main/python/core/runnables/test_heartbeat.py
b/amber/src/main/python/core/runnables/test_heartbeat.py
new file mode 100644
index 0000000000..76dc93071a
--- /dev/null
+++ b/amber/src/main/python/core/runnables/test_heartbeat.py
@@ -0,0 +1,117 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import socket
+from threading import Event
+from unittest.mock import patch, MagicMock
+
+import pytest
+
+from core.runnables.heartbeat import Heartbeat
+
+
+def make_heartbeat(host="localhost", port=12345, interval=0.05, event=None):
+ return Heartbeat(host, port, interval, event or Event())
+
+
+class TestHeartbeatInit:
+ def test_parses_host_and_port_from_grpc_tcp_url(self):
+ hb = make_heartbeat(host="example.test", port=9090)
+ assert hb._parsed_server_host == "example.test"
+ assert hb._parsed_server_port == 9090
+
+ def test_records_interval_and_stop_event_references(self):
+ event = Event()
+ hb = make_heartbeat(interval=2.5, event=event)
+ assert hb._interval == 2.5
+ assert hb._stop_event is event
+
+ def test_captures_original_parent_pid_at_construction_time(self):
+ with patch("core.runnables.heartbeat.os.getppid", return_value=4242):
+ hb = make_heartbeat()
+ assert hb._original_parent_pid == 4242
+
+ def test_supports_ipv6_host_in_bracketed_form(self):
+ hb = make_heartbeat(host="[::1]", port=9090)
+ assert hb._parsed_server_host == "::1"
+ assert hb._parsed_server_port == 9090
+
+
+class TestCheckHeartbeat:
+ def test_returns_true_when_socket_connects(self):
+ hb = make_heartbeat(host="h", port=1)
+ fake_sock = MagicMock()
+ with patch(
+ "core.runnables.heartbeat.socket.create_connection",
+ return_value=fake_sock,
+ ) as mock_connect:
+ assert hb._check_heartbeat() is True
+ mock_connect.assert_called_once_with(("h", 1), timeout=1)
+ fake_sock.close.assert_called_once()
+
+ def test_returns_false_when_socket_connection_raises(self):
+ hb = make_heartbeat()
+ with patch(
+ "core.runnables.heartbeat.socket.create_connection",
+ side_effect=ConnectionRefusedError("nope"),
+ ):
+ assert hb._check_heartbeat() is False
+
+ def test_returns_false_when_socket_connection_times_out(self):
+ hb = make_heartbeat()
+ with patch(
+ "core.runnables.heartbeat.socket.create_connection",
+ side_effect=socket.timeout("slow"),
+ ):
+ assert hb._check_heartbeat() is False
+
+ def test_returns_false_when_socket_close_raises(self):
+ # Pins the false-negative path: connect succeeds but the subsequent
+ # close() throws (e.g. broken pipe on a half-open socket). The bare
+ # `except Exception` in _check_heartbeat() catches it and reports
+ # "server down", and a regression that narrows that handler would be
+ # caught here.
+ hb = make_heartbeat()
+ fake_sock = MagicMock()
+ fake_sock.close.side_effect = OSError("close failed")
+ with patch(
+ "core.runnables.heartbeat.socket.create_connection",
+ return_value=fake_sock,
+ ):
+ assert hb._check_heartbeat() is False
+
+
+class TestRunEarlyExit:
+ @pytest.mark.timeout(2)
+ def test_returns_immediately_when_stop_event_is_already_set(self):
+ event = Event()
+ event.set()
+ hb = make_heartbeat(interval=10.0, event=event)
+ # Event.wait(timeout=10) returns immediately because the event is
+ # already set, so `while not self._stop_event.wait(...)` short-circuits
+ # before the loop body runs and _check_heartbeat() is never called.
+ # The pytest timeout above turns a regression that re-enters the loop
+ # (or blocks on wait()) into a fast failure rather than a hung CI job.
+ with patch.object(hb, "_check_heartbeat") as mock_check:
+ hb.run()
+ mock_check.assert_not_called()
+
+
[email protected]("port", [1, 65535, 8080])
+def test_init_accepts_full_port_range(port):
+ hb = make_heartbeat(port=port)
+ assert hb._parsed_server_port == port