This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 8820f33d70 [SYSTEMDS-3455] Improved multi-threaded unary cell 
operations
8820f33d70 is described below

commit 8820f33d70fb408c7b5ad145ad149295d4e1c9be
Author: Matthias Boehm <[email protected]>
AuthorDate: Mon Oct 24 16:03:41 2022 -0400

    [SYSTEMDS-3455] Improved multi-threaded unary cell operations
    
    So far, unary operations used only a best-effort multi-threading,
    because allocation and read/write from/to memory dominate performance.
    However, with a proper implementation we get robust improvements for
    all sizes and can avoid multi-threading for small sizes where it does
    not pay off. The following results have been obtained on a laptop with
    8 vcores (with an expectation of larger improvements on scale-up nodes):
    
    * 10 times 100K x 1K (800MB): 13.8s -> 11.3s
    * 1K times 1K x 1K (8MB): 4.9s -> 2.7s
    * 100K times 10 x 1K (8KB) 9.1s -> 3.9s
---
 src/main/java/org/apache/sysds/hops/UnaryOp.java   |   2 +-
 .../java/org/apache/sysds/lops/compile/Dag.java    |   1 -
 .../runtime/matrix/data/LibMatrixBincell.java      | 191 +++++++++++++++++++++
 .../sysds/runtime/matrix/data/MatrixBlock.java     | 130 +-------------
 .../apache/sysds/utils/stats/SparkStatistics.java  |   1 -
 5 files changed, 194 insertions(+), 131 deletions(-)

