This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 79ec601  [SYSTEMDS-3242] Multithreaded allocation for transformencode
79ec601 is described below

commit 79ec6019843a037545aee7bd6495f91cbeb88a6e
Author: arnabp <[email protected]>
AuthorDate: Wed Dec 8 14:38:47 2021 +0100

    [SYSTEMDS-3242] Multithreaded allocation for transformencode
    
    This patch enables multi-threaded sparse target matrix allocation
    for transformencode apply phase.
---
 .../transform/encode/MultiColumnEncoder.java       | 43 ++++++++++++++++------
 1 file changed, 31 insertions(+), 12 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
index f593487..7206600 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
@@ -37,12 +37,14 @@ import java.util.concurrent.Future;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types;
 import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysds.runtime.data.SparseBlock;
@@ -263,7 +265,10 @@ public class MultiColumnEncoder implements Encoder {
                                + "has a encoder or slice the input 
accordingly");
                // TODO smart checks
                // Block allocation for MT access
-               outputMatrixPreProcessing(out, in);
+               boolean hasDC = false;
+               for(ColumnEncoderComposite columnEncoder : _columnEncoders)
+                       hasDC = 
columnEncoder.hasEncoder(ColumnEncoderDummycode.class);
+               outputMatrixPreProcessing(out, in, hasDC);
                if(k > 1) {
                        applyMT(in, out, outputCol, k);
                }
@@ -318,7 +323,7 @@ public class MultiColumnEncoder implements Encoder {
                pool.shutdown();
        }
 
-       private static void outputMatrixPreProcessing(MatrixBlock output, 
CacheBlock input) {
+       private static void outputMatrixPreProcessing(MatrixBlock output, 
CacheBlock input, boolean hasDC) {
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                output.allocateBlock();
                if(output.isInSparseFormat()) {
@@ -326,18 +331,31 @@ public class MultiColumnEncoder implements Encoder {
                        if(!(block instanceof SparseBlockMCSR))
                                throw new RuntimeException(
                                        "Transform apply currently only 
supported for MCSR sparse and dense output Matrices");
-                       for(int r = 0; r < output.getNumRows(); r++) {
-                               // allocate all sparse rows so MT sync can be 
done.
-                               // should be rare that rows have only 0
-                               block.allocate(r, input.getNumColumns());
-                               // Setting the size here makes it possible to 
run all sparse apply tasks without any sync
-                               // could become problematic if the input is 
very sparse since we allocate the same size as the input
-                               // should be fine in theory ;)
-                               
((SparseRowVector)block.get(r)).setSize(input.getNumColumns());
+                       if (hasDC && OptimizerUtils.getTransformNumThreads()>1) 
{
+                               // DC forces a single threaded allocation after 
the build phase and
+                               // before the apply starts. Below code 
parallelizes sparse allocation.
+                               IntStream.range(0, output.getNumRows())
+                               .parallel().forEach(r -> {
+                                       block.allocate(r, 
input.getNumColumns());
+                                       
((SparseRowVector)block.get(r)).setSize(input.getNumColumns());
+                               });
+                       }
+                       else {
+                               for(int r = 0; r < output.getNumRows(); r++) {
+                                       // allocate all sparse rows so MT sync 
can be done.
+                                       // should be rare that rows have only 0
+                                       block.allocate(r, 
input.getNumColumns());
+                                       // Setting the size here makes it 
possible to run all sparse apply tasks without any sync
+                                       // could become problematic if the 
input is very sparse since we allocate the same size as the input
+                                       // should be fine in theory ;)
+                                       
((SparseRowVector)block.get(r)).setSize(input.getNumColumns());
+                               }
                        }
                }
-               if(DMLScript.STATISTICS)
+               if(DMLScript.STATISTICS) {
+                       LOG.debug("Elapsed time for allocation: "+ ((double) 
System.nanoTime() - t0) / 1000000 + " ms");
                        
Statistics.incTransformOutMatrixPreProcessingTime(System.nanoTime()-t0);
+               }
        }
 
        private void outputMatrixPostProcessing(MatrixBlock output){
@@ -803,10 +821,11 @@ public class MultiColumnEncoder implements Encoder {
                @Override
                public Object call() throws Exception {
                        int numCols = _input.getNumColumns() + 
_encoder.getNumExtraCols();
+                       boolean hasDC = 
_encoder.getColumnEncoders(ColumnEncoderDummycode.class).size() > 0;
                        long estNNz = (long) _input.getNumColumns() * (long) 
_input.getNumRows();
                        boolean sparse = 
MatrixBlock.evalSparseFormatInMemory(_input.getNumRows(), numCols, estNNz);
                        _output.reset(_input.getNumRows(), numCols, sparse, 
estNNz);
-                       outputMatrixPreProcessing(_output, _input);
+                       outputMatrixPreProcessing(_output, _input, hasDC);
                        return null;
                }
 

Reply via email to