This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new cce3cd5  [SYSTEMDS-3293] Find optimum #partitions for transformencode
cce3cd5 is described below

commit cce3cd516ed7182845feac9dd0202086029870f4
Author: arnabp <[email protected]>
AuthorDate: Sun Feb 13 20:25:23 2022 +0100

    [SYSTEMDS-3293] Find optimum #partitions for transformencode
    
    This patch introduces a logic to automatically find the right
    number of row partitions for build and apply.
    No. of build blocks = (2 * #physical cores)/#build encoders
    No. of apply blocks = (4 * #physical cores)/#apply encoders
---
 .../runtime/transform/encode/ColumnEncoder.java    | 10 +--
 .../transform/encode/ColumnEncoderComposite.java   | 19 +++--
 .../transform/encode/ColumnEncoderDummycode.java   |  2 +-
 .../transform/encode/ColumnEncoderFeatureHash.java |  2 +-
 .../transform/encode/ColumnEncoderPassThrough.java |  2 +-
 .../transform/encode/MultiColumnEncoder.java       | 95 ++++++++++++++++++----
 6 files changed, 103 insertions(+), 27 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
index a9f0c70..df673e9 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
@@ -55,7 +55,7 @@ import org.apache.sysds.utils.stats.TransformStatistics;
  */
 public abstract class ColumnEncoder implements Encoder, 
Comparable<ColumnEncoder> {
        protected static final Log LOG = 
LogFactory.getLog(ColumnEncoder.class.getName());
-       protected static final int APPLY_ROW_BLOCKS_PER_COLUMN = 1;
+       public static int APPLY_ROW_BLOCKS_PER_COLUMN = -1;
        public static int BUILD_ROW_BLOCKS_PER_COLUMN = -1;
        private static final long serialVersionUID = 2299156350718979064L;
        protected int _colID;
@@ -290,11 +290,11 @@ public abstract class ColumnEncoder implements Encoder, 
Comparable<ColumnEncoder
         * complete if all previous tasks are done. This is so that we can use 
the last task as a dependency for the whole
         * build, reducing unnecessary dependencies.
         */
-       public List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
+       public List<DependencyTask<?>> getBuildTasks(CacheBlock in, int 
nBuildPartition) {
                List<Callable<Object>> tasks = new ArrayList<>();
                List<List<? extends Callable<?>>> dep = null;
                int nRows = in.getNumRows();
-               int[] blockSizes = getBlockSizes(nRows, 
getNumBuildRowPartitions());
+               int[] blockSizes = getBlockSizes(nRows, nBuildPartition);
                if(blockSizes.length == 1) {
                        tasks.add(getBuildTask(in));
                }
@@ -325,10 +325,10 @@ public abstract class ColumnEncoder implements Encoder, 
Comparable<ColumnEncoder
        }
 
 
-       public List<DependencyTask<?>> getApplyTasks(CacheBlock in, MatrixBlock 
out, int outputCol){
+       public List<DependencyTask<?>> getApplyTasks(CacheBlock in, MatrixBlock 
out, int nApplyPartitions, int outputCol) {
                List<Callable<Object>> tasks = new ArrayList<>();
                List<List<? extends Callable<?>>> dep = null;
-               int[] blockSizes = getBlockSizes(in.getNumRows(), 
getNumApplyRowPartitions());
+               int[] blockSizes = getBlockSizes(in.getNumRows(), 
nApplyPartitions);
                for(int startRow = 0, i = 0; i < blockSizes.length; 
startRow+=blockSizes[i], i++){
                        if(out.isInSparseFormat())
                                tasks.add(getSparseTask(in, out, outputCol, 
startRow, blockSizes[i]));
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
index 4eb57ba..ac2156e 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
@@ -106,17 +106,17 @@ public class ColumnEncoderComposite extends ColumnEncoder 
{
        }
 
        @Override
-       public List<DependencyTask<?>> getApplyTasks(CacheBlock in, MatrixBlock 
out, int outputCol) {
+       public List<DependencyTask<?>> getApplyTasks(CacheBlock in, MatrixBlock 
out, int nParition, int outputCol) {
                List<DependencyTask<?>> tasks = new ArrayList<>();
                List<Integer> sizes = new ArrayList<>();
                for(int i = 0; i < _columnEncoders.size(); i++) {
                        List<DependencyTask<?>> t;
                        if(i == 0) {
                                // 1. encoder writes data into MatrixBlock 
Column all others use this column for further encoding
-                               t = _columnEncoders.get(i).getApplyTasks(in, 
out, outputCol);
+                               t = _columnEncoders.get(i).getApplyTasks(in, 
out, nParition, outputCol);
                        }
                        else {
-                               t = _columnEncoders.get(i).getApplyTasks(out, 
out, outputCol);
+                               t = _columnEncoders.get(i).getApplyTasks(out, 
out, nParition, outputCol);
                        }
                        if(t == null)
                                continue;
@@ -143,11 +143,11 @@ public class ColumnEncoderComposite extends ColumnEncoder 
{
        }
 
        @Override
-       public List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
+       public List<DependencyTask<?>> getBuildTasks(CacheBlock in, int 
nPartition) {
                List<DependencyTask<?>> tasks = new ArrayList<>();
                Map<Integer[], Integer[]> depMap = null;
                for(ColumnEncoder columnEncoder : _columnEncoders) {
-                       List<DependencyTask<?>> t = 
columnEncoder.getBuildTasks(in);
+                       List<DependencyTask<?>> t = 
columnEncoder.getBuildTasks(in, nPartition);
                        if(t == null)
                                continue;
                        // Linear execution between encoders so they can't be 
built in parallel
@@ -351,6 +351,15 @@ public class ColumnEncoderComposite extends ColumnEncoder {
                return _columnEncoders.stream().anyMatch(encoder -> 
encoder.getClass().equals(type));
        }
 
+       public <T extends ColumnEncoder> boolean hasBuild() {
+               for (ColumnEncoder e : _columnEncoders)
+                       if (e.getClass().equals(ColumnEncoderRecode.class)
+                               || 
e.getClass().equals(ColumnEncoderDummycode.class)
+                               || e.getClass().equals(ColumnEncoderBin.class))
+                               return true;
+               return false;
+       }
+
        @Override
        public void shiftCol(int columnOffset) {
                super.shiftCol(columnOffset);
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
index e0efe53..3366329 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
@@ -65,7 +65,7 @@ public class ColumnEncoderDummycode extends ColumnEncoder {
        }
 
        @Override
-       public List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
+       public List<DependencyTask<?>> getBuildTasks(CacheBlock in, int 
nParition) {
                return null;
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
index 9445474..aa362fe 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
@@ -93,7 +93,7 @@ public class ColumnEncoderFeatureHash extends ColumnEncoder {
        }
 
        @Override
-       public List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
+       public List<DependencyTask<?>> getBuildTasks(CacheBlock in, int 
nParition) {
                return null;
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
index 2f5739f..a134dfd 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
@@ -53,7 +53,7 @@ public class ColumnEncoderPassThrough extends ColumnEncoder {
        }
 
        @Override
-       public List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
+       public List<DependencyTask<?>> getBuildTasks(CacheBlock in, int 
nParition) {
                return null;
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
index aa7f408..00c7962 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
@@ -74,6 +74,7 @@ public class MultiColumnEncoder implements Encoder {
        private EncoderOmit _legacyOmit = null;
        private int _colOffset = 0; // offset for federated Workers who are 
using subrange encoders
        private FrameBlock _meta = null;
+       private int[] _nPartitions = null;
 
        public MultiColumnEncoder(List<ColumnEncoderComposite> columnEncoders) {
                _columnEncoders = columnEncoders;
@@ -89,6 +90,7 @@ public class MultiColumnEncoder implements Encoder {
 
        public MatrixBlock encode(CacheBlock in, int k) {
                MatrixBlock out;
+               _nPartitions = getNumRowPartitions(in, k);
                try {
                        if(k > 1 && !MULTI_THREADED_STAGES && 
!hasLegacyEncoder()) {
                                out = new MatrixBlock();
@@ -155,7 +157,7 @@ public class MultiColumnEncoder implements Encoder {
 
                for(ColumnEncoderComposite e : _columnEncoders) {
                        // Create the build tasks
-                       List<DependencyTask<?>> buildTasks = 
e.getBuildTasks(in);
+                       List<DependencyTask<?>> buildTasks = 
e.getBuildTasks(in, _nPartitions[0]);
                        tasks.addAll(buildTasks);
                        if(buildTasks.size() > 0) {
                                // Check if any Build independent UpdateDC task 
(Bin+DC, FH+DC)
@@ -197,7 +199,7 @@ public class MultiColumnEncoder implements Encoder {
                        // Apply Task depends on InitOutputMatrixTask (output 
allocation)
                        depMap.put(new Integer[] {tasks.size(), tasks.size() + 
1},         //ApplyTask
                                        new Integer[] {0, 1});                  
                   //Allocation task (1st task)
-                       ApplyTasksWrapperTask applyTaskWrapper = new 
ApplyTasksWrapperTask(e, in, out, pool);
+                       ApplyTasksWrapperTask applyTaskWrapper = new 
ApplyTasksWrapperTask(e, in, out, _nPartitions[1], pool);
 
                        if(e.hasEncoder(ColumnEncoderDummycode.class)) {
                                // Allocation depends on build if DC is in the 
list.
@@ -244,6 +246,8 @@ public class MultiColumnEncoder implements Encoder {
        public void build(CacheBlock in, int k) {
                if(hasLegacyEncoder() && !(in instanceof FrameBlock))
                        throw new DMLRuntimeException("LegacyEncoders do not 
support non FrameBlock Inputs");
+               if(_nPartitions == null) //happens if this method is directly 
called from the tests
+                       _nPartitions = getNumRowPartitions(in, k);
                if(k > 1) {
                        buildMT(in, k);
                }
@@ -260,7 +264,7 @@ public class MultiColumnEncoder implements Encoder {
        private List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
                List<DependencyTask<?>> tasks = new ArrayList<>();
                for(ColumnEncoderComposite columnEncoder : _columnEncoders) {
-                       tasks.addAll(columnEncoder.getBuildTasks(in));
+                       tasks.addAll(columnEncoder.getBuildTasks(in, 
_nPartitions[0]));
                }
                return tasks;
        }
@@ -337,11 +341,11 @@ public class MultiColumnEncoder implements Encoder {
                return out;
        }
 
-       private List<DependencyTask<?>> getApplyTasks(CacheBlock in, 
MatrixBlock out, int outputCol) {
+       private List<DependencyTask<?>> getApplyTasks(CacheBlock in, 
MatrixBlock out, int nPartition, int outputCol) {
                List<DependencyTask<?>> tasks = new ArrayList<>();
                int offset = outputCol;
                for(ColumnEncoderComposite e : _columnEncoders) {
-                       tasks.addAll(e.getApplyTasks(in, out, e._colID - 1 + 
offset));
+                       tasks.addAll(e.getApplyTasks(in, out, nPartition, 
e._colID - 1 + offset));
                        if(e.hasEncoder(ColumnEncoderDummycode.class))
                                offset += 
e.getEncoder(ColumnEncoderDummycode.class)._domainSize - 1;
                }
@@ -354,12 +358,12 @@ public class MultiColumnEncoder implements Encoder {
                        if(APPLY_ENCODER_SEPARATE_STAGES){
                                int offset = outputCol;
                                for (ColumnEncoderComposite e : 
_columnEncoders) {
-                                       
pool.submitAllAndWait(e.getApplyTasks(in, out, e._colID - 1 + offset));
+                                       
pool.submitAllAndWait(e.getApplyTasks(in, out, _nPartitions[1], e._colID - 1 + 
offset));
                                        if 
(e.hasEncoder(ColumnEncoderDummycode.class))
                                                offset += 
e.getEncoder(ColumnEncoderDummycode.class)._domainSize - 1;
                                }
                        }else{
-                               pool.submitAllAndWait(getApplyTasks(in, out, 
outputCol));
+                               pool.submitAllAndWait(getApplyTasks(in, out, 
_nPartitions[1], outputCol));
                        }
                }
                catch(ExecutionException | InterruptedException e) {
@@ -369,6 +373,57 @@ public class MultiColumnEncoder implements Encoder {
                pool.shutdown();
        }
 
+       private int[] getNumRowPartitions(CacheBlock in, int k) {
+               int[] numBlocks = new int[2];
+               if (k == 1) { //single-threaded
+                       numBlocks[0] = 1;
+                       numBlocks[1] = 1;
+                       return numBlocks;
+               }
+               // Read from global flags. These are set by the unit tests
+               if (ColumnEncoder.BUILD_ROW_BLOCKS_PER_COLUMN > 0)
+                       numBlocks[0] = 
ColumnEncoder.BUILD_ROW_BLOCKS_PER_COLUMN;
+               if (ColumnEncoder.APPLY_ROW_BLOCKS_PER_COLUMN > 0)
+                       numBlocks[1] = 
ColumnEncoder.APPLY_ROW_BLOCKS_PER_COLUMN;
+
+               // Read from the config file if set. These overwrite the 
derived values.
+               if (numBlocks[0] == 0 && 
ConfigurationManager.getParallelBuildBlocks() > 0)
+                       numBlocks[0] = 
ConfigurationManager.getParallelBuildBlocks();
+               if (numBlocks[1] == 0 && 
ConfigurationManager.getParallelApplyBlocks() > 0)
+                       numBlocks[1] = 
ConfigurationManager.getParallelApplyBlocks();
+
+               // Else, derive the optimum number of partitions
+               int nRow = in.getNumRows();
+               int nThread = OptimizerUtils.getTransformNumThreads(); //VCores
+               int minNumRows = 16000; //min rows per partition
+               // Count #Builds and #Applies (= #Col)
+               int nBuild = 0;
+               for (ColumnEncoderComposite e : _columnEncoders)
+                       if (e.hasBuild())
+                               nBuild++;
+               int nApply = in.getNumColumns();
+               // #BuildBlocks = (2 * #PhysicalCores)/#build
+               if (numBlocks[0] == 0 && nBuild < nThread)
+                       numBlocks[0] = Math.round(((float)nThread)/nBuild);
+               // #ApplyBlocks = (4 * #PhysicalCores)/#apply
+               if (numBlocks[1] == 0 && nApply < nThread*2)
+                       numBlocks[1] = Math.round(((float)nThread*2)/nBuild);
+
+               // Reduce #blocks if #rows per partition is too small
+               while (numBlocks[0] > 1 && nRow/numBlocks[0] < minNumRows)
+                       numBlocks[0]--;
+               while (numBlocks[1] > 1 && nRow/numBlocks[1] < minNumRows)
+                       numBlocks[1]--;
+
+               // Set to 1 if not set by the above logics
+               for (int i=0; i<2; i++)
+                       if (numBlocks[i] == 0)
+                               numBlocks[i] = 1; //default 1
+
+               return numBlocks;
+       }
+
+
        private static void outputMatrixPreProcessing(MatrixBlock output, 
CacheBlock input, boolean hasDC) {
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                if(output.isInSparseFormat()) {
@@ -426,7 +481,6 @@ public class MultiColumnEncoder implements Encoder {
        private void outputMatrixPostProcessing(MatrixBlock output){
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
                int k = OptimizerUtils.getTransformNumThreads();
-               ForkJoinPool myPool = new ForkJoinPool(k);
                if (k == 1) {
                        Set<Integer> indexSet = _columnEncoders.stream()
                                        
.map(ColumnEncoderComposite::getSparseRowsWZeros).flatMap(l -> {
@@ -441,14 +495,25 @@ public class MultiColumnEncoder implements Encoder {
                        }
                }
                else {
+                       ExecutorService myPool = CommonThreadPool.get(k);
                        try {
-                               Set<Integer> indexSet = 
_columnEncoders.stream().parallel()
+                               // Collect the row indices that need compaction
+                               Set<Integer> indexSet = myPool.submit(() ->
+                                       _columnEncoders.stream().parallel()
                                        
.map(ColumnEncoderComposite::getSparseRowsWZeros).flatMap(l -> {
                                                if(l == null)
                                                        return null;
                                                return l.stream();
-                                       }).collect(Collectors.toSet());
-                               
if(!indexSet.stream().parallel().allMatch(Objects::isNull)) {
+                                       }).collect(Collectors.toSet())
+                               ).get();
+
+                               // Check if the set is empty
+                               boolean emptySet = myPool.submit(() ->
+                                       
indexSet.stream().parallel().allMatch(Objects::isNull)
+                               ).get();
+
+                               // Concurrently compact the rows
+                               if (emptySet) {
                                        myPool.submit(() -> {
                                                
indexSet.stream().parallel().forEach(row -> {
                                                        
output.getSparseBlock().get(row).compact();
@@ -459,8 +524,8 @@ public class MultiColumnEncoder implements Encoder {
                        catch(Exception ex) {
                                throw new DMLRuntimeException(ex);
                        }
+                       myPool.shutdown();
                }
-               myPool.shutdown();
                output.recomputeNonZeros();
                if(DMLScript.STATISTICS)
                        
TransformStatistics.incOutMatrixPostProcessingTime(System.nanoTime()-t0);
@@ -929,20 +994,22 @@ public class MultiColumnEncoder implements Encoder {
                private final ColumnEncoder _encoder;
                private final MatrixBlock _out;
                private final CacheBlock _in;
+               private final int _nApplyPartition;
                private int _offset = -1; // offset dude to dummycoding in
                                                                        // 
previous columns needs to be updated by external task!
 
                private ApplyTasksWrapperTask(ColumnEncoder encoder, CacheBlock 
in, 
-                               MatrixBlock out, DependencyThreadPool pool) {
+                               MatrixBlock out, int nPart, 
DependencyThreadPool pool) {
                        super(pool);
                        _encoder = encoder;
                        _out = out;
                        _in = in;
+                       _nApplyPartition = nPart;
                }
 
                @Override
                public List<DependencyTask<?>> getWrappedTasks() {
-                       return _encoder.getApplyTasks(_in, _out, 
_encoder._colID - 1 + _offset);
+                       return _encoder.getApplyTasks(_in, _out, 
_nApplyPartition, _encoder._colID - 1 + _offset);
                }
 
                @Override

Reply via email to