This is an automated email from the ASF dual-hosted git repository.
Baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 88c26e2ae7 [SYSTEMDS-2651] Poll for async compression in federated
component tests (#2472)
88c26e2ae7 is described below
commit 88c26e2ae79c70d5b30bc881c03c368255d8a470
Author: Sebastian Baunsgaard <[email protected]>
AuthorDate: Tue May 26 16:46:13 2026 +0200
[SYSTEMDS-2651] Poll for async compression in federated component tests
(#2472)
FedWorkerReadMatrixCompress.verifyRead failed roughly once per ten
component-test CI runs because it called FederatedTestUtils.wait(1000)
to give the worker time to finish its async compression (kicked off by
CompressedMatrixBlockFactory.compressAsync), then asserted that the
returned block was a CompressedMatrixBlock. On a contended runner the
1 s sleep was not enough, the subsequent read returned the still-
uncompressed block, and the assertion failed. Surefire's
rerunFailingTestsCount=2 hid this as a "Flake" rather than a job
failure.
Add FedWorkerBase.awaitCompressed(long id), which polls getMatrixBlock
at 25 ms intervals for up to COMPRESS_TIMEOUT_MS (10 s) and returns as
soon as the worker reports the compressed form, or returns the last-
observed block on timeout so the caller's assertion still produces a
meaningful failure.
Convert the three call sites that used the fixed-sleep anti-pattern:
- FedWorkerReadMatrixCompress.verifyRead (the actual CI flake)
- FedWorkerMatrixCompress.verifySameOrAlsoCompressedAsLocalCompress
(polls only when local compresses, so the "do not compress"
parametrization stays fast)
- FedWorkerMatrixMultiplyWorkload.verifySameOrAlsoCompressedAsLocalCompress
Remove the now-unused FederatedTestUtils.wait helper so the
anti-pattern is harder to reintroduce.
---
.../test/component/federated/FedWorkerBase.java | 39 ++++++++++++++++++++++
.../federated/FedWorkerMatrixCompress.java | 12 ++++---
.../federated/FedWorkerMatrixMultiplyWorkload.java | 13 +++-----
.../federated/FedWorkerReadMatrixCompress.java | 7 ++--
.../component/federated/FederatedTestUtils.java | 9 -----
5 files changed, 54 insertions(+), 26 deletions(-)
diff --git
a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerBase.java
b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerBase.java
index 1bf5d33006..2c854b4a81 100644
--- a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerBase.java
+++ b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerBase.java
@@ -26,12 +26,19 @@ import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.test.AutomatedTestBase;
public abstract class FedWorkerBase {
protected static final Log LOG =
LogFactory.getLog(FedWorkerBase.class.getName());
+ /** Upper bound (ms) for {@link #awaitCompressed(long)} polling against
async worker-side compression. */
+ protected static final int COMPRESS_TIMEOUT_MS = 10_000;
+
+ /** Poll interval used by {@link #awaitCompressed(long)} between
successive reads. */
+ private static final int COMPRESS_POLL_INTERVAL_MS = 25;
+
private final InetSocketAddress addr;
public final int port;
@@ -70,6 +77,38 @@ public abstract class FedWorkerBase {
return FederatedTestUtils.getMatrixBlock(id, addr);
}
+ /**
+ * Poll the federated worker until the matrix at {@code id} is observed
as a
+ * {@link CompressedMatrixBlock}, or {@link #COMPRESS_TIMEOUT_MS}
elapses.
+ *
+ * <p>Federated workers compress asynchronously after a PUT/READ_VAR
(see
+ * {@code CompressedMatrixBlockFactory.compressAsync}), so a {@code
getMatrixBlock} fired right
+ * after the operation can race against the in-flight compression and
return the uncompressed
+ * block. Tests that need to observe the compressed form should poll
instead of sleeping a fixed
+ * amount.
+ *
+ * <p>On timeout this returns the most recent (uncompressed) read so
the caller can produce a
+ * meaningful assertion failure naming the variable.
+ *
+ * @param id federated variable id
+ * @return the matrix block, compressed if compression finished in
time, otherwise the latest read
+ */
+ public MatrixBlock awaitCompressed(long id) {
+ final long deadline = System.currentTimeMillis() +
COMPRESS_TIMEOUT_MS;
+ MatrixBlock mb = getMatrixBlock(id);
+ while(!(mb instanceof CompressedMatrixBlock) &&
System.currentTimeMillis() < deadline) {
+ try {
+ Thread.sleep(COMPRESS_POLL_INTERVAL_MS);
+ }
+ catch(InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ fail("Interrupted while waiting for federated
compression of id=" + id);
+ }
+ mb = getMatrixBlock(id);
+ }
+ return mb;
+ }
+
public long matrixMult(long idLeft, long idRight) {
return FederatedTestUtils.exec_MM(idLeft, idRight, addr);
}
diff --git
a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixCompress.java
b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixCompress.java
index 29c6f94e7a..2b5ff327ef 100644
---
a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixCompress.java
+++
b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixCompress.java
@@ -65,14 +65,16 @@ public class FedWorkerMatrixCompress extends FedWorkerBase {
// local
final MatrixBlock mbcLocal =
CompressedMatrixBlockFactory.compress(mb).getLeft();
- // federated
+ // federated. Compression on the worker is async; poll only
when we expect compression to
+ // match the local result, otherwise a single read is enough.
final long id = putMatrixBlock(mb);
- // give the federated site time to compress async.
- FederatedTestUtils.wait(1000);
- final MatrixBlock mbr = getMatrixBlock(id);
+ final MatrixBlock mbr = (mbcLocal instanceof
CompressedMatrixBlock)
+ ? awaitCompressed(id)
+ : getMatrixBlock(id);
if(mbcLocal instanceof CompressedMatrixBlock && !(mbr
instanceof CompressedMatrixBlock))
- fail("Invalid result, the federated site did not
compress the matrix block");
+ fail("Invalid result, the federated site did not
compress the matrix block within "
+ + COMPRESS_TIMEOUT_MS + "ms");
TestUtils.compareMatricesBitAvgDistance(mbcLocal, mbr, 0, 0,
"Not equivalent matrix block returned from federated
site");
diff --git
a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixMultiplyWorkload.java
b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixMultiplyWorkload.java
index 06a193368c..59c9a093c4 100644
---
a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixMultiplyWorkload.java
+++
b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerMatrixMultiplyWorkload.java
@@ -88,19 +88,16 @@ public class FedWorkerMatrixMultiplyWorkload extends
FedWorkerBase {
for(int i = 0; i < 9; i++) // chain left side compressed
multiplications with idr.
ide = matrixMult(ide, idr);
- // give the federated site time to compress async (it should
already be done, but just to be safe).
- FederatedTestUtils.wait(1000);
-
- // Get back the matrix block stored behind mbr that should be
compressed now.
- final MatrixBlock mbr_compressed = getMatrixBlock(idr);
+ // Workload-driven compression runs async on the worker; poll
instead of sleeping a fixed
+ // amount so a slow runner doesn't observe the
still-uncompressed block.
+ final MatrixBlock mbr_compressed = awaitCompressed(idr);
if(!(mbr_compressed instanceof CompressedMatrixBlock))
- fail("Invalid result, the federated site did not
compress the matrix block based on workload");
+ fail("Invalid result, the federated site did not
compress the matrix block based on workload within "
+ + COMPRESS_TIMEOUT_MS + "ms");
TestUtils.compareMatricesBitAvgDistance(mbcLocal,
mbr_compressed, 0, 0,
"Not equivalent matrix block returned from federated
site");
}
-
-
}
diff --git
a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerReadMatrixCompress.java
b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerReadMatrixCompress.java
index ed47a87e1e..d94cd367a1 100644
---
a/src/test/java/org/apache/sysds/test/component/federated/FedWorkerReadMatrixCompress.java
+++
b/src/test/java/org/apache/sysds/test/component/federated/FedWorkerReadMatrixCompress.java
@@ -65,15 +65,14 @@ public class FedWorkerReadMatrixCompress extends
FedWorkerBase {
public void verifyRead() {
MatrixBlock expected = readCSV();
Long id = readMatrix(path);
- // give the federated site time to compress async.
- FederatedTestUtils.wait(1000);
- MatrixBlock actual = getMatrixBlock(id);
+ // Compression happens async on the worker; poll instead of
sleeping a fixed amount.
+ MatrixBlock actual = awaitCompressed(id);
if(actual instanceof CompressedMatrixBlock){
TestUtils.compareMatricesBitAvgDistance(expected,
actual, 0, 0,
"Not equivalent matrix block read from
federated site");
}
else
- fail("Did not compress the matrix input");
+ fail("Did not compress the matrix input within " +
COMPRESS_TIMEOUT_MS + "ms");
}
protected MatrixBlock readCSV() {
diff --git
a/src/test/java/org/apache/sysds/test/component/federated/FederatedTestUtils.java
b/src/test/java/org/apache/sysds/test/component/federated/FederatedTestUtils.java
index 4d3796892a..9b589c35f7 100644
---
a/src/test/java/org/apache/sysds/test/component/federated/FederatedTestUtils.java
+++
b/src/test/java/org/apache/sysds/test/component/federated/FederatedTestUtils.java
@@ -190,13 +190,4 @@ public class FederatedTestUtils {
fail("Failed to get response from put Matrix Block");
}
}
-
- protected static void wait(int ms) {
- try {
- Thread.sleep(ms);
- }
- catch(Exception e) {
- fail("Failed to wait");
- }
- }
}