This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 9316ae2  [SYSTEMDS-3279] Parallel allocation for transformencode
9316ae2 is described below

commit 9316ae2c1c0f238d1c529ef8078020b3c89b44d9
Author: arnabp <[email protected]>
AuthorDate: Wed Jan 26 15:11:21 2022 +0100

    [SYSTEMDS-3279] Parallel allocation for transformencode
    
    This patch adds an optimization to make allocation parallel to
    the Build phase for dummycoding followed by binning and
    dummycoding followed by feature hashing. We can derive the
    number of output columns (#bins, K) even before the build
    phase completes. Moreover, this patch fixes a bug to allow
    dummycoding followed by feature hashing.
---
 .../transform/encode/ColumnEncoderComposite.java   | 10 ++++-
 .../transform/encode/MultiColumnEncoder.java       | 48 ++++++++++++++++------
 .../apache/sysds/runtime/util/DependencyTask.java  |  6 ++-
 .../sysds/runtime/util/DependencyThreadPool.java   |  2 +-
 .../TransformFrameEncodeMultithreadedTest.java     | 17 +++++++-
 .../datasets/homes3/homes.tfspec_hash_dummy.json   |  2 +
 6 files changed, 67 insertions(+), 18 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
index a4a9563..b7c162b 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
@@ -161,13 +161,19 @@ public class ColumnEncoderComposite extends ColumnEncoder 
{
                        }
                        tasks.addAll(t);
                }
+
                List<List<? extends Callable<?>>> dep = new 
ArrayList<>(Collections.nCopies(tasks.size(), null));
                DependencyThreadPool.createDependencyList(tasks, depMap, dep);
+               // If DC is required, add an UpdateDC task to update the 
domainsize as the last task
+               // Only for RC build, UpdateDC must depends on the Build task, 
other can be independent.
                if(hasEncoder(ColumnEncoderDummycode.class)) {
                        tasks.add(DependencyThreadPool.createDependencyTask(new 
ColumnCompositeUpdateDCTask(this)));
-                       dep.add(tasks.subList(tasks.size() - 2, tasks.size() - 
1));
+                       if (_columnEncoders.get(0) instanceof 
ColumnEncoderRecode) {
+                               dep.add(tasks.subList(tasks.size() - 2, 
tasks.size() - 1));
+                               return 
DependencyThreadPool.createDependencyTasks(tasks, dep);
+                       }
                }
