This is an automated email from the ASF dual-hosted git repository. mboehm7 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/systemml.git
The following commit(s) were added to refs/heads/master by this push: new be7191c [SYSTEMDS-393] Performance distributed connected components be7191c is described below commit be7191c2502ad7f5445891ceb671f335e88e51c9 Author: Matthias Boehm <mboe...@gmail.com> AuthorDate: Sat May 30 00:15:11 2020 +0200 [SYSTEMDS-393] Performance distributed connected components This patch makes a few tweaks to significantly improve the performance of the new connected components builtin function where the graph G does not fix in the driver memory and thus, spawns distributed spark operations. The test case was a 1M x 1M graph with 1G edges, ran with driver memory of 10GB and 9 executors 80GB each. The baseline runtime of 10 calls to connected components (each requiring 4 iterations until convergence) was pretty bad with 1,512s due to excessive shuffle and GC overhead. 1) Modified Script: Removed the unnecessary removal of self-edges as the chosen update rule is robust enough to handle both cases. This removed the excessive shuffling overhead for matrix-matrix binary operations without existing hash partitioning. This change alone reduced the total runtime of 10 calls to 760s. 2) Handling of approximately known sparsity: The large GC overhead was due to not converting the MCSR representation into read-optimized CSR during checkpointing (spark caching). We now compute these conditions with the upper bound information that is available in cases where the exact nnz is unknown. This further reduce the total runtime to 131s With codegen the runtime is further slightly improved to 120s (including spark context creation, and matrix creation) as we avoid materializing G * t(c) in memory by fusing it with rowMaxs(G * t(c)). For 40 update rule computations (and thus scans of the graph), this is fairly reasonable. --- scripts/builtin/components.dml | 9 ++++----- src/main/java/org/apache/sysds/hops/OptimizerUtils.java | 8 ++++++-- .../sysds/runtime/instructions/spark/RandSPInstruction.java | 3 +++ 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/scripts/builtin/components.dml b/scripts/builtin/components.dml index 51d96db..5f37c07 100644 --- a/scripts/builtin/components.dml +++ b/scripts/builtin/components.dml @@ -27,11 +27,10 @@ m_components = function(Matrix[Double] G, Integer maxi = 0, Boolean verbose = TRUE) return (Matrix[Double] C) { - # ensure there are no self-edges in the graph - if( trace(G) != 0 ) { - G = G - diag(diag(G)); - if(verbose) - print("Connected Components: warning - removed self-edges from input graph"); + # best effort check for symmetry (not exact but fast) + if( sum(rowSums(G) != t(colSums(G))) > 0 ) { + stop("Connected Components: input graph needs to be " + + "symmetric but rowSums and colSums don't match up."); } # initialize state with vertex ids diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java index ef2b5ff..213041f 100644 --- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java +++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java @@ -477,8 +477,12 @@ public class OptimizerUtils } public static boolean checkSparseBlockCSRConversion( DataCharacteristics dcIn ) { - return Checkpoint.CHECKPOINT_SPARSE_CSR - && OptimizerUtils.getSparsity(dcIn) < MatrixBlock.SPARSITY_TURN_POINT; + //we use the non-zero bound to make the important csr decision in + //an best effort manner (the precise non-zeros is irrelevant here) + double sp = OptimizerUtils.getSparsity( + dcIn.getRows(), dcIn.getCols(), dcIn.getNonZerosBound()); + return Checkpoint.CHECKPOINT_SPARSE_CSR + && sp < MatrixBlock.SPARSITY_TURN_POINT; } /** diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java index ef40773..17315f0 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java @@ -403,8 +403,11 @@ public class RandSPInstruction extends UnarySPInstruction { if(!mcOut.dimsKnown(true)) { //note: we cannot compute the nnz from sparsity because this would not reflect the //actual number of non-zeros, except for extreme values of sparsity equals 0 or 1. + //However, in all cases we keep this information for more coarse-grained decisions. long lnnz = (sparsity==0 || sparsity==1) ? (long) (sparsity*lrows*lcols) : -1; mcOut.set(lrows, lcols, blocksize, lnnz); + if( !mcOut.nnzKnown() ) + mcOut.setNonZerosBound((long) (sparsity*lrows*lcols)); } sec.setRDDHandleForVariable(output.getName(), out); }