Repository: systemml Updated Branches: refs/heads/master 912c65506 -> ba73291c9
[SYSTEMML-1879] Performance parfor remote spark (reuse shared inputs) In parfor remote spark jobs, each worker is initialized with its own deserialized symbol table, which causes redundant reads of shared inputs in each parfor worker and is unnecessarily memory-inefficient. This patch introduces a principled approach to reusing shared inputs, where we reuse all variables except for result variables and partitioned matrices. By simply using common instances of matrix objects, the sharing happens automatically through the bufferpool similar to local parfor execution and without additional pinned memory requirements. On the perftest scenario MSVM 1M x 1K, sparse with 150 classes and 25 iterations, the end-to-end runtime (including read and spark context creation) improved from 94s to 72s. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/2c57cf77 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/2c57cf77 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/2c57cf77 Branch: refs/heads/master Commit: 2c57cf779e3b1a583ee4062bbd268f592537ef05 Parents: 912c655 Author: Matthias Boehm <mboe...@gmail.com> Authored: Fri Sep 1 20:29:49 2017 -0700 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Fri Sep 1 20:31:08 2017 -0700 ---------------------------------------------------------------------- .../org/apache/sysml/parser/DMLTranslator.java | 14 +- .../controlprogram/LocalVariableMap.java | 5 + .../controlprogram/ParForProgramBlock.java | 464 ++++++++----------- .../context/ExecutionContext.java | 45 +- .../parfor/CachedReuseVariables.java | 59 +++ .../controlprogram/parfor/ProgramConverter.java | 8 +- .../parfor/RemoteParForSpark.java | 32 +- .../parfor/RemoteParForSparkWorker.java | 33 +- .../parfor/TaskPartitionerFactoring.java | 33 +- 9 files changed, 356 insertions(+), 337 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/parser/DMLTranslator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/parser/DMLTranslator.java b/src/main/java/org/apache/sysml/parser/DMLTranslator.java index e6b8590..f44c0a4 100644 --- a/src/main/java/org/apache/sysml/parser/DMLTranslator.java +++ b/src/main/java/org/apache/sysml/parser/DMLTranslator.java @@ -617,12 +617,11 @@ public class DMLTranslator ForProgramBlock rtpb = null; IterablePredicate iterPred = fsb.getIterPredicate(); - if( sb instanceof ParForStatementBlock ) - { + if( sb instanceof ParForStatementBlock ) { sbName = "ParForStatementBlock"; - rtpb = new ParForProgramBlock(prog, iterPred.getIterVar().getName(), iterPred.getParForParams()); + rtpb = new ParForProgramBlock(prog, iterPred.getIterVar().getName(), + iterPred.getParForParams(), ((ParForStatementBlock)sb).getResultVariables()); ParForProgramBlock pfrtpb = (ParForProgramBlock)rtpb; - pfrtpb.setResultVariables( ((ParForStatementBlock)sb).getResultVariables() ); pfrtpb.setStatementBlock((ParForStatementBlock)sb); //used for optimization and creating unscoped variables } else {//ForStatementBlock @@ -636,8 +635,8 @@ public class DMLTranslator // process the body of the for statement block if (fsb.getNumStatements() > 1){ - LOG.error(fsb.printBlockErrorLocation() + " " + sbName + " should have 1 statement" ); - throw new LopsException(fsb.printBlockErrorLocation() + " " + sbName + " should have 1 statement" ); + LOG.error(fsb.printBlockErrorLocation() + " " + sbName + " should have 1 statement" ); + throw new LopsException(fsb.printBlockErrorLocation() + " " + sbName + " should have 1 statement" ); } ForStatement fs = (ForStatement)fsb.getStatement(0); for (StatementBlock sblock : fs.getBody()){ @@ -653,9 +652,6 @@ public class DMLTranslator retPB = rtpb; - //post processing for generating missing instructions - //retPB = verifyAndCorrectProgramBlock(sb.liveIn(), sb.liveOut(), sb._kill, retPB); - // add statement block retPB.setStatementBlock(sb); http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java b/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java index 94d85ad..7e59951 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java @@ -91,6 +91,11 @@ public class LocalVariableMap implements Cloneable localMap.clear(); } + public void removeAllIn(Set<String> blacklist) { + localMap.entrySet().removeIf( + e -> blacklist.contains(e.getKey())); + } + public void removeAllNotIn(Set<String> blacklist) { localMap.entrySet().removeIf( e -> !blacklist.contains(e.getKey())); http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java index ce8bbed..b393cd1 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java @@ -125,22 +125,22 @@ public class ParForProgramBlock extends ForProgramBlock { // execution modes public enum PExecMode { - LOCAL, //local (master) multi-core execution mode - REMOTE_MR, //remote (MR cluster) execution mode - REMOTE_MR_DP, //remote (MR cluster) execution mode, fused with data partitioning - REMOTE_SPARK, //remote (Spark cluster) execution mode + LOCAL, //local (master) multi-core execution mode + REMOTE_MR, //remote (MR cluster) execution mode + REMOTE_MR_DP, //remote (MR cluster) execution mode, fused with data partitioning + REMOTE_SPARK, //remote (Spark cluster) execution mode REMOTE_SPARK_DP,//remote (Spark cluster) execution mode, fused with data partitioning UNSPECIFIED } // task partitioner public enum PTaskPartitioner { - FIXED, //fixed-sized task partitioner, uses tasksize - NAIVE, //naive task partitioner (tasksize=1) - STATIC, //static task partitioner (numIterations/numThreads) - FACTORING, //factoring task partitioner - FACTORING_CMIN, //constrained factoring task partitioner, uses tasksize as min constraint - FACTORING_CMAX, //constrained factoring task partitioner, uses tasksize as max constraint + FIXED, //fixed-sized task partitioner, uses tasksize + NAIVE, //naive task partitioner (tasksize=1) + STATIC, //static task partitioner (numIterations/numThreads) + FACTORING, //factoring task partitioner + FACTORING_CMIN, //constrained factoring task partitioner, uses tasksize as min constraint + FACTORING_CMAX, //constrained factoring task partitioner, uses tasksize as max constraint UNSPECIFIED } @@ -259,10 +259,10 @@ public class ParForProgramBlock extends ForProgramBlock } public enum PDataPartitioner { - NONE, // no data partitioning - LOCAL, // local file based partition split on master node - REMOTE_MR, // remote partition split using a reblock MR job - REMOTE_SPARK, // remote partition split using a spark job + NONE, // no data partitioning + LOCAL, // local file based partition split on master node + REMOTE_MR, // remote partition split using a reblock MR job + REMOTE_SPARK, // remote partition split using a spark job UNSPECIFIED, } @@ -277,21 +277,21 @@ public class ParForProgramBlock extends ForProgramBlock //optimizer public enum POptMode{ - NONE, //no optimization, use defaults and specified parameters - RULEBASED, //rule-based rewritings with memory constraints - CONSTRAINED, //same as rule-based but with given params as constraints - HEURISTIC, //smae as rule-based but with time-based cost estimates + NONE, //no optimization, use defaults and specified parameters + RULEBASED, //rule-based rewritings with memory constraints + CONSTRAINED, //same as rule-based but with given params as constraints + HEURISTIC, //same as rule-based but with time-based cost estimates } - + // internal parameters - public static final boolean OPTIMIZE = true; // run all automatic optimizations on top-level parfor + public static final boolean OPTIMIZE = true; // run all automatic optimizations on top-level parfor public static final boolean USE_PB_CACHE = false; // reuse copied program blocks whenever possible, not there can be issues related to recompile - public static boolean USE_RANGE_TASKS_IF_USEFUL = true; // use range tasks whenever size>3, false, otherwise wrong split order in remote - public static final boolean USE_STREAMING_TASK_CREATION = true; // start working while still creating tasks, prevents blocking due to too small task queue - public static final boolean ALLOW_NESTED_PARALLELISM = true; // if not, transparently change parfor to for on program conversions (local,remote) - public static boolean ALLOW_REUSE_MR_JVMS = true; // potential benefits: less setup costs per task, NOTE> cannot be used MR4490 in Hadoop 1.0.3, still not fixed in 1.1.1 + public static final boolean USE_RANGE_TASKS_IF_USEFUL = true; // use range tasks whenever size>3, false, otherwise wrong split order in remote + public static final boolean USE_STREAMING_TASK_CREATION = true; // start working while still creating tasks, prevents blocking due to too small task queue + public static final boolean ALLOW_NESTED_PARALLELISM = true; // if not, transparently change parfor to for on program conversions (local,remote) + public static boolean ALLOW_REUSE_MR_JVMS = true; // potential benefits: less setup costs per task, NOTE> cannot be used MR4490 in Hadoop 1.0.3, still not fixed in 1.1.1 public static boolean ALLOW_REUSE_MR_PAR_WORKER = ALLOW_REUSE_MR_JVMS; //potential benefits: less initialization, reuse in-memory objects and result consolidation! - public static final boolean USE_PARALLEL_RESULT_MERGE = false; // if result merge is run in parallel or serial + public static final boolean USE_PARALLEL_RESULT_MERGE = false; // if result merge is run in parallel or serial public static final boolean USE_PARALLEL_RESULT_MERGE_REMOTE = true; // if remote result merge should be run in parallel for multiple result vars public static final boolean ALLOW_DATA_COLOCATION = true; public static final boolean CREATE_UNSCOPED_RESULTVARS = true; @@ -299,7 +299,7 @@ public class ParForProgramBlock extends ForProgramBlock public static final int WRITE_REPLICATION_FACTOR = 1; public static final int MAX_RETRYS_ON_ERROR = 1; public static final boolean FORCE_CP_ON_REMOTE_MR = true; // compile body to CP if exec type forced to MR - public static final boolean LIVEVAR_AWARE_EXPORT = true; //export only read variables according to live variable analysis + public static final boolean LIVEVAR_AWARE_EXPORT = true; // export only read variables according to live variable analysis public static final boolean RESET_RECOMPILATION_FLAGs = true; public static final String PARFOR_FNAME_PREFIX = "/parfor/"; @@ -311,69 +311,61 @@ public class ParForProgramBlock extends ForProgramBlock public static final String PARFOR_COUNTER_GROUP_NAME = "SystemML ParFOR Counters"; // static ID generator sequences - private static IDSequence _pfIDSeq = null; - private static IDSequence _pwIDSeq = null; + private final static IDSequence _pfIDSeq = new IDSequence(); + private final static IDSequence _pwIDSeq = new IDSequence(); // runtime parameters - protected HashMap<String,String> _params = null; - protected int _numThreads = -1; - protected PTaskPartitioner _taskPartitioner = null; - protected long _taskSize = -1; + protected final HashMap<String,String> _params; + protected final boolean _monitor; + protected final Level _optLogLevel; + protected int _numThreads = -1; + protected long _taskSize = -1; + protected PTaskPartitioner _taskPartitioner = null; protected PDataPartitioner _dataPartitioner = null; - protected PResultMerge _resultMerge = null; - protected PExecMode _execMode = null; - protected POptMode _optMode = null; - protected boolean _monitor = false; - protected Level _optLogLevel = null; - + protected PResultMerge _resultMerge = null; + protected PExecMode _execMode = null; + protected POptMode _optMode = null; //specifics used for optimization - protected long _numIterations = -1; + protected long _numIterations = -1; //specifics used for data partitioning protected LocalVariableMap _variablesDPOriginal = null; - protected LocalVariableMap _variablesDPReuse = null; - protected String _colocatedDPMatrix = null; - protected boolean _tSparseCol = false; - protected int _replicationDP = WRITE_REPLICATION_FACTOR; - protected int _replicationExport = -1; + protected LocalVariableMap _variablesDPReuse = null; + protected String _colocatedDPMatrix = null; + protected boolean _tSparseCol = false; + protected int _replicationDP = WRITE_REPLICATION_FACTOR; + protected int _replicationExport = -1; //specifics used for result partitioning - protected boolean _jvmReuse = true; + protected boolean _jvmReuse = true; //specifics used for recompilation - protected double _oldMemoryBudget = -1; - protected double _recompileMemoryBudget = -1; + protected double _oldMemoryBudget = -1; + protected double _recompileMemoryBudget = -1; //specifics for caching - protected boolean _enableCPCaching = true; - protected boolean _enableRuntimePiggybacking = false; + protected boolean _enableCPCaching = true; + protected boolean _enableRuntimePiggybacking = false; //specifics for spark protected Collection<String> _variablesRP = null; protected Collection<String> _variablesECache = null; // program block meta data - protected long _ID = -1; - protected int _IDPrefix = -1; - protected ArrayList<String> _resultVars = null; - protected IDSequence _resultVarsIDSeq = null; - protected IDSequence _dpVarsIDSeq = null; - protected boolean _monitorReport = false; - protected boolean _hasFunctions = true; + protected final ArrayList<String> _resultVars; + protected final IDSequence _resultVarsIDSeq; + protected final IDSequence _dpVarsIDSeq; + protected final boolean _hasFunctions; + + protected long _ID = -1; + protected int _IDPrefix = -1; + protected boolean _monitorReport = false; // local parworker data - protected long[] _pwIDs = null; protected HashMap<Long,ArrayList<ProgramBlock>> _pbcache = null; + protected long[] _pwIDs = null; - - static - { - //init static ID sequence generators - _pfIDSeq = new IDSequence(); - _pwIDSeq = new IDSequence(); - } - - public ParForProgramBlock(Program prog, String iterPredVar, HashMap<String,String> params) + public ParForProgramBlock(Program prog, String iterPredVar, HashMap<String,String> params, ArrayList<String> resultVars) throws DMLRuntimeException { - this( -1, prog, iterPredVar, params); + this( -1, prog, iterPredVar, params, resultVars); } /** @@ -384,9 +376,10 @@ public class ParForProgramBlock extends ForProgramBlock * @param prog runtime program * @param iterPredVars ? * @param params map of parameters + * @param resultVars list of result variable names * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public ParForProgramBlock(int ID, Program prog, String iterPredVar, HashMap<String,String> params) + public ParForProgramBlock(int ID, Program prog, String iterPredVar, HashMap<String,String> params, ArrayList<String> resultVars) { super(prog, iterPredVar); @@ -395,6 +388,7 @@ public class ParForProgramBlock extends ForProgramBlock //ID generation and setting setParForProgramBlockIDs( ID ); + _resultVars = resultVars; _resultVarsIDSeq = new IDSequence(); _dpVarsIDSeq = new IDSequence(); @@ -407,18 +401,18 @@ public class ParForProgramBlock extends ForProgramBlock _dataPartitioner = PDataPartitioner.valueOf( getParForParam(ParForStatementBlock.DATA_PARTITIONER) ); _resultMerge = PResultMerge.valueOf( getParForParam(ParForStatementBlock.RESULT_MERGE) ); _execMode = PExecMode.valueOf( getParForParam(ParForStatementBlock.EXEC_MODE) ); - _optMode = POptMode.valueOf( getParForParam(ParForStatementBlock.OPT_MODE) ); + _optMode = POptMode.valueOf( getParForParam(ParForStatementBlock.OPT_MODE) ); _optLogLevel = Level.toLevel( getParForParam(ParForStatementBlock.OPT_LOG) ); _monitor = (Integer.parseInt(getParForParam(ParForStatementBlock.PROFILE) ) == 1); } catch(Exception ex) { throw new RuntimeException("Error parsing specified ParFOR parameters.",ex); } - + //reset the internal opt mode if optimization globally disabled. if( !OPTIMIZE ) _optMode = POptMode.NONE; - + _variablesDPOriginal = new LocalVariableMap(); _variablesDPReuse = new LocalVariableMap(); @@ -453,19 +447,14 @@ public class ParForProgramBlock extends ForProgramBlock public String getParForParam(String key) { String tmp = getParForParams().get(key); - return (tmp == null) ? null : + return (tmp == null) ? null : UtilFunctions.unquote(tmp).toUpperCase(); } - public ArrayList<String> getResultVariables() - { + public ArrayList<String> getResultVariables() { return _resultVars; } - public void setResultVariables(ArrayList<String> resultVars) { - _resultVars = resultVars; - } - public void disableOptimization() { _optMode = POptMode.NONE; } @@ -574,7 +563,7 @@ public class ParForProgramBlock extends ForProgramBlock ALLOW_REUSE_MR_PAR_WORKER = ALLOW_REUSE_MR_JVMS; } - @Override + @Override public void execute(ExecutionContext ec) throws DMLRuntimeException { @@ -621,7 +610,7 @@ public class ParForProgramBlock extends ForProgramBlock if( _monitor ) StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_DATA_T, time.stop()); - + // initialize iter var to form value IntObject iterVar = new IntObject(_iterPredVar, from.getLongValue() ); @@ -630,8 +619,7 @@ public class ParForProgramBlock extends ForProgramBlock /////// LOG.trace("EXECUTE PARFOR ID = "+_ID+" with mode = "+_execMode+", numThreads = "+_numThreads+", taskpartitioner = "+_taskPartitioner); - if( _monitor ) - { + if( _monitor ) { StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTHREADS, _numThreads); StatisticMonitor.putPFStat(_ID, Stat.PARFOR_TASKSIZE, _taskSize); StatisticMonitor.putPFStat(_ID, Stat.PARFOR_TASKPARTITIONER, _taskPartitioner.ordinal()); @@ -644,13 +632,13 @@ public class ParForProgramBlock extends ForProgramBlock HashMap<String, Boolean> varState = ec.pinVariables(varList); try - { + { switch( _execMode ) { case LOCAL: //create parworkers as local threads executeLocalParFor(ec, iterVar, from, to, incr); break; - + case REMOTE_MR: // create parworkers as MR tasks (one job per parfor) executeRemoteMRParFor(ec, iterVar, from, to, incr); break; @@ -669,10 +657,9 @@ public class ParForProgramBlock extends ForProgramBlock default: throw new DMLRuntimeException("Undefined execution mode: '"+_execMode+"'."); - } + } } - catch(Exception ex) - { + catch(Exception ex) { throw new DMLRuntimeException("PARFOR: Failed to execute loop in parallel.",ex); } @@ -688,8 +675,7 @@ public class ParForProgramBlock extends ForProgramBlock //ensure that subsequent program blocks never see partitioned data (invalid plans!) //we can replace those variables, because partitioning only applied for read-only matrices - for( String var : _variablesDPOriginal.keySet() ) - { + for( String var : _variablesDPOriginal.keySet() ) { //cleanup partitioned matrix (if not reused) if( !_variablesDPReuse.keySet().contains(var) ) VariableCPInstruction.processRemoveVariableInstruction(ec, var); @@ -704,19 +690,19 @@ public class ParForProgramBlock extends ForProgramBlock //print profiling report (only if top-level parfor because otherwise in parallel context) if( _monitorReport ) - LOG.info("\n"+StatisticMonitor.createReport()); + LOG.info("\n"+StatisticMonitor.createReport()); //reset flags/modifications made by optimizer //TODO reset of hop parallelism constraint (e.g., ba+*) for( String dpvar : _variablesDPOriginal.keySet() ) //release forced exectypes - ProgramRecompiler.rFindAndRecompileIndexingHOP(sb, this, dpvar, ec, false); + ProgramRecompiler.rFindAndRecompileIndexingHOP(sb, this, dpvar, ec, false); //release forced exectypes for fused dp/exec if( _execMode == PExecMode.REMOTE_MR_DP || _execMode == PExecMode.REMOTE_SPARK_DP ) ProgramRecompiler.rFindAndRecompileIndexingHOP(sb, this, _colocatedDPMatrix, ec, false); resetOptimizerFlags(); //after release, deletes dp_varnames //execute exit instructions (usually empty) - executeInstructions(_exitInstructions, ec); + executeInstructions(_exitInstructions, ec); } @@ -814,8 +800,7 @@ public class ParForProgramBlock extends ForProgramBlock if( _monitor ) StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop()); - - + // Step 4) collecting results from each parallel worker //obtain results and cleanup other intermediates before result merge LocalVariableMap [] localVariables = new LocalVariableMap [_numThreads]; @@ -862,16 +847,15 @@ public class ParForProgramBlock extends ForProgramBlock StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, numExecutedIterations); } } - } + } private void executeRemoteMRParFor( ExecutionContext ec, IntObject itervar, IntObject from, IntObject to, IntObject incr ) throws DMLRuntimeException, IOException { /* Step 0) check and recompile MR inst * Step 1) serialize child PB and inst - * Step 2) create tasks - * serialize tasks - * Step 3) submit MR Jobs and wait for results + * Step 2) create and serialize tasks + * Step 3) submit MR Jobs and wait for results * Step 4) collect results from each parallel worker */ @@ -884,10 +868,10 @@ public class ParForProgramBlock extends ForProgramBlock //tid = 0 because replaced in remote parworker flagForced = checkMRAndRecompileToCP(0); } - + // Step 1) init parallel workers (serialize PBs) - // NOTES: each mapper changes filenames with regard to his ID as we submit a single job, - // cannot reuse serialized string, since variables are serialized as well. + // NOTES: each mapper changes filenames with regard to his ID as we submit a single + // job, cannot reuse serialized string, since variables are serialized as well. ParForBody body = new ParForBody( _childBlocks, _resultVars, ec ); String program = ProgramConverter.serializeParForBody( body ); @@ -902,64 +886,60 @@ public class ParForProgramBlock extends ForProgramBlock long numIterations = partitioner.getNumIterations(); int maxDigits = (int)Math.log10(to.getLongValue()) + 1; long numCreatedTasks = -1; - if( USE_STREAMING_TASK_CREATION ) - { + if( USE_STREAMING_TASK_CREATION ) { LocalTaskQueue<Task> queue = new LocalTaskQueue<Task>(); - + //put tasks into queue and start writing to taskFile numCreatedTasks = partitioner.createTasks(queue); - taskFile = writeTasksToFile( taskFile, queue, maxDigits ); + taskFile = writeTasksToFile( taskFile, queue, maxDigits ); } else { //sequentially create tasks and write to disk List<Task> tasks = partitioner.createTasks(); - numCreatedTasks = tasks.size(); - taskFile = writeTasksToFile( taskFile, tasks, maxDigits ); + numCreatedTasks = tasks.size(); + taskFile = writeTasksToFile( taskFile, tasks, maxDigits ); } - + if( _monitor ) StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop()); //write matrices to HDFS exportMatricesToHDFS(ec); - + // Step 3) submit MR job (wait for finished work) MatrixObject colocatedDPMatrixObj = (_colocatedDPMatrix!=null)? ec.getMatrixObject(_colocatedDPMatrix) : null; - RemoteParForJobReturn ret = RemoteParForMR.runJob(_ID, program, taskFile, resultFile, colocatedDPMatrixObj, _enableCPCaching, - _numThreads, WRITE_REPLICATION_FACTOR, MAX_RETRYS_ON_ERROR, getMinMemory(ec), - (ALLOW_REUSE_MR_JVMS & _jvmReuse) ); + RemoteParForJobReturn ret = RemoteParForMR.runJob(_ID, program, taskFile, resultFile, + colocatedDPMatrixObj, _enableCPCaching,_numThreads, WRITE_REPLICATION_FACTOR, MAX_RETRYS_ON_ERROR, + getMinMemory(ec), (ALLOW_REUSE_MR_JVMS & _jvmReuse) ); if( _monitor ) StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop()); - - + // Step 4) collecting results from each parallel worker int numExecutedTasks = ret.getNumExecutedTasks(); int numExecutedIterations = ret.getNumExecutedIterations(); //consolidate results into global symbol table - consolidateAndCheckResults( ec, numIterations, numCreatedTasks, numExecutedIterations , numExecutedTasks, - ret.getVariables() ); + consolidateAndCheckResults( ec, numIterations, numCreatedTasks, + numExecutedIterations , numExecutedTasks, ret.getVariables() ); if( flagForced ) //see step 0 releaseForcedRecompile(0); - if( _monitor ) - { + if( _monitor ) { StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_RESULTS_T, time.stop()); StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTASKS, numExecutedTasks); StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, numExecutedIterations); - } - } + } + } private void executeRemoteMRParForDP( ExecutionContext ec, IntObject itervar, IntObject from, IntObject to, IntObject incr ) throws DMLRuntimeException, IOException { /* Step 0) check and recompile MR inst * Step 1) serialize child PB and inst - * Step 2) create tasks - * serialize tasks - * Step 3) submit MR Jobs and wait for results + * Step 2) create and serialize tasks + * Step 3) submit MR Jobs and wait for results * Step 4) collect results from each parallel worker */ @@ -975,8 +955,8 @@ public class ParForProgramBlock extends ForProgramBlock inputMatrix.setPartitioned(inputDPF._dpf, inputDPF._N); //mark matrix var as partitioned // Step 2) init parallel workers (serialize PBs) - // NOTES: each mapper changes filenames with regard to his ID as we submit a single job, - // cannot reuse serialized string, since variables are serialized as well. + // NOTES: each mapper changes filenames with regard to his ID as we submit a single + // job, cannot reuse serialized string, since variables are serialized as well. ParForBody body = new ParForBody( _childBlocks, _resultVars, ec ); String program = ProgramConverter.serializeParForBody( body ); @@ -988,41 +968,40 @@ public class ParForProgramBlock extends ForProgramBlock String resultFile = constructResultFileName(); long numIterations = partitioner.getNumIterations(); long numCreatedTasks = numIterations;//partitioner.createTasks().size(); - + if( _monitor ) StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop()); //write matrices to HDFS exportMatricesToHDFS(ec); - + // Step 4) submit MR job (wait for finished work) - OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && inputDPF==PartitionFormat.COLUMN_WISE)|| - (inputMatrix.getSparsity()<0.001 && inputDPF==PartitionFormat.ROW_WISE))? - OutputInfo.BinaryCellOutputInfo : OutputInfo.BinaryBlockOutputInfo; + OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && inputDPF==PartitionFormat.COLUMN_WISE) + || (inputMatrix.getSparsity()<0.001 && inputDPF==PartitionFormat.ROW_WISE)) ? + OutputInfo.BinaryCellOutputInfo : OutputInfo.BinaryBlockOutputInfo; RemoteParForJobReturn ret = RemoteDPParForMR.runJob(_ID, itervar.getName(), _colocatedDPMatrix, program, resultFile, - inputMatrix, inputDPF, inputOI, _tSparseCol, _enableCPCaching, _numThreads, _replicationDP ); + inputMatrix, inputDPF, inputOI, _tSparseCol, _enableCPCaching, _numThreads, _replicationDP ); - if( _monitor ) + if( _monitor ) StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop()); - + // Step 5) collecting results from each parallel worker int numExecutedTasks = ret.getNumExecutedTasks(); int numExecutedIterations = ret.getNumExecutedIterations(); //consolidate results into global symbol table - consolidateAndCheckResults( ec, numIterations, numCreatedTasks, numExecutedIterations, numExecutedTasks, - ret.getVariables() ); + consolidateAndCheckResults( ec, numIterations, numCreatedTasks, + numExecutedIterations, numExecutedTasks, ret.getVariables() ); if( flagForced ) //see step 0 releaseForcedRecompile(0); inputMatrix.unsetPartitioned(); - if( _monitor ) - { + if( _monitor ) { StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_RESULTS_T, time.stop()); StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTASKS, numExecutedTasks); StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, numExecutedIterations); - } + } } private void executeRemoteSparkParFor(ExecutionContext ec, IntObject itervar, IntObject from, IntObject to, IntObject incr) @@ -1032,15 +1011,15 @@ public class ParForProgramBlock extends ForProgramBlock // Step 0) check and compile to CP (if forced remote parfor) boolean flagForced = false; - if( FORCE_CP_ON_REMOTE_MR && (_optMode == POptMode.NONE || (_optMode == POptMode.CONSTRAINED && _execMode==PExecMode.REMOTE_SPARK)) ) - { + if( FORCE_CP_ON_REMOTE_MR && (_optMode == POptMode.NONE + || (_optMode == POptMode.CONSTRAINED && _execMode==PExecMode.REMOTE_SPARK)) ) { //tid = 0 because replaced in remote parworker flagForced = checkMRAndRecompileToCP(0); } // Step 1) init parallel workers (serialize PBs) - // NOTES: each mapper changes filenames with regard to his ID as we submit a single job, - // cannot reuse serialized string, since variables are serialized as well. + // NOTES: each mapper changes filenames with regard to his ID as we submit a single + // job, cannot reuse serialized string, since variables are serialized as well. ParForBody body = new ParForBody(_childBlocks, _resultVars, ec); HashMap<String, byte[]> clsMap = new HashMap<String, byte[]>(); String program = ProgramConverter.serializeParForBody(body, clsMap); @@ -1055,37 +1034,35 @@ public class ParForProgramBlock extends ForProgramBlock //sequentially create tasks as input to parfor job List<Task> tasks = partitioner.createTasks(); long numCreatedTasks = tasks.size(); - + if( _monitor ) StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop()); //write matrices to HDFS exportMatricesToHDFS(ec); - + // Step 3) submit Spark parfor job (no lazy evaluation, since collect on result) //MatrixObject colocatedDPMatrixObj = (_colocatedDPMatrix!=null)? (MatrixObject)ec.getVariable(_colocatedDPMatrix) : null; RemoteParForJobReturn ret = RemoteParForSpark.runJob(_ID, program, clsMap, tasks, ec, _enableCPCaching, _numThreads); if( _monitor ) StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop()); - - + // Step 4) collecting results from each parallel worker int numExecutedTasks = ret.getNumExecutedTasks(); int numExecutedIterations = ret.getNumExecutedIterations(); //consolidate results into global symbol table - consolidateAndCheckResults( ec, numIterations, numCreatedTasks, numExecutedIterations , numExecutedTasks, - ret.getVariables() ); + consolidateAndCheckResults( ec, numIterations, numCreatedTasks, + numExecutedIterations , numExecutedTasks, ret.getVariables() ); if( flagForced ) //see step 0 releaseForcedRecompile(0); - if( _monitor ) - { + if( _monitor ) { StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_RESULTS_T, time.stop()); StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTASKS, numExecutedTasks); StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, numExecutedIterations); - } + } } private void executeRemoteSparkParForDP( ExecutionContext ec, IntObject itervar, IntObject from, IntObject to, IntObject incr ) @@ -1096,15 +1073,15 @@ public class ParForProgramBlock extends ForProgramBlock // Step 0) check and compile to CP (if forced remote parfor) boolean flagForced = checkMRAndRecompileToCP(0); - // Step 1) prepare partitioned input matrix (needs to happen before serializing the progam) + // Step 1) prepare partitioned input matrix (needs to happen before serializing the program) ParForStatementBlock sb = (ParForStatementBlock) getStatementBlock(); MatrixObject inputMatrix = ec.getMatrixObject(_colocatedDPMatrix ); PartitionFormat inputDPF = sb.determineDataPartitionFormat( _colocatedDPMatrix ); - inputMatrix.setPartitioned(inputDPF._dpf, inputDPF._N); //mark matrix var as partitioned + inputMatrix.setPartitioned(inputDPF._dpf, inputDPF._N); //mark matrix var as partitioned // Step 2) init parallel workers (serialize PBs) - // NOTES: each mapper changes filenames with regard to his ID as we submit a single job, - // cannot reuse serialized string, since variables are serialized as well. + // NOTES: each mapper changes filenames with regard to his ID as we submit a single + // job, cannot reuse serialized string, since variables are serialized as well. ParForBody body = new ParForBody( _childBlocks, _resultVars, ec ); HashMap<String, byte[]> clsMap = new HashMap<String, byte[]>(); String program = ProgramConverter.serializeParForBody( body, clsMap ); @@ -1117,7 +1094,7 @@ public class ParForProgramBlock extends ForProgramBlock String resultFile = constructResultFileName(); long numIterations = partitioner.getNumIterations(); long numCreatedTasks = numIterations;//partitioner.createTasks().size(); - + if( _monitor ) StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop()); @@ -1126,34 +1103,30 @@ public class ParForProgramBlock extends ForProgramBlock // Step 4) submit MR job (wait for finished work) //TODO runtime support for binary cell partitioning - //OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && inputDPF==PDataPartitionFormat.COLUMN_WISE)|| - // (inputMatrix.getSparsity()<0.001 && inputDPF==PDataPartitionFormat.ROW_WISE))? - // OutputInfo.BinaryCellOutputInfo : OutputInfo.BinaryBlockOutputInfo; OutputInfo inputOI = OutputInfo.BinaryBlockOutputInfo; - RemoteParForJobReturn ret = RemoteDPParForSpark.runJob(_ID, itervar.getName(), _colocatedDPMatrix, program, clsMap, - resultFile, inputMatrix, ec, inputDPF, inputOI, _tSparseCol, _enableCPCaching, _numThreads ); + RemoteParForJobReturn ret = RemoteDPParForSpark.runJob(_ID, itervar.getName(), _colocatedDPMatrix, program, + clsMap, resultFile, inputMatrix, ec, inputDPF, inputOI, _tSparseCol, _enableCPCaching, _numThreads ); if( _monitor ) StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop()); - + // Step 5) collecting results from each parallel worker int numExecutedTasks = ret.getNumExecutedTasks(); int numExecutedIterations = ret.getNumExecutedIterations(); //consolidate results into global symbol table - consolidateAndCheckResults( ec, numIterations, numCreatedTasks, numExecutedIterations, numExecutedTasks, - ret.getVariables() ); + consolidateAndCheckResults( ec, numIterations, numCreatedTasks, + numExecutedIterations, numExecutedTasks, ret.getVariables() ); if( flagForced ) //see step 0 releaseForcedRecompile(0); inputMatrix.unsetPartitioned(); - if( _monitor ) - { + if( _monitor ) { StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_RESULTS_T, time.stop()); StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTASKS, numExecutedTasks); StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, numExecutedIterations); - } + } } private void handleDataPartitioning( ExecutionContext ec ) @@ -1161,7 +1134,7 @@ public class ParForProgramBlock extends ForProgramBlock { PDataPartitioner dataPartitioner = _dataPartitioner; if( dataPartitioner != PDataPartitioner.NONE ) - { + { ParForStatementBlock sb = (ParForStatementBlock) getStatementBlock(); if( sb == null ) throw new DMLRuntimeException("ParFor statement block required for reasoning about data partitioning."); @@ -1171,19 +1144,20 @@ public class ParForProgramBlock extends ForProgramBlock { Data dat = ec.getVariable(var); //skip non-existing input matrices (which are due to unknown sizes marked for - //partitioning but typically related branches are never executed) + //partitioning but typically related branches are never executed) if( dat != null && dat instanceof MatrixObject ) { MatrixObject moVar = (MatrixObject) dat; //unpartitioned input PartitionFormat dpf = sb.determineDataPartitionFormat( var ); - LOG.trace("PARFOR ID = "+_ID+", Partitioning read-only input variable "+var+" (format="+dpf+", mode="+_dataPartitioner+")"); + LOG.trace("PARFOR ID = "+_ID+", Partitioning read-only input variable " + + var + " (format="+dpf+", mode="+_dataPartitioner+")"); if( dpf != PartitionFormat.NONE ) { if( dataPartitioner != PDataPartitioner.REMOTE_SPARK && dpf.isBlockwise() ) { LOG.warn("PARFOR ID = "+_ID+", Switching data partitioner from " + dataPartitioner + - " to " + PDataPartitioner.REMOTE_SPARK.name()+" for blockwise-n partitioning."); + " to " + PDataPartitioner.REMOTE_SPARK.name()+" for blockwise-n partitioning."); dataPartitioner = PDataPartitioner.REMOTE_SPARK; } @@ -1214,9 +1188,8 @@ public class ParForProgramBlock extends ForProgramBlock //store original and partitioned matrix (for reuse if applicable) _variablesDPOriginal.put(var, moVar); - if( ALLOW_REUSE_PARTITION_VARS - && ProgramRecompiler.isApplicableForReuseVariable(sb.getDMLProg(), sb, var) ) - { + if( ALLOW_REUSE_PARTITION_VARS + && ProgramRecompiler.isApplicableForReuseVariable(sb.getDMLProg(), sb, var) ) { _variablesDPReuse.put(var, dpdatNew); } @@ -1233,7 +1206,6 @@ public class ParForProgramBlock extends ForProgramBlock if( OptimizerUtils.isSparkExecutionMode() && _variablesRP != null && !_variablesRP.isEmpty() ) { SparkExecutionContext sec = (SparkExecutionContext) ec; - for( String var : _variablesRP ) sec.repartitionAndCacheMatrixObject(var); } @@ -1245,7 +1217,6 @@ public class ParForProgramBlock extends ForProgramBlock if( OptimizerUtils.isSparkExecutionMode() && _variablesECache != null && !_variablesECache.isEmpty() ) { SparkExecutionContext sec = (SparkExecutionContext) ec; - for( String var : _variablesECache ) sec.cacheMatrixObject(var); } @@ -1262,8 +1233,7 @@ public class ParForProgramBlock extends ForProgramBlock private void cleanWorkerResultVariables(ExecutionContext ec, MatrixObject out, MatrixObject[] in) throws DMLRuntimeException { - for( MatrixObject tmp : in ) - { + for( MatrixObject tmp : in ) { //check for empty inputs (no iterations executed) if( tmp != null && tmp != out ) ec.cleanupMatrixObject(tmp); @@ -1299,8 +1269,7 @@ public class ParForProgramBlock extends ForProgramBlock switch( datatype ) { case SCALAR: - switch( valuetype ) - { + switch( valuetype ) { case BOOLEAN: dataObj = new BooleanObject(var,false); break; case INT: dataObj = new IntObject(var,-1); break; case DOUBLE: dataObj = new DoubleObject(var,-1d); break; @@ -1408,20 +1377,17 @@ public class ParForProgramBlock extends ForProgramBlock HashSet<String> fnNames = new HashSet<String>(); if( USE_PB_CACHE ) { - if( _pbcache.containsKey(pwID) ) - { + if( _pbcache.containsKey(pwID) ) { cpChildBlocks = _pbcache.get(pwID); } - else - { + else { cpChildBlocks = ProgramConverter.rcreateDeepCopyProgramBlocks(_childBlocks, pwID, _IDPrefix, new HashSet<String>(), fnNames, false, false); _pbcache.put(pwID, cpChildBlocks); } } - else - { + else { cpChildBlocks = ProgramConverter.rcreateDeepCopyProgramBlocks(_childBlocks, pwID, _IDPrefix, new HashSet<String>(), fnNames, false, false); - } + } //deep copy execution context (including prepare parfor update-in-place) ExecutionContext cpEc = ProgramConverter.createDeepCopyExecutionContext(ec); @@ -1443,8 +1409,7 @@ public class ParForProgramBlock extends ForProgramBlock pw = new LocalParWorker( pwID, queue, body, cconf, MAX_RETRYS_ON_ERROR, _monitor ); pw.setFunctionNames(fnNames); } - catch(Exception ex) - { + catch(Exception ex) { throw new DMLRuntimeException(ex); } @@ -1519,7 +1484,7 @@ public class ParForProgramBlock extends ForProgramBlock int maxNumRed = InfrastructureAnalyzer.getRemoteParallelReduceTasks(); //correction max number of reducers on yarn clusters if( InfrastructureAnalyzer.isYarnEnabled() ) - maxNumRed = (int)Math.max( maxNumRed, YarnClusterAnalyzer.getNumCores()/2 ); + maxNumRed = (int)Math.max( maxNumRed, YarnClusterAnalyzer.getNumCores()/2 ); int numRed = Math.min(numReducers,maxNumRed); //create data partitioner @@ -1530,12 +1495,12 @@ public class ParForProgramBlock extends ForProgramBlock break; case REMOTE_MR: dp = new DataPartitionerRemoteMR( dpf, _ID, numRed, - _replicationDP, ALLOW_REUSE_MR_JVMS, false ); + _replicationDP, ALLOW_REUSE_MR_JVMS, false ); break; case REMOTE_SPARK: dp = new DataPartitionerRemoteSpark( dpf, ec, numRed, - _replicationDP, false ); - break; + _replicationDP, false ); + break; default: throw new DMLRuntimeException("Unknown data partitioner: '" +dataPartitioner.name()+"'."); } @@ -1557,18 +1522,18 @@ public class ParForProgramBlock extends ForProgramBlock else { int numReducers = ConfigurationManager.getNumReducers(); maxMap = InfrastructureAnalyzer.getRemoteParallelMapTasks(); - maxRed = Math.min(numReducers, + maxRed = Math.min(numReducers, InfrastructureAnalyzer.getRemoteParallelReduceTasks()); //correction max number of reducers on yarn clusters - if( InfrastructureAnalyzer.isYarnEnabled() ) { - maxMap = (int)Math.max( maxMap, YarnClusterAnalyzer.getNumCores() ); - maxRed = (int)Math.max( maxRed, YarnClusterAnalyzer.getNumCores()/2 ); + if( InfrastructureAnalyzer.isYarnEnabled() ) { + maxMap = (int)Math.max( maxMap, YarnClusterAnalyzer.getNumCores() ); + maxRed = (int)Math.max( maxRed, YarnClusterAnalyzer.getNumCores()/2 ); } } int numMap = Math.max(_numThreads, maxMap); int numRed = maxRed; - //create result merge implementation + //create result merge implementation switch( prm ) { case LOCAL_MEM: @@ -1581,10 +1546,9 @@ public class ParForProgramBlock extends ForProgramBlock rm = new ResultMergeLocalAutomatic( out, in, fname ); break; case REMOTE_MR: - rm = new ResultMergeRemoteMR( out, in, fname, _ID, numMap, numRed, - WRITE_REPLICATION_FACTOR, - MAX_RETRYS_ON_ERROR, - ALLOW_REUSE_MR_JVMS ); + rm = new ResultMergeRemoteMR( out, in, fname, + _ID, numMap, numRed, WRITE_REPLICATION_FACTOR, + MAX_RETRYS_ON_ERROR, ALLOW_REUSE_MR_JVMS ); break; case REMOTE_SPARK: rm = new ResultMergeRemoteSpark( out, in, fname, ec, numMap, numRed ); @@ -1628,8 +1592,8 @@ public class ParForProgramBlock extends ForProgramBlock private void releaseForcedRecompile(long tid) throws DMLRuntimeException { - HashSet<String> fnStack = new HashSet<String>(); - Recompiler.recompileProgramBlockHierarchy2Forced(_childBlocks, tid, fnStack, null); + Recompiler.recompileProgramBlockHierarchy2Forced( + _childBlocks, tid, new HashSet<String>(), null); } private String writeTasksToFile(String fname, List<Task> tasks, int maxDigits) @@ -1641,17 +1605,15 @@ public class ParForProgramBlock extends ForProgramBlock Path path = new Path(fname); FileSystem fs = IOUtilFunctions.getFileSystem(path); br = new BufferedWriter(new OutputStreamWriter(fs.create(path,true))); - + boolean flagFirst = true; //workaround for keeping gen order - for( Task t : tasks ) - { + for( Task t : tasks ) { br.write( createTaskFileLine( t, maxDigits, flagFirst ) ); if( flagFirst ) flagFirst = false; } } - catch(Exception ex) - { + catch(Exception ex) { throw new DMLRuntimeException("Error writing tasks to taskfile "+fname, ex); } finally { @@ -1670,18 +1632,16 @@ public class ParForProgramBlock extends ForProgramBlock Path path = new Path( fname ); FileSystem fs = IOUtilFunctions.getFileSystem(path); br = new BufferedWriter(new OutputStreamWriter(fs.create(path,true))); - + Task t = null; boolean flagFirst = true; //workaround for keeping gen order - while( (t = queue.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS ) - { + while( (t = queue.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS ) { br.write( createTaskFileLine( t, maxDigits, flagFirst ) ); if( flagFirst ) flagFirst = false; } } - catch(Exception ex) - { + catch(Exception ex) { throw new DMLRuntimeException("Error writing tasks to taskfile "+fname, ex); } finally { @@ -1691,11 +1651,9 @@ public class ParForProgramBlock extends ForProgramBlock return fname; } - private String createTaskFileLine( Task t, int maxDigits, boolean flagFirst ) - { + private String createTaskFileLine( Task t, int maxDigits, boolean flagFirst ) { //always pad to max digits in order to preserve task order - String ret = t.toCompactString(maxDigits) + (flagFirst?" ":"") + "\n"; - return ret; + return t.toCompactString(maxDigits) + (flagFirst?" ":"") + "\n"; } private void consolidateAndCheckResults(ExecutionContext ec, long expIters, long expTasks, long numIters, long numTasks, LocalVariableMap [] results) @@ -1752,20 +1710,20 @@ public class ParForProgramBlock extends ForProgramBlock MatrixObject out = (MatrixObject) dat; MatrixObject[] in = new MatrixObject[ results.length ]; for( int i=0; i< results.length; i++ ) - in[i] = (MatrixObject) results[i].get( var ); + in[i] = (MatrixObject) results[i].get( var ); String fname = constructResultMergeFileName(); ResultMerge rm = createResultMerge(_resultMerge, out, in, fname, ec); MatrixObject outNew = null; if( USE_PARALLEL_RESULT_MERGE ) outNew = rm.executeParallelMerge( _numThreads ); else - outNew = rm.executeSerialMerge(); + outNew = rm.executeSerialMerge(); //cleanup existing var Data exdata = ec.removeVariable(var); if( exdata != null && exdata != outNew && exdata instanceof MatrixObject ) ec.cleanupMatrixObject((MatrixObject)exdata); - + //cleanup of intermediate result variables cleanWorkerResultVariables( ec, out, in ); @@ -1796,21 +1754,18 @@ public class ParForProgramBlock extends ForProgramBlock * * @return true if ? */ - private boolean checkParallelRemoteResultMerge() - { - return (USE_PARALLEL_RESULT_MERGE_REMOTE - && _resultVars.size() > 1 - && ( _resultMerge == PResultMerge.REMOTE_MR - ||_resultMerge == PResultMerge.REMOTE_SPARK) ); + private boolean checkParallelRemoteResultMerge() { + return (USE_PARALLEL_RESULT_MERGE_REMOTE && _resultVars.size() > 1 + && ( _resultMerge == PResultMerge.REMOTE_MR + ||_resultMerge == PResultMerge.REMOTE_SPARK) ); } - private void setParForProgramBlockIDs(int IDPrefix) - { + private void setParForProgramBlockIDs(int IDPrefix) { _IDPrefix = IDPrefix; if( _IDPrefix == -1 ) //not specified _ID = _pfIDSeq.getNextID(); //generated new ID else //remote case (further nested parfors are all in one JVM) - _ID = IDHandler.concatIntIDsToLong(_IDPrefix, (int)_pfIDSeq.getNextID()); + _ID = IDHandler.concatIntIDsToLong(_IDPrefix, (int)_pfIDSeq.getNextID()); } /** @@ -1826,8 +1781,7 @@ public class ParForProgramBlock extends ForProgramBlock _pwIDs = new long[ _numThreads ]; - for( int i=0; i<_numThreads; i++ ) - { + for( int i=0; i<_numThreads; i++ ) { if(_IDPrefix == -1) _pwIDs[i] = _pwIDSeq.getNextID(); else @@ -1838,8 +1792,7 @@ public class ParForProgramBlock extends ForProgramBlock } } - private long computeNumIterations( IntObject from, IntObject to, IntObject incr ) - { + private long computeNumIterations( IntObject from, IntObject to, IntObject incr ) { return (long)Math.ceil(((double)(to.getLongValue() - from.getLongValue() + 1)) / incr.getLongValue()); } @@ -1849,10 +1802,8 @@ public class ParForProgramBlock extends ForProgramBlock * * @return task file name */ - private String constructTaskFileName() - { + private String constructTaskFileName() { String scratchSpaceLoc = ConfigurationManager.getScratchSpace(); - StringBuilder sb = new StringBuilder(); sb.append(scratchSpaceLoc); sb.append(Lop.FILE_SEPARATOR); @@ -1869,10 +1820,8 @@ public class ParForProgramBlock extends ForProgramBlock * * @return result file name */ - private String constructResultFileName() - { + private String constructResultFileName() { String scratchSpaceLoc = ConfigurationManager.getScratchSpace(); - StringBuilder sb = new StringBuilder(); sb.append(scratchSpaceLoc); sb.append(Lop.FILE_SEPARATOR); @@ -1880,13 +1829,11 @@ public class ParForProgramBlock extends ForProgramBlock sb.append(DMLScript.getUUID()); sb.append(PARFOR_MR_RESULT_TMP_FNAME.replaceAll("%ID%", String.valueOf(_ID))); - return sb.toString(); + return sb.toString(); } - private String constructResultMergeFileName() - { + private String constructResultMergeFileName() { String scratchSpaceLoc = ConfigurationManager.getScratchSpace(); - String fname = PARFOR_MR_RESULTMERGE_FNAME; fname = fname.replaceAll("%ID%", String.valueOf(_ID)); //replace workerID fname = fname.replaceAll("%VAR%", String.valueOf(_resultVarsIDSeq.getNextID())); @@ -1898,13 +1845,11 @@ public class ParForProgramBlock extends ForProgramBlock sb.append(DMLScript.getUUID()); sb.append(fname); - return sb.toString(); + return sb.toString(); } - private String constructDataPartitionsFileName() - { + private String constructDataPartitionsFileName() { String scratchSpaceLoc = ConfigurationManager.getScratchSpace(); - String fname = PARFOR_DATAPARTITIONS_FNAME; fname = fname.replaceAll("%ID%", String.valueOf(_ID)); //replace workerID fname = fname.replaceAll("%VAR%", String.valueOf(_dpVarsIDSeq.getNextID())); @@ -1916,7 +1861,7 @@ public class ParForProgramBlock extends ForProgramBlock sb.append(DMLScript.getUUID()); sb.append(fname); - return sb.toString(); + return sb.toString(); } private long getMinMemory(ExecutionContext ec) @@ -1946,10 +1891,8 @@ public class ParForProgramBlock extends ForProgramBlock return ret; } - private void setMemoryBudget() - { - if( _recompileMemoryBudget > 0 ) - { + private void setMemoryBudget() { + if( _recompileMemoryBudget > 0 ) { // store old budget for reset after exec _oldMemoryBudget = (double)InfrastructureAnalyzer.getLocalMaxMemory(); @@ -1959,16 +1902,12 @@ public class ParForProgramBlock extends ForProgramBlock } } - private void resetMemoryBudget() - { + private void resetMemoryBudget() { if( _recompileMemoryBudget > 0 ) - { InfrastructureAnalyzer.setLocalMaxMemory((long)_oldMemoryBudget); - } } - private void resetOptimizerFlags() - { + private void resetOptimizerFlags() { //reset all state that was set but is not guaranteed to be overwritten by optimizer _variablesDPOriginal.removeAll(); _colocatedDPMatrix = null; @@ -2021,7 +1960,7 @@ public class ParForProgramBlock extends ForProgramBlock MatrixObject[] in = new MatrixObject[ _refVars.length ]; for( int i=0; i< _refVars.length; i++ ) - in[i] = (MatrixObject) _refVars[i].get( varname ); + in[i] = (MatrixObject) _refVars[i].get( varname ); String fname = constructResultMergeFileName(); ResultMerge rm = createResultMerge(_resultMerge, out, in, fname, _ec); @@ -2029,7 +1968,7 @@ public class ParForProgramBlock extends ForProgramBlock if( USE_PARALLEL_RESULT_MERGE ) outNew = rm.executeParallelMerge( _numThreads ); else - outNew = rm.executeSerialMerge(); + outNew = rm.executeSerialMerge(); synchronized( _ec.getVariables() ){ _ec.getVariables().put( varname, outNew); @@ -2051,5 +1990,4 @@ public class ParForProgramBlock extends ForProgramBlock public String printBlockErrorLocation(){ return "ERROR: Runtime error in parfor program block generated from parfor statement block between lines " + _beginLine + " and " + _endLine + " -- "; } - -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java index 0bd73d1..59d2589 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java @@ -544,23 +544,18 @@ public class ExecutionContext { for( String var : varList ) { Data dat = _variables.get(var); - if( dat instanceof MatrixObject ) - { + if( dat instanceof MatrixObject ) { MatrixObject mo = (MatrixObject)dat; varsState.put( var, mo.isCleanupEnabled() ); - //System.out.println("pre-pin "+var+" ("+mo.isCleanupEnabled()+")"); } } //step 2) pin variables - for( String var : varList ) - { + for( String var : varList ) { Data dat = _variables.get(var); - if( dat instanceof MatrixObject ) - { + if( dat instanceof MatrixObject ) { MatrixObject mo = (MatrixObject)dat; mo.enableCleanup(false); - //System.out.println("pin "+var); } } @@ -583,11 +578,8 @@ public class ExecutionContext { * @param varList variable list * @param varsState variable state */ - public void unpinVariables(ArrayList<String> varList, HashMap<String,Boolean> varsState) - { - for( String var : varList) - { - //System.out.println("unpin "+var+" ("+varsState.get(var)+")"); + public void unpinVariables(ArrayList<String> varList, HashMap<String,Boolean> varsState) { + for( String var : varList) { Data dat = _variables.get(var); if( dat instanceof MatrixObject ) ((MatrixObject)dat).enableCleanup(varsState.get(var)); @@ -597,15 +589,28 @@ public class ExecutionContext { /** * NOTE: No order guaranteed, so keep same list for pin and unpin. * - * @return variable list as strings + * @return list of all variable names. */ - public ArrayList<String> getVarList() - { - ArrayList<String> varlist = new ArrayList<String>(); - varlist.addAll(_variables.keySet()); - return varlist; + public ArrayList<String> getVarList() { + return new ArrayList<>(_variables.keySet()); } - + + /** + * NOTE: No order guaranteed, so keep same list for pin and unpin. + * + * @return list of all variable names of partitioned matrices. + */ + public ArrayList<String> getVarListPartitioned() { + ArrayList<String> ret = new ArrayList<>(); + for( String var : _variables.keySet() ) { + Data dat = _variables.get(var); + if( dat instanceof MatrixObject + && ((MatrixObject)dat).isPartitioned() ) + ret.add(var); + } + return ret; + } + public void cleanupMatrixObject(MatrixObject mo) throws DMLRuntimeException { http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/CachedReuseVariables.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/CachedReuseVariables.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/CachedReuseVariables.java new file mode 100644 index 0000000..48ef883 --- /dev/null +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/CachedReuseVariables.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.runtime.controlprogram.parfor; + +import java.lang.ref.SoftReference; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; + +import org.apache.sysml.runtime.controlprogram.LocalVariableMap; + +public class CachedReuseVariables +{ + private final HashMap<Long, SoftReference<LocalVariableMap>> _data; + + public CachedReuseVariables() { + _data = new HashMap<>(); + } + + public synchronized void reuseVariables(long pfid, LocalVariableMap vars, Collection<String> blacklist) { + //check for existing reuse map + LocalVariableMap tmp = null; + if( _data.containsKey(pfid) ) + tmp = _data.get(pfid).get(); + + //build reuse map if not created yet or evicted + if( tmp == null ) { + tmp = new LocalVariableMap(vars); + tmp.removeAllIn(new HashSet<>(blacklist)); + _data.put(pfid, new SoftReference<>(tmp)); + } + //reuse existing reuse map + else { + for( String varName : tmp.keySet() ) + vars.put(varName, tmp.get(varName)); + } + } + + public synchronized void clearVariables(long pfid) { + _data.remove(pfid); + } +} http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java index 8dc86d4..6d0cd3a 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java @@ -328,16 +328,15 @@ public class ProgramConverter ParForProgramBlock tmpPB = null; if( IDPrefix == -1 ) //still on master node - tmpPB = new ParForProgramBlock(prog,pfpb.getIterVar(), pfpb.getParForParams()); + tmpPB = new ParForProgramBlock(prog,pfpb.getIterVar(), pfpb.getParForParams(), pfpb.getResultVariables()); else //child of remote ParWorker at any level - tmpPB = new ParForProgramBlock(IDPrefix, prog, pfpb.getIterVar(), pfpb.getParForParams()); + tmpPB = new ParForProgramBlock(IDPrefix, prog, pfpb.getIterVar(), pfpb.getParForParams(), pfpb.getResultVariables()); tmpPB.setStatementBlock( createForStatementBlockCopy( (ForStatementBlock) pfpb.getStatementBlock(), pid, plain, forceDeepCopy) ); tmpPB.setThreadID(pid); tmpPB.disableOptimization(); //already done in top-level parfor tmpPB.disableMonitorReport(); //already done in top-level parfor - tmpPB.setResultVariables( pfpb.getResultVariables() ); tmpPB.setFromInstructions( createDeepCopyInstructionSet(pfpb.getFromInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); tmpPB.setToInstructions( createDeepCopyInstructionSet(pfpb.getToInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) ); @@ -1514,9 +1513,8 @@ public class ProgramConverter //program blocks //reset id to preinit state, replaced during exec ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, 0); - ParForProgramBlock pfpb = new ParForProgramBlock(id, prog, iterVar, params); + ParForProgramBlock pfpb = new ParForProgramBlock(id, prog, iterVar, params, resultVars); pfpb.disableOptimization(); //already done in top-level parfor - pfpb.setResultVariables(resultVars); pfpb.setFromInstructions(from); pfpb.setToInstructions(to); pfpb.setIncrementInstructions(incr); http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java index 49ac9db..10b44a2 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java @@ -34,6 +34,8 @@ import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.LocalVariableMap; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; +import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence; import org.apache.sysml.utils.Statistics; /** @@ -51,12 +53,14 @@ import org.apache.sysml.utils.Statistics; */ public class RemoteParForSpark { - protected static final Log LOG = LogFactory.getLog(RemoteParForSpark.class.getName()); - - public static RemoteParForJobReturn runJob(long pfid, String program, HashMap<String, byte[]> clsMap, + + //globally unique id for parfor spark job instances (unique across spark contexts) + private static final IDSequence _jobID = new IDSequence(); + + public static RemoteParForJobReturn runJob(long pfid, String prog, HashMap<String, byte[]> clsMap, List<Task> tasks, ExecutionContext ec, boolean cpCaching, int numMappers) - throws DMLRuntimeException + throws DMLRuntimeException { String jobname = "ParFor-ESP"; long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; @@ -68,13 +72,16 @@ public class RemoteParForSpark LongAccumulator aTasks = sc.sc().longAccumulator("tasks"); LongAccumulator aIters = sc.sc().longAccumulator("iterations"); + //reset cached shared inputs for correctness in local mode + long jobid = _jobID.getNextID(); + if( InfrastructureAnalyzer.isLocalMode() ) + RemoteParForSparkWorker.cleanupCachedVariables(jobid); + //run remote_spark parfor job //(w/o lazy evaluation to fit existing parfor framework, e.g., result merge) - RemoteParForSparkWorker func = new RemoteParForSparkWorker(program, clsMap, cpCaching, aTasks, aIters); - List<Tuple2<Long,String>> out = sc - .parallelize(tasks, tasks.size()) //create rdd of parfor tasks - .flatMapToPair(func) //execute parfor tasks - .collect(); //get output handles + List<Tuple2<Long,String>> out = sc.parallelize(tasks, tasks.size()) //create rdd of parfor tasks + .flatMapToPair(new RemoteParForSparkWorker(jobid, prog, clsMap, cpCaching, aTasks, aIters)) + .collect(); //execute and get output handles //de-serialize results LocalVariableMap[] results = RemoteParForUtils.getResults(out, LOG); @@ -85,11 +92,10 @@ public class RemoteParForSpark RemoteParForJobReturn ret = new RemoteParForJobReturn(true, numTasks, numIters, results); //maintain statistics - Statistics.incrementNoOfCompiledSPInst(); - Statistics.incrementNoOfExecutedSPInst(); - if( DMLScript.STATISTICS ){ + Statistics.incrementNoOfCompiledSPInst(); + Statistics.incrementNoOfExecutedSPInst(); + if( DMLScript.STATISTICS ) Statistics.maintainCPHeavyHitters(jobname, System.nanoTime()-t0); - } return ret; } http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java index e1410da..033d398 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java @@ -21,10 +21,12 @@ package org.apache.sysml.runtime.controlprogram.parfor; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.Map.Entry; +import org.apache.commons.collections.CollectionUtils; import org.apache.spark.TaskContext; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.spark.util.LongAccumulator; @@ -40,8 +42,11 @@ import scala.Tuple2; public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFunction<Task, Long, String> { private static final long serialVersionUID = -3254950138084272296L; - - private final String _prog; + + private static final CachedReuseVariables reuseVars = new CachedReuseVariables(); + + private final long _jobid; + private final String _prog; private final HashMap<String, byte[]> _clsMap; private boolean _initialized = false; private boolean _caching = true; @@ -49,9 +54,10 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun private final LongAccumulator _aTasks; private final LongAccumulator _aIters; - public RemoteParForSparkWorker(String program, HashMap<String, byte[]> clsMap, boolean cpCaching, LongAccumulator atasks, LongAccumulator aiters) + public RemoteParForSparkWorker(long jobid, String program, HashMap<String, byte[]> clsMap, boolean cpCaching, LongAccumulator atasks, LongAccumulator aiters) throws DMLRuntimeException { + _jobid = jobid; _prog = program; _clsMap = clsMap; _initialized = false; @@ -68,7 +74,7 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun { //lazy parworker initialization if( !_initialized ) - configureWorker( TaskContext.get().taskAttemptId() ); + configureWorker(TaskContext.get().taskAttemptId()); //execute a single task long numIter = getExecutedIterations(); @@ -88,10 +94,11 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun return ret.iterator(); } - private void configureWorker( long ID ) + @SuppressWarnings("unchecked") + private void configureWorker(long taskID) throws DMLRuntimeException, IOException { - _workerID = ID; + _workerID = taskID; //initialize codegen class cache (before program parsing) synchronized( CodegenUtils.class ) { @@ -106,7 +113,13 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun _resultVars = body.getResultVarNames(); _numTasks = 0; _numIters = 0; - + + //reuse shared inputs (to read shared inputs once per process instead of once per core; + //we reuse everything except result variables and partitioned input matrices) + _ec.pinVariables(_ec.getVarList()); //avoid cleanup of shared inputs + Collection<String> blacklist = CollectionUtils.union(_resultVars, _ec.getVarListPartitioned()); + reuseVars.reuseVariables(_jobid, _ec.getVariables(), blacklist); + //init and register-cleanup of buffer pool (in parfor spark, multiple tasks might //share the process-local, i.e., per executor, buffer pool; hence we synchronize //the initialization and immediately register the created directory for cleanup @@ -121,7 +134,7 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; //register entire working dir for delete on shutdown RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown(); - } + } } //ensure that resultvar files are not removed @@ -134,4 +147,8 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun //mark as initialized _initialized = true; } + + public static void cleanupCachedVariables(long pfid) { + reuseVars.clearVariables(pfid); + } } http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java index d202e07..7dd25bf 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java @@ -57,14 +57,14 @@ public class TaskPartitionerFactoring extends TaskPartitioner { LinkedList<Task> tasks = new LinkedList<Task>(); - long lFrom = _fromVal.getLongValue(); - long lTo = _toVal.getLongValue(); - long lIncr = _incrVal.getLongValue(); + long lFrom = _fromVal.getLongValue(); + long lTo = _toVal.getLongValue(); + long lIncr = _incrVal.getLongValue(); int P = _numThreads; // number of parallel workers - long N = _numIter; // total number of iterations - long R = N; // remaining number of iterations - long K = -1; // next _numThreads task sizes + long N = _numIter; // total number of iterations + long R = N; // remaining number of iterations + long K = -1; // next _numThreads task sizes TaskType type = null; // type of iterations: range tasks (similar to run-length encoding) make only sense if taskSize>3 for( long i = lFrom; i<=lTo; ) @@ -73,7 +73,7 @@ public class TaskPartitionerFactoring extends TaskPartitioner R -= (K * P); type = (ParForProgramBlock.USE_RANGE_TASKS_IF_USEFUL && K>3 ) ? - TaskType.RANGE : TaskType.SET; + TaskType.RANGE : TaskType.SET; //for each logical processor for( int j=0; j<P; j++ ) @@ -86,16 +86,12 @@ public class TaskPartitionerFactoring extends TaskPartitioner tasks.addLast(lTask); // add iterations to task - if( type == TaskType.SET ) - { + if( type == TaskType.SET ) { //value based tasks for( long k=0; k<K && i<=lTo; k++, i+=lIncr ) - { - lTask.addIteration(new IntObject(_iterVarName, i)); - } + lTask.addIteration(new IntObject(_iterVarName, i)); } - else - { + else { //determine end of task long to = Math.min( i+(K-1)*lIncr, lTo ); @@ -103,7 +99,6 @@ public class TaskPartitionerFactoring extends TaskPartitioner lTask.addIteration(new IntObject(_iterVarName, i)); //from lTask.addIteration(new IntObject(_iterVarName, to)); //to lTask.addIteration(new IntObject(_iterVarName, lIncr)); //increment - i = to + lIncr; } } @@ -122,11 +117,11 @@ public class TaskPartitionerFactoring extends TaskPartitioner long lTo = _toVal.getLongValue(); long lIncr = _incrVal.getLongValue(); - int P = _numThreads; // number of parallel workers + int P = _numThreads; // number of parallel workers long N = _numIter; // total number of iterations - long R = N; // remaining number of iterations - long K = -1; //next _numThreads task sizes - TaskType type = null; // type of iterations: range tasks (similar to run-length encoding) make only sense if taskSize>3 + long R = N; // remaining number of iterations + long K = -1; //next _numThreads task sizes + TaskType type = null; // type of iterations: range tasks (similar to run-length encoding) make only sense if taskSize>3 try {