-               return DependencyThreadPool.createDependencyTasks(tasks, dep);
+               return DependencyThreadPool.createDependencyTasks(tasks, null);
        }
 
        @Override
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
index 09ecf64..053c452 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
@@ -141,7 +141,7 @@ public class MultiColumnEncoder implements Encoder {
         * BuildTask:                   Build an encoder
         * ColumnCompositeUpdateDCTask: Update domain size of a DC encoder 
based on #distincts, #bins, K
         * ColumnMetaDataTask:          Fill up metadata of an encoder
-        * ApplyTasksWrapperTask:       Wrapper task for an Apply
+        * ApplyTasksWrapperTask:       Wrapper task for an Apply task
         * UpdateOutputColTask:         Set starting offsets of the DC columns
         */
        private List<DependencyTask<?>> getEncodeTasks(CacheBlock in, 
MatrixBlock out, DependencyThreadPool pool) {
@@ -150,6 +150,7 @@ public class MultiColumnEncoder implements Encoder {
                Map<Integer[], Integer[]> depMap = new HashMap<>();
                boolean hasDC = 
getColumnEncoders(ColumnEncoderDummycode.class).size() > 0;
                boolean applyOffsetDep = false;
+               boolean independentUpdateDC = false;
                _meta = new FrameBlock(in.getNumColumns(), ValueType.STRING);
                // Create the output and metadata allocation tasks
                tasks.add(DependencyThreadPool.createDependencyTask(new 
InitOutputMatrixTask(this, in, out)));
@@ -160,15 +161,36 @@ public class MultiColumnEncoder implements Encoder {
                        List<DependencyTask<?>> buildTasks = 
e.getBuildTasks(in);
                        tasks.addAll(buildTasks);
                        if(buildTasks.size() > 0) {
-                               // Apply Task depends on build completion task
-                               depMap.put(new Integer[] {tasks.size(), 
tasks.size() + 1},      //ApplyTask
-                                       new Integer[] {tasks.size() - 1, 
tasks.size()});            //BuildTask
-                               // getMetaDataTask depends on build completion
-                               depMap.put(new Integer[] {tasks.size() + 1, 
tasks.size() + 2}, //MetaDataTask
-                                       new Integer[] {tasks.size() - 1, 
tasks.size()});           //BuildTask
-                               // AllocMetaTask depends on the build 
completion tasks
-                               depMap.put(new Integer[] {1, 2},                
               //AllocMetaTask (2nd task)
-                                       new Integer[] {tasks.size() - 1, 
tasks.size()});           //BuildTask
+                               // Check if any Build independent UpdateDC task 
(Bin+DC, FH+DC)
+                               if (e.hasEncoder(ColumnEncoderDummycode.class) 
+                                       && buildTasks.size() > 1  //filter out 
FH
+                                       && 
!buildTasks.get(buildTasks.size()-2).hasDependency(buildTasks.get(buildTasks.size()-1)))
+                                               independentUpdateDC = true;
+                               
+                               // Independent UpdateDC task
+                               if (independentUpdateDC) {
+                                       // Apply Task depends on task prior to 
UpdateDC (Build/MergePartialBuild)
+                                       depMap.put(new Integer[] {tasks.size(), 
tasks.size() + 1},     //ApplyTask
+                                               new Integer[] {tasks.size() - 
2, tasks.size() - 1});       //BuildTask
+                                       // getMetaDataTask depends on task 
prior to UpdateDC 
+                                       depMap.put(new Integer[] {tasks.size() 
+ 1, tasks.size() + 2}, //MetaDataTask
+                                               new Integer[] {tasks.size() - 
2, tasks.size() - 1});       //BuildTask
+                               }
+                               else { 
+                                       // Apply Task depends on the last task 
(Build/MergePartial/UpdateDC)
+                                       depMap.put(new Integer[] {tasks.size(), 
tasks.size() + 1},     //ApplyTask
+                                               new Integer[] {tasks.size() - 
1, tasks.size()});           //Build/UpdateDC
+                                       // getMetaDataTask depends on build 
completion
+                                       depMap.put(new Integer[] {tasks.size() 
+ 1, tasks.size() + 2}, //MetaDataTask
+                                               new Integer[] {tasks.size() - 
1, tasks.size()});           //Build/UpdateDC
+                               }
+                               // AllocMetaTask never depends on the UpdateDC 
task
+                               if (e.hasEncoder(ColumnEncoderDummycode.class) 
&& buildTasks.size() > 1)
+                                       depMap.put(new Integer[] {1, 2},        
                       //AllocMetaTask (2nd task)
+                                               new Integer[] {tasks.size() - 
2, tasks.size()-1});         //BuildTask
+                               else
+                                       depMap.put(new Integer[] {1, 2},        
                       //AllocMetaTask (2nd task)
+                                               new Integer[] {tasks.size() - 
1, tasks.size()});           //BuildTask
                        }
 
                        // getMetaDataTask depends on AllocMeta task
@@ -356,8 +378,8 @@ public class MultiColumnEncoder implements Encoder {
                        if (MatrixBlock.DEFAULT_SPARSEBLOCK != 
SparseBlock.Type.CSR
                                        && MatrixBlock.DEFAULT_SPARSEBLOCK != 
SparseBlock.Type.MCSR)
                                throw new RuntimeException("Transformapply is 
only supported for MCSR and CSR output matrix");
-                       boolean mcsr = MatrixBlock.DEFAULT_SPARSEBLOCK == 
SparseBlock.Type.MCSR;
-                       mcsr = false; //force CSR for transformencode
+                       //boolean mcsr = MatrixBlock.DEFAULT_SPARSEBLOCK == 
SparseBlock.Type.MCSR;
+                       boolean mcsr = false; //force CSR for transformencode
                        if (mcsr) {
                                output.allocateBlock();
                                SparseBlock block = output.getSparseBlock();
@@ -461,7 +483,7 @@ public class MultiColumnEncoder implements Encoder {
                                columnEncoder.getMetaData(meta);
                }
 
-               //_columnEncoders.stream().parallel().forEach(columnEncoder -> 
+               //_columnEncoders.stream().parallel().forEach(columnEncoder ->
                //              columnEncoder.getMetaData(meta));
                if(_legacyOmit != null)
                        _legacyOmit.getMetaData(meta);
diff --git a/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java 
b/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java
index 17351a6..69c25fe 100644
--- a/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java
+++ b/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java
@@ -68,11 +68,15 @@ public class DependencyTask<E> implements 
Comparable<DependencyTask<?>>, Callabl
                        return isReady();
                }
        }
-
+       
        public void addDependent(DependencyTask<?> dependencyTask) {
                _dependantTasks.add(dependencyTask);
                dependencyTask._rdy += 1;
        }
+       
+       public boolean hasDependency (DependencyTask<?> dependencyTask) {
+               return _dependantTasks.contains(dependencyTask);
+       }
 
        @Override
        public E call() throws Exception {
diff --git 
a/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java 
b/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java
index 90d1dfc..1e8f99b 100644
--- a/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java
+++ b/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java
@@ -148,7 +148,7 @@ public class DependencyThreadPool {
                List<List<? extends Callable<?>>> dependencies) {
                if(dependencies != null && tasks.size() != dependencies.size())
                        throw new DMLRuntimeException(
-                               "Could not create DependencyTasks since the 
input array sizes are where mismatched");
+                               "Could not create DependencyTasks since the 
input array sizes are mismatching");
                List<DependencyTask<?>> ret = new ArrayList<>();
                Map<Callable<?>, DependencyTask<?>> map = new HashMap<>();
                for(Callable<?> task : tasks) {
diff --git 
a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
 
b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
index 560f934..ff24a99 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameEncodeMultithreadedTest.java
@@ -56,13 +56,14 @@ public class TransformFrameEncodeMultithreadedTest extends 
AutomatedTestBase {
        private final static String SPEC7 = 
"homes3/homes.tfspec_binDummy.json"; // recode+dummy
        private final static String SPEC8 = "homes3/homes.tfspec_hash.json";
        private final static String SPEC9 = 
"homes3/homes.tfspec_hash_recode.json";
+       private final static String SPEC10 = 
"homes3/homes.tfspec_hash_dummy.json";
 
        private static final int[] BIN_col3 = new int[] {1, 4, 2, 3, 3, 2, 4};
        private static final int[] BIN_col8 = new int[] {1, 2, 2, 2, 2, 2, 3};
 
        public enum TransformType {
                RECODE, DUMMY, DUMMY_ALL, // to test sparse
-               RECODE_DUMMY, BIN, BIN_DUMMY, HASH, HASH_RECODE,
+               RECODE_DUMMY, BIN, BIN_DUMMY, HASH, HASH_RECODE, HASH_DUMMY
        }
 
        @Override
@@ -112,6 +113,11 @@ public class TransformFrameEncodeMultithreadedTest extends 
AutomatedTestBase {
        }
 
        @Test
+       public void testHomesHashDummyCodeNonStaged() {
+               runTransformTest(ExecMode.SINGLE_NODE, "csv", 
TransformType.HASH_DUMMY, false);
+       }
+
+       @Test
        public void testHomesRecodeStaged() {
                runTransformTest(ExecMode.SINGLE_NODE, "csv", 
TransformType.RECODE, true);
        }
@@ -151,6 +157,11 @@ public class TransformFrameEncodeMultithreadedTest extends 
AutomatedTestBase {
                runTransformTest(ExecMode.SINGLE_NODE, "csv", 
TransformType.HASH_RECODE, true);
        }
 
+       @Test
+       public void testHomesHashDummyCodeStaged() {
+               runTransformTest(ExecMode.SINGLE_NODE, "csv", 
TransformType.HASH_DUMMY, true);
+       }
+
        private void runTransformTest(ExecMode rt, String ofmt, TransformType 
type, boolean staged) {
 
                // set transform specification
@@ -189,6 +200,10 @@ public class TransformFrameEncodeMultithreadedTest extends 
AutomatedTestBase {
                                SPEC = SPEC9;
                                DATASET = DATASET1;
                                break;
+                       case HASH_DUMMY:
+                               SPEC = SPEC10;
+                               DATASET = DATASET1;
+                               break;
                }
 
                if(!ofmt.equals("csv"))
diff --git a/src/test/resources/datasets/homes3/homes.tfspec_hash_dummy.json 
b/src/test/resources/datasets/homes3/homes.tfspec_hash_dummy.json
new file mode 100644
index 0000000..631b3be
--- /dev/null
+++ b/src/test/resources/datasets/homes3/homes.tfspec_hash_dummy.json
@@ -0,0 +1,2 @@
+{
+    "ids": true, "hash": [ 1, 2, 7 ], "K": 100, "dummycode": [ 1, 2, 3, 6 ] }

Reply via email to