This is an automated email from the ASF dual-hosted git repository.

Baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 34a19f001f [SYSTEMDS-2651] Extend TCP port polling to federated 
monitoring backend
34a19f001f is described below

commit 34a19f001f72da2d029f55c1f7789809cbf0c45c
Author: Sebastian Baunsgaard <[email protected]>
AuthorDate: Tue May 26 01:30:47 2026 +0200

    [SYSTEMDS-2651] Extend TCP port polling to federated monitoring backend
    
    Wire startLocalFedMonitoring through FederatedWorkerUtils.waitForWorker so
    the monitoring backend's port-bind is polled instead of slept on (fixes
    flaky FederatedCoordinatorIntegrationCRUDTest), migrate FederatedLogicalTest
    to the bulk startLocalFedWorkers(int[]) API, and drop the now-unused
    FED_WORKER_WAIT_S and FED_MONITOR_WAIT constants.
---
 .../org/apache/sysds/test/AutomatedTestBase.java   | 62 ++++++++++++++--------
 .../primitives/part4/FederatedLogicalTest.java     | 22 ++++----
 2 files changed, 49 insertions(+), 35 deletions(-)

diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java 
b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index 85a37b7dbd..150a358bdf 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -20,7 +20,6 @@
 package org.apache.sysds.test;
 
 import static java.lang.Math.ceil;
