This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new bfa2234 [SYSTEMDS-3121] Fix robustness concurrent parfor optimization
(in eval)
bfa2234 is described below
commit bfa2234064c360ac15c88efa1386e2b4bf38dd25
Author: Matthias Boehm <[email protected]>
AuthorDate: Fri Sep 3 20:26:49 2021 +0200
[SYSTEMDS-3121] Fix robustness concurrent parfor optimization (in eval)
This patch fixes, the historically extended, parfor optimization tree
with static access to a global mapping to programs and operators. While
normally only the top-level parfor optimizes the entire (potentially
nested parfor) body program, when primitives are invoked through eval,
multiple parfor optimizers can indeed run concurrently, and thus corrupt
each other. We now properly refactored these mappings to plan local data
structures, which also enables additional parallelization of cleaning
primitives.
---
scripts/pipelines/scripts/utils.dml | 22 +-
.../runtime/controlprogram/parfor/opt/OptTree.java | 57 +++--
.../parfor/opt/OptTreeConverter.java | 246 ++++++++++-----------
.../parfor/opt/OptimizationWrapper.java | 21 +-
.../parfor/opt/OptimizerConstrained.java | 31 +--
.../parfor/opt/OptimizerRuleBased.java | 144 ++++++------
6 files changed, 245 insertions(+), 276 deletions(-)
diff --git a/scripts/pipelines/scripts/utils.dml
b/scripts/pipelines/scripts/utils.dml
index d2916f1..f6d3d01 100644
--- a/scripts/pipelines/scripts/utils.dml
+++ b/scripts/pipelines/scripts/utils.dml
@@ -286,11 +286,10 @@ topk_gridSearch = function(Matrix[Double] X,
Matrix[Double] y, Matrix[Double] Xt
}
else if(cv & train == "multiLogReg")
{
- for( i in 1:nrow(HP) ) {
+ parfor( i in 1:nrow(HP) ) {
# a) replace training arguments
# acc = utils::crossVML(X, y, cvk, HP[i]);
k = cvk
- accuracyMatrix = matrix(0, k, 1)
dataList = list()
testL = list()
data = order(target = cbind(y, X), by = 1, decreasing=FALSE,
index.return=FALSE)
@@ -319,8 +318,9 @@ topk_gridSearch = function(Matrix[Double] X, Matrix[Double]
y, Matrix[Double] Xt
fold_idxes[, 2] += ins_per_fold
}
- for(i in seq(1,k))
- {
+ cvbeta = matrix(0,1,numB);
+ cvloss = matrix(0,1,1);
+ for(i in seq(1,k)) {
[trainList, hold_out] = remove(dataList, i)
trainset = rbind(trainList)
testset = as.matrix(hold_out)
@@ -328,14 +328,15 @@ topk_gridSearch = function(Matrix[Double] X,
Matrix[Double] y, Matrix[Double] Xt
trainy = trainset[, 1]
testsetX = testset[, 2:ncol(testset)]
testsety = testset[, 1]
- beta = multiLogReg(X=trainX, Y=trainy, icpt=as.scalar(HP[1,1]),
reg=as.scalar(HP[1,2]), tol=as.scalar(HP[1,3]),
+ lbeta = multiLogReg(X=trainX, Y=trainy, icpt=as.scalar(HP[1,1]),
reg=as.scalar(HP[1,2]), tol=as.scalar(HP[1,3]),
maxi=as.scalar(HP[1,4]), maxii=50, verbose=FALSE);
- [prob, yhat, accuracy] = multiLogRegPredict(testsetX, beta, testsety,
FALSE)
- accuracyMatrix[i] = accuracy
+ [prob, yhat, accuracy] = multiLogRegPredict(testsetX, lbeta, testsety,
FALSE)
+ cvbeta += lbeta;
+ cvloss += as.matrix(accuracy);
}
-
- Rloss[i,] = mean(accuracyMatrix)
- }
+ Rbeta[i,] = cvbeta / k;
+ Rloss[i,] = cvloss / k;
+ }
}
# without cross-validation
else {
@@ -356,4 +357,3 @@ topk_gridSearch = function(Matrix[Double] X, Matrix[Double]
y, Matrix[Double] Xt
B = t(Rbeta[ix,]); # optimal model
opt = as.frame(HP[ix,]); # optimal hyper-parameters
}
-
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTree.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTree.java
index bf14ff6..96ce36d 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTree.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTree.java
@@ -19,6 +19,8 @@
package org.apache.sysds.runtime.controlprogram.parfor.opt;
+import org.apache.sysds.hops.Hop;
+import org.apache.sysds.runtime.controlprogram.ProgramBlock;
import
org.apache.sysds.runtime.controlprogram.parfor.opt.Optimizer.PlanInputType;
/**
@@ -29,8 +31,9 @@ import
org.apache.sysds.runtime.controlprogram.parfor.opt.Optimizer.PlanInputTyp
*/
public class OptTree
{
+ private final OptTreePlanMappingAbstract _hlMap;
+ private final OptTreePlanMappingRuntime _rtMap;
-
//global contraints
private int _ck; //max constraint degree of parallelism
private double _cm; //max constraint memory consumption
@@ -39,51 +42,61 @@ public class OptTree
private PlanInputType _type = null;
private OptNode _root = null;
-
- public OptTree( int ck, double cm, OptNode node )
- {
- this( ck, cm, PlanInputType.RUNTIME_PLAN, node );
+ public OptTree( int ck, double cm, OptNode node ) {
+ this( ck, cm, PlanInputType.RUNTIME_PLAN, node, null, null);
}
- public OptTree( int ck, double cm, PlanInputType type, OptNode node )
- {
+ public OptTree( int ck, double cm, PlanInputType type, OptNode node,
+ OptTreePlanMappingAbstract hlMap, OptTreePlanMappingRuntime
rtMap) {
_ck = ck;
_cm = cm;
-
_type = type;
_root = node;
+ _hlMap = hlMap;
+ _rtMap = rtMap;
}
- ///////
- // getter and setter
+ public OptTreePlanMappingAbstract getAbstractPlanMapping() {
+ return _hlMap;
+ }
- public int getCK()
- {
+ public OptTreePlanMappingRuntime getRuntimePlanMapping() {
+ return _rtMap;
+ }
+
+ public Hop getMappedHop( long id ) {
+ return _hlMap.getMappedHop(id);
+ }
+
+ public Object[] getMappedProg( long id ) {
+ return _hlMap.getMappedProg(id);
+ }
+
+ public ProgramBlock getMappedProgramBlock(long id) {
+ return _hlMap.getMappedProgramBlock(id);
+ }
+
+ public int getCK() {
return _ck;
}
- public double getCM()
- {
+ public double getCM() {
return _cm;
}
- public PlanInputType getPlanInputType()
- {
+ public PlanInputType getPlanInputType() {
return _type;
}
- public void setPlanInputType( PlanInputType type )
- {
+ public void setPlanInputType( PlanInputType type ) {
_type = type;
}
- public OptNode getRoot()
- {
+ public OptNode getRoot() {
return _root;
}
- public void setRoot( OptNode n )
- {
+ public void setRoot( OptNode n ) {
_root = n;
}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreeConverter.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreeConverter.java
index 91a703a..296d40d 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreeConverter.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptTreeConverter.java
@@ -77,59 +77,46 @@ public class OptTreeConverter
{
//internal configuration flags
public static boolean INCLUDE_FUNCTIONS = true;
-
- //internal state
- private static OptTreePlanMappingAbstract _hlMap = null;
- private static OptTreePlanMappingRuntime _rtMap = null;
-
- static
- {
- _hlMap = new OptTreePlanMappingAbstract();
- _rtMap = new OptTreePlanMappingRuntime();
- }
-
+
public static OptTree createOptTree( int ck, double cm, PlanInputType
type, ParForStatementBlock pfsb, ParForProgramBlock pfpb, ExecutionContext ec )
{
- OptNode root = null;
- switch( type )
- {
- case ABSTRACT_PLAN:
- _hlMap.putRootProgram(pfsb.getDMLProg(),
pfpb.getProgram());
+ switch( type ) {
+ case ABSTRACT_PLAN: {
+ OptTreePlanMappingAbstract hlMap = new
OptTreePlanMappingAbstract();
+ hlMap.putRootProgram(pfsb.getDMLProg(),
pfpb.getProgram());
Set<String> memo = new HashSet<>();
- root = rCreateAbstractOptNode(pfsb, pfpb,
ec.getVariables(), true, memo);
+ OptNode root = rCreateAbstractOptNode(pfsb,
pfpb, ec.getVariables(), true, hlMap, memo);
root.checkAndCleanupRecursiveFunc(new
HashSet<String>()); //create consistency between recursive info
root.checkAndCleanupLeafNodes(); //prune
unnecessary nodes
- break;
- case RUNTIME_PLAN:
- root = rCreateOptNode( pfpb, ec.getVariables(),
true, true );
- break;
+ return new OptTree(ck, cm, type, root, hlMap,
null);
+ }
+ case RUNTIME_PLAN: {
+ OptTreePlanMappingRuntime rtMap = new
OptTreePlanMappingRuntime();
+ OptNode root = rCreateOptNode( pfpb,
ec.getVariables(), true, rtMap, true );
+ return new OptTree(ck, cm, type, root, null,
rtMap);
+ }
default:
throw new DMLRuntimeException("Optimizer plan
input type "+type+" not supported.");
}
-
- OptTree tree = new OptTree(ck, cm, type, root);
-
- return tree;
}
- public static OptTree createAbstractOptTree( int ck, double cm,
ParForStatementBlock pfsb, ParForProgramBlock pfpb, Set<String> memo,
ExecutionContext ec )
+ public static OptTree createAbstractOptTree( int ck, double cm,
ParForStatementBlock pfsb,
+ ParForProgramBlock pfpb, OptTreePlanMappingAbstract hlMap,
Set<String> memo, ExecutionContext ec )
{
OptTree tree = null;
OptNode root = null;
- try
- {
- root = rCreateAbstractOptNode( pfsb, pfpb,
ec.getVariables(), true, memo );
+ try {
+ root = rCreateAbstractOptNode( pfsb, pfpb,
ec.getVariables(), true, hlMap, memo );
tree = new OptTree(ck, cm, root);
}
- catch(HopsException he)
- {
- throw new DMLRuntimeException(he);
- }
-
+ catch(HopsException ex) {
+ throw new DMLRuntimeException(ex);
+ }
+
return tree;
}
- public static OptNode rCreateOptNode( ProgramBlock pb, LocalVariableMap
vars, boolean topLevel, boolean storeObjs )
+ public static OptNode rCreateOptNode( ProgramBlock pb, LocalVariableMap
vars, boolean topLevel, OptTreePlanMappingRuntime rtMap, boolean storeObjs )
{
OptNode node = null;
@@ -137,58 +124,58 @@ public class OptTreeConverter
IfProgramBlock ipb = (IfProgramBlock) pb;
node = new OptNode( NodeType.IF );
if(storeObjs)
- _rtMap.putMapping(ipb, node);
+ rtMap.putMapping(ipb, node);
node.setExecType(ExecType.CP);
//process if condition
OptNode ifn = new OptNode(NodeType.GENERIC);
- node.addChilds( createOptNodes( ipb.getPredicate(),
vars,storeObjs ) );
+ node.addChilds( createOptNodes( ipb.getPredicate(),
vars, rtMap, storeObjs ) );
node.addChild( ifn );
for( ProgramBlock lpb : ipb.getChildBlocksIfBody() )
- ifn.addChild( rCreateOptNode(lpb,vars,topLevel,
storeObjs) );
+ ifn.addChild( rCreateOptNode(lpb, vars,
topLevel, rtMap, storeObjs) );
//process else condition
if( ipb.getChildBlocksElseBody() != null &&
ipb.getChildBlocksElseBody().size()>0 ) {
OptNode efn = new OptNode(NodeType.GENERIC);
node.addChild( efn );
for( ProgramBlock lpb :
ipb.getChildBlocksElseBody() )
- efn.addChild(
rCreateOptNode(lpb,vars,topLevel, storeObjs) );
+ efn.addChild( rCreateOptNode(lpb, vars,
topLevel, rtMap, storeObjs) );
}
}
else if( pb instanceof WhileProgramBlock ) {
WhileProgramBlock wpb = (WhileProgramBlock) pb;
node = new OptNode( NodeType.WHILE );
if(storeObjs)
- _rtMap.putMapping(wpb, node);
+ rtMap.putMapping(wpb, node);
node.setExecType(ExecType.CP);
//process predicate instruction
- node.addChilds( createOptNodes( wpb.getPredicate(),
vars,storeObjs ) );
+ node.addChilds( createOptNodes( wpb.getPredicate(),
vars, rtMap, storeObjs ) );
//process body
for( ProgramBlock lpb : wpb.getChildBlocks() )
- node.addChild(
rCreateOptNode(lpb,vars,topLevel,storeObjs) );
+ node.addChild( rCreateOptNode(lpb, vars,
topLevel, rtMap, storeObjs) );
}
else if( pb instanceof ForProgramBlock && !(pb instanceof
ParForProgramBlock) ) {
ForProgramBlock fpb = (ForProgramBlock) pb;
node = new OptNode( NodeType.FOR );
if(storeObjs)
- _rtMap.putMapping(fpb, node);
+ rtMap.putMapping(fpb, node);
node.setExecType(ExecType.CP);
//determine number of iterations
long N = OptimizerUtils.getNumIterations(fpb, vars,
CostEstimator.FACTOR_NUM_ITERATIONS);
node.addParam(ParamType.NUM_ITERATIONS,
String.valueOf(N));
- node.addChilds( createOptNodes(
fpb.getFromInstructions(), vars,storeObjs ) );
- node.addChilds( createOptNodes(
fpb.getToInstructions(), vars,storeObjs ) );
- node.addChilds( createOptNodes(
fpb.getIncrementInstructions(), vars,storeObjs ) );
+ node.addChilds( createOptNodes(
fpb.getFromInstructions(), vars, rtMap, storeObjs ) );
+ node.addChilds( createOptNodes(
fpb.getToInstructions(), vars, rtMap, storeObjs ) );
+ node.addChilds( createOptNodes(
fpb.getIncrementInstructions(), vars, rtMap, storeObjs ) );
//process body
for( ProgramBlock lpb : fpb.getChildBlocks() )
- node.addChild(
rCreateOptNode(lpb,vars,topLevel,storeObjs) );
+ node.addChild( rCreateOptNode(lpb, vars,
topLevel, rtMap, storeObjs) );
}
else if( pb instanceof ParForProgramBlock ) {
ParForProgramBlock fpb = (ParForProgramBlock) pb;
node = new OptNode( NodeType.PARFOR );
if(storeObjs)
- _rtMap.putMapping(fpb, node);
+ rtMap.putMapping(fpb, node);
node.setK( fpb.getDegreeOfParallelism() );
long N = fpb.getNumIterations();
node.addParam(ParamType.NUM_ITERATIONS, (N!=-1) ?
@@ -201,20 +188,20 @@ public class OptTreeConverter
case REMOTE_SPARK:
case REMOTE_SPARK_DP:
node.setExecType(ExecType.SPARK);
- break;
+ break;
default:
node.setExecType(null);
}
if( !topLevel ) {
- node.addChilds( createOptNodes(
fpb.getFromInstructions(), vars, storeObjs ) );
- node.addChilds( createOptNodes(
fpb.getToInstructions(), vars, storeObjs ) );
- node.addChilds( createOptNodes(
fpb.getIncrementInstructions(), vars, storeObjs ) );
+ node.addChilds( createOptNodes(
fpb.getFromInstructions(), vars, rtMap, storeObjs ) );
+ node.addChilds( createOptNodes(
fpb.getToInstructions(), vars, rtMap, storeObjs ) );
+ node.addChilds( createOptNodes(
fpb.getIncrementInstructions(), vars, rtMap, storeObjs ) );
}
//process body
for( ProgramBlock lpb : fpb.getChildBlocks() )
- node.addChild(
rCreateOptNode(lpb,vars,false,storeObjs) );
+ node.addChild( rCreateOptNode(lpb, vars, false,
rtMap, storeObjs) );
//parameters, add required parameters
}
@@ -222,27 +209,27 @@ public class OptTreeConverter
BasicProgramBlock bpb = (BasicProgramBlock) pb;
node = new OptNode(NodeType.GENERIC);
if(storeObjs)
- _rtMap.putMapping(pb, node);
- node.addChilds( createOptNodes(bpb.getInstructions(),
vars, storeObjs) );
+ rtMap.putMapping(pb, node);
+ node.addChilds( createOptNodes(bpb.getInstructions(),
vars, rtMap, storeObjs) );
node.setExecType(ExecType.CP);
}
return node;
}
- public static ArrayList<OptNode> createOptNodes (ArrayList<Instruction>
instset, LocalVariableMap vars, boolean storeObjs) {
+ public static ArrayList<OptNode> createOptNodes (ArrayList<Instruction>
instset, LocalVariableMap vars, OptTreePlanMappingRuntime rtMap, boolean
storeObjs) {
ArrayList<OptNode> tmp = new ArrayList<>(instset.size());
for( Instruction inst : instset )
- tmp.add( createOptNode(inst,vars,storeObjs) );
+ tmp.add( createOptNode(inst, vars, rtMap, storeObjs) );
return tmp;
}
- public static OptNode createOptNode( Instruction inst, LocalVariableMap
vars, boolean storeObjs ) {
+ public static OptNode createOptNode( Instruction inst, LocalVariableMap
vars, OptTreePlanMappingRuntime rtMap, boolean storeObjs ) {
OptNode node = new OptNode(NodeType.INST);
String instStr = inst.toString();
String opstr = instStr.split(Instruction.OPERAND_DELIM)[1];
if(storeObjs)
- _rtMap.putMapping(inst, node);
+ rtMap.putMapping(inst, node);
node.addParam(ParamType.OPSTRING,opstr);
//exec type
@@ -262,7 +249,8 @@ public class OptTreeConverter
return node;
}
- public static OptNode rCreateAbstractOptNode( StatementBlock sb,
ProgramBlock pb, LocalVariableMap vars, boolean topLevel, Set<String> memo )
+ public static OptNode rCreateAbstractOptNode( StatementBlock sb,
ProgramBlock pb,
+ LocalVariableMap vars, boolean topLevel,
OptTreePlanMappingAbstract hlMap, Set<String> memo )
{
OptNode node = null;
@@ -273,17 +261,17 @@ public class OptTreeConverter
IfStatement is = (IfStatement) isb.getStatement(0);
node = new OptNode( NodeType.IF );
- _hlMap.putProgMapping(sb, pb, node);
+ hlMap.putProgMapping(sb, pb, node);
node.setExecType(ExecType.CP);
node.setLineNumbers(isb.getBeginLine(),
isb.getEndLine());
//handle predicate
isb.getPredicateHops().resetVisitStatus();
- node.addChilds( rCreateAbstractOptNodes(
isb.getPredicateHops(), vars, memo ) );
+ node.addChilds( rCreateAbstractOptNodes(
isb.getPredicateHops(), vars, hlMap, memo ) );
//process if branch
OptNode ifn = new OptNode(NodeType.GENERIC);
- _hlMap.putProgMapping(sb, pb, ifn);
+ hlMap.putProgMapping(sb, pb, ifn);
ifn.setExecType(ExecType.CP);
node.addChild( ifn );
int len = is.getIfBody().size();
@@ -291,13 +279,12 @@ public class OptTreeConverter
{
ProgramBlock lpb =
ipb.getChildBlocksIfBody().get(i);
StatementBlock lsb = is.getIfBody().get(i);
- ifn.addChild(
rCreateAbstractOptNode(lsb,lpb,vars,false, memo) );
+ ifn.addChild( rCreateAbstractOptNode(lsb, lpb,
vars, false, hlMap, memo) );
}
//process else branch
- if( ipb.getChildBlocksElseBody() != null )
- {
+ if( ipb.getChildBlocksElseBody() != null ) {
OptNode efn = new OptNode(NodeType.GENERIC);
- _hlMap.putProgMapping(sb, pb, efn);
+ hlMap.putProgMapping(sb, pb, efn);
efn.setExecType(ExecType.CP);
node.addChild( efn );
int len2 = is.getElseBody().size();
@@ -305,9 +292,9 @@ public class OptTreeConverter
{
ProgramBlock lpb =
ipb.getChildBlocksElseBody().get(i);
StatementBlock lsb =
is.getElseBody().get(i);
- efn.addChild(
rCreateAbstractOptNode(lsb,lpb,vars,false, memo) );
+ efn.addChild(
rCreateAbstractOptNode(lsb, lpb, vars, false, hlMap, memo) );
}
- }
+ }
}
else if( pb instanceof WhileProgramBlock && sb instanceof
WhileStatementBlock )
{
@@ -316,20 +303,20 @@ public class OptTreeConverter
WhileStatement ws = (WhileStatement)
wsb.getStatement(0);
node = new OptNode( NodeType.WHILE );
- _hlMap.putProgMapping(sb, pb, node);
+ hlMap.putProgMapping(sb, pb, node);
node.setExecType(ExecType.CP);
node.setLineNumbers(wsb.getBeginLine(),
wsb.getEndLine());
//handle predicate
wsb.getPredicateHops().resetVisitStatus();
- node.addChilds( rCreateAbstractOptNodes(
wsb.getPredicateHops(), vars, memo ) );
+ node.addChilds( rCreateAbstractOptNodes(
wsb.getPredicateHops(), vars, hlMap, memo ) );
//process body
int len = ws.getBody().size();
for( int i=0; i<wpb.getChildBlocks().size() && i<len;
i++ ) {
ProgramBlock lpb = wpb.getChildBlocks().get(i);
StatementBlock lsb = ws.getBody().get(i);
- node.addChild(
rCreateAbstractOptNode(lsb,lpb,vars,false, memo) );
+ node.addChild( rCreateAbstractOptNode(lsb, lpb,
vars, false, hlMap, memo) );
}
}
else if( pb instanceof ForProgramBlock && sb instanceof
ForStatementBlock && !(pb instanceof ParForProgramBlock) )
@@ -339,7 +326,7 @@ public class OptTreeConverter
ForStatement fs = (ForStatement) fsb.getStatement(0);
node = new OptNode( NodeType.FOR );
- _hlMap.putProgMapping(sb, pb, node);
+ hlMap.putProgMapping(sb, pb, node);
node.setExecType(ExecType.CP);
node.setLineNumbers(fsb.getBeginLine(),
fsb.getEndLine());
@@ -352,17 +339,17 @@ public class OptTreeConverter
fsb.getToHops().resetVisitStatus();
if( fsb.getIncrementHops()!=null )
fsb.getIncrementHops().resetVisitStatus();
- node.addChilds( rCreateAbstractOptNodes(
fsb.getFromHops(), vars, memo ) );
- node.addChilds( rCreateAbstractOptNodes(
fsb.getToHops(), vars, memo ) );
+ node.addChilds( rCreateAbstractOptNodes(
fsb.getFromHops(), vars, hlMap, memo ) );
+ node.addChilds( rCreateAbstractOptNodes(
fsb.getToHops(), vars, hlMap, memo ) );
if( fsb.getIncrementHops()!=null )
- node.addChilds( rCreateAbstractOptNodes(
fsb.getIncrementHops(), vars, memo ) );
+ node.addChilds( rCreateAbstractOptNodes(
fsb.getIncrementHops(), vars, hlMap, memo ) );
//process body
int len = fs.getBody().size();
for( int i=0; i<fpb.getChildBlocks().size() && i<len;
i++ ) {
ProgramBlock lpb = fpb.getChildBlocks().get(i);
StatementBlock lsb = fs.getBody().get(i);
- node.addChild(
rCreateAbstractOptNode(lsb,lpb,vars,false, memo) );
+ node.addChild( rCreateAbstractOptNode(lsb, lpb,
vars, false, hlMap, memo) );
}
}
else if( pb instanceof ParForProgramBlock && sb instanceof
ParForStatementBlock )
@@ -372,7 +359,7 @@ public class OptTreeConverter
ParForStatement fs = (ParForStatement)
fsb.getStatement(0);
node = new OptNode( NodeType.PARFOR );
node.setLineNumbers(fsb.getBeginLine(),
fsb.getEndLine());
- _hlMap.putProgMapping(sb, pb, node);
+ hlMap.putProgMapping(sb, pb, node);
node.setK( fpb.getDegreeOfParallelism() );
long N = fpb.getNumIterations();
node.addParam(ParamType.NUM_ITERATIONS, (N!=-1) ?
String.valueOf(N) :
@@ -395,10 +382,10 @@ public class OptTreeConverter
fsb.getToHops().resetVisitStatus();
if( fsb.getIncrementHops()!=null )
fsb.getIncrementHops().resetVisitStatus();
- node.addChilds( rCreateAbstractOptNodes(
fsb.getFromHops(), vars, memo ) );
- node.addChilds( rCreateAbstractOptNodes(
fsb.getToHops(), vars, memo ) );
+ node.addChilds( rCreateAbstractOptNodes(
fsb.getFromHops(), vars, hlMap, memo ) );
+ node.addChilds( rCreateAbstractOptNodes(
fsb.getToHops(), vars, hlMap, memo ) );
if( fsb.getIncrementHops()!=null )
- node.addChilds(
rCreateAbstractOptNodes( fsb.getIncrementHops(), vars, memo ) );
+ node.addChilds(
rCreateAbstractOptNodes( fsb.getIncrementHops(), vars, hlMap, memo ) );
}
//process body
@@ -406,7 +393,7 @@ public class OptTreeConverter
for( int i=0; i<fpb.getChildBlocks().size() && i<len;
i++ ) {
ProgramBlock lpb = fpb.getChildBlocks().get(i);
StatementBlock lsb = fs.getBody().get(i);
- node.addChild(
rCreateAbstractOptNode(lsb,lpb,vars,false, memo) );
+ node.addChild( rCreateAbstractOptNode(lsb, lpb,
vars, false, hlMap, memo) );
}
//parameters, add required parameters
@@ -422,8 +409,8 @@ public class OptTreeConverter
//process all hops
node = new OptNode(NodeType.GENERIC);
- _hlMap.putProgMapping(sb, pb, node);
- node.addChilds( createAbstractOptNodes(sb.getHops(),
vars, memo) );
+ hlMap.putProgMapping(sb, pb, node);
+ node.addChilds( createAbstractOptNodes(sb.getHops(),
vars, hlMap, memo) );
node.setExecType(ExecType.CP);
node.setLineNumbers(sb.getBeginLine(), sb.getEndLine());
@@ -441,7 +428,7 @@ public class OptTreeConverter
return node;
}
- public static ArrayList<OptNode> createAbstractOptNodes(ArrayList<Hop>
hops, LocalVariableMap vars, Set<String> memo ) {
+ public static ArrayList<OptNode> createAbstractOptNodes(ArrayList<Hop>
hops, LocalVariableMap vars, OptTreePlanMappingAbstract hlMap, Set<String> memo
) {
ArrayList<OptNode> ret = new ArrayList<>();
//reset all hops
@@ -450,12 +437,12 @@ public class OptTreeConverter
//created and add actual opt nodes
if( hops != null )
for( Hop hop : hops )
- ret.addAll(rCreateAbstractOptNodes(hop, vars,
memo));
+ ret.addAll(rCreateAbstractOptNodes(hop, vars,
hlMap, memo));
return ret;
}
- public static ArrayList<OptNode> rCreateAbstractOptNodes(Hop hop,
LocalVariableMap vars, Set<String> memo) {
+ public static ArrayList<OptNode> rCreateAbstractOptNodes(Hop hop,
LocalVariableMap vars, OptTreePlanMappingAbstract hlMap, Set<String> memo) {
ArrayList<OptNode> ret = new ArrayList<>();
ArrayList<Hop> in = hop.getInput();
@@ -488,9 +475,9 @@ public class OptTreeConverter
}
//assign node to return
- _hlMap.putHopMapping(hop, node);
+ hlMap.putHopMapping(hop, node);
ret.add(node);
- }
+ }
//process function calls
else if (hop instanceof FunctionOp && INCLUDE_FUNCTIONS )
{
@@ -498,10 +485,10 @@ public class OptTreeConverter
String fname = fhop.getFunctionName();
String fnspace = fhop.getFunctionNamespace();
String fKey = fhop.getFunctionKey();
- Object[] prog = _hlMap.getRootProgram();
+ Object[] prog = hlMap.getRootProgram();
OptNode node = new OptNode(NodeType.FUNCCALL);
- _hlMap.putHopMapping(fhop, node);
+ hlMap.putHopMapping(fhop, node);
node.setExecType(ExecType.CP);
node.addParam(ParamType.OPSTRING, fKey);
@@ -519,7 +506,7 @@ public class OptTreeConverter
for( int i=0;
i<fpb.getChildBlocks().size() && i<len; i++ ) {
ProgramBlock lpb =
fpb.getChildBlocks().get(i);
StatementBlock lsb =
fs.getBody().get(i);
- node.addChild(
rCreateAbstractOptNode(lsb, lpb, vars, false, memo) );
+ node.addChild(
rCreateAbstractOptNode(lsb, lpb, vars, false, hlMap, memo) );
}
memo.remove(fKey);
}
@@ -533,7 +520,7 @@ public class OptTreeConverter
if( in != null )
for( Hop hin : in )
if( !(hin instanceof DataOp || hin instanceof
LiteralOp ) ) //no need for opt nodes
- ret.addAll(rCreateAbstractOptNodes(hin,
vars, memo));
+ ret.addAll(rCreateAbstractOptNodes(hin,
vars, hlMap, memo));
hop.setVisited();
@@ -605,18 +592,14 @@ public class OptTreeConverter
|| inst instanceof
EvalNaryCPInstruction);
}
- public static void replaceProgramBlock(OptNode parent, OptNode n,
ProgramBlock pbOld, ProgramBlock pbNew, boolean rtMap) {
+ public static void replaceProgramBlock(OptNode parent, OptNode n,
ProgramBlock pbOld, ProgramBlock pbNew, OptTreePlanMappingAbstract hlMap) {
ProgramBlock pbParent = null;
- if( rtMap )
- pbParent = (ProgramBlock)_rtMap.getMappedObject(
parent.getID() );
- else {
- if( parent.getNodeType()==NodeType.FUNCCALL ) {
- FunctionOp fop = (FunctionOp)
_hlMap.getMappedHop(parent.getID());
- pbParent =
((Program)_hlMap.getRootProgram()[1]).getFunctionProgramBlock(fop.getFunctionNamespace(),
fop.getFunctionName());
- }
- else
- pbParent = (ProgramBlock)_hlMap.getMappedProg(
parent.getID() )[1];
+ if( parent.getNodeType()==NodeType.FUNCCALL ) {
+ FunctionOp fop = (FunctionOp)
hlMap.getMappedHop(parent.getID());
+ pbParent =
((Program)hlMap.getRootProgram()[1]).getFunctionProgramBlock(fop.getFunctionNamespace(),
fop.getFunctionName());
}
+ else
+ pbParent = (ProgramBlock) hlMap.getMappedProg(
parent.getID() )[1];
if( pbParent instanceof IfProgramBlock ) {
IfProgramBlock ipb = (IfProgramBlock) pbParent;
@@ -639,40 +622,41 @@ public class OptTreeConverter
throw new DMLRuntimeException("Optimizer doesn't
support "+pbParent.getClass().getName());
//update repository
- if( rtMap )
- _rtMap.replaceMapping(pbNew, n);
+ hlMap.replaceMapping(pbNew, n);
+ }
+
+ public static void replaceProgramBlock(OptNode parent, OptNode n,
ProgramBlock pbOld, ProgramBlock pbNew, OptTreePlanMappingRuntime rtMap) {
+ ProgramBlock pbParent = null;
+ pbParent = (ProgramBlock) rtMap.getMappedObject( parent.getID()
);
+
+ if( pbParent instanceof IfProgramBlock ) {
+ IfProgramBlock ipb = (IfProgramBlock) pbParent;
+ replaceProgramBlock( ipb.getChildBlocksIfBody(), pbOld,
pbNew );
+ replaceProgramBlock( ipb.getChildBlocksElseBody(),
pbOld, pbNew );
+ }
+ else if( pbParent instanceof WhileProgramBlock ) {
+ WhileProgramBlock wpb = (WhileProgramBlock) pbParent;
+ replaceProgramBlock( wpb.getChildBlocks(), pbOld, pbNew
);
+ }
+ else if( pbParent instanceof ForProgramBlock || pbParent
instanceof ParForProgramBlock ) {
+ ForProgramBlock fpb = (ForProgramBlock) pbParent;
+ replaceProgramBlock( fpb.getChildBlocks(), pbOld, pbNew
);
+ }
+ else if( pbParent instanceof FunctionProgramBlock ) {
+ FunctionProgramBlock fpb = (FunctionProgramBlock)
pbParent;
+ replaceProgramBlock( fpb.getChildBlocks(), pbOld, pbNew
);
+ }
else
- _hlMap.replaceMapping(pbNew, n);
+ throw new DMLRuntimeException("Optimizer doesn't
support "+pbParent.getClass().getName());
+
+ //update repository
+ rtMap.replaceMapping(pbNew, n);
}
- public static void replaceProgramBlock(List<ProgramBlock> pbs,
ProgramBlock pbOld, ProgramBlock pbNew)
- {
+ public static void replaceProgramBlock(List<ProgramBlock> pbs,
ProgramBlock pbOld, ProgramBlock pbNew) {
int len = pbs.size();
for( int i=0; i<len; i++ )
if( pbs.get(i) == pbOld )
pbs.set(i, pbNew);
}
-
-
-
- ///////////////////////////////
- // //
- // internal state management //
- // //
- ///////////////////////////////
-
-
- public static OptTreePlanMappingAbstract getAbstractPlanMapping()
- {
- return _hlMap;
- }
-
- public static void clear()
- {
- if( _hlMap != null )
- _hlMap.clear();
- if( _rtMap != null )
- _rtMap.clear();
- }
-
}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizationWrapper.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizationWrapper.java
index 7b41353..6188abb 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizationWrapper.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizationWrapper.java
@@ -139,12 +139,10 @@ public class OptimizationWrapper
ForStatement fs = (ForStatement) sb.getStatement(0);
//debug output before recompilation
- if( LOG.isDebugEnabled() )
- {
+ if( LOG.isDebugEnabled() ) {
try {
tree =
OptTreeConverter.createOptTree(ck, cm, opt.getPlanInputType(), sb, pb, ec);
LOG.debug("ParFOR Opt: Input plan
(before recompilation):\n" + tree.explain(false));
- OptTreeConverter.clear();
}
catch(Exception ex)
{
@@ -223,11 +221,11 @@ public class OptimizationWrapper
}
//create cost estimator
- CostEstimator est = createCostEstimator( cmtype,
ec.getVariables() );
+ CostEstimator est = createCostEstimator( cmtype, tree,
ec.getVariables() );
LOG.trace("ParFOR Opt: Created cost estimator ("+cmtype+")");
//core optimize
- opt.optimize( sb, pb, tree, est, ec );
+ opt.optimize(sb, pb, tree, est, ec);
LOG.debug("ParFOR Opt: Optimized plan (after optimization): \n"
+ tree.explain(false));
//assert plan correctness
@@ -246,9 +244,6 @@ public class OptimizationWrapper
if( DMLScript.STATISTICS )
Statistics.incrementParForOptimTime(ltime);
- //cleanup phase
- OptTreeConverter.clear();
-
//monitor stats
if( monitor ) {
StatisticMonitor.putPFStat( pb.getID() ,
Stat.OPT_OPTIMIZER, otype.ordinal());
@@ -267,14 +262,14 @@ public class OptimizationWrapper
}
}
- private static CostEstimator createCostEstimator( CostModelType cmtype,
LocalVariableMap vars ) {
+ private static CostEstimator createCostEstimator( CostModelType cmtype,
OptTree tree, LocalVariableMap vars ) {
switch( cmtype ) {
case STATIC_MEM_METRIC:
- return new CostEstimatorHops(
-
OptTreeConverter.getAbstractPlanMapping() );
+ return new CostEstimatorHops(
+ tree.getAbstractPlanMapping() );
case RUNTIME_METRICS:
- return new CostEstimatorRuntime(
-
OptTreeConverter.getAbstractPlanMapping(),
+ return new CostEstimatorRuntime(
+ tree.getAbstractPlanMapping(),
(LocalVariableMap)vars.clone() );
default:
throw new DMLRuntimeException("Undefined cost
model type: '"+cmtype+"'.");
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
index ca71f43..c039f93 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
@@ -78,8 +78,10 @@ public class OptimizerConstrained extends OptimizerRuleBased
{
public boolean optimize(ParForStatementBlock sb, ParForProgramBlock pb,
OptTree plan, CostEstimator est, ExecutionContext ec)
{
LOG.debug("--- "+getOptMode()+" OPTIMIZER -------");
-
- OptNode pn = plan.getRoot();
+ _cost = est;
+ _plan = plan;
+
+ OptNode pn = _plan.getRoot();
//early abort for empty parfor body
if( pn.isLeaf() )
@@ -88,8 +90,6 @@ public class OptimizerConstrained extends OptimizerRuleBased {
//ANALYZE infrastructure properties
super.analyzeProblemAndInfrastructure( pn );
- _cost = est;
-
//debug and warnings output
LOG.debug(getOptMode()+" OPT: Optimize with
local_max_mem="+toMB(_lm)+" and remote_max_mem="+toMB(_rm)+")." );
if( _rnk<=0 || _rk<=0 )
@@ -222,8 +222,7 @@ public class OptimizerConstrained extends
OptimizerRuleBased {
// constraint awareness
if( !initPlan.equals(PDataPartitioner.UNSPECIFIED.name()) ) {
- ParForProgramBlock pfpb = (ParForProgramBlock)
OptTreeConverter
-
.getAbstractPlanMapping().getMappedProg(n.getID())[1];
+ ParForProgramBlock pfpb = (ParForProgramBlock)
_plan.getMappedProg(n.getID())[1];
pfpb.setDataPartitioner(PDataPartitioner.valueOf(initPlan));
LOG.debug(getOptMode()+" OPT: forced 'set data
partitioner' - result=" + initPlan );
}
@@ -244,8 +243,7 @@ public class OptimizerConstrained extends
OptimizerRuleBased {
// constraint awareness
if( n.getExecType() != null &&
ConfigurationManager.isParallelParFor() )
{
- ParForProgramBlock pfpb = (ParForProgramBlock)
OptTreeConverter
-
.getAbstractPlanMapping().getMappedProg(n.getID())[1];
+ ParForProgramBlock pfpb = (ParForProgramBlock)
_plan.getMappedProg(n.getID())[1];
PExecMode mode = PExecMode.LOCAL;
if (n.getExecType() == ExecType.SPARK) {
@@ -273,8 +271,7 @@ public class OptimizerConstrained extends
OptimizerRuleBased {
if( n.getK() > 0 && ConfigurationManager.isParallelParFor() )
{
//set parfor degree of parallelism
- ParForProgramBlock pfpb = (ParForProgramBlock)
OptTreeConverter
-
.getAbstractPlanMapping().getMappedProg(n.getID())[1];
+ ParForProgramBlock pfpb = (ParForProgramBlock)
_plan.getMappedProg(n.getID())[1];
pfpb.setDegreeOfParallelism(n.getK());
//distribute remaining parallelism
@@ -299,8 +296,7 @@ public class OptimizerConstrained extends
OptimizerRuleBased {
// constraint awareness
if(
!pn.getParam(ParamType.TASK_PARTITIONER).equals(PTaskPartitioner.UNSPECIFIED.name())
)
{
- ParForProgramBlock pfpb = (ParForProgramBlock)
OptTreeConverter
-
.getAbstractPlanMapping().getMappedProg(pn.getID())[1];
+ ParForProgramBlock pfpb = (ParForProgramBlock)
_plan.getMappedProg(pn.getID())[1];
pfpb.setTaskPartitioner(PTaskPartitioner.valueOf(pn.getParam(ParamType.TASK_PARTITIONER)));
String tsExt = "";
if( pn.getParam(ParamType.TASK_SIZE)!=null )
@@ -329,8 +325,7 @@ public class OptimizerConstrained extends
OptimizerRuleBased {
// constraint awareness
if(
!n.getParam(ParamType.RESULT_MERGE).equals(PResultMerge.UNSPECIFIED.name()) )
{
- ParForProgramBlock pfpb = (ParForProgramBlock)
OptTreeConverter
-
.getAbstractPlanMapping().getMappedProg(n.getID())[1];
+ ParForProgramBlock pfpb = (ParForProgramBlock)
_plan.getMappedProg(n.getID())[1];
pfpb.setResultMerge(PResultMerge.valueOf(n.getParam(ParamType.RESULT_MERGE)));
LOG.debug(getOptMode()+" OPT: force 'set result merge'
- result="+n.getParam(ParamType.RESULT_MERGE) );
}
@@ -347,8 +342,7 @@ public class OptimizerConstrained extends
OptimizerRuleBased {
{
if(emode == PExecMode.REMOTE_SPARK_DP)
{
- ParForProgramBlock pfpb = (ParForProgramBlock)
OptTreeConverter
-
.getAbstractPlanMapping().getMappedProg(pn.getID())[1];
+ ParForProgramBlock pfpb = (ParForProgramBlock)
_plan.getMappedProg(pn.getID())[1];
//partitioned matrix
if( partitionedMatrices.size()<=0 ) {
@@ -382,9 +376,8 @@ public class OptimizerConstrained extends
OptimizerRuleBased {
super.rewriteSetFusedDataPartitioningExecution(pn, M,
flagLIX, partitionedMatrices, vars);
}
- private static PExecMode getPExecMode( OptNode pn ) {
- ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
-
.getAbstractPlanMapping().getMappedProg(pn.getID())[1];
+ private PExecMode getPExecMode( OptNode pn ) {
+ ParForProgramBlock pfpb = (ParForProgramBlock)
_plan.getMappedProg(pn.getID())[1];
return pfpb.getExecMode();
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index c88f4d0..cf2c091 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -169,13 +169,13 @@ public class OptimizerRuleBased extends Optimizer {
protected double _rm2 = -1; //remote memory constraint (reducers)
protected CostEstimator _cost = null;
-
+ protected OptTree _plan = null;
+
@Override
public CostModelType getCostModelType() {
return CostModelType.STATIC_MEM_METRIC;
}
-
@Override
public PlanInputType getPlanInputType() {
return PlanInputType.ABSTRACT_PLAN;
@@ -197,7 +197,8 @@ public class OptimizerRuleBased extends Optimizer {
{
LOG.debug("--- "+getOptMode()+" OPTIMIZER -------");
- OptNode pn = plan.getRoot();
+ _plan = plan;
+ OptNode pn = _plan.getRoot();
//early abort for empty parfor body
if( pn.isLeaf() )
@@ -378,7 +379,7 @@ public class OptimizerRuleBased extends Optimizer {
//preparations
long id = n.getID();
- Object[] o =
OptTreeConverter.getAbstractPlanMapping().getMappedProg(id);
+ Object[] o = _plan.getMappedProg(id);
ParForStatementBlock pfsb = (ParForStatementBlock) o[0];
ParForProgramBlock pfpb = (ParForProgramBlock) o[1];
@@ -434,7 +435,7 @@ public class OptimizerRuleBased extends Optimizer {
else if( n.getNodeType()== NodeType.HOP
&&
n.getParam(ParamType.OPSTRING).equals(IndexingOp.OPSTRING) )
{
- Hop h =
OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
+ Hop h = _plan.getMappedHop(n.getID());
String inMatrix = h.getInput().get(0).getName();
if( cand.containsKey(inMatrix) &&
h.getDataType().isMatrix() ) //Required: partitionable
{
@@ -563,7 +564,7 @@ public class OptimizerRuleBased extends Optimizer {
protected boolean rewriteSetResultPartitioning(OptNode n, double M,
LocalVariableMap vars) {
//preparations
long id = n.getID();
- Object[] o =
OptTreeConverter.getAbstractPlanMapping().getMappedProg(id);
+ Object[] o = _plan.getMappedProg(id);
ParForProgramBlock pfpb = (ParForProgramBlock) o[1];
//search for candidates
@@ -615,7 +616,7 @@ public class OptimizerRuleBased extends Optimizer {
Hop base = null;
if( ret ) {
- h =
OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
+ h = _plan.getMappedHop(n.getID());
base = h.getInput().get(0);
//check result variable
@@ -716,16 +717,16 @@ public class OptimizerRuleBased extends Optimizer {
}
protected void recompileLIX( OptNode n, LocalVariableMap vars ) {
- Hop h =
OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
+ Hop h = _plan.getMappedHop(n.getID());
//set forced exec type
h.setForcedExecType(Types.ExecType.CP);
n.setExecType(ExecType.CP);
//recompile parent pb
- long pid =
OptTreeConverter.getAbstractPlanMapping().getMappedParentID(n.getID());
- OptNode nParent =
OptTreeConverter.getAbstractPlanMapping().getOptNode(pid);
- Object[] o =
OptTreeConverter.getAbstractPlanMapping().getMappedProg(pid);
+ long pid =
_plan.getAbstractPlanMapping().getMappedParentID(n.getID());
+ OptNode nParent =
_plan.getAbstractPlanMapping().getOptNode(pid);
+ Object[] o = _plan.getMappedProg(pid);
StatementBlock sb = (StatementBlock) o[0];
BasicProgramBlock pb = (BasicProgramBlock) o[1];
@@ -750,7 +751,7 @@ public class OptimizerRuleBased extends Optimizer {
for( OptNode n : parent.getChilds() )
if( n.getParam(ParamType.DATA_PARTITION_FORMAT) != null
)
{
- Hop h =
OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
+ Hop h = _plan.getMappedHop(n.getID());
estimates.put( h, h.getMemEstimate() );
}
return estimates;
@@ -828,8 +829,7 @@ public class OptimizerRuleBased extends Optimizer {
//actual programblock modification
long id = n.getID();
- ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
-
.getAbstractPlanMapping().getMappedProg(id)[1];
+ ParForProgramBlock pfpb = (ParForProgramBlock)
_plan.getMappedProg(id)[1];
PExecMode mode = n.getExecType().toParForExecMode();
pfpb.setExecMode( mode );
@@ -862,7 +862,7 @@ public class OptimizerRuleBased extends Optimizer {
if( n.isLeaf() && et == getRemoteExecType() )
{
- Hop h =
OptTreeConverter.getAbstractPlanMapping().getMappedHop( n.getID() );
+ Hop h = _plan.getMappedHop( n.getID() );
if( h.getForcedExecType()!=Types.ExecType.SPARK
&& h.hasValidCPDimsAndSize() ) //integer dims
{
@@ -893,8 +893,7 @@ public class OptimizerRuleBased extends Optimizer {
//recompile program (actual programblock modification)
if( recompile && count<=0 )
LOG.warn("OPT: Forced set operations exec type 'CP',
but no operation requires recompile.");
- ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
- .getAbstractPlanMapping().getMappedProg(pn.getID())[1];
+ ParForProgramBlock pfpb = (ParForProgramBlock)
_plan.getMappedProg(pn.getID())[1];
HashSet<String> fnStack = new HashSet<>();
Recompiler.recompileProgramBlockHierarchy2Forced(pfpb.getChildBlocks(), 0,
fnStack, Types.ExecType.CP);
@@ -937,8 +936,7 @@ public class OptimizerRuleBased extends Optimizer {
// on the partitioned matrix
boolean apply = false;
String varname = null;
- ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
-
.getAbstractPlanMapping().getMappedProg(n.getID())[1];
+ ParForProgramBlock pfpb = (ParForProgramBlock)
_plan.getMappedProg(n.getID())[1];
//modify the runtime plan (apply true if at least one candidate)
if( apply )
@@ -959,7 +957,7 @@ public class OptimizerRuleBased extends Optimizer {
&& n.getParam(ParamType.DATA_PARTITION_FORMAT) !=
null )
{
PartitionFormat dpf =
PartitionFormat.valueOf(n.getParam(ParamType.DATA_PARTITION_FORMAT));
- Hop h =
OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
+ Hop h = _plan.getMappedHop(n.getID());
String inMatrix = h.getInput().get(0).getName();
String indexAccess = null;
switch( dpf._dpf )
@@ -1004,8 +1002,7 @@ public class OptimizerRuleBased extends Optimizer {
double sizeReplicated = 0;
int replication = ParForProgramBlock.WRITE_REPLICATION_FACTOR;
- ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
- .getAbstractPlanMapping().getMappedProg(n.getID())[1];
+ ParForProgramBlock pfpb = (ParForProgramBlock)
_plan.getMappedProg(n.getID())[1];
if(((n.getExecType()==ExecType.SPARK &&
n.getParam(ParamType.DATA_PARTITIONER).equals(PDataPartitioner.REMOTE_SPARK.name())))
&& n.hasNestedParallelism(false)
@@ -1068,8 +1065,7 @@ public class OptimizerRuleBased extends Optimizer {
boolean apply = false;
int replication = -1;
- ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
- .getAbstractPlanMapping().getMappedProg(n.getID())[1];
+ ParForProgramBlock pfpb = (ParForProgramBlock)
_plan.getMappedProg(n.getID())[1];
//decide on the replication factor
if( n.getExecType()==getRemoteExecType() )
@@ -1104,7 +1100,7 @@ public class OptimizerRuleBased extends Optimizer {
double ret = 0;
if (n.isLeaf() && et != getRemoteExecType()) {
- Hop h =
OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
+ Hop h = _plan.getMappedHop(n.getID());
if ( h.getForcedExecType() != Types.ExecType.SPARK) {
double mem =
_cost.getLeafNodeEstimate(TestMeasure.MEMORY_USAGE, n, Types.ExecType.CP);
if (mem >= OptimizerUtils.DEFAULT_SIZE) {
@@ -1134,7 +1130,7 @@ public class OptimizerRuleBased extends Optimizer {
long id = n.getID();
//special handling for different exec models (CP, MR, MR nested)
- Object[] map =
OptTreeConverter.getAbstractPlanMapping().getMappedProg(id);
+ Object[] map = _plan.getMappedProg(id);
ParForStatementBlock pfsb = (ParForStatementBlock)map[0];
ParForProgramBlock pfpb = (ParForProgramBlock)map[1];
@@ -1252,7 +1248,7 @@ public class OptimizerRuleBased extends Optimizer {
long id = c.getID();
c.setK(tmpK);
ParForProgramBlock pfpb =
(ParForProgramBlock)
-
OptTreeConverter.getAbstractPlanMapping().getMappedProgramBlock(id);
+ _plan.getMappedProgramBlock(id);
pfpb.setDegreeOfParallelism(tmpK);
//distribute remaining parallelism
@@ -1263,7 +1259,7 @@ public class OptimizerRuleBased extends Optimizer {
else if( c.getNodeType() == NodeType.HOP )
{
//set degree of parallelism for
multi-threaded leaf nodes
- Hop h =
OptTreeConverter.getAbstractPlanMapping().getMappedHop(c.getID());
+ Hop h = _plan.getMappedHop(c.getID());
if(
ConfigurationManager.isParallelMatrixOperations()
&& h instanceof MultiThreadedHop
&&
((MultiThreadedHop)h).isMultiThreadedOpType() )
@@ -1283,7 +1279,7 @@ public class OptimizerRuleBased extends Optimizer {
//if parfor contains eval call, make
unoptimized functions single-threaded
if( HopRewriteUtils.isNary(h,
OpOpN.EVAL) ) {
- ProgramBlock pb =
OptTreeConverter.getAbstractPlanMapping().getMappedProgramBlock(n.getID());
+ ProgramBlock pb =
_plan.getMappedProgramBlock(n.getID());
pb.getProgram().getFunctionProgramBlocks(false)
.forEach((fname,
fvalue) -> ParamservUtils.recompileProgramBlocks(1, fvalue.getChildBlocks()));
}
@@ -1296,7 +1292,7 @@ public class OptimizerRuleBased extends Optimizer {
if( recompileSB ) {
try {
//guaranteed to be a last-level block
(see hop change)
- ProgramBlock pb =
OptTreeConverter.getAbstractPlanMapping().getMappedProgramBlock(n.getID());
+ ProgramBlock pb =
_plan.getMappedProgramBlock(n.getID());
Recompiler.recompileProgramBlockInstructions(pb);
}
catch(Exception ex){
@@ -1367,8 +1363,7 @@ public class OptimizerRuleBased extends Optimizer {
long id = n.getID();
// modify rtprog
- ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
- .getAbstractPlanMapping().getMappedProgramBlock(id);
+ ParForProgramBlock pfpb = (ParForProgramBlock)
_plan.getMappedProgramBlock(id);
pfpb.setTaskPartitioner(partitioner);
// modify plan
@@ -1435,8 +1430,7 @@ public class OptimizerRuleBased extends Optimizer {
&& partitioner!=null &&
partitioner.equals(REMOTE_DP.toString()) //MR/SP partitioning
&& partitionedMatrices.size()==1 ) //only one
partitioned matrix
{
- ParForProgramBlock pfpb = (ParForProgramBlock)
OptTreeConverter
-
.getAbstractPlanMapping().getMappedProg(pn.getID())[1];
+ ParForProgramBlock pfpb = (ParForProgramBlock)
_plan.getMappedProg(pn.getID())[1];
//partitioned matrix
String moVarname =
partitionedMatrices.keySet().iterator().next();
@@ -1480,7 +1474,7 @@ public class OptimizerRuleBased extends Optimizer {
&& n.getParam(ParamType.DATA_PARTITION_FORMAT) !=
null )
{
PartitionFormat dpf =
PartitionFormat.valueOf(n.getParam(ParamType.DATA_PARTITION_FORMAT));
- Hop h =
OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
+ Hop h = _plan.getMappedHop(n.getID());
String inMatrix = h.getInput().get(0).getName();
String indexAccess = null;
switch( dpf._dpf )
@@ -1539,7 +1533,7 @@ public class OptimizerRuleBased extends Optimizer {
&&
n.getParam(ParamType.OPSTRING).equals(IndexingOp.OPSTRING)
&& n.getParam(ParamType.DATA_PARTITION_FORMAT) !=
null )
{
- Hop h =
OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
+ Hop h = _plan.getMappedHop(n.getID());
String inMatrix = h.getInput().get(0).getName();
if( inMatrix.equals(varName) )
@@ -1568,8 +1562,7 @@ public class OptimizerRuleBased extends Optimizer {
boolean apply = false;
- ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
- .getAbstractPlanMapping().getMappedProg(pn.getID())[1];
+ ParForProgramBlock pfpb = (ParForProgramBlock)
_plan.getMappedProg(pn.getID())[1];
//note currently we decide for all result vars jointly, i.e.,
//only if all fit pinned in remaining budget, we apply this
rewrite.
@@ -1627,7 +1620,7 @@ public class OptimizerRuleBased extends Optimizer {
ret &= rHasOnlyInPlaceSafeLeftIndexing( cn,
retVars );
}
else if( n.getNodeType()== NodeType.HOP) {
- Hop h =
OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
+ Hop h = _plan.getMappedHop(n.getID());
if( h instanceof LeftIndexingOp &&
ResultVar.contains(retVars, h.getInput().get(0).getName() )
&& !retVars.stream().anyMatch(rvar ->
rvar._isAccum) )
ret &= (h.getParent().size()==1
@@ -1664,7 +1657,7 @@ public class OptimizerRuleBased extends Optimizer {
}
else if( n.getNodeType()== NodeType.HOP )
{
- Hop h =
OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
+ Hop h = _plan.getMappedHop(n.getID());
if(
n.getParam(ParamType.OPSTRING).equals(IndexingOp.OPSTRING)
&& n.getParam(ParamType.DATA_PARTITION_FORMAT)
!= null ) {
//set during partitioning rewrite
@@ -1725,7 +1718,7 @@ public class OptimizerRuleBased extends Optimizer {
// }
// else if( n.getNodeType()== NodeType.HOP &&
n.getExecType()==ExecType.MR )
// {
-// Hop h =
OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
+// Hop h = _plan.getMappedHop(n.getID());
// for( Hop ch : h.getInput() )
// {
// //note: we replaxed the contraint of
non-partitioned inputs for additional
@@ -1758,7 +1751,7 @@ public class OptimizerRuleBased extends Optimizer {
protected void rewriteInjectSparkLoopCheckpointing(OptNode n)
{
//get program blocks of root parfor
- Object[] progobj =
OptTreeConverter.getAbstractPlanMapping().getMappedProg(n.getID());
+ Object[] progobj = _plan.getMappedProg(n.getID());
ParForStatementBlock pfsb = (ParForStatementBlock)progobj[0];
ParForStatement fs = (ParForStatement) pfsb.getStatement(0);
ParForProgramBlock pfpb = (ParForProgramBlock)progobj[1];
@@ -1794,7 +1787,7 @@ public class OptimizerRuleBased extends Optimizer {
protected void rewriteInjectSparkRepartition(OptNode n,
LocalVariableMap vars)
{
//get program blocks of root parfor
- Object[] progobj =
OptTreeConverter.getAbstractPlanMapping().getMappedProg(n.getID());
+ Object[] progobj = _plan.getMappedProg(n.getID());
ParForStatementBlock pfsb = (ParForStatementBlock)progobj[0];
ParForProgramBlock pfpb = (ParForProgramBlock)progobj[1];
ArrayList<String> ret = new ArrayList<>();
@@ -1841,7 +1834,7 @@ public class OptimizerRuleBased extends Optimizer {
//collect zipmm inputs
if( n.getNodeType()==NodeType.HOP )
{
- Hop h =
OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
+ Hop h = _plan.getMappedHop(n.getID());
if( h instanceof AggBinaryOp &&
(((AggBinaryOp)h).getMMultMethod()==MMultMethod.ZIPMM
||((AggBinaryOp)h).getMMultMethod()==MMultMethod.CPMM) )
{
@@ -1870,7 +1863,7 @@ public class OptimizerRuleBased extends Optimizer {
protected void rewriteSetSparkEagerRDDCaching(OptNode n,
LocalVariableMap vars)
{
//get program blocks of root parfor
- Object[] progobj =
OptTreeConverter.getAbstractPlanMapping().getMappedProg(n.getID());
+ Object[] progobj = _plan.getMappedProg(n.getID());
ParForStatementBlock pfsb = (ParForStatementBlock)progobj[0];
ParForProgramBlock pfpb = (ParForProgramBlock)progobj[1];
@@ -1919,8 +1912,7 @@ public class OptimizerRuleBased extends Optimizer {
protected void rewriteRemoveUnnecessaryCompareMatrix( OptNode n,
ExecutionContext ec )
{
- ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
- .getAbstractPlanMapping().getMappedProg(n.getID())[1];
+ ParForProgramBlock pfpb = (ParForProgramBlock)
_plan.getMappedProg(n.getID())[1];
ArrayList<ResultVar> cleanedVars = new ArrayList<>();
ArrayList<ResultVar> resultVars = pfpb.getResultVariables();
@@ -1969,7 +1961,7 @@ public class OptimizerRuleBased extends Optimizer {
if( opStr==null || !opStr.equals(LeftIndexingOp.OPSTRING) )
return false;
- Hop h =
OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
+ Hop h = _plan.getMappedHop(n.getID());
Hop base = h.getInput().get(0);
//check result variable
@@ -2009,7 +2001,7 @@ public class OptimizerRuleBased extends Optimizer {
boolean ret = false;
if( n.getNodeType()==NodeType.HOP ) {
- Hop h =
OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
+ Hop h = _plan.getMappedHop(n.getID());
if( h instanceof IndexingOp && h.getInput().get(0)
instanceof DataOp
&& h.getInput().get(0).getName().equals(var) )
{
@@ -2030,8 +2022,7 @@ public class OptimizerRuleBased extends Optimizer {
///
protected void rewriteSetResultMerge( OptNode n, LocalVariableMap vars,
boolean inLocal ) {
- ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
- .getAbstractPlanMapping().getMappedProg(n.getID())[1];
+ ParForProgramBlock pfpb = (ParForProgramBlock)
_plan.getMappedProg(n.getID())[1];
PResultMerge REMOTE = PResultMerge.REMOTE_SPARK;
PResultMerge ret = null;
@@ -2118,7 +2109,7 @@ public class OptimizerRuleBased extends Optimizer {
if( opName != null &&
opName.equals(LeftIndexingOp.OPSTRING)
&& n.getExecType() == getRemoteExecType() )
{
- LeftIndexingOp hop = (LeftIndexingOp)
OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
+ LeftIndexingOp hop = (LeftIndexingOp)
_plan.getMappedHop(n.getID());
//check agains set of varname
String varName =
hop.getInput().get(0).getName();
if( ResultVar.contains(resultVars, varName) )
@@ -2218,7 +2209,7 @@ public class OptimizerRuleBased extends Optimizer {
//check opstring and exec type
if( opName.equals(LeftIndexingOp.OPSTRING) )
{
- LeftIndexingOp hop = (LeftIndexingOp)
OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
+ LeftIndexingOp hop = (LeftIndexingOp)
_plan.getMappedHop(n.getID());
//check agains set of varname
String varName =
hop.getInput().get(0).getName();
if( ResultVar.contains(resultVars, varName) &&
vars.keySet().contains(varName) ) {
@@ -2282,8 +2273,7 @@ public class OptimizerRuleBased extends Optimizer {
newLocalMem = _lm / par;
//modify runtime plan
- ParForProgramBlock pfpb = (ParForProgramBlock)
OptTreeConverter
-
.getAbstractPlanMapping().getMappedProg(n.getID())[1];
+ ParForProgramBlock pfpb = (ParForProgramBlock)
_plan.getMappedProg(n.getID())[1];
pfpb.setRecompileMemoryBudget( newLocalMem );
}
@@ -2309,8 +2299,7 @@ public class OptimizerRuleBased extends Optimizer {
//unfold if necessary
try
{
- ParForProgramBlock pfpb = (ParForProgramBlock)
OptTreeConverter
-
.getAbstractPlanMapping().getMappedProg(n.getID())[1];
+ ParForProgramBlock pfpb = (ParForProgramBlock)
_plan.getMappedProg(n.getID())[1];
if( recPBs.contains(pfpb) )
rFindAndUnfoldRecursiveFunction(n,
pfpb, recPBs, vars);
}
@@ -2340,10 +2329,8 @@ public class OptimizerRuleBased extends Optimizer {
}
//add candidate program blocks
- if( recContext && n.getNodeType()==NodeType.PARFOR )
- {
- ParForProgramBlock pfpb = (ParForProgramBlock)
OptTreeConverter
-
.getAbstractPlanMapping().getMappedProg(n.getID())[1];
+ if( recContext && n.getNodeType()==NodeType.PARFOR ) {
+ ParForProgramBlock pfpb = (ParForProgramBlock)
_plan.getMappedProg(n.getID())[1];
cand.add(pfpb);
}
}
@@ -2363,7 +2350,7 @@ public class OptimizerRuleBased extends Optimizer {
String fnameNew = FUNCTION_UNFOLD_NAMEPREFIX +
fname;
//unfold function
- FunctionOp fop = (FunctionOp)
OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
+ FunctionOp fop = (FunctionOp)
_plan.getMappedHop(n.getID());
Program prog = parfor.getProgram();
DMLProgram dmlprog =
parfor.getStatementBlock().getDMLProg();
FunctionProgramBlock fpb =
prog.getFunctionProgramBlock(fnamespace, fname);
@@ -2377,19 +2364,19 @@ public class OptimizerRuleBased extends Optimizer {
//recreate sub opttree
String fnameNewKey = fnamespace +
Program.KEY_DELIM + fnameNew;
OptNode nNew = new OptNode(NodeType.FUNCCALL);
-
OptTreeConverter.getAbstractPlanMapping().putHopMapping(fop, nNew);
+
_plan.getAbstractPlanMapping().putHopMapping(fop, nNew);
nNew.setExecType(ExecType.CP);
nNew.addParam(ParamType.OPSTRING, fnameNewKey);
- long parentID =
OptTreeConverter.getAbstractPlanMapping().getMappedParentID(n.getID());
-
OptTreeConverter.getAbstractPlanMapping().getOptNode(parentID).exchangeChild(n,
nNew);
+ long parentID =
_plan.getAbstractPlanMapping().getMappedParentID(n.getID());
+
_plan.getAbstractPlanMapping().getOptNode(parentID).exchangeChild(n, nNew);
HashSet<String> memo = new HashSet<>();
memo.add(fnameKey); //required if functionop
not shared (because not replaced yet)
memo.add(fnameNewKey); //requied if functionop
shared (indirectly replaced)
- for( int i=0; i<copyfpb.getChildBlocks().size()
/*&& i<len*/; i++ )
- {
+ for( int i=0; i<copyfpb.getChildBlocks().size()
/*&& i<len*/; i++ ) {
ProgramBlock lpb =
copyfpb.getChildBlocks().get(i);
StatementBlock lsb =
lpb.getStatementBlock();
- nNew.addChild(
OptTreeConverter.rCreateAbstractOptNode(lsb,lpb,vars,false, memo) );
+ nNew.addChild( OptTreeConverter
+ .rCreateAbstractOptNode(lsb,
lpb, vars, false, _plan.getAbstractPlanMapping(), memo) );
}
//compute delta for recPB set (use for removing
parfor)
@@ -2416,8 +2403,7 @@ public class OptimizerRuleBased extends Optimizer {
boolean ret = false;
if( n.getNodeType() == NodeType.PARFOR ) {
- ProgramBlock pfpb = OptTreeConverter
-
.getAbstractPlanMapping().getMappedProgramBlock(n.getID());
+ ProgramBlock pfpb =
_plan.getMappedProgramBlock(n.getID());
ret = (parfor == pfpb);
}
@@ -2435,8 +2421,7 @@ public class OptimizerRuleBased extends Optimizer {
//collect parfor
if( n.getNodeType()==NodeType.PARFOR )
{
- ParForProgramBlock pfpb = (ParForProgramBlock)
OptTreeConverter
-
.getAbstractPlanMapping().getMappedProgramBlock(n.getID());
+ ParForProgramBlock pfpb = (ParForProgramBlock)
_plan.getMappedProgramBlock(n.getID());
pbs.add(pfpb);
}
@@ -2452,7 +2437,7 @@ public class OptimizerRuleBased extends Optimizer {
{
if( n.getNodeType() == NodeType.FUNCCALL)
{
- FunctionOp fop = (FunctionOp)
OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
+ FunctionOp fop = (FunctionOp)
_plan.getMappedHop(n.getID());
String[] names =
n.getParam(ParamType.OPSTRING).split(Program.KEY_DELIM);
String fnamespace = names[0];
@@ -2464,9 +2449,8 @@ public class OptimizerRuleBased extends Optimizer {
n.addParam(ParamType.OPSTRING,
DMLProgram.constructFunctionKey(fnamespace,newName));
//set instruction function name
- long parentID =
OptTreeConverter.getAbstractPlanMapping().getMappedParentID(n.getID());
- BasicProgramBlock pb = (BasicProgramBlock)
OptTreeConverter
-
.getAbstractPlanMapping().getMappedProg(parentID)[1];
+ long parentID =
_plan.getAbstractPlanMapping().getMappedParentID(n.getID());
+ BasicProgramBlock pb = (BasicProgramBlock)
_plan.getMappedProg(parentID)[1];
ArrayList<Instruction> instArr =
pb.getInstructions();
for( int i=0; i<instArr.size(); i++ ) {
@@ -2501,7 +2485,7 @@ public class OptimizerRuleBased extends Optimizer {
if( sub.getNodeType() == NodeType.PARFOR )
{
long id = sub.getID();
- Object[] progobj =
OptTreeConverter.getAbstractPlanMapping().getMappedProg(id);
+ Object[] progobj =
_plan.getMappedProg(id);
ParForStatementBlock pfsb =
(ParForStatementBlock)progobj[0];
ParForProgramBlock pfpb =
(ParForProgramBlock)progobj[1];
@@ -2512,7 +2496,7 @@ public class OptimizerRuleBased extends Optimizer {
ForProgramBlock fpb =
ProgramConverter.createShallowCopyForProgramBlock(pfpb, prog);
//replace parfor with for, and
update objectmapping
-
OptTreeConverter.replaceProgramBlock(n, sub, pfpb, fpb, false);
+
OptTreeConverter.replaceProgramBlock(n, sub, pfpb, fpb,
_plan.getAbstractPlanMapping());
//update link to statement block
fpb.setStatementBlock(pfsb);
@@ -2552,7 +2536,7 @@ public class OptimizerRuleBased extends Optimizer {
if( sub.getNodeType() == NodeType.PARFOR &&
sub.getK() == 1 )
{
long id = sub.getID();
- Object[] progobj =
OptTreeConverter.getAbstractPlanMapping().getMappedProg(id);
+ Object[] progobj =
_plan.getMappedProg(id);
ParForStatementBlock pfsb =
(ParForStatementBlock)progobj[0];
ParForProgramBlock pfpb =
(ParForProgramBlock)progobj[1];
@@ -2561,7 +2545,7 @@ public class OptimizerRuleBased extends Optimizer {
ForProgramBlock fpb =
ProgramConverter.createShallowCopyForProgramBlock(pfpb, prog);
//replace parfor with for, and update
objectmapping
- OptTreeConverter.replaceProgramBlock(n,
sub, pfpb, fpb, false);
+ OptTreeConverter.replaceProgramBlock(n,
sub, pfpb, fpb, _plan.getAbstractPlanMapping());
//update link to statement block
fpb.setStatementBlock(pfsb);