This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit e03ef9691d111eca82e8c8bdb0373cd40bdc361e
Author: Matthias Boehm <[email protected]>
AuthorDate: Sat Sep 4 22:19:24 2021 +0200

    [SYSTEMDS-3122] Fix parfor degree of parallelism w/ eval functions
    
    Assume the following special-case (but increasingly common) scenario of
    three functions fun1, fun2, fun3, where fun1 might be, for example,
    hyper-parameter tuning with unknown models/functions. There was an issue
    where the parfor optimizer set the degree of parallelism to 112, and
    then tried to set all reachable program blocks and functions to a DOP 1.
    However, because it encounters an eval with unknown function call, it
    recompiled all existing functions (including fun1) to DOP 1 and thus,
    destroyed its own optimization decisions. This patch now properly fixes
    these decisions (for a tree of nested parfor) when recompiling eval
    functions.
    
    function fun1()
      parfor(i in 1:n)
        eval("fun2", X, y)
    
    function fun2()
      fun3()
    
    function fun3()
      X = X + 1
    
    On the topk-cleaning pipeline enumeration until the hyper-parameter
    tuning for dirty baseline accuracy, this patch improved the end-to-end
    runtime from 51s to 11s.
---
 .../sysds/runtime/controlprogram/ParForProgramBlock.java      | 11 ++++++++++-
 .../runtime/controlprogram/paramserv/ParamservUtils.java      |  6 ++++--
 .../runtime/controlprogram/parfor/opt/OptimizerRuleBased.java | 11 +++++++++--
 3 files changed, 23 insertions(+), 5 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java 
b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
index 42ab8bc..a289218 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/ParForProgramBlock.java
@@ -320,6 +320,7 @@ public class ParForProgramBlock extends ForProgramBlock
        protected final boolean _monitor;
        protected final Level _optLogLevel;
        protected int _numThreads = -1;
+       protected boolean _fixedDOP = false; //guard for numThreads
        protected long _taskSize = -1;
        protected PTaskPartitioner _taskPartitioner = null;
        protected PDataPartitioner _dataPartitioner = null;
@@ -471,6 +472,14 @@ public class ParForProgramBlock extends ForProgramBlock
                _params.put(ParForStatementBlock.PAR, 
String.valueOf(_numThreads)); //kept up-to-date for copies
                setLocalParWorkerIDs();
        }
+       
+       public boolean isDegreeOfParallelismFixed() {
+               return _fixedDOP;
+       }
+       
+       public void setDegreeOfParallelismFixed(boolean flag) {
+               _fixedDOP = flag;
+       }
 
        public void setCPCaching(boolean flag) {
                _enableCPCaching = flag;
@@ -1187,7 +1196,7 @@ public class ParForProgramBlock extends ForProgramBlock
                try
                {
                        //create deep copies of required elements child blocks
-                       ArrayList<ProgramBlock> cpChildBlocks = null;   
+                       ArrayList<ProgramBlock> cpChildBlocks = null;
                        HashSet<String> fnNames = new HashSet<>();
                        if( USE_PB_CACHE )
                        {
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
index da1e9f7..b25c7df 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/paramserv/ParamservUtils.java
@@ -309,8 +309,10 @@ public class ParamservUtils {
                for (ProgramBlock pb : pbs) {
                        if (pb instanceof ParForProgramBlock) {
                                ParForProgramBlock pfpb = (ParForProgramBlock) 
pb;
-                               pfpb.setDegreeOfParallelism(k);
-                               recompiled |= 
rAssignParallelismAndRecompile(pfpb.getChildBlocks(), 1, recompiled, 
forceExecTypeCP);
+                               if( !pfpb.isDegreeOfParallelismFixed() ) {
+                                       pfpb.setDegreeOfParallelism(k);
+                                       recompiled |= 
rAssignParallelismAndRecompile(pfpb.getChildBlocks(), 1, recompiled, 
forceExecTypeCP);
+                               }
                        } else if (pb instanceof ForProgramBlock) {
                                recompiled |= 
rAssignParallelismAndRecompile(((ForProgramBlock) pb).getChildBlocks(), k, 
recompiled, forceExecTypeCP);
                        } else if (pb instanceof WhileProgramBlock) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index cf2c091..b2c82c1 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -1178,12 +1178,14 @@ public class OptimizerRuleBased extends Optimizer {
                        
                        //set parfor degree of parallelism
                        pfpb.setDegreeOfParallelism(parforK);
+                       pfpb.setDegreeOfParallelismFixed(true);
                        n.setK(parforK);
                        
                        //distribute remaining parallelism 
                        int remainParforK = getRemainingParallelismParFor(kMax, 
parforK);
                        int remainOpsK = getRemainingParallelismOps(_lkmaxCP, 
parforK);
                        rAssignRemainingParallelism( n, remainParforK, 
remainOpsK );
+                       pfpb.setDegreeOfParallelismFixed(false);
                }
                else // ExecType.MR/ExecType.SPARK
                {
@@ -1212,7 +1214,9 @@ public class OptimizerRuleBased extends Optimizer {
                                kMax = 1;
                        
                        //distribute remaining parallelism and recompile 
parallel instructions
+                       pfpb.setDegreeOfParallelismFixed(true);
                        rAssignRemainingParallelism( n, kMax, 1 );
+                       pfpb.setDegreeOfParallelismFixed(false);
                }
                
                _numEvaluatedPlans++;
@@ -1247,14 +1251,15 @@ public class OptimizerRuleBased extends Optimizer {
                                        //set parfor degree of parallelism
                                        long id = c.getID();
                                        c.setK(tmpK);
-                                       ParForProgramBlock pfpb = 
(ParForProgramBlock) 
-                                               _plan.getMappedProgramBlock(id);
+                                       ParForProgramBlock pfpb = 
(ParForProgramBlock) _plan.getMappedProgramBlock(id);
                                        pfpb.setDegreeOfParallelism(tmpK);
                                        
                                        //distribute remaining parallelism
                                        int remainParforK = 
getRemainingParallelismParFor(parforK, tmpK);
                                        int remainOpsK = 
getRemainingParallelismOps(opsK, tmpK);
+                                       pfpb.setDegreeOfParallelismFixed(true);
                                        rAssignRemainingParallelism(c, 
remainParforK, remainOpsK);
+                                       pfpb.setDegreeOfParallelismFixed(false);
                                }
                                else if( c.getNodeType() == NodeType.HOP )
                                {
@@ -1278,6 +1283,8 @@ public class OptimizerRuleBased extends Optimizer {
                                        }
                                        
                                        //if parfor contains eval call, make 
unoptimized functions single-threaded
+                                       //(parent parfor program blocks have 
been frozen such that the following
+                                       //recompilation of all possible 
functions does not reset the DOP to 1)
                                        if( HopRewriteUtils.isNary(h, 
OpOpN.EVAL) ) {
                                                ProgramBlock pb = 
_plan.getMappedProgramBlock(n.getID());
                                                
pb.getProgram().getFunctionProgramBlocks(false)

Reply via email to