[SYSTEMML-2400] Fix dynamic recompiler to preserve par constraints

Dynamic recompilation deep copies the hop DAG and applies - among other
things - various rewrites. In case these rewrites create new hops (no
in-place modification) they lose the max thread constraints set by
parfor and paramserv. On nodes with large degree of parallelism this
constitutes a serious issue because it leads to large over-provisioning
and thus reduced performance and potential OOMs. This patch fixes the
issue in the core of the recompiler by obtaining the maximum thread
constraints before rewrites and reapplying this max constraint before
lop construction. 


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/23df1484
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/23df1484
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/23df1484

Branch: refs/heads/master
Commit: 23df1484fa51a414169c6c1d9ed4e73c7e6160d0
Parents: 7b29464
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Fri Jun 15 18:58:11 2018 -0700
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Fri Jun 15 18:58:47 2018 -0700

----------------------------------------------------------------------
 .../apache/sysml/hops/recompile/Recompiler.java | 50 ++++++++++++++++++--
 1 file changed, 45 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/23df1484/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java 
b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
index 0364724..a7df6a0 100644
--- a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
+++ b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java
@@ -51,6 +51,7 @@ import org.apache.sysml.hops.HopsException;
 import org.apache.sysml.hops.IndexingOp;
 import org.apache.sysml.hops.LiteralOp;
 import org.apache.sysml.hops.MemoTable;
+import org.apache.sysml.hops.MultiThreadedHop;
 import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.hops.UnaryOp;
 import org.apache.sysml.hops.codegen.SpoofCompiler;
@@ -310,6 +311,10 @@ public class Recompiler
                                rClearLops( hopRoot );
                }
                
+               // get max parallelism constraint, see below
+               Hop.resetVisitStatus(hops);
+               int maxK = rGetMaxParallelism(hops);
+               
                // replace scalar reads with literals 
                if( !inplace && replaceLit ) {
                        Hop.resetVisitStatus(hops);
@@ -366,6 +371,11 @@ public class Recompiler
                                (status==null || !status.isInitialCodegen()));
                }
                
+               // set max parallelism constraint to ensure compilation 
+               // incl rewrites does not lose these hop-lop constraints
+               Hop.resetVisitStatus(hops);
+               rSetMaxParallelism(hops, maxK);
+               
                // construct lops
                Dag<Lop> dag = new Dag<>();
                for( Hop hopRoot : hops ){
@@ -1404,21 +1414,51 @@ public class Recompiler
                LiteralReplacement.rReplaceLiterals(hop, vars, scalarsOnly);
        }
        
-       public static void rSetExecType( Hop hop, ExecType etype )
-       {
+       public static void rSetExecType( Hop hop, ExecType etype ) {
                if( hop.isVisited() )
                        return;
-               
                //update function names
                hop.setForcedExecType(etype);
-               
                if( hop.getInput() != null )
                        for( Hop c : hop.getInput() )
                                rSetExecType(c, etype);
-               
                hop.setVisited();
        }
        
+       public static int rGetMaxParallelism(List<Hop> hops) {
+               int ret = -1;
+               for( Hop c : hops )
+                       ret = Math.max(ret, rGetMaxParallelism(c));
+               return ret;
+       }
+       
+       public static int rGetMaxParallelism(Hop hop) {
+               if( hop.isVisited() )
+                       return -1;
+               //recursively process children and
+               int ret = rGetMaxParallelism(hop.getInput());
+               //obtain max num thread constraints
+               if( hop instanceof MultiThreadedHop )
+                       ret = Math.max(ret, 
((MultiThreadedHop)hop).getMaxNumThreads());
+               hop.setVisited();
+               return ret;
+       }
+       
+       public static void rSetMaxParallelism(List<Hop> hops, int k) {
+               for( Hop c : hops )
+                       rSetMaxParallelism(c, k);
+       }
+       
+       public static void rSetMaxParallelism(Hop hop, int k) {
+               if( hop.isVisited() )
+                       return;
+               //recursively process children
+               rSetMaxParallelism(hop.getInput(), k);
+               //set max num thread constraint
+               if( hop instanceof MultiThreadedHop )
+                       ((MultiThreadedHop)hop).setMaxNumThreads(k);
+               hop.setVisited();
+       }
 
        /**
         * Returns true iff (1) all instruction are reblock instructions and 
(2) all

Reply via email to