-import static java.lang.Thread.sleep;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
@@ -118,15 +117,10 @@ public abstract class AutomatedTestBase {
        public static final double GPU_TOLERANCE = 1e-9;
 
        /**
-        * Default upper bound (ms) passed to federated worker readiness waits. 
The wait returns as soon
-        * as the worker's TCP port accepts a connection, so this value only 
affects the deadline used
-        * when a worker never becomes ready. {@link FederatedWorkerUtils} 
clamps caller values below its
-        * enforced floor up to that floor, so the effective ceiling is at 
least that floor regardless
-        * of this constant.
+        * Default deadline (ms) for federated worker/monitoring readiness 
waits and a few legacy
+        * {@code sleep()} calls. {@link FederatedWorkerUtils} enforces its own 
minimum floor.
         */
        public static final int FED_WORKER_WAIT = 3000;
-       public static final int FED_MONITOR_WAIT = 10000;
-       public static final int FED_WORKER_WAIT_S = 50;
        
 
        // The timeout for a test to fail. all tests must execute in less than 
this time.
@@ -1765,29 +1759,53 @@ public abstract class AutomatedTestBase {
        }
 
        /**
-        * Start new JVM for a federated monitoring backend at the port.
+        * Start a new JVM for a federated monitoring backend at the port.
         *
-        * @param port Port to use for the JVM
-        * @return the process associated with the worker.
+        * <p>Returns once the backend's TCP port accepts connections (Netty's 
bind has completed), or
+        * throws a {@link RuntimeException} once the {@link 
FederatedWorkerUtils} readiness floor
+        * elapses.
+        *
+        * @param port    Port to use for the JVM
+        * @param addArgs Extra CLI args to append, or null
+        * @return the process associated with the monitoring backend.
         */
        protected Process startLocalFedMonitoring(int port, String[] addArgs) {
-               Process process = null;
+               return startLocalFedMonitoring(port, addArgs, FED_WORKER_WAIT);
+       }
+
+       /**
+        * Start a new JVM for a federated monitoring backend at the port.
+        *
+        * <p>Returns once the backend's TCP port accepts connections, or 
throws a
+        * {@link RuntimeException} after {@code timeoutMs} elapses. The 
monitoring server opens the
+        * port after Netty's {@code bind().sync()} returns; a successful TCP 
connect therefore signals
+        * that the HTTP listener is ready to accept requests.
+        *
+        * @param port      Port to use for the JVM
+        * @param addArgs   Extra CLI args to append, or null
+        * @param timeoutMs Upper bound on the wait, in ms; raised to a minimum 
value enforced inside
+        *                  {@link FederatedWorkerUtils}.
+        * @return the process associated with the monitoring backend.
+        */
+       protected Process startLocalFedMonitoring(int port, String[] addArgs, 
int timeoutMs) {
+               Process process = spawnLocalFedMonitoring(port, addArgs);
+               FederatedWorkerUtils.waitForWorker(port, timeoutMs, 
process::isAlive, "monitoring process");
+               return process;
+       }
+
+       /** Spawn a federated monitoring backend JVM and return without waiting 
for the port to bind. */
+       private static Process spawnLocalFedMonitoring(int port, String[] 
addArgs) {
                String separator = System.getProperty("file.separator");
                String classpath = System.getProperty("java.class.path");
                String path = System.getProperty("java.home") + separator + 
"bin" + separator + "java";
-               String[] args = ArrayUtils.addAll(new String[]{path, "-cp", 
classpath, DMLScript.class.getName(),
-                               "-fedMonitoring", Integer.toString(port)}, 
addArgs);
-               ProcessBuilder processBuilder = new ProcessBuilder(args);
-
+               String[] args = ArrayUtils.addAll(new String[] {path, "-cp", 
classpath, DMLScript.class.getName(),
+                       "-fedMonitoring", Integer.toString(port)}, addArgs);
                try {
-                       process = processBuilder.start();
-                       // Wait till process is started
-                       sleep(FED_MONITOR_WAIT);
+                       return new ProcessBuilder(args).start();
                }
-               catch(IOException | InterruptedException e) {
-                       throw new RuntimeException(e);
+               catch(IOException e) {
+                       throw new RuntimeException("Failed to launch federated 
monitoring process on port " + port, e);
                }
-               return process;
        }
 
        /**
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/part4/FederatedLogicalTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/part4/FederatedLogicalTest.java
index ba1e7e0ea0..f8acdd0793 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/part4/FederatedLogicalTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/part4/FederatedLogicalTest.java
@@ -372,17 +372,15 @@ public class FederatedLogicalTest extends 
AutomatedTestBase {
                // empty script name because we don't execute any script, just 
start the worker
                fullDMLScriptName = "";
                int port1 = getRandomAvailablePort();
-               int port2 = (!single_fed_worker ? getRandomAvailablePort() : 0);
-               int port3 = (!single_fed_worker ? getRandomAvailablePort() : 0);
-               int port4 = (!single_fed_worker ? getRandomAvailablePort() : 0);
-               Process thread1 = startLocalFedWorker(port1, 
(!single_fed_worker ? FED_WORKER_WAIT_S : FED_WORKER_WAIT));
-               Process thread2 = (!single_fed_worker ? 
startLocalFedWorker(port2, FED_WORKER_WAIT_S) : null);
-               Process thread3 = (!single_fed_worker ? 
startLocalFedWorker(port3, FED_WORKER_WAIT_S) : null);
-               Process thread4 = (!single_fed_worker ? 
startLocalFedWorker(port4) : null);
-
-               
+               int port2 = single_fed_worker ? 0 : getRandomAvailablePort();
+               int port3 = single_fed_worker ? 0 : getRandomAvailablePort();
+               int port4 = single_fed_worker ? 0 : getRandomAvailablePort();
+               Process[] workers = startLocalFedWorkers(single_fed_worker
+                       ? new int[] {port1}
+                       : new int[] {port1, port2, port3, port4});
+
                try {
-                       if(!isAlive(thread1))
+                       if(!isAlive(workers))
                                throw new RuntimeException("Failed starting 
federated worker");
 
                        getAndLoadTestConfiguration(testname);
@@ -449,9 +447,7 @@ public class FederatedLogicalTest extends AutomatedTestBase 
{
                        }
                }
                finally {
-                       TestUtils.shutdownThreads(thread1);
-                       if(!single_fed_worker)
-                               TestUtils.shutdownThreads(thread2, thread3, 
thread4);
+                       TestUtils.shutdownThreads(workers);
 
                        resetExecMode(platform_old);
                }

Reply via email to