[SYSTEMML-2400] Fix dynamic recompiler to preserve par constraints Dynamic recompilation deep copies the hop DAG and applies - among other things - various rewrites. In case these rewrites create new hops (no in-place modification) they lose the max thread constraints set by parfor and paramserv. On nodes with large degree of parallelism this constitutes a serious issue because it leads to large over-provisioning and thus reduced performance and potential OOMs. This patch fixes the issue in the core of the recompiler by obtaining the maximum thread constraints before rewrites and reapplying this max constraint before lop construction.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/23df1484 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/23df1484 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/23df1484 Branch: refs/heads/master Commit: 23df1484fa51a414169c6c1d9ed4e73c7e6160d0 Parents: 7b29464 Author: Matthias Boehm <mboe...@gmail.com> Authored: Fri Jun 15 18:58:11 2018 -0700 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Fri Jun 15 18:58:47 2018 -0700 ---------------------------------------------------------------------- .../apache/sysml/hops/recompile/Recompiler.java | 50 ++++++++++++++++++-- 1 file changed, 45 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/23df1484/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java index 0364724..a7df6a0 100644 --- a/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java +++ b/src/main/java/org/apache/sysml/hops/recompile/Recompiler.java @@ -51,6 +51,7 @@ import org.apache.sysml.hops.HopsException; import org.apache.sysml.hops.IndexingOp; import org.apache.sysml.hops.LiteralOp; import org.apache.sysml.hops.MemoTable; +import org.apache.sysml.hops.MultiThreadedHop; import org.apache.sysml.hops.OptimizerUtils; import org.apache.sysml.hops.UnaryOp; import org.apache.sysml.hops.codegen.SpoofCompiler; @@ -310,6 +311,10 @@ public class Recompiler rClearLops( hopRoot ); } + // get max parallelism constraint, see below + Hop.resetVisitStatus(hops); + int maxK = rGetMaxParallelism(hops); + // replace scalar reads with literals if( !inplace && replaceLit ) { Hop.resetVisitStatus(hops); @@ -366,6 +371,11 @@ public class Recompiler (status==null || !status.isInitialCodegen())); } + // set max parallelism constraint to ensure compilation + // incl rewrites does not lose these hop-lop constraints + Hop.resetVisitStatus(hops); + rSetMaxParallelism(hops, maxK); + // construct lops Dag<Lop> dag = new Dag<>(); for( Hop hopRoot : hops ){ @@ -1404,21 +1414,51 @@ public class Recompiler LiteralReplacement.rReplaceLiterals(hop, vars, scalarsOnly); } - public static void rSetExecType( Hop hop, ExecType etype ) - { + public static void rSetExecType( Hop hop, ExecType etype ) { if( hop.isVisited() ) return; - //update function names hop.setForcedExecType(etype); - if( hop.getInput() != null ) for( Hop c : hop.getInput() ) rSetExecType(c, etype); - hop.setVisited(); } + public static int rGetMaxParallelism(List<Hop> hops) { + int ret = -1; + for( Hop c : hops ) + ret = Math.max(ret, rGetMaxParallelism(c)); + return ret; + } + + public static int rGetMaxParallelism(Hop hop) { + if( hop.isVisited() ) + return -1; + //recursively process children and + int ret = rGetMaxParallelism(hop.getInput()); + //obtain max num thread constraints + if( hop instanceof MultiThreadedHop ) + ret = Math.max(ret, ((MultiThreadedHop)hop).getMaxNumThreads()); + hop.setVisited(); + return ret; + } + + public static void rSetMaxParallelism(List<Hop> hops, int k) { + for( Hop c : hops ) + rSetMaxParallelism(c, k); + } + + public static void rSetMaxParallelism(Hop hop, int k) { + if( hop.isVisited() ) + return; + //recursively process children + rSetMaxParallelism(hop.getInput(), k); + //set max num thread constraint + if( hop instanceof MultiThreadedHop ) + ((MultiThreadedHop)hop).setMaxNumThreads(k); + hop.setVisited(); + } /** * Returns true iff (1) all instruction are reblock instructions and (2) all