This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new fdd60f6d10 [SYSTEMDS-3662] Parfor Merge Sparse
fdd60f6d10 is described below

commit fdd60f6d10acb72239fafc7507bd62b73941153f
Author: Sebastian Baunsgaard <[email protected]>
AuthorDate: Fri Jan 5 13:06:58 2024 +0100

    [SYSTEMDS-3662] Parfor Merge Sparse
    
    This commit optimize the parfor merge.
    In the case of Kmeans with 10 runs it optimize the merge phase from
    19 to 1 sec because it exploits the sparsity of the merging blocks.
    
    Closes #1971
---
 .../controlprogram/parfor/ResultMergeMatrix.java   | 192 +++++++++++++++------
 1 file changed, 143 insertions(+), 49 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeMatrix.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeMatrix.java
index 6e0a3c4d0c..d90b9e177b 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeMatrix.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeMatrix.java
@@ -22,37 +22,51 @@ package org.apache.sysds.runtime.controlprogram.parfor;
 import java.util.List;
 
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.compress.utils.Util;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.data.DenseBlock;
+import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
 /**
+ * <p>
  * Due to independence of all iterations, any result has the following 
properties:
- * (1) non local var, (2) matrix object, and (3) completely independent.
- * These properties allow us to realize result merging in parallel without any 
synchronization. 
+ * </p>
  * 
+ * <p>
+ * (1) non local var,
+ * </p>
+ * <p>
+ * (2) matrix object, and
+ * </p>
+ * <p>
+ * (3) completely independent.
+ * </p>
+ * 
+ * <p>
+ * These properties allow us to realize result merging in parallel without any 
synchronization.
+ * </p>
  */