diff --git a/src/main/java/org/apache/sysds/hops/UnaryOp.java 
b/src/main/java/org/apache/sysds/hops/UnaryOp.java
index 38db8a50b6..b250ce2c1b 100644
--- a/src/main/java/org/apache/sysds/hops/UnaryOp.java
+++ b/src/main/java/org/apache/sysds/hops/UnaryOp.java
@@ -452,7 +452,7 @@ public class UnaryOp extends MultiThreadedHop
        }
        
        public boolean isExpensiveUnaryOperation() {
-               return (_op == OpOp1.EXP || _op == OpOp1.LOG
+               return (_op == OpOp1.EXP || _op == OpOp1.LOG || _op == 
OpOp1.LOG_NZ
                        || _op == OpOp1.ROUND || _op == OpOp1.FLOOR || _op == 
OpOp1.CEIL
                        || _op == OpOp1.SIGMOID || _op == OpOp1.SPROP || _op == 
OpOp1.SOFTMAX
                        || _op == OpOp1.TAN || _op == OpOp1.TANH || _op == 
OpOp1.ATAN
diff --git a/src/main/java/org/apache/sysds/lops/compile/Dag.java 
b/src/main/java/org/apache/sysds/lops/compile/Dag.java
index 36cacde4d0..f87163eee3 100644
--- a/src/main/java/org/apache/sysds/lops/compile/Dag.java
+++ b/src/main/java/org/apache/sysds/lops/compile/Dag.java
@@ -38,7 +38,6 @@ import org.apache.sysds.common.Types.OpOpData;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.conf.DMLConfig;
 import org.apache.sysds.hops.AggBinaryOp.SparkAggType;
-import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.lops.CSVReBlock;
 import org.apache.sysds.lops.CentralMoment;
 import org.apache.sysds.lops.Checkpoint;
diff --git 
a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixBincell.java 
b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixBincell.java
index 43ba3791a4..14f83e7de2 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixBincell.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixBincell.java
@@ -57,6 +57,7 @@ import org.apache.sysds.runtime.functionobjects.Power2;
 import org.apache.sysds.runtime.functionobjects.ValueFunction;
 import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
 import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
+import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
 import org.apache.sysds.runtime.util.CommonThreadPool;
 import org.apache.sysds.runtime.util.DataConverter;
 import org.apache.sysds.runtime.util.SortUtils;
@@ -98,6 +99,51 @@ public class LibMatrixBincell {
        // public matrix bincell interface
        ///////////////////////////////////
        
+       public static MatrixBlock uncellOp(MatrixBlock m1, MatrixBlock ret, 
UnaryOperator op) {
+               if(!m1.sparse && !m1.isEmptyBlock(false) 
+                       && op.getNumThreads() > 1 && m1.getLength() > 
PAR_NUMCELL_THRESHOLD2  ) {
+                       //note: we apply multi-threading in a best-effort 
manner here
+                       //only for expensive operators such as exp, log, 
sigmoid, because
+                       //otherwise allocation, read and write anyway dominates
+                       if (!op.isInplace() || m1.isEmpty())
+                               ret.allocateDenseBlock(false);
+                       else
+                               ret = m1;
+
+                       int k = op.getNumThreads();
+                       DenseBlock a = m1.getDenseBlock();
+                       DenseBlock c = ret.getDenseBlock();
+                       try {
+                               ExecutorService pool = CommonThreadPool.get(k);
+                               ArrayList<UncellTask> tasks = new ArrayList<>();
+                               ArrayList<Integer> blklens = 
UtilFunctions.getBalancedBlockSizesDefault(ret.rlen, k, false);
+                               for( int i=0, lb=0; i<blklens.size(); 
lb+=blklens.get(i), i++ )
+                                       tasks.add(new UncellTask(a, c, op, lb, 
lb+blklens.get(i)));
+                               List<Future<Long>> taskret = 
pool.invokeAll(tasks);
+                               
+                               //aggregate non-zeros
+                               ret.nonZeros = 0; //reset after execute
+                               for( Future<Long> task : taskret )
+                                       ret.nonZeros += task.get();
+                               pool.shutdown();
+                       }
+                       catch(InterruptedException | ExecutionException ex) {
+                               throw new DMLRuntimeException(ex);
+                       }
+               }
+               else {
+                       if (op.isInplace() && !m1.isInSparseFormat() )
+                               ret = m1;
+                       
+                       //default execute unary operations
+                       if(op.sparseSafe)
+                               sparseUnaryOperations(m1, ret, op);
+                       else
+                               denseUnaryOperations(m1, ret, op);
+               }
+               return ret;
+       }
+       
        /**
         * matrix-scalar, scalar-matrix binary operations.
         * 
@@ -445,6 +491,106 @@ public class LibMatrixBincell {
        // private sparse-safe/sparse-unsafe implementations
        ///////////////////////////////////
 
+       private static void denseUnaryOperations(MatrixBlock m1, MatrixBlock 
ret, UnaryOperator op) {
+               //prepare 0-value init (determine if unnecessarily 
sparse-unsafe)
+               double val0 = op.fn.execute(0d);
+               
+               final int m = m1.rlen;
+               final int n = m1.clen;
+               
+               //early abort possible if unnecessarily sparse unsafe
+               //(otherwise full init with val0, no need for computation)
+               if( m1.isEmptyBlock(false) ) {
+                       if( val0 != 0 )
+                               ret.reset(m, n, val0);
+                       return;
+               }
+               
+               //redirection to sparse safe operation w/ init by val0
+               if( m1.sparse && val0 != 0 ) {
+                       ret.reset(m, n, val0);
+                       ret.nonZeros = (long)m * n;
+               }
+               sparseUnaryOperations(m1, ret, op);
+       }
+       
+       private static void sparseUnaryOperations(MatrixBlock m1, MatrixBlock 
ret, UnaryOperator op) {
+               //early abort possible since sparse-safe
+               if( m1.isEmptyBlock(false) )
+                       return;
+               
+               final int m = m1.rlen;
+               final int n = m1.clen;
+               
+               if( m1.sparse && ret.sparse ) //SPARSE <- SPARSE
+               {
+                       ret.allocateSparseRowsBlock();
+                       SparseBlock a = m1.sparseBlock;
+                       SparseBlock c = ret.sparseBlock;
+               
+                       long nnz = 0;
+                       for(int i=0; i<m; i++) {
+                               if( a.isEmpty(i) ) continue;
+                               
+                               int apos = a.pos(i);
+                               int alen = a.size(i);
+                               int[] aix = a.indexes(i);
+                               double[] avals = a.values(i);
+                               
+                               c.allocate(i, alen); //avoid repeated alloc
+                               for( int j=apos; j<apos+alen; j++ ) {
+                                       double val = op.fn.execute(avals[j]);
+                                       c.append(i, aix[j], val);
+                                       nnz += (val != 0) ? 1 : 0;
+                               }
+                       }
+                       ret.nonZeros = nnz;
+               }
+               else if( m1.sparse ) //DENSE <- SPARSE
+               {
+                       ret.allocateDenseBlock(false);
+                       SparseBlock a = m1.sparseBlock;
+                       DenseBlock c = ret.denseBlock;
+                       long nnz = (ret.nonZeros > 0) ?
+                               (long) m*n-a.size() : 0;
+                       for(int i=0; i<m; i++) {
+                               if( a.isEmpty(i) ) continue;
+                               int apos = a.pos(i);
+                               int alen = a.size(i);
+                               int[] aix = a.indexes(i);
+                               double[] avals = a.values(i);
+                               double[] cvals = c.values(i);
+                               int cix = c.pos(i);
+                               for( int j=apos; j<apos+alen; j++ ) {
+                                       double val = op.fn.execute(avals[j]);
+                                       cvals[cix + aix[j]] = val; 
+                                       nnz += (val != 0) ? 1 : 0;
+                               }
+                       }
+                       ret.nonZeros = nnz;
+               }
+               else //DENSE <- DENSE
+               {
+                       if( m1 != ret ) //!in-place
+                               ret.allocateDenseBlock(false);
+                       DenseBlock da = m1.getDenseBlock();
+                       DenseBlock dc = ret.getDenseBlock();
+                       
+                       //unary op, incl nnz maintenance
+                       long nnz = 0;
+                       for( int bi=0; bi<da.numBlocks(); bi++ ) {
+                               double[] a = da.valuesAt(bi);
+                               double[] c = dc.valuesAt(bi);
+                               int len = da.size(bi);
+                               for( int i=0; i<len; i++ ) {
+                                       c[i] = op.fn.execute(a[i]);
+                                       nnz += (c[i] != 0) ? 1 : 0;
+                               }
+                       }
+                       ret.nonZeros = nnz;
+               }
+       }
+       
        private static long safeBinary(MatrixBlock m1, MatrixBlock m2, 
MatrixBlock ret, BinaryOperator op,
                BinaryAccessType atype, int rl, int ru)
        {
@@ -1764,4 +1910,49 @@ public class LibMatrixBincell {
                        return safeBinaryScalar(_m1, _ret, _sop, _rl, _ru);
                }
        }
+       
+       private static class UncellTask implements Callable<Long> {
+               private final DenseBlock _a;
+               private final DenseBlock _c;
+               private final UnaryOperator _op;
+               private final int _rl;
+               private final int _ru;
+
+               protected UncellTask(DenseBlock a, DenseBlock c, UnaryOperator 
op, int rl, int ru ) {
+                       _a = a;
+                       _c = c;
+                       _op = op;
+                       _rl = rl;
+                       _ru = ru;
+               }
+               
+               @Override
+               public Long call() {
+                       long nnz = 0;
+                       //fast dense-dense operations
+                       if(_a.isContiguous(_rl, _ru)) {
+                               double[] avals = _a.values(_rl);
+                               double[] cvals = _c.values(_rl);
+                               int start = _a.pos(_rl), end = _a.pos(_ru);
+                               for( int i=start; i<end; i++ ) {
+                                       cvals[i] = _op.fn.execute(avals[i]);
+                                       nnz += (cvals[i] != 0) ? 1 : 0;
+                               }
+                       }
+                       //generic dense-dense, including large blocks
+                       else {
+                               int clen = _a.getDim(1);
+                               for(int i=_rl; i<_ru; i++) {
+                                       double[] avals = _a.values(i);
+                                       double[] cvals = _c.values(i);
+                                       int pos = _a.pos(i);
+                                       for( int j=0; j<clen; j++ ) {
+                                               cvals[pos+j] = 
_op.fn.execute(avals[pos+j]);
+                                               nnz += (cvals[pos+j] != 0) ? 1 
: 0;
+                                       }
+                               }
+                       }
+                       return nnz;
+               }
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
index ebd0ff88b0..6c8c6c9953 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
@@ -2859,34 +2859,8 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                        else
                                ret = 
LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, op);
                }
-               else if(!sparse && !isEmptyBlock(false)
-                       && 
OptimizerUtils.isMaxLocalParallelism(op.getNumThreads())) {
-                       //note: we apply multi-threading in a best-effort 
manner here
-                       //only for expensive operators such as exp, log, 
sigmoid, because
-                       //otherwise allocation, read and write anyway dominates
-                       if (!op.isInplace() || isEmpty())
-                               ret.allocateDenseBlock(false);
-                       else
-                               ret = this;
-
-                       DenseBlock a = getDenseBlock();
-                       DenseBlock c = ret.getDenseBlock();
-                       for(int bi=0; bi<a.numBlocks(); bi++) {
-                               double[] avals = a.valuesAt(bi), cvals = 
c.valuesAt(bi);
-                               Arrays.parallelSetAll(cvals, i -> 
op.fn.execute(avals[i]));
-                       }
-                       ret.recomputeNonZeros();
-               }
-               else
-               {
-                       if (op.isInplace() && !isInSparseFormat() )
-                               ret = this;
-                       
-                       //default execute unary operations
-                       if(op.sparseSafe)
-                               sparseUnaryOperations(op, ret);
-                       else
-                               denseUnaryOperations(op, ret);
+               else {
+                       ret = LibMatrixBincell.uncellOp(this, ret, op);
                }
                
                //ensure empty results sparse representation 
@@ -2897,106 +2871,6 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                return ret;
        }
 
-       private void sparseUnaryOperations(UnaryOperator op, MatrixBlock ret) {
-               //early abort possible since sparse-safe
-               if( isEmptyBlock(false) )
-                       return;
-               
-               final int m = rlen;
-               final int n = clen;
-               
-               if( sparse && ret.sparse ) //SPARSE <- SPARSE
-               {
-                       ret.allocateSparseRowsBlock();
-                       SparseBlock a = sparseBlock;
-                       SparseBlock c = ret.sparseBlock;
-               
-                       long nnz = 0;
-                       for(int i=0; i<m; i++) {
-                               if( a.isEmpty(i) ) continue;
-                               
-                               int apos = a.pos(i);
-                               int alen = a.size(i);
-                               int[] aix = a.indexes(i);
-                               double[] avals = a.values(i);
-                               
-                               c.allocate(i, alen); //avoid repeated alloc
-                               for( int j=apos; j<apos+alen; j++ ) {
-                                       double val = op.fn.execute(avals[j]);
-                                       c.append(i, aix[j], val);
-                                       nnz += (val != 0) ? 1 : 0;
-                               }
-                       }
-                       ret.nonZeros = nnz;
-               }
-               else if( sparse ) //DENSE <- SPARSE
-               {
-                       ret.allocateDenseBlock(false);
-                       SparseBlock a = sparseBlock;
-                       DenseBlock c = ret.denseBlock;
-                       long nnz = (ret.nonZeros > 0) ?
-                               (long) m*n-a.size() : 0;
-                       for(int i=0; i<m; i++) {
-                               if( a.isEmpty(i) ) continue;
-                               int apos = a.pos(i);
-                               int alen = a.size(i);
-                               int[] aix = a.indexes(i);
-                               double[] avals = a.values(i);
-                               double[] cvals = c.values(i);
-                               int cix = c.pos(i);
-                               for( int j=apos; j<apos+alen; j++ ) {
-                                       double val = op.fn.execute(avals[j]);
-                                       cvals[cix + aix[j]] = val; 
-                                       nnz += (val != 0) ? 1 : 0;
-                               }
-                       }
-                       ret.nonZeros = nnz;
-               }
-               else //DENSE <- DENSE
-               {
-                       if( this != ret ) //!in-place
-                               ret.allocateDenseBlock(false);
-                       DenseBlock da = getDenseBlock();
-                       DenseBlock dc = ret.getDenseBlock();
-                       
-                       //unary op, incl nnz maintenance
-                       long nnz = 0;
-                       for( int bi=0; bi<da.numBlocks(); bi++ ) {
-                               double[] a = da.valuesAt(bi);
-                               double[] c = dc.valuesAt(bi);
-                               int len = da.size(bi);
-                               for( int i=0; i<len; i++ ) {
-                                       c[i] = op.fn.execute(a[i]);
-                                       nnz += (c[i] != 0) ? 1 : 0;
-                               }
-                       }
-                       ret.nonZeros = nnz;
-               }
-       }
-
-       private void denseUnaryOperations(UnaryOperator op, MatrixBlock ret) {
-               //prepare 0-value init (determine if unnecessarily 
sparse-unsafe)
-               double val0 = op.fn.execute(0d);
-               
-               final int m = rlen;
-               final int n = clen;
-               
-               //early abort possible if unnecessarily sparse unsafe
-               //(otherwise full init with val0, no need for computation)
-               if( isEmptyBlock(false) ) {
-                       if( val0 != 0 )
-                               ret.reset(m, n, val0);
-                       return;
-               }
-               
-               //redirection to sparse safe operation w/ init by val0
-               if( sparse && val0 != 0 ) {
-                       ret.reset(m, n, val0);
-                       ret.nonZeros = (long)m * n;
-               }
-               sparseUnaryOperations(op, ret);
-       }
-
        public final MatrixBlock binaryOperations(BinaryOperator op, 
MatrixValue thatValue){
                return binaryOperations(op, thatValue, null);
        }
diff --git a/src/main/java/org/apache/sysds/utils/stats/SparkStatistics.java 
b/src/main/java/org/apache/sysds/utils/stats/SparkStatistics.java
index e374512267..331634d1bd 100644
--- a/src/main/java/org/apache/sysds/utils/stats/SparkStatistics.java
+++ b/src/main/java/org/apache/sysds/utils/stats/SparkStatistics.java
@@ -21,7 +21,6 @@ package org.apache.sysds.utils.stats;
 
 import java.util.concurrent.atomic.LongAdder;
 
-import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
 
 public class SparkStatistics {

Reply via email to