This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemml.git


The following commit(s) were added to refs/heads/master by this push:
     new be7191c  [SYSTEMDS-393] Performance distributed connected components
be7191c is described below

commit be7191c2502ad7f5445891ceb671f335e88e51c9
Author: Matthias Boehm <mboe...@gmail.com>
AuthorDate: Sat May 30 00:15:11 2020 +0200

    [SYSTEMDS-393] Performance distributed connected components
    
    This patch makes a few tweaks to significantly improve the performance
    of the new connected components builtin function where the graph G does
    not fix in the driver memory and thus, spawns distributed spark
    operations.
    
    The test case was a 1M x 1M graph with 1G edges, ran with driver memory
    of 10GB and 9 executors 80GB each. The baseline runtime of 10 calls to
    connected components (each requiring 4 iterations until convergence) was
    pretty bad with 1,512s due to excessive shuffle and GC overhead.
    
    1) Modified Script: Removed the unnecessary removal of self-edges as the
    chosen update rule is robust enough to handle both cases. This removed
    the excessive shuffling overhead for matrix-matrix binary operations
    without existing hash partitioning. This change alone reduced the total
    runtime of 10 calls to 760s.
    
    2) Handling of approximately known sparsity: The large GC overhead was
    due to not converting the MCSR representation into read-optimized CSR
    during checkpointing (spark caching). We now compute these conditions
    with the upper bound information that is available in cases where the
    exact nnz is unknown. This further reduce the total runtime to 131s
    
    With codegen the runtime is further slightly improved to 120s (including
    spark context creation, and matrix creation) as we avoid materializing G
    * t(c) in memory by fusing it with rowMaxs(G * t(c)). For 40 update rule
    computations (and thus scans of the graph), this is fairly reasonable.
---
 scripts/builtin/components.dml                                   | 9 ++++-----
 src/main/java/org/apache/sysds/hops/OptimizerUtils.java          | 8 ++++++--
 .../sysds/runtime/instructions/spark/RandSPInstruction.java      | 3 +++
 3 files changed, 13 insertions(+), 7 deletions(-)

diff --git a/scripts/builtin/components.dml b/scripts/builtin/components.dml
index 51d96db..5f37c07 100644
--- a/scripts/builtin/components.dml
+++ b/scripts/builtin/components.dml
@@ -27,11 +27,10 @@
 m_components = function(Matrix[Double] G, Integer maxi = 0, Boolean verbose = 
TRUE) 
   return (Matrix[Double] C) 
 {
-  # ensure there are no self-edges in the graph
-  if( trace(G) != 0 ) {
-    G = G - diag(diag(G));
-    if(verbose)
-      print("Connected Components: warning - removed self-edges from input 
graph");
+  # best effort check for symmetry (not exact but fast)
+  if( sum(rowSums(G) != t(colSums(G))) > 0 ) {
+    stop("Connected Components: input graph needs to be "
+       + "symmetric but rowSums and colSums don't match up.");
   }
 
   # initialize state with vertex ids
diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java 
b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
index ef2b5ff..213041f 100644
--- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
@@ -477,8 +477,12 @@ public class OptimizerUtils
        }
 
        public static boolean checkSparseBlockCSRConversion( 
DataCharacteristics dcIn ) {
-               return Checkpoint.CHECKPOINT_SPARSE_CSR
-                       && OptimizerUtils.getSparsity(dcIn) < 
MatrixBlock.SPARSITY_TURN_POINT;
+               //we use the non-zero bound to make the important csr decision 
in 
+               //an best effort manner (the precise non-zeros is irrelevant 
here)
+               double sp = OptimizerUtils.getSparsity(
+                       dcIn.getRows(), dcIn.getCols(), 
dcIn.getNonZerosBound());
+               return Checkpoint.CHECKPOINT_SPARSE_CSR 
+                       && sp < MatrixBlock.SPARSITY_TURN_POINT;
        }
        
        /**
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
index ef40773..17315f0 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/RandSPInstruction.java
@@ -403,8 +403,11 @@ public class RandSPInstruction extends UnarySPInstruction {
                if(!mcOut.dimsKnown(true)) {
                        //note: we cannot compute the nnz from sparsity because 
this would not reflect the
                        //actual number of non-zeros, except for extreme values 
of sparsity equals 0 or 1.
+                       //However, in all cases we keep this information for 
more coarse-grained decisions.
                        long lnnz = (sparsity==0 || sparsity==1) ? (long) 
(sparsity*lrows*lcols) : -1;
                        mcOut.set(lrows, lcols, blocksize, lnnz);
+                       if( !mcOut.nnzKnown() )
+                               mcOut.setNonZerosBound((long) 
(sparsity*lrows*lcols));
                }
                sec.setRDDHandleForVariable(output.getName(), out);
        }

Reply via email to