-public abstract class ResultMergeMatrix extends ResultMerge<MatrixObject>
-{
+public abstract class ResultMergeMatrix extends ResultMerge<MatrixObject> {
        private static final long serialVersionUID = 5319002218804570071L;
-       
+
        public ResultMergeMatrix() {
                super();
        }
-       
+
        public ResultMergeMatrix(MatrixObject out, MatrixObject[] in, String 
outputFilename, boolean accum) {
                super(out, in, outputFilename, accum);
        }
-       
-       protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, 
boolean appendOnly ) {
+
+       protected void mergeWithoutComp(MatrixBlock out, MatrixBlock in, 
boolean appendOnly) {
                mergeWithoutComp(out, in, appendOnly, false);
        }
-       
-       protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, 
boolean appendOnly, boolean par ) {
-               //pass through to matrix block operations
-               if( _isAccum )
+
+       protected void mergeWithoutComp(MatrixBlock out, MatrixBlock in, 
boolean appendOnly, boolean par) {
+               // pass through to matrix block operations
+               if(_isAccum)
                        out.binaryOperationsInPlace(PLUS, in);
-               else{
+               else {
                        MatrixBlock out2 = out.merge(in, appendOnly, par);
 
                        if(out2 != out)
@@ -61,52 +75,132 @@ public abstract class ResultMergeMatrix extends 
ResultMerge<MatrixObject>
        }
 
        /**
-        * NOTE: append only not applicable for wiht compare because output 
must be populated with
-        * initial state of matrix - with append, this would result in 
duplicates.
+        * NOTE: append only not applicable for with compare because output 
must be populated with initial state of matrix -
+        * with append, this would result in duplicates.
         * 
-        * @param out output matrix block
-        * @param in input matrix block
-        * @param compare ?
+        * @param out     output matrix block
+        * @param in      input matrix block
+        * @param compare Comparison matrix of old values.
         */
-       protected void mergeWithComp( MatrixBlock out, MatrixBlock in, 
DenseBlock compare ) 
-       {
-               //Notes for result correctness:
-               // * Always iterate over entire block in order to compare all 
values 
-               //   (using sparse iterator would miss values set to 0) 
+       protected void mergeWithComp(MatrixBlock out, MatrixBlock in, 
DenseBlock compare) {
+               // Notes for result correctness:
+               // * Always iterate over entire block in order to compare all 
values
+               // (using sparse iterator would miss values set to 0)
                // * Explicit NaN awareness because for cases were original 
matrix contains
-               //   NaNs, since NaN != NaN, otherwise we would potentially 
overwrite results
+               // NaNs, since NaN != NaN, otherwise we would potentially 
overwrite results
                // * For the case of accumulation, we add out += (new-old) to 
ensure correct results
-               //   because all inputs have the old values replicated
-               
-               if( in.isEmptyBlock(false) ) {
-                       if( _isAccum ) return; //nothing to do
-                       for( int i=0; i<in.getNumRows(); i++ )
-                               for( int j=0; j<in.getNumColumns(); j++ )
-                                       if( compare.get(i, j) != 0 )
-                                               out.quickSetValue(i, j, 0);
+               // because all inputs have the old values replicated
+
+               final int rows = in.getNumRows();
+               final int cols = in.getNumColumns();
+               if(in.isEmptyBlock(false)) {
+                       if(_isAccum)
+                               return; // nothing to do
+                       mergeWithCompEmpty(out, rows, cols, compare);
+               }
+               else if(in.isInSparseFormat() && _isAccum)
+                       mergeSparseAccumulative(out, in, rows, cols, compare);
+               else if(in.isInSparseFormat())
+                       mergeSparse(out, in, rows, cols, compare);
+               else // SPARSE/DENSE
+                       mergeGeneric(out, in, rows, cols, compare);
+       }
+
+       private void mergeWithCompEmpty(MatrixBlock out, int m, int n, 
DenseBlock compare) {
+               for(int i = 0; i < m; i++)
+                       mergeWithCompEmptyRow(out, m, n, compare, i);
+       }
+
+       private void mergeWithCompEmptyRow(MatrixBlock out, int m, int n, 
DenseBlock compare, int i) {
+
+               for(int j = 0; j < n; j++) {
+                       final double valOld = compare.get(i, j);
+                       if(!Util.eq(0.0, valOld)) // NaN awareness
+                               out.quickSetValue(i, j, 0);
+               }
+       }
+
+       private void mergeSparseAccumulative(MatrixBlock out, MatrixBlock in, 
int m, int n, DenseBlock compare) {
+               final SparseBlock a = in.getSparseBlock();
+               for(int i = 0; i < m; i++) {
+                       if(a.isEmpty(i))
+                               continue;
+                       final int apos = a.pos(i);
+                       final int alen = a.size(i) + apos;
+                       final int[] aix = a.indexes(i);
+                       final double[] aval = a.values(i);
+                       mergeSparseRowAccumulative(out, apos, alen, aix, aval, 
compare, n, i);
+               }
+       }
+
+       private void mergeSparseRowAccumulative(MatrixBlock out, int apos, int 
alen, int[] aix, double[] aval,
+               DenseBlock compare, int n, int i) {
+               for(; apos < alen; apos++) { // inside
+                       final double valOld = compare.get(i, aix[apos]);
+                       final double valNew = aval[apos];
+                       if(!Util.eq(valNew, valOld)) { // NaN awareness
+                               double value = out.quickGetValue(i, aix[apos]) 
+ (valNew - valOld);
+                               out.quickSetValue(i, aix[apos], value);
+                       }
+               }
+       }
+
+       private void mergeSparse(MatrixBlock out, MatrixBlock in, int m, int n, 
DenseBlock compare) {
+               final SparseBlock a = in.getSparseBlock();
+               for(int i = 0; i < m; i++) {
+                       if(a.isEmpty(i))
+                               mergeWithCompEmptyRow(out, m, n, compare, i);
+                       else {
+                               final int apos = a.pos(i);
+                               final int alen = a.size(i) + apos;
+                               final int[] aix = a.indexes(i);
+                               final double[] aval = a.values(i);
+                               mergeSparseRow(out, apos, alen, aix, aval, 
compare, n, i);
+                       }
                }
-               else { //SPARSE/DENSE
-                       int rows = in.getNumRows();
-                       int cols = in.getNumColumns();
-                       for( int i=0; i<rows; i++ )
-                               for( int j=0; j<cols; j++ ) {
-                                       double valOld = compare.get(i,j);
-                                       double valNew = in.quickGetValue(i,j); 
//input value
-                                       if( (valNew != valOld && 
!Double.isNaN(valNew) )      //for changed values 
-                                               || Double.isNaN(valNew) != 
Double.isNaN(valOld) ) //NaN awareness 
-                                       {
-                                               double value = !_isAccum ? 
valNew :
-                                                       (out.quickGetValue(i, 
j) + (valNew - valOld));
-                                               out.quickSetValue(i, j, value);
-                                       }
+       }
+
+       private void mergeSparseRow(MatrixBlock out, int apos, int alen, int[] 
aix, double[] aval, DenseBlock compare, int n,
+               int i) {
+               int j = 0;
+               for(; j < n && apos < alen; j++) { // inside
+                       final boolean aposValid = aix[apos] == j;
+                       final double valOld = compare.get(i, j);
+                       final double valNew = aix[apos] == j ? aval[apos] : 0.0;
+                       if(!Util.eq(valNew, valOld)) { // NaN awareness
+                               double value = !_isAccum ? valNew : 
(out.quickGetValue(i, j) + (valNew - valOld));
+                               out.quickSetValue(i, j, value);
+                       }
+                       if(aposValid)
+                               apos++;
+               }
+               for(; j < n; j++) {
+                       final double valOld = compare.get(i, j);
+                       if(valOld != 0) {
+                               double value = (out.quickGetValue(i, j) - 
valOld);
+                               out.quickSetValue(i, j, value);
+                       }
+               }
+
+       }
+
+       private void mergeGeneric(MatrixBlock out, MatrixBlock in, int m, int 
n, DenseBlock compare) {
+               for(int i = 0; i < m; i++) {
+                       for(int j = 0; j < n; j++) {
+                               final double valOld = compare.get(i, j);
+                               final double valNew = in.quickGetValue(i, j); 
// input value
+                               if(!Util.eq(valNew, valOld)) { // NaN awareness
+                                       double value = !_isAccum ? valNew : 
(out.quickGetValue(i, j) + (valNew - valOld));
+                                       out.quickSetValue(i, j, value);
                                }
+                       }
                }
        }
 
-       protected long computeNonZeros( MatrixObject out, List<MatrixObject> in 
) {
-               //sum of nnz of input (worker result) - output var existing nnz
+       protected long computeNonZeros(MatrixObject out, List<MatrixObject> in) 
{
+               // sum of nnz of input (worker result) - output var existing nnz
                long outNNZ = out.getDataCharacteristics().getNonZeros();
-               return outNNZ - in.size() * outNNZ + in.stream()
+               return outNNZ - in.size() * outNNZ + in.stream()//
                        .mapToLong(m -> 
m.getDataCharacteristics().getNonZeros()).sum();
        }
 }

Reply via email to