This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new b9f4686  [SYSTEMDS-3245] CSR sparse support for tansformapply
b9f4686 is described below

commit b9f4686e2153ff44e41378d76301a6d24ffd1666
Author: arnabp <[email protected]>
AuthorDate: Mon Dec 13 09:53:41 2021 +0100

    [SYSTEMDS-3245] CSR sparse support for tansformapply
    
    This patch adds supports for CSR sparse output for
    transformapply. CSR is more efficient for memory-bound apply
    phase. Multi-threaded apply is now 30% faster for dummycoding
    100 columns each having 5M rows and 100K distinct values.
---
 .../runtime/transform/encode/ColumnEncoder.java    | 18 +++++-
 .../transform/encode/ColumnEncoderDummycode.java   | 41 ++++++++++----
 .../transform/encode/ColumnEncoderPassThrough.java | 23 +++++++-
 .../transform/encode/MultiColumnEncoder.java       | 64 ++++++++++++++--------
 4 files changed, 107 insertions(+), 39 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
index f69da6f..9db3772 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
@@ -41,6 +41,8 @@ import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysds.runtime.data.SparseRowVector;
+import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.data.SparseBlockCSR;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.DependencyTask;
@@ -129,6 +131,7 @@ public abstract class ColumnEncoder implements Encoder, 
Comparable<ColumnEncoder
        }*/
 
        protected void applySparse(CacheBlock in, MatrixBlock out, int 
outputCol, int rowStart, int blk){
+               boolean mcsr = MatrixBlock.DEFAULT_SPARSEBLOCK == 
SparseBlock.Type.MCSR;
                int index = _colID - 1;
                // Apply loop tiling to exploit CPU caches
                double[] codes = getCodeCol(in, rowStart, blk);
@@ -137,9 +140,18 @@ public abstract class ColumnEncoder implements Encoder, 
Comparable<ColumnEncoder
                for(int i = rowStart; i < rowEnd; i+=B) {
                        int lim = Math.min(i+B, rowEnd);
                        for (int ii=i; ii<lim; ii++) {
-                               SparseRowVector row = (SparseRowVector) 
out.getSparseBlock().get(ii);
-                               row.values()[index] = codes[ii-rowStart];
-                               row.indexes()[index] = outputCol;
+                               if (mcsr) {
+                                       SparseRowVector row = (SparseRowVector) 
out.getSparseBlock().get(ii);
+                                       row.values()[index] = 
codes[ii-rowStart];
+                                       row.indexes()[index] = outputCol;
+                               }
+                               else { //csr
+                                       // Manually fill the column-indexes and 
values array
+                                       SparseBlockCSR csrblock = 
(SparseBlockCSR)out.getSparseBlock();
+                                       int rptr[] = csrblock.rowPointers();
+                                       csrblock.indexes()[rptr[ii]+index] = 
outputCol;
+                                       csrblock.values()[rptr[ii]+index] = 
codes[ii-rowStart];
+                               }
                        }
                }
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
index a5670ae..63cf86c 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
@@ -32,6 +32,8 @@ import java.util.Set;
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
+import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.data.SparseBlockCSR;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.DependencyTask;
@@ -85,6 +87,7 @@ public class ColumnEncoderDummycode extends ColumnEncoder {
                        throw new DMLRuntimeException("ColumnEncoderDummycode 
called with: " + in.getClass().getSimpleName() +
                                        " and not MatrixBlock");
                }
+               boolean mcsr = MatrixBlock.DEFAULT_SPARSEBLOCK == 
SparseBlock.Type.MCSR;
                Set<Integer> sparseRowsWZeros = null;
                int index = _colID - 1;
                for(int r = rowStart; r < getEndIndex(in.getNumRows(), 
rowStart, blk); r++) {
@@ -103,17 +106,35 @@ public class ColumnEncoderDummycode extends ColumnEncoder 
{
                        //
                        // indexes = [0,2] ===> indexes = [0,3]
                        // values = [1,2] values = [1,1]
-                       double val = 
out.getSparseBlock().get(r).values()[index];
-                       if(Double.isNaN(val)){
-                               if(sparseRowsWZeros == null)
-                                       sparseRowsWZeros = new HashSet<>();
-                               sparseRowsWZeros.add(r);
-                               out.getSparseBlock().get(r).values()[index] = 0;
-                               continue;
+                       if (mcsr) {
+                               double val = 
out.getSparseBlock().get(r).values()[index];
+                               if(Double.isNaN(val)){
+                                       if(sparseRowsWZeros == null)
+                                               sparseRowsWZeros = new 
HashSet<>();
+                                       sparseRowsWZeros.add(r);
+                                       
out.getSparseBlock().get(r).values()[index] = 0;
+                                       continue;
+                               }
+                               int nCol = outputCol + (int) val - 1;
+                               out.getSparseBlock().get(r).indexes()[index] = 
nCol;
+                               out.getSparseBlock().get(r).values()[index] = 1;
+                       }
+                       else { //csr
+                               SparseBlockCSR csrblock = 
(SparseBlockCSR)out.getSparseBlock();
+                               int rptr[] = csrblock.rowPointers();
+                               double val = csrblock.values()[rptr[r]+index];
+                               if(Double.isNaN(val)){
+                                       if(sparseRowsWZeros == null)
+                                               sparseRowsWZeros = new 
HashSet<>();
+                                       sparseRowsWZeros.add(r);
+                                       csrblock.values()[rptr[r]+index] = 0; 
//test
+                                       continue;
+                               }
+                               // Manually fill the column-indexes and values 
array
+                               int nCol = outputCol + (int) val - 1;
+                               csrblock.indexes()[rptr[r]+index] = nCol;
+                               csrblock.values()[rptr[r]+index] = 1;
                        }
-                       int nCol = outputCol + (int) val - 1;
-                       out.getSparseBlock().get(r).indexes()[index] = nCol;
-                       out.getSparseBlock().get(r).values()[index] = 1;
                }
                if(sparseRowsWZeros != null){
                        addSparseRowsWZeros(sparseRowsWZeros);
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
index 57405b0..f8b467d 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
@@ -27,6 +27,8 @@ import java.util.Set;
 
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
+import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.data.SparseBlockCSR;
 import org.apache.sysds.runtime.data.SparseRowVector;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -77,6 +79,7 @@ public class ColumnEncoderPassThrough extends ColumnEncoder {
 
        protected void applySparse(CacheBlock in, MatrixBlock out, int 
outputCol, int rowStart, int blk){
                Set<Integer> sparseRowsWZeros = null;
+               boolean mcsr = MatrixBlock.DEFAULT_SPARSEBLOCK == 
SparseBlock.Type.MCSR;
                int index = _colID - 1;
                // Apply loop tiling to exploit CPU caches
                double[] codes = getCodeCol(in, rowStart, blk);
@@ -86,14 +89,28 @@ public class ColumnEncoderPassThrough extends ColumnEncoder 
{
                        int lim = Math.min(i+B, rowEnd);
                        for (int ii=i; ii<lim; ii++) {
                                double v = codes[ii-rowStart];
-                               SparseRowVector row = (SparseRowVector) 
out.getSparseBlock().get(ii);
                                if(v == 0) {
                                        if(sparseRowsWZeros == null)
                                                sparseRowsWZeros = new 
HashSet<>();
                                        sparseRowsWZeros.add(ii);
                                }
-                               row.values()[index] = v;
-                               row.indexes()[index] = outputCol;
+                               if (mcsr) {
+                                       SparseRowVector row = (SparseRowVector) 
out.getSparseBlock().get(ii);
+                                       row.values()[index] = v;
+                                       row.indexes()[index] = outputCol;
+                               }
+                               else { //csr
+                                       if(v == 0) {
+                                               if(sparseRowsWZeros == null)
+                                                       sparseRowsWZeros = new 
HashSet<>();
+                                               sparseRowsWZeros.add(ii);
+                                       }
+                                       // Manually fill the column-indexes and 
values array
+                                       SparseBlockCSR csrblock = 
(SparseBlockCSR)out.getSparseBlock();
+                                       int rptr[] = csrblock.rowPointers();
+                                       csrblock.indexes()[rptr[ii]+index] = 
outputCol;
+                                       csrblock.values()[rptr[ii]+index] = 
codes[ii-rowStart];
+                               }
                        }
                }
                if(sparseRowsWZeros != null){
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
index 7206600..76301ce 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
@@ -48,7 +48,7 @@ import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysds.runtime.data.SparseBlock;
-import org.apache.sysds.runtime.data.SparseBlockMCSR;
+import org.apache.sysds.runtime.data.SparseBlockCSR;
 import org.apache.sysds.runtime.data.SparseRowVector;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -325,33 +325,51 @@ public class MultiColumnEncoder implements Encoder {
 
        private static void outputMatrixPreProcessing(MatrixBlock output, 
CacheBlock input, boolean hasDC) {
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-               output.allocateBlock();
                if(output.isInSparseFormat()) {
-                       SparseBlock block = output.getSparseBlock();
-                       if(!(block instanceof SparseBlockMCSR))
-                               throw new RuntimeException(
-                                       "Transform apply currently only 
supported for MCSR sparse and dense output Matrices");
-                       if (hasDC && OptimizerUtils.getTransformNumThreads()>1) 
{
-                               // DC forces a single threaded allocation after 
the build phase and
-                               // before the apply starts. Below code 
parallelizes sparse allocation.
-                               IntStream.range(0, output.getNumRows())
-                               .parallel().forEach(r -> {
-                                       block.allocate(r, 
input.getNumColumns());
-                                       
((SparseRowVector)block.get(r)).setSize(input.getNumColumns());
-                               });
+                       if (MatrixBlock.DEFAULT_SPARSEBLOCK != 
SparseBlock.Type.CSR
+                                       && MatrixBlock.DEFAULT_SPARSEBLOCK != 
SparseBlock.Type.MCSR)
+                               throw new RuntimeException("Transformapply is 
only supported for MCSR and CSR output matrix");
+                       boolean mcsr = MatrixBlock.DEFAULT_SPARSEBLOCK == 
SparseBlock.Type.MCSR;
+                       if (mcsr) {
+                               output.allocateBlock();
+                               SparseBlock block = output.getSparseBlock();
+                               if (hasDC && 
OptimizerUtils.getTransformNumThreads()>1) {
+                                       // DC forces a single threaded 
allocation after the build phase and
+                                       // before the apply starts. Below code 
parallelizes sparse allocation.
+                                       IntStream.range(0, output.getNumRows())
+                                       .parallel().forEach(r -> {
+                                               block.allocate(r, 
input.getNumColumns());
+                                               
((SparseRowVector)block.get(r)).setSize(input.getNumColumns());
+                                       });
+                               }
+                               else {
+                                       for(int r = 0; r < output.getNumRows(); 
r++) {
+                                               // allocate all sparse rows so 
MT sync can be done.
+                                               // should be rare that rows 
have only 0
+                                               block.allocate(r, 
input.getNumColumns());
+                                               // Setting the size here makes 
it possible to run all sparse apply tasks without any sync
+                                               // could become problematic if 
the input is very sparse since we allocate the same size as the input
+                                               // should be fine in theory ;)
+                                               
((SparseRowVector)block.get(r)).setSize(input.getNumColumns());
+                                       }
+                               }
                        }
-                       else {
-                               for(int r = 0; r < output.getNumRows(); r++) {
-                                       // allocate all sparse rows so MT sync 
can be done.
-                                       // should be rare that rows have only 0
-                                       block.allocate(r, 
input.getNumColumns());
-                                       // Setting the size here makes it 
possible to run all sparse apply tasks without any sync
-                                       // could become problematic if the 
input is very sparse since we allocate the same size as the input
-                                       // should be fine in theory ;)
-                                       
((SparseRowVector)block.get(r)).setSize(input.getNumColumns());
+                       else { //csr
+                               int size = output.getNumRows() * 
input.getNumColumns();
+                               SparseBlock csrblock = new 
SparseBlockCSR(output.getNumRows(), size, size);
+                               // Manually fill the row pointers based on 
nnzs/row (= #cols in the input)
+                               // Not using the set() methods to 1) avoid 
binary search and shifting, 
+                               // 2) reduce thread contentions on the arrays
+                               int[] rptr = 
((SparseBlockCSR)csrblock).rowPointers();
+                               for (int i=0; i<rptr.length-1; i++) { //TODO: 
parallelize
+                                       rptr[i+1] = rptr[i] + 
input.getNumColumns();
                                }
+                               output.setSparseBlock(csrblock);
                        }
                }
+               else //dense
+                       output.allocateBlock();
+
                if(DMLScript.STATISTICS) {
                        LOG.debug("Elapsed time for allocation: "+ ((double) 
System.nanoTime() - t0) / 1000000 + " ms");
                        
Statistics.incTransformOutMatrixPreProcessingTime(System.nanoTime()-t0);

Reply via email to