Repository: systemml
Updated Branches:
  refs/heads/master 912c65506 -> ba73291c9


[SYSTEMML-1879] Performance parfor remote spark (reuse shared inputs)

In parfor remote spark jobs, each worker is initialized with its own
deserialized symbol table, which causes redundant reads of shared inputs
in each parfor worker and is unnecessarily memory-inefficient. This
patch introduces a principled approach to reusing shared inputs, where
we reuse all variables except for result variables and partitioned
matrices. By simply using common instances of matrix objects, the
sharing happens automatically through the bufferpool similar to local
parfor execution and without additional pinned memory requirements. On
the perftest scenario MSVM 1M x 1K, sparse with 150 classes and 25
iterations, the end-to-end runtime (including read and spark context
creation) improved from 94s to 72s.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/2c57cf77
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/2c57cf77
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/2c57cf77

Branch: refs/heads/master
Commit: 2c57cf779e3b1a583ee4062bbd268f592537ef05
Parents: 912c655
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Fri Sep 1 20:29:49 2017 -0700
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Fri Sep 1 20:31:08 2017 -0700

----------------------------------------------------------------------
 .../org/apache/sysml/parser/DMLTranslator.java  |  14 +-
 .../controlprogram/LocalVariableMap.java        |   5 +
 .../controlprogram/ParForProgramBlock.java      | 464 ++++++++-----------
 .../context/ExecutionContext.java               |  45 +-
 .../parfor/CachedReuseVariables.java            |  59 +++
 .../controlprogram/parfor/ProgramConverter.java |   8 +-
 .../parfor/RemoteParForSpark.java               |  32 +-
 .../parfor/RemoteParForSparkWorker.java         |  33 +-
 .../parfor/TaskPartitionerFactoring.java        |  33 +-
 9 files changed, 356 insertions(+), 337 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/parser/DMLTranslator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/DMLTranslator.java 
b/src/main/java/org/apache/sysml/parser/DMLTranslator.java
index e6b8590..f44c0a4 100644
--- a/src/main/java/org/apache/sysml/parser/DMLTranslator.java
+++ b/src/main/java/org/apache/sysml/parser/DMLTranslator.java
@@ -617,12 +617,11 @@ public class DMLTranslator
                        ForProgramBlock rtpb = null;
                        IterablePredicate iterPred = fsb.getIterPredicate();
                        
-                       if( sb instanceof ParForStatementBlock )
-                       {
+                       if( sb instanceof ParForStatementBlock ) {
                                sbName = "ParForStatementBlock";
-                               rtpb = new ParForProgramBlock(prog, 
iterPred.getIterVar().getName(), iterPred.getParForParams());
+                               rtpb = new ParForProgramBlock(prog, 
iterPred.getIterVar().getName(),
+                                       iterPred.getParForParams(), 
((ParForStatementBlock)sb).getResultVariables());
                                ParForProgramBlock pfrtpb = 
(ParForProgramBlock)rtpb;
-                               pfrtpb.setResultVariables( 
((ParForStatementBlock)sb).getResultVariables() );
                                
pfrtpb.setStatementBlock((ParForStatementBlock)sb); //used for optimization and 
creating unscoped variables
                        }
                        else {//ForStatementBlock
@@ -636,8 +635,8 @@ public class DMLTranslator
                        
                        // process the body of the for statement block
                        if (fsb.getNumStatements() > 1){
-                               LOG.error(fsb.printBlockErrorLocation() + " "  
+ sbName + " should have 1 statement" );
-                               throw new 
LopsException(fsb.printBlockErrorLocation() + " "  + sbName + " should have 1 
statement" );
+                               LOG.error(fsb.printBlockErrorLocation() + " " + 
sbName + " should have 1 statement" );
+                               throw new 
LopsException(fsb.printBlockErrorLocation() + " " + sbName + " should have 1 
statement" );
                        }
                        ForStatement fs = (ForStatement)fsb.getStatement(0);
                        for (StatementBlock sblock : fs.getBody()){
@@ -653,9 +652,6 @@ public class DMLTranslator
                        
                        retPB = rtpb;
                        
-                       //post processing for generating missing instructions
-                       //retPB = verifyAndCorrectProgramBlock(sb.liveIn(), 
sb.liveOut(), sb._kill, retPB);
-                       
                        // add statement block
                        retPB.setStatementBlock(sb);
                        

http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
index 94d85ad..7e59951 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
@@ -91,6 +91,11 @@ public class LocalVariableMap implements Cloneable
                localMap.clear();
        }
        
+       public void removeAllIn(Set<String> blacklist) {
+               localMap.entrySet().removeIf(
+                       e -> blacklist.contains(e.getKey()));
+       }
+       
        public void removeAllNotIn(Set<String> blacklist) {
                localMap.entrySet().removeIf(
                        e -> !blacklist.contains(e.getKey()));

http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index ce8bbed..b393cd1 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -125,22 +125,22 @@ public class ParForProgramBlock extends ForProgramBlock
 {      
        // execution modes
        public enum PExecMode {
-               LOCAL,      //local (master) multi-core execution mode
-               REMOTE_MR,      //remote (MR cluster) execution mode
-               REMOTE_MR_DP,   //remote (MR cluster) execution mode, fused 
with data partitioning
-               REMOTE_SPARK,   //remote (Spark cluster) execution mode
+               LOCAL,          //local (master) multi-core execution mode
+               REMOTE_MR,      //remote (MR cluster) execution mode
+               REMOTE_MR_DP,   //remote (MR cluster) execution mode, fused 
with data partitioning
+               REMOTE_SPARK,   //remote (Spark cluster) execution mode
                REMOTE_SPARK_DP,//remote (Spark cluster) execution mode, fused 
with data partitioning
                UNSPECIFIED
        }
 
        // task partitioner
        public enum PTaskPartitioner {
-               FIXED,      //fixed-sized task partitioner, uses tasksize 
-               NAIVE,      //naive task partitioner (tasksize=1)
-               STATIC,     //static task partitioner (numIterations/numThreads)
-               FACTORING,  //factoring task partitioner  
-               FACTORING_CMIN,  //constrained factoring task partitioner, uses 
tasksize as min constraint
-               FACTORING_CMAX,  //constrained factoring task partitioner, uses 
tasksize as max constraint
+               FIXED,          //fixed-sized task partitioner, uses tasksize 
+               NAIVE,          //naive task partitioner (tasksize=1)
+               STATIC,         //static task partitioner 
(numIterations/numThreads)
+               FACTORING,      //factoring task partitioner  
+               FACTORING_CMIN, //constrained factoring task partitioner, uses 
tasksize as min constraint
+               FACTORING_CMAX, //constrained factoring task partitioner, uses 
tasksize as max constraint
                UNSPECIFIED
        }
        
@@ -259,10 +259,10 @@ public class ParForProgramBlock extends ForProgramBlock
        }
        
        public enum PDataPartitioner {
-               NONE,       // no data partitioning
-               LOCAL,      // local file based partition split on master node
-               REMOTE_MR,  // remote partition split using a reblock MR job 
-               REMOTE_SPARK, // remote partition split using a spark job
+               NONE,            // no data partitioning
+               LOCAL,           // local file based partition split on master 
node
+               REMOTE_MR,       // remote partition split using a reblock MR 
job 
+               REMOTE_SPARK,    // remote partition split using a spark job
                UNSPECIFIED, 
        }
 
@@ -277,21 +277,21 @@ public class ParForProgramBlock extends ForProgramBlock
        
        //optimizer
        public enum POptMode{
-               NONE,        //no optimization, use defaults and specified 
parameters
-               RULEBASED,   //rule-based rewritings with memory constraints 
-               CONSTRAINED, //same as rule-based but with given params as 
constraints
-               HEURISTIC,   //smae as rule-based but with time-based cost 
estimates
+               NONE,            //no optimization, use defaults and specified 
parameters
+               RULEBASED,       //rule-based rewritings with memory 
constraints 
+               CONSTRAINED,     //same as rule-based but with given params as 
constraints
+               HEURISTIC,       //same as rule-based but with time-based cost 
estimates
        }
-               
+       
        // internal parameters
-       public static final boolean OPTIMIZE                    = true; // run 
all automatic optimizations on top-level parfor
+       public static final boolean OPTIMIZE                    = true; // run 
all automatic optimizations on top-level parfor
        public static final boolean USE_PB_CACHE                = false; // 
reuse copied program blocks whenever possible, not there can be issues related 
to recompile
-       public static       boolean USE_RANGE_TASKS_IF_USEFUL   = true;         
// use range tasks whenever size>3, false, otherwise wrong split order in 
remote 
-       public static final boolean USE_STREAMING_TASK_CREATION = true;         
// start working while still creating tasks, prevents blocking due to too small 
task queue
-       public static final boolean ALLOW_NESTED_PARALLELISM    = true;    // 
if not, transparently change parfor to for on program conversions (local,remote)
-       public static       boolean ALLOW_REUSE_MR_JVMS         = true;    // 
potential benefits: less setup costs per task, NOTE> cannot be used MR4490 in 
Hadoop 1.0.3, still not fixed in 1.1.1
+       public static final boolean USE_RANGE_TASKS_IF_USEFUL   = true; // use 
range tasks whenever size>3, false, otherwise wrong split order in remote 
+       public static final boolean USE_STREAMING_TASK_CREATION = true; // 
start working while still creating tasks, prevents blocking due to too small 
task queue
+       public static final boolean ALLOW_NESTED_PARALLELISM    = true; // if 
not, transparently change parfor to for on program conversions (local,remote)
+       public static       boolean ALLOW_REUSE_MR_JVMS         = true; // 
potential benefits: less setup costs per task, NOTE> cannot be used MR4490 in 
Hadoop 1.0.3, still not fixed in 1.1.1
        public static       boolean ALLOW_REUSE_MR_PAR_WORKER   = 
ALLOW_REUSE_MR_JVMS; //potential benefits: less initialization, reuse in-memory 
objects and result consolidation!
-       public static final boolean USE_PARALLEL_RESULT_MERGE   = false;    // 
if result merge is run in parallel or serial 
+       public static final boolean USE_PARALLEL_RESULT_MERGE   = false; // if 
result merge is run in parallel or serial 
        public static final boolean USE_PARALLEL_RESULT_MERGE_REMOTE = true; // 
if remote result merge should be run in parallel for multiple result vars
        public static final boolean ALLOW_DATA_COLOCATION       = true;
        public static final boolean CREATE_UNSCOPED_RESULTVARS  = true;
@@ -299,7 +299,7 @@ public class ParForProgramBlock extends ForProgramBlock
        public static final int     WRITE_REPLICATION_FACTOR    = 1;
        public static final int     MAX_RETRYS_ON_ERROR         = 1;
        public static final boolean FORCE_CP_ON_REMOTE_MR       = true; // 
compile body to CP if exec type forced to MR
-       public static final boolean LIVEVAR_AWARE_EXPORT        = true; 
//export only read variables according to live variable analysis
+       public static final boolean LIVEVAR_AWARE_EXPORT        = true; // 
export only read variables according to live variable analysis
        public static final boolean RESET_RECOMPILATION_FLAGs   = true;
        
        public static final String PARFOR_FNAME_PREFIX          = "/parfor/"; 
@@ -311,69 +311,61 @@ public class ParForProgramBlock extends ForProgramBlock
        public static final String PARFOR_COUNTER_GROUP_NAME    = "SystemML 
ParFOR Counters";
        
        // static ID generator sequences
-       private static IDSequence   _pfIDSeq        = null;
-       private static IDSequence   _pwIDSeq        = null;
+       private final static IDSequence _pfIDSeq = new IDSequence();
+       private final static IDSequence _pwIDSeq = new IDSequence();
        
        // runtime parameters
-       protected HashMap<String,String> _params    = null;
-       protected int              _numThreads      = -1;
-       protected PTaskPartitioner _taskPartitioner = null; 
-       protected long             _taskSize        = -1;
+       protected final HashMap<String,String> _params;
+       protected final boolean _monitor;
+       protected final Level _optLogLevel;
+       protected int _numThreads = -1;
+       protected long _taskSize = -1;
+       protected PTaskPartitioner _taskPartitioner = null;
        protected PDataPartitioner _dataPartitioner = null;
-       protected PResultMerge     _resultMerge     = null;
-       protected PExecMode        _execMode        = null;
-       protected POptMode         _optMode         = null;
-       protected boolean          _monitor         = false;
-       protected Level            _optLogLevel     = null;
-       
+       protected PResultMerge _resultMerge = null;
+       protected PExecMode _execMode = null;
+       protected POptMode _optMode = null;
        
        //specifics used for optimization
-       protected long             _numIterations   = -1; 
+       protected long _numIterations = -1;
        
        //specifics used for data partitioning
        protected LocalVariableMap _variablesDPOriginal = null;
-       protected LocalVariableMap _variablesDPReuse    = null;
-       protected String           _colocatedDPMatrix   = null;
-       protected boolean          _tSparseCol          = false;
-       protected int              _replicationDP       = 
WRITE_REPLICATION_FACTOR;
-       protected int              _replicationExport   = -1;
+       protected LocalVariableMap _variablesDPReuse = null;
+       protected String _colocatedDPMatrix = null;
+       protected boolean _tSparseCol = false;
+       protected int _replicationDP = WRITE_REPLICATION_FACTOR;
+       protected int _replicationExport = -1;
        //specifics used for result partitioning
-       protected boolean          _jvmReuse            = true;
+       protected boolean _jvmReuse = true;
        //specifics used for recompilation 
-       protected double           _oldMemoryBudget = -1;
-       protected double           _recompileMemoryBudget = -1;
+       protected double _oldMemoryBudget = -1;
+       protected double _recompileMemoryBudget = -1;
        //specifics for caching
-       protected boolean          _enableCPCaching     = true;
-       protected boolean          _enableRuntimePiggybacking = false;
+       protected boolean _enableCPCaching = true;
+       protected boolean _enableRuntimePiggybacking = false;
        //specifics for spark 
        protected Collection<String> _variablesRP = null;
        protected Collection<String> _variablesECache = null;
        
        // program block meta data
-       protected long                _ID           = -1;
-       protected int                 _IDPrefix     = -1;
-       protected ArrayList<String>  _resultVars      = null;
-       protected IDSequence         _resultVarsIDSeq = null;
-       protected IDSequence         _dpVarsIDSeq     = null;
-       protected boolean            _monitorReport   = false;
-       protected boolean            _hasFunctions    = true;
+       protected final ArrayList<String> _resultVars;
+       protected final IDSequence _resultVarsIDSeq;
+       protected final IDSequence _dpVarsIDSeq;
+       protected final boolean _hasFunctions;
+       
+       protected long _ID = -1;
+       protected int _IDPrefix = -1;
+       protected boolean _monitorReport = false;
        
        // local parworker data
-       protected long[]                                            _pwIDs   = 
null;
        protected HashMap<Long,ArrayList<ProgramBlock>> _pbcache = null;
+       protected long[] _pwIDs = null;
        
-       
-       static
-       {
-               //init static ID sequence generators
-               _pfIDSeq = new IDSequence();
-               _pwIDSeq = new IDSequence();
-       }
-       
-       public ParForProgramBlock(Program prog, String iterPredVar, 
HashMap<String,String> params) 
+       public ParForProgramBlock(Program prog, String iterPredVar, 
HashMap<String,String> params, ArrayList<String> resultVars)
                throws DMLRuntimeException 
        {
-               this( -1, prog, iterPredVar, params);
+               this( -1, prog, iterPredVar, params, resultVars);
        }
        
        /**
@@ -384,9 +376,10 @@ public class ParForProgramBlock extends ForProgramBlock
         * @param prog runtime program
         * @param iterPredVars ?
         * @param params map of parameters
+        * @param resultVars list of result variable names
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public ParForProgramBlock(int ID, Program prog, String iterPredVar, 
HashMap<String,String> params) 
+       public ParForProgramBlock(int ID, Program prog, String iterPredVar, 
HashMap<String,String> params, ArrayList<String> resultVars) 
        {
                super(prog, iterPredVar);
 
@@ -395,6 +388,7 @@ public class ParForProgramBlock extends ForProgramBlock
                
                //ID generation and setting 
                setParForProgramBlockIDs( ID );
+               _resultVars = resultVars;
                _resultVarsIDSeq = new IDSequence();
                _dpVarsIDSeq = new IDSequence();
                
@@ -407,18 +401,18 @@ public class ParForProgramBlock extends ForProgramBlock
                        _dataPartitioner = PDataPartitioner.valueOf( 
getParForParam(ParForStatementBlock.DATA_PARTITIONER) );
                        _resultMerge     = PResultMerge.valueOf( 
getParForParam(ParForStatementBlock.RESULT_MERGE) );
                        _execMode        = PExecMode.valueOf( 
getParForParam(ParForStatementBlock.EXEC_MODE) );
-                       _optMode         = POptMode.valueOf( 
getParForParam(ParForStatementBlock.OPT_MODE) );           
+                       _optMode         = POptMode.valueOf( 
getParForParam(ParForStatementBlock.OPT_MODE) );
                        _optLogLevel     = Level.toLevel( 
getParForParam(ParForStatementBlock.OPT_LOG) );
                        _monitor         = 
(Integer.parseInt(getParForParam(ParForStatementBlock.PROFILE) ) == 1);
                }
                catch(Exception ex) {
                        throw new RuntimeException("Error parsing specified 
ParFOR parameters.",ex);
                }
-                       
+               
                //reset the internal opt mode if optimization globally disabled.
                if( !OPTIMIZE )
                        _optMode = POptMode.NONE;
-                       
+               
                _variablesDPOriginal = new LocalVariableMap();
                _variablesDPReuse = new LocalVariableMap();
                
@@ -453,19 +447,14 @@ public class ParForProgramBlock extends ForProgramBlock
        
        public String getParForParam(String key) {
                String tmp = getParForParams().get(key);
-               return (tmp == null) ? null :  
+               return (tmp == null) ? null :
                        UtilFunctions.unquote(tmp).toUpperCase();
        }
 
-       public ArrayList<String> getResultVariables()
-       {
+       public ArrayList<String> getResultVariables() {
                return _resultVars;
        }
        
-       public void setResultVariables(ArrayList<String> resultVars) {
-               _resultVars = resultVars;
-       }
-       
        public void disableOptimization() {
                _optMode = POptMode.NONE;
        }
@@ -574,7 +563,7 @@ public class ParForProgramBlock extends ForProgramBlock
                ALLOW_REUSE_MR_PAR_WORKER = ALLOW_REUSE_MR_JVMS;
        }
        
-       @Override       
+       @Override
        public void execute(ExecutionContext ec)
                throws DMLRuntimeException
        {
@@ -621,7 +610,7 @@ public class ParForProgramBlock extends ForProgramBlock
                
                if( _monitor ) 
                        StatisticMonitor.putPFStat(_ID, 
Stat.PARFOR_INIT_DATA_T, time.stop());
-                       
+               
                // initialize iter var to form value
                IntObject iterVar = new IntObject(_iterPredVar, 
from.getLongValue() );
                
@@ -630,8 +619,7 @@ public class ParForProgramBlock extends ForProgramBlock
                ///////
                LOG.trace("EXECUTE PARFOR ID = "+_ID+" with mode = 
"+_execMode+", numThreads = "+_numThreads+", taskpartitioner = 
"+_taskPartitioner);
                
-               if( _monitor )
-               {
+               if( _monitor ) {
                        StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTHREADS, 
     _numThreads);
                        StatisticMonitor.putPFStat(_ID, Stat.PARFOR_TASKSIZE,   
     _taskSize);
                        StatisticMonitor.putPFStat(_ID, 
Stat.PARFOR_TASKPARTITIONER, _taskPartitioner.ordinal());
@@ -644,13 +632,13 @@ public class ParForProgramBlock extends ForProgramBlock
                HashMap<String, Boolean> varState = ec.pinVariables(varList);
                
                try 
-               {               
+               {
                        switch( _execMode )
                        {
                                case LOCAL: //create parworkers as local threads
                                        executeLocalParFor(ec, iterVar, from, 
to, incr);
                                        break;
-                                       
+                               
                                case REMOTE_MR: // create parworkers as MR 
tasks (one job per parfor)
                                        executeRemoteMRParFor(ec, iterVar, 
from, to, incr);
                                        break;
@@ -669,10 +657,9 @@ public class ParForProgramBlock extends ForProgramBlock
                                
                                default:
                                        throw new 
DMLRuntimeException("Undefined execution mode: '"+_execMode+"'.");
-                       }       
+                       }
                }
-               catch(Exception ex)
-               {
+               catch(Exception ex) {
                        throw new DMLRuntimeException("PARFOR: Failed to 
execute loop in parallel.",ex);
                }
                
@@ -688,8 +675,7 @@ public class ParForProgramBlock extends ForProgramBlock
                
                //ensure that subsequent program blocks never see partitioned 
data (invalid plans!)
                //we can replace those variables, because partitioning only 
applied for read-only matrices
-               for( String var : _variablesDPOriginal.keySet() )
-               {
+               for( String var : _variablesDPOriginal.keySet() ) {
                        //cleanup partitioned matrix (if not reused)
                        if( !_variablesDPReuse.keySet().contains(var) )
                                
VariableCPInstruction.processRemoveVariableInstruction(ec, var); 
@@ -704,19 +690,19 @@ public class ParForProgramBlock extends ForProgramBlock
        
                //print profiling report (only if top-level parfor because 
otherwise in parallel context)
                if( _monitorReport )
-                   LOG.info("\n"+StatisticMonitor.createReport());
+                       LOG.info("\n"+StatisticMonitor.createReport());
                
                //reset flags/modifications made by optimizer
                //TODO reset of hop parallelism constraint (e.g., ba+*)
                for( String dpvar : _variablesDPOriginal.keySet() ) //release 
forced exectypes
-                   ProgramRecompiler.rFindAndRecompileIndexingHOP(sb, this, 
dpvar, ec, false);
+                       ProgramRecompiler.rFindAndRecompileIndexingHOP(sb, 
this, dpvar, ec, false);
                 //release forced exectypes for fused dp/exec
                if( _execMode == PExecMode.REMOTE_MR_DP || _execMode == 
PExecMode.REMOTE_SPARK_DP )
                        ProgramRecompiler.rFindAndRecompileIndexingHOP(sb, 
this, _colocatedDPMatrix, ec, false); 
                resetOptimizerFlags(); //after release, deletes dp_varnames
                
                //execute exit instructions (usually empty)
-               executeInstructions(_exitInstructions, ec);                     
+               executeInstructions(_exitInstructions, ec);
        }
 
 
@@ -814,8 +800,7 @@ public class ParForProgramBlock extends ForProgramBlock
                        
                        if( _monitor ) 
                                StatisticMonitor.putPFStat(_ID, 
Stat.PARFOR_WAIT_EXEC_T, time.stop());
-                               
-                               
+                       
                        // Step 4) collecting results from each parallel worker
                        //obtain results and cleanup other intermediates before 
result merge
                        LocalVariableMap [] localVariables = new 
LocalVariableMap [_numThreads]; 
@@ -862,16 +847,15 @@ public class ParForProgramBlock extends ForProgramBlock
                                StatisticMonitor.putPFStat(_ID, 
Stat.PARFOR_NUMITERS, numExecutedIterations);
                        }
                }
-       }       
+       }
 
        private void executeRemoteMRParFor( ExecutionContext ec, IntObject 
itervar, IntObject from, IntObject to, IntObject incr ) 
                throws DMLRuntimeException, IOException
        {
                /* Step 0) check and recompile MR inst
                 * Step 1) serialize child PB and inst
-                * Step 2) create tasks
-                *         serialize tasks
-                * Step 3) submit MR Jobs and wait for results                  
      
+                * Step 2) create and serialize tasks
+                * Step 3) submit MR Jobs and wait for results
                 * Step 4) collect results from each parallel worker
                 */
                
@@ -884,10 +868,10 @@ public class ParForProgramBlock extends ForProgramBlock
                        //tid = 0  because replaced in remote parworker
                        flagForced = checkMRAndRecompileToCP(0);
                }
-                       
+               
                // Step 1) init parallel workers (serialize PBs)
-               // NOTES: each mapper changes filenames with regard to his ID 
as we submit a single job,
-               //        cannot reuse serialized string, since variables are 
serialized as well.
+               // NOTES: each mapper changes filenames with regard to his ID 
as we submit a single
+               // job, cannot reuse serialized string, since variables are 
serialized as well.
                ParForBody body = new ParForBody( _childBlocks, _resultVars, ec 
);
                String program = ProgramConverter.serializeParForBody( body );
                
@@ -902,64 +886,60 @@ public class ParForProgramBlock extends ForProgramBlock
                long numIterations = partitioner.getNumIterations();
                int maxDigits = (int)Math.log10(to.getLongValue()) + 1;
                long numCreatedTasks = -1;
-               if( USE_STREAMING_TASK_CREATION )
-               {
+               if( USE_STREAMING_TASK_CREATION ) {
                        LocalTaskQueue<Task> queue = new LocalTaskQueue<Task>();
-
+                       
                        //put tasks into queue and start writing to taskFile
                        numCreatedTasks = partitioner.createTasks(queue);
-                       taskFile        = writeTasksToFile( taskFile, queue, 
maxDigits );                               
+                       taskFile = writeTasksToFile( taskFile, queue, maxDigits 
);
                }
                else
                {
                        //sequentially create tasks and write to disk
                        List<Task> tasks = partitioner.createTasks();
-                       numCreatedTasks  = tasks.size();
-                   taskFile         = writeTasksToFile( taskFile, tasks, 
maxDigits );                          
+                       numCreatedTasks = tasks.size();
+                       taskFile = writeTasksToFile( taskFile, tasks, maxDigits 
);
                }
-                               
+               
                if( _monitor )
                        StatisticMonitor.putPFStat(_ID, 
Stat.PARFOR_INIT_TASKS_T, time.stop());
                
                //write matrices to HDFS 
                exportMatricesToHDFS(ec);
-                               
+               
                // Step 3) submit MR job (wait for finished work)
                MatrixObject colocatedDPMatrixObj = (_colocatedDPMatrix!=null)? 
ec.getMatrixObject(_colocatedDPMatrix) : null;
-               RemoteParForJobReturn ret = RemoteParForMR.runJob(_ID, program, 
taskFile, resultFile, colocatedDPMatrixObj, _enableCPCaching,
-                                                                         
_numThreads, WRITE_REPLICATION_FACTOR, MAX_RETRYS_ON_ERROR, getMinMemory(ec),
-                                                                         
(ALLOW_REUSE_MR_JVMS & _jvmReuse) );
+               RemoteParForJobReturn ret = RemoteParForMR.runJob(_ID, program, 
taskFile, resultFile,
+                       colocatedDPMatrixObj, _enableCPCaching,_numThreads, 
WRITE_REPLICATION_FACTOR, MAX_RETRYS_ON_ERROR, 
+                       getMinMemory(ec), (ALLOW_REUSE_MR_JVMS & _jvmReuse) );
                
                if( _monitor ) 
                        StatisticMonitor.putPFStat(_ID, 
Stat.PARFOR_WAIT_EXEC_T, time.stop());
-                       
-                       
+               
                // Step 4) collecting results from each parallel worker
                int numExecutedTasks = ret.getNumExecutedTasks();
                int numExecutedIterations = ret.getNumExecutedIterations();
                
                //consolidate results into global symbol table
-               consolidateAndCheckResults( ec, numIterations, numCreatedTasks, 
numExecutedIterations , numExecutedTasks, 
-                                                   ret.getVariables() );
+               consolidateAndCheckResults( ec, numIterations, numCreatedTasks,
+                       numExecutedIterations , numExecutedTasks, 
ret.getVariables() );
                if( flagForced ) //see step 0
                        releaseForcedRecompile(0);
                
-               if( _monitor ) 
-               {
+               if( _monitor ) {
                        StatisticMonitor.putPFStat(_ID, 
Stat.PARFOR_WAIT_RESULTS_T, time.stop());
                        StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTASKS, 
numExecutedTasks);
                        StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, 
numExecutedIterations);
-               }                       
-       }       
+               }
+       }
 
        private void executeRemoteMRParForDP( ExecutionContext ec, IntObject 
itervar, IntObject from, IntObject to, IntObject incr ) 
                throws DMLRuntimeException, IOException
        {
                /* Step 0) check and recompile MR inst
                 * Step 1) serialize child PB and inst
-                * Step 2) create tasks
-                *         serialize tasks
-                * Step 3) submit MR Jobs and wait for results                  
      
+                * Step 2) create and serialize tasks
+                * Step 3) submit MR Jobs and wait for results
                 * Step 4) collect results from each parallel worker
                 */
                
@@ -975,8 +955,8 @@ public class ParForProgramBlock extends ForProgramBlock
                inputMatrix.setPartitioned(inputDPF._dpf, inputDPF._N); //mark 
matrix var as partitioned  
                
                // Step 2) init parallel workers (serialize PBs)
-               // NOTES: each mapper changes filenames with regard to his ID 
as we submit a single job,
-               //        cannot reuse serialized string, since variables are 
serialized as well.
+               // NOTES: each mapper changes filenames with regard to his ID 
as we submit a single
+               // job, cannot reuse serialized string, since variables are 
serialized as well.
                ParForBody body = new ParForBody( _childBlocks, _resultVars, ec 
);
                String program = ProgramConverter.serializeParForBody( body );
                
@@ -988,41 +968,40 @@ public class ParForProgramBlock extends ForProgramBlock
                String resultFile = constructResultFileName();
                long numIterations = partitioner.getNumIterations();
                long numCreatedTasks = 
numIterations;//partitioner.createTasks().size();
-                                               
+               
                if( _monitor )
                        StatisticMonitor.putPFStat(_ID, 
Stat.PARFOR_INIT_TASKS_T, time.stop());
                
                //write matrices to HDFS 
                exportMatricesToHDFS(ec);
-                               
+               
                // Step 4) submit MR job (wait for finished work)
-               OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && 
inputDPF==PartitionFormat.COLUMN_WISE)||
-                                             (inputMatrix.getSparsity()<0.001 
&& inputDPF==PartitionFormat.ROW_WISE))? 
-                                            OutputInfo.BinaryCellOutputInfo : 
OutputInfo.BinaryBlockOutputInfo;
+               OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && 
inputDPF==PartitionFormat.COLUMN_WISE)
+                       || (inputMatrix.getSparsity()<0.001 && 
inputDPF==PartitionFormat.ROW_WISE)) ?
+                       OutputInfo.BinaryCellOutputInfo : 
OutputInfo.BinaryBlockOutputInfo;
                RemoteParForJobReturn ret = RemoteDPParForMR.runJob(_ID, 
itervar.getName(), _colocatedDPMatrix, program, resultFile, 
-                               inputMatrix, inputDPF, inputOI, _tSparseCol, 
_enableCPCaching, _numThreads, _replicationDP );
+                       inputMatrix, inputDPF, inputOI, _tSparseCol, 
_enableCPCaching, _numThreads, _replicationDP );
                
-               if( _monitor ) 
+               if( _monitor )
                        StatisticMonitor.putPFStat(_ID, 
Stat.PARFOR_WAIT_EXEC_T, time.stop());
-                       
+               
                // Step 5) collecting results from each parallel worker
                int numExecutedTasks = ret.getNumExecutedTasks();
                int numExecutedIterations = ret.getNumExecutedIterations();
                
                //consolidate results into global symbol table
-               consolidateAndCheckResults( ec, numIterations, numCreatedTasks, 
numExecutedIterations, numExecutedTasks, 
-                                                   ret.getVariables() );
+               consolidateAndCheckResults( ec, numIterations, numCreatedTasks,
+                       numExecutedIterations, numExecutedTasks, 
ret.getVariables() );
                
                if( flagForced ) //see step 0
                        releaseForcedRecompile(0);
                inputMatrix.unsetPartitioned();
                
-               if( _monitor ) 
-               {
+               if( _monitor ) {
                        StatisticMonitor.putPFStat(_ID, 
Stat.PARFOR_WAIT_RESULTS_T, time.stop());
                        StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTASKS, 
numExecutedTasks);
                        StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, 
numExecutedIterations);
-               }                       
+               }
        }
 
        private void executeRemoteSparkParFor(ExecutionContext ec, IntObject 
itervar, IntObject from, IntObject to, IntObject incr) 
@@ -1032,15 +1011,15 @@ public class ParForProgramBlock extends ForProgramBlock
                
                // Step 0) check and compile to CP (if forced remote parfor)
                boolean flagForced = false;
-               if( FORCE_CP_ON_REMOTE_MR && (_optMode == POptMode.NONE || 
(_optMode == POptMode.CONSTRAINED && _execMode==PExecMode.REMOTE_SPARK)) )
-               {
+               if( FORCE_CP_ON_REMOTE_MR && (_optMode == POptMode.NONE 
+                       || (_optMode == POptMode.CONSTRAINED && 
_execMode==PExecMode.REMOTE_SPARK)) ) {
                        //tid = 0  because replaced in remote parworker
                        flagForced = checkMRAndRecompileToCP(0); 
                }
                        
                // Step 1) init parallel workers (serialize PBs)
-               // NOTES: each mapper changes filenames with regard to his ID 
as we submit a single job,
-               //        cannot reuse serialized string, since variables are 
serialized as well.
+               // NOTES: each mapper changes filenames with regard to his ID 
as we submit a single 
+               // job, cannot reuse serialized string, since variables are 
serialized as well.
                ParForBody body = new ParForBody(_childBlocks, _resultVars, ec);
                HashMap<String, byte[]> clsMap = new HashMap<String, byte[]>();
                String program = ProgramConverter.serializeParForBody(body, 
clsMap);
@@ -1055,37 +1034,35 @@ public class ParForProgramBlock extends ForProgramBlock
                //sequentially create tasks as input to parfor job
                List<Task> tasks = partitioner.createTasks();
                long numCreatedTasks = tasks.size();
-                               
+               
                if( _monitor )
                        StatisticMonitor.putPFStat(_ID, 
Stat.PARFOR_INIT_TASKS_T, time.stop());
                
                //write matrices to HDFS 
                exportMatricesToHDFS(ec);
-                               
+               
                // Step 3) submit Spark parfor job (no lazy evaluation, since 
collect on result)
                //MatrixObject colocatedDPMatrixObj = 
(_colocatedDPMatrix!=null)? (MatrixObject)ec.getVariable(_colocatedDPMatrix) : 
null;
                RemoteParForJobReturn ret = RemoteParForSpark.runJob(_ID, 
program, clsMap, tasks, ec, _enableCPCaching, _numThreads);
                
                if( _monitor ) 
                        StatisticMonitor.putPFStat(_ID, 
Stat.PARFOR_WAIT_EXEC_T, time.stop());
-                       
-                       
+               
                // Step 4) collecting results from each parallel worker
                int numExecutedTasks = ret.getNumExecutedTasks();
                int numExecutedIterations = ret.getNumExecutedIterations();
                
                //consolidate results into global symbol table
-               consolidateAndCheckResults( ec, numIterations, numCreatedTasks, 
numExecutedIterations , numExecutedTasks, 
-                                                   ret.getVariables() );
+               consolidateAndCheckResults( ec, numIterations, numCreatedTasks,
+                       numExecutedIterations , numExecutedTasks, 
ret.getVariables() );
                if( flagForced ) //see step 0
                        releaseForcedRecompile(0);
                
-               if( _monitor ) 
-               {
+               if( _monitor ) {
                        StatisticMonitor.putPFStat(_ID, 
Stat.PARFOR_WAIT_RESULTS_T, time.stop());
                        StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTASKS, 
numExecutedTasks);
                        StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, 
numExecutedIterations);
-               }                       
+               }
        }
        
        private void executeRemoteSparkParForDP( ExecutionContext ec, IntObject 
itervar, IntObject from, IntObject to, IntObject incr ) 
@@ -1096,15 +1073,15 @@ public class ParForProgramBlock extends ForProgramBlock
                // Step 0) check and compile to CP (if forced remote parfor)
                boolean flagForced = checkMRAndRecompileToCP(0);
                
-               // Step 1) prepare partitioned input matrix (needs to happen 
before serializing the progam)
+               // Step 1) prepare partitioned input matrix (needs to happen 
before serializing the program)
                ParForStatementBlock sb = (ParForStatementBlock) 
getStatementBlock();
                MatrixObject inputMatrix = 
ec.getMatrixObject(_colocatedDPMatrix );
                PartitionFormat inputDPF = sb.determineDataPartitionFormat( 
_colocatedDPMatrix );
-               inputMatrix.setPartitioned(inputDPF._dpf, inputDPF._N); //mark 
matrix var as partitioned  
+               inputMatrix.setPartitioned(inputDPF._dpf, inputDPF._N); //mark 
matrix var as partitioned
                
                // Step 2) init parallel workers (serialize PBs)
-               // NOTES: each mapper changes filenames with regard to his ID 
as we submit a single job,
-               //        cannot reuse serialized string, since variables are 
serialized as well.
+               // NOTES: each mapper changes filenames with regard to his ID 
as we submit a single
+               // job, cannot reuse serialized string, since variables are 
serialized as well.
                ParForBody body = new ParForBody( _childBlocks, _resultVars, ec 
);
                HashMap<String, byte[]> clsMap = new HashMap<String, byte[]>(); 
                String program = ProgramConverter.serializeParForBody( body, 
clsMap );
@@ -1117,7 +1094,7 @@ public class ParForProgramBlock extends ForProgramBlock
                String resultFile = constructResultFileName();
                long numIterations = partitioner.getNumIterations();
                long numCreatedTasks = 
numIterations;//partitioner.createTasks().size();
-                                               
+               
                if( _monitor )
                        StatisticMonitor.putPFStat(_ID, 
Stat.PARFOR_INIT_TASKS_T, time.stop());
                
@@ -1126,34 +1103,30 @@ public class ParForProgramBlock extends ForProgramBlock
                
                // Step 4) submit MR job (wait for finished work)
                //TODO runtime support for binary cell partitioning 
-               //OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && 
inputDPF==PDataPartitionFormat.COLUMN_WISE)||
-               //                            (inputMatrix.getSparsity()<0.001 
&& inputDPF==PDataPartitionFormat.ROW_WISE))? 
-               //                           OutputInfo.BinaryCellOutputInfo : 
OutputInfo.BinaryBlockOutputInfo;
                OutputInfo inputOI = OutputInfo.BinaryBlockOutputInfo;
-               RemoteParForJobReturn ret = RemoteDPParForSpark.runJob(_ID, 
itervar.getName(), _colocatedDPMatrix, program, clsMap, 
-                               resultFile, inputMatrix, ec, inputDPF, inputOI, 
_tSparseCol, _enableCPCaching, _numThreads );
+               RemoteParForJobReturn ret = RemoteDPParForSpark.runJob(_ID, 
itervar.getName(), _colocatedDPMatrix, program,
+                       clsMap, resultFile, inputMatrix, ec, inputDPF, inputOI, 
_tSparseCol, _enableCPCaching, _numThreads );
                
                if( _monitor ) 
                        StatisticMonitor.putPFStat(_ID, 
Stat.PARFOR_WAIT_EXEC_T, time.stop());
-                       
+               
                // Step 5) collecting results from each parallel worker
                int numExecutedTasks = ret.getNumExecutedTasks();
                int numExecutedIterations = ret.getNumExecutedIterations();
                
                //consolidate results into global symbol table
-               consolidateAndCheckResults( ec, numIterations, numCreatedTasks, 
numExecutedIterations, numExecutedTasks, 
-                                                   ret.getVariables() );
+               consolidateAndCheckResults( ec, numIterations, numCreatedTasks,
+                       numExecutedIterations, numExecutedTasks, 
ret.getVariables() );
                
                if( flagForced ) //see step 0
                        releaseForcedRecompile(0);
                inputMatrix.unsetPartitioned();
                
-               if( _monitor ) 
-               {
+               if( _monitor ) {
                        StatisticMonitor.putPFStat(_ID, 
Stat.PARFOR_WAIT_RESULTS_T, time.stop());
                        StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTASKS, 
numExecutedTasks);
                        StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, 
numExecutedIterations);
-               }                       
+               }
        }
 
        private void handleDataPartitioning( ExecutionContext ec ) 
@@ -1161,7 +1134,7 @@ public class ParForProgramBlock extends ForProgramBlock
        {
                PDataPartitioner dataPartitioner = _dataPartitioner;
                if( dataPartitioner != PDataPartitioner.NONE )
-               {                       
+               {
                        ParForStatementBlock sb = (ParForStatementBlock) 
getStatementBlock();
                        if( sb == null )
                                throw new DMLRuntimeException("ParFor statement 
block required for reasoning about data partitioning.");
@@ -1171,19 +1144,20 @@ public class ParForProgramBlock extends ForProgramBlock
                        {
                                Data dat = ec.getVariable(var);
                                //skip non-existing input matrices (which are 
due to unknown sizes marked for
-                               //partitioning but typically related branches 
are never executed)                               
+                               //partitioning but typically related branches 
are never executed)
                                if( dat != null && dat instanceof MatrixObject )
                                {
                                        MatrixObject moVar = (MatrixObject) 
dat; //unpartitioned input
                                        
                                        PartitionFormat dpf = 
sb.determineDataPartitionFormat( var );
-                                       LOG.trace("PARFOR ID = "+_ID+", 
Partitioning read-only input variable "+var+" (format="+dpf+", 
mode="+_dataPartitioner+")");
+                                       LOG.trace("PARFOR ID = "+_ID+", 
Partitioning read-only input variable "
+                                               + var + " (format="+dpf+", 
mode="+_dataPartitioner+")");
                                        
                                        if( dpf != PartitionFormat.NONE )
                                        {
                                                if( dataPartitioner != 
PDataPartitioner.REMOTE_SPARK && dpf.isBlockwise() ) {
                                                        LOG.warn("PARFOR ID = 
"+_ID+", Switching data partitioner from " + dataPartitioner + 
-                                                                       " to " 
+ PDataPartitioner.REMOTE_SPARK.name()+" for blockwise-n partitioning.");
+                                                               " to " + 
PDataPartitioner.REMOTE_SPARK.name()+" for blockwise-n partitioning.");
                                                        dataPartitioner = 
PDataPartitioner.REMOTE_SPARK;
                                                }
                                                
@@ -1214,9 +1188,8 @@ public class ParForProgramBlock extends ForProgramBlock
                                                
                                                //store original and 
partitioned matrix (for reuse if applicable)
                                                _variablesDPOriginal.put(var, 
moVar);
-                                               if(    
ALLOW_REUSE_PARTITION_VARS 
-                                                       && 
ProgramRecompiler.isApplicableForReuseVariable(sb.getDMLProg(), sb, var) ) 
-                                               {
+                                               if( ALLOW_REUSE_PARTITION_VARS 
+                                                       && 
ProgramRecompiler.isApplicableForReuseVariable(sb.getDMLProg(), sb, var) ) {
                                                        
_variablesDPReuse.put(var, dpdatNew);
                                                }
                                                
@@ -1233,7 +1206,6 @@ public class ParForProgramBlock extends ForProgramBlock
                if( OptimizerUtils.isSparkExecutionMode() &&
                        _variablesRP != null && !_variablesRP.isEmpty() ) {
                        SparkExecutionContext sec = (SparkExecutionContext) ec;
-                       
                        for( String var : _variablesRP )
                                sec.repartitionAndCacheMatrixObject(var);
                }
@@ -1245,7 +1217,6 @@ public class ParForProgramBlock extends ForProgramBlock
                if( OptimizerUtils.isSparkExecutionMode() &&
                        _variablesECache != null && !_variablesECache.isEmpty() 
) {
                        SparkExecutionContext sec = (SparkExecutionContext) ec;
-                       
                        for( String var : _variablesECache )
                                sec.cacheMatrixObject(var);
                }
@@ -1262,8 +1233,7 @@ public class ParForProgramBlock extends ForProgramBlock
        private void cleanWorkerResultVariables(ExecutionContext ec, 
MatrixObject out, MatrixObject[] in) 
                throws DMLRuntimeException
        {
-               for( MatrixObject tmp : in )
-               {
+               for( MatrixObject tmp : in ) {
                        //check for empty inputs (no iterations executed)
                        if( tmp != null && tmp != out )
                                ec.cleanupMatrixObject(tmp);
@@ -1299,8 +1269,7 @@ public class ParForProgramBlock extends ForProgramBlock
                                switch( datatype )
                                {
                                        case SCALAR:
-                                               switch( valuetype )
-                                               {
+                                               switch( valuetype ) {
                                                        case BOOLEAN: dataObj = 
new BooleanObject(var,false); break;
                                                        case INT:     dataObj = 
new IntObject(var,-1);        break;
                                                        case DOUBLE:  dataObj = 
new DoubleObject(var,-1d);    break;
@@ -1408,20 +1377,17 @@ public class ParForProgramBlock extends ForProgramBlock
                        HashSet<String> fnNames = new HashSet<String>();
                        if( USE_PB_CACHE )
                        {
-                               if( _pbcache.containsKey(pwID) )
-                               {
+                               if( _pbcache.containsKey(pwID) ) {
                                        cpChildBlocks = _pbcache.get(pwID);     
                                }
-                               else
-                               {
+                               else {
                                        cpChildBlocks = 
ProgramConverter.rcreateDeepCopyProgramBlocks(_childBlocks, pwID, _IDPrefix, 
new HashSet<String>(), fnNames, false, false); 
                                        _pbcache.put(pwID, cpChildBlocks);
                                }
                        }
-                       else
-                       {
+                       else {
                                cpChildBlocks = 
ProgramConverter.rcreateDeepCopyProgramBlocks(_childBlocks, pwID, _IDPrefix, 
new HashSet<String>(), fnNames, false, false); 
-                       }             
+                       }
                        
                        //deep copy execution context (including prepare parfor 
update-in-place)
                        ExecutionContext cpEc = 
ProgramConverter.createDeepCopyExecutionContext(ec);
@@ -1443,8 +1409,7 @@ public class ParForProgramBlock extends ForProgramBlock
                        pw = new LocalParWorker( pwID, queue, body, cconf, 
MAX_RETRYS_ON_ERROR, _monitor );
                        pw.setFunctionNames(fnNames);
                }
-               catch(Exception ex)
-               {
+               catch(Exception ex) {
                        throw new DMLRuntimeException(ex);
                }
                
@@ -1519,7 +1484,7 @@ public class ParForProgramBlock extends ForProgramBlock
                int maxNumRed = 
InfrastructureAnalyzer.getRemoteParallelReduceTasks();
                //correction max number of reducers on yarn clusters
                if( InfrastructureAnalyzer.isYarnEnabled() )
-                       maxNumRed = (int)Math.max( maxNumRed, 
YarnClusterAnalyzer.getNumCores()/2 );                            
+                       maxNumRed = (int)Math.max( maxNumRed, 
YarnClusterAnalyzer.getNumCores()/2 );
                int numRed = Math.min(numReducers,maxNumRed);
                
                //create data partitioner
@@ -1530,12 +1495,12 @@ public class ParForProgramBlock extends ForProgramBlock
                                break;
                        case REMOTE_MR:
                                dp = new DataPartitionerRemoteMR( dpf, _ID, 
numRed,
-                                               _replicationDP, 
ALLOW_REUSE_MR_JVMS, false );
+                                       _replicationDP, ALLOW_REUSE_MR_JVMS, 
false );
                                break;
                        case REMOTE_SPARK:
                                dp = new DataPartitionerRemoteSpark( dpf, ec, 
numRed,
-                                               _replicationDP, false );
-                               break;  
+                                       _replicationDP, false );
+                               break;
                        default:
                                throw new DMLRuntimeException("Unknown data 
partitioner: '" +dataPartitioner.name()+"'.");
                }
@@ -1557,18 +1522,18 @@ public class ParForProgramBlock extends ForProgramBlock
                else {
                        int numReducers = ConfigurationManager.getNumReducers();
                        maxMap = 
InfrastructureAnalyzer.getRemoteParallelMapTasks();
-                       maxRed = Math.min(numReducers, 
+                       maxRed = Math.min(numReducers,
                                
InfrastructureAnalyzer.getRemoteParallelReduceTasks());
                        //correction max number of reducers on yarn clusters
-                       if( InfrastructureAnalyzer.isYarnEnabled() ) {          
                        
-                               maxMap = (int)Math.max( maxMap, 
YarnClusterAnalyzer.getNumCores() );    
-                               maxRed = (int)Math.max( maxRed, 
YarnClusterAnalyzer.getNumCores()/2 );  
+                       if( InfrastructureAnalyzer.isYarnEnabled() ) {
+                               maxMap = (int)Math.max( maxMap, 
YarnClusterAnalyzer.getNumCores() );
+                               maxRed = (int)Math.max( maxRed, 
YarnClusterAnalyzer.getNumCores()/2 );
                        }
                }
                int numMap = Math.max(_numThreads, maxMap);
                int numRed = maxRed;
                
-               //create result merge implementation            
+               //create result merge implementation
                switch( prm )
                {
                        case LOCAL_MEM:
@@ -1581,10 +1546,9 @@ public class ParForProgramBlock extends ForProgramBlock
                                rm = new ResultMergeLocalAutomatic( out, in, 
fname );
                                break;
                        case REMOTE_MR:
-                               rm = new ResultMergeRemoteMR( out, in, fname, 
_ID, numMap, numRed,
-                                                                 
WRITE_REPLICATION_FACTOR, 
-                                                                 
MAX_RETRYS_ON_ERROR, 
-                                                                 
ALLOW_REUSE_MR_JVMS );
+                               rm = new ResultMergeRemoteMR( out, in, fname, 
+                                       _ID, numMap, numRed, 
WRITE_REPLICATION_FACTOR,
+                                       MAX_RETRYS_ON_ERROR, 
ALLOW_REUSE_MR_JVMS );
                                break;
                        case REMOTE_SPARK:
                                rm = new ResultMergeRemoteSpark( out, in, 
fname, ec, numMap, numRed );
@@ -1628,8 +1592,8 @@ public class ParForProgramBlock extends ForProgramBlock
        private void releaseForcedRecompile(long tid) 
                throws DMLRuntimeException
        {
-               HashSet<String> fnStack = new HashSet<String>();
-               Recompiler.recompileProgramBlockHierarchy2Forced(_childBlocks, 
tid, fnStack, null);
+               Recompiler.recompileProgramBlockHierarchy2Forced(
+                       _childBlocks, tid, new HashSet<String>(), null);
        }
 
        private String writeTasksToFile(String fname, List<Task> tasks, int 
maxDigits)
@@ -1641,17 +1605,15 @@ public class ParForProgramBlock extends ForProgramBlock
                        Path path = new Path(fname);
                        FileSystem fs = IOUtilFunctions.getFileSystem(path);
                        br = new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));
-               
+                       
                        boolean flagFirst = true; //workaround for keeping gen 
order
-                       for( Task t : tasks )
-                       {
+                       for( Task t : tasks ) {
                                br.write( createTaskFileLine( t, maxDigits, 
flagFirst ) );
                                if( flagFirst )
                                        flagFirst = false;
                        }
                }
-               catch(Exception ex)
-               {
+               catch(Exception ex) {
                        throw new DMLRuntimeException("Error writing tasks to 
taskfile "+fname, ex);
                }
                finally {
@@ -1670,18 +1632,16 @@ public class ParForProgramBlock extends ForProgramBlock
                        Path path = new Path( fname );
                        FileSystem fs = IOUtilFunctions.getFileSystem(path);
                        br = new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));
-               
+                       
                        Task t = null;
                        boolean flagFirst = true; //workaround for keeping gen 
order
-                       while( (t = queue.dequeueTask()) != 
LocalTaskQueue.NO_MORE_TASKS )
-                       {
+                       while( (t = queue.dequeueTask()) != 
LocalTaskQueue.NO_MORE_TASKS ) {
                                br.write( createTaskFileLine( t, maxDigits, 
flagFirst ) );
                                if( flagFirst )
                                        flagFirst = false;
                        }
                }
-               catch(Exception ex)
-               {
+               catch(Exception ex) {
                        throw new DMLRuntimeException("Error writing tasks to 
taskfile "+fname, ex);
                }
                finally {
@@ -1691,11 +1651,9 @@ public class ParForProgramBlock extends ForProgramBlock
                return fname;
        }
        
-       private String createTaskFileLine( Task t, int maxDigits, boolean 
flagFirst ) 
-       {
+       private String createTaskFileLine( Task t, int maxDigits, boolean 
flagFirst ) {
                //always pad to max digits in order to preserve task order      
-               String ret = t.toCompactString(maxDigits) + (flagFirst?" ":"") 
+ "\n";
-               return ret;
+               return t.toCompactString(maxDigits) + (flagFirst?" ":"") + "\n";
        }
 
        private void consolidateAndCheckResults(ExecutionContext ec, long 
expIters, long expTasks, long numIters, long numTasks, LocalVariableMap [] 
results) 
@@ -1752,20 +1710,20 @@ public class ParForProgramBlock extends ForProgramBlock
                                        MatrixObject out = (MatrixObject) dat;
                                        MatrixObject[] in = new MatrixObject[ 
results.length ];
                                        for( int i=0; i< results.length; i++ )
-                                               in[i] = (MatrixObject) 
results[i].get( var );                   
+                                               in[i] = (MatrixObject) 
results[i].get( var );
                                        String fname = 
constructResultMergeFileName();
                                        ResultMerge rm = 
createResultMerge(_resultMerge, out, in, fname, ec);
                                        MatrixObject outNew = null;
                                        if( USE_PARALLEL_RESULT_MERGE )
                                                outNew = 
rm.executeParallelMerge( _numThreads );
                                        else
-                                               outNew = 
rm.executeSerialMerge();               
+                                               outNew = 
rm.executeSerialMerge();
                                        
                                        //cleanup existing var
                                        Data exdata = ec.removeVariable(var);
                                        if( exdata != null && exdata != outNew 
&& exdata instanceof MatrixObject )
                                                
ec.cleanupMatrixObject((MatrixObject)exdata);
-                                                       
+                                       
                                        //cleanup of intermediate result 
variables
                                        cleanWorkerResultVariables( ec, out, in 
);
                                        
@@ -1796,21 +1754,18 @@ public class ParForProgramBlock extends ForProgramBlock
         * 
         * @return true if ?
         */
-       private boolean checkParallelRemoteResultMerge()
-       {
-               return (USE_PARALLEL_RESULT_MERGE_REMOTE 
-                           && _resultVars.size() > 1
-                           && ( _resultMerge == PResultMerge.REMOTE_MR
-                              ||_resultMerge == PResultMerge.REMOTE_SPARK) );
+       private boolean checkParallelRemoteResultMerge() {
+               return (USE_PARALLEL_RESULT_MERGE_REMOTE && _resultVars.size() 
> 1
+                       && ( _resultMerge == PResultMerge.REMOTE_MR 
+                               ||_resultMerge == PResultMerge.REMOTE_SPARK) );
        }
 
-       private void setParForProgramBlockIDs(int IDPrefix)
-       {
+       private void setParForProgramBlockIDs(int IDPrefix) {
                _IDPrefix = IDPrefix;
                if( _IDPrefix == -1 ) //not specified
                        _ID = _pfIDSeq.getNextID(); //generated new ID
                else //remote case (further nested parfors are all in one JVM)
-                   _ID = IDHandler.concatIntIDsToLong(_IDPrefix, 
(int)_pfIDSeq.getNextID());   
+                       _ID = IDHandler.concatIntIDsToLong(_IDPrefix, 
(int)_pfIDSeq.getNextID());       
        }
        
        /**
@@ -1826,8 +1781,7 @@ public class ParForProgramBlock extends ForProgramBlock
                        
                _pwIDs = new long[ _numThreads ];
                
-               for( int i=0; i<_numThreads; i++ )
-               {
+               for( int i=0; i<_numThreads; i++ ) {
                        if(_IDPrefix == -1)
                                _pwIDs[i] = _pwIDSeq.getNextID();
                        else
@@ -1838,8 +1792,7 @@ public class ParForProgramBlock extends ForProgramBlock
                }
        }
 
-       private long computeNumIterations( IntObject from, IntObject to, 
IntObject incr )
-       {
+       private long computeNumIterations( IntObject from, IntObject to, 
IntObject incr ) {
                return (long)Math.ceil(((double)(to.getLongValue() - 
from.getLongValue() + 1)) / incr.getLongValue()); 
        }
        
@@ -1849,10 +1802,8 @@ public class ParForProgramBlock extends ForProgramBlock
         * 
         * @return task file name
         */
-       private String constructTaskFileName()
-       {
+       private String constructTaskFileName() {
                String scratchSpaceLoc = ConfigurationManager.getScratchSpace();
-       
                StringBuilder sb = new StringBuilder();
                sb.append(scratchSpaceLoc);
                sb.append(Lop.FILE_SEPARATOR);
@@ -1869,10 +1820,8 @@ public class ParForProgramBlock extends ForProgramBlock
         * 
         * @return result file name
         */
-       private String constructResultFileName()
-       {
+       private String constructResultFileName() {
                String scratchSpaceLoc = ConfigurationManager.getScratchSpace();
-               
                StringBuilder sb = new StringBuilder();
                sb.append(scratchSpaceLoc);
                sb.append(Lop.FILE_SEPARATOR);
@@ -1880,13 +1829,11 @@ public class ParForProgramBlock extends ForProgramBlock
                sb.append(DMLScript.getUUID());
                sb.append(PARFOR_MR_RESULT_TMP_FNAME.replaceAll("%ID%", 
String.valueOf(_ID)));
                
-               return sb.toString();   
+               return sb.toString();
        }
 
-       private String constructResultMergeFileName()
-       {
+       private String constructResultMergeFileName() {
                String scratchSpaceLoc = ConfigurationManager.getScratchSpace();
-               
                String fname = PARFOR_MR_RESULTMERGE_FNAME;
                fname = fname.replaceAll("%ID%", String.valueOf(_ID)); 
//replace workerID
                fname = fname.replaceAll("%VAR%", 
String.valueOf(_resultVarsIDSeq.getNextID()));
@@ -1898,13 +1845,11 @@ public class ParForProgramBlock extends ForProgramBlock
                sb.append(DMLScript.getUUID());
                sb.append(fname);
                
-               return sb.toString();                   
+               return sb.toString();
        }
 
-       private String constructDataPartitionsFileName()
-       {
+       private String constructDataPartitionsFileName() {
                String scratchSpaceLoc = ConfigurationManager.getScratchSpace();
-               
                String fname = PARFOR_DATAPARTITIONS_FNAME;
                fname = fname.replaceAll("%ID%", String.valueOf(_ID)); 
//replace workerID
                fname = fname.replaceAll("%VAR%", 
String.valueOf(_dpVarsIDSeq.getNextID()));
@@ -1916,7 +1861,7 @@ public class ParForProgramBlock extends ForProgramBlock
                sb.append(DMLScript.getUUID());
                sb.append(fname);
                
-               return sb.toString();                   
+               return sb.toString();
        }
 
        private long getMinMemory(ExecutionContext ec)
@@ -1946,10 +1891,8 @@ public class ParForProgramBlock extends ForProgramBlock
                return ret;
        }
 
-       private void setMemoryBudget()
-       {
-               if( _recompileMemoryBudget > 0 )
-               {
+       private void setMemoryBudget() {
+               if( _recompileMemoryBudget > 0 ) {
                        // store old budget for reset after exec
                        _oldMemoryBudget = 
(double)InfrastructureAnalyzer.getLocalMaxMemory();
                        
@@ -1959,16 +1902,12 @@ public class ParForProgramBlock extends ForProgramBlock
                }
        }
        
-       private void resetMemoryBudget()
-       {
+       private void resetMemoryBudget() {
                if( _recompileMemoryBudget > 0 )
-               {
                        
InfrastructureAnalyzer.setLocalMaxMemory((long)_oldMemoryBudget);
-               }
        }
        
-       private void resetOptimizerFlags()
-       {
+       private void resetOptimizerFlags() {
                //reset all state that was set but is not guaranteed to be 
overwritten by optimizer
                _variablesDPOriginal.removeAll();
                _colocatedDPMatrix         = null;
@@ -2021,7 +1960,7 @@ public class ParForProgramBlock extends ForProgramBlock
                                        
                                        MatrixObject[] in = new MatrixObject[ 
_refVars.length ];
                                        for( int i=0; i< _refVars.length; i++ )
-                                               in[i] = (MatrixObject) 
_refVars[i].get( varname );                      
+                                               in[i] = (MatrixObject) 
_refVars[i].get( varname ); 
                                        String fname = 
constructResultMergeFileName();
                                
                                        ResultMerge rm = 
createResultMerge(_resultMerge, out, in, fname, _ec);
@@ -2029,7 +1968,7 @@ public class ParForProgramBlock extends ForProgramBlock
                                        if( USE_PARALLEL_RESULT_MERGE )
                                                outNew = 
rm.executeParallelMerge( _numThreads );
                                        else
-                                               outNew = 
rm.executeSerialMerge();       
+                                               outNew = 
rm.executeSerialMerge();
                                        
                                        synchronized( _ec.getVariables() ){
                                                _ec.getVariables().put( 
varname, outNew);
@@ -2051,5 +1990,4 @@ public class ParForProgramBlock extends ForProgramBlock
        public String printBlockErrorLocation(){
                return "ERROR: Runtime error in parfor program block generated 
from parfor statement block between lines " + _beginLine + " and " + _endLine + 
" -- ";
        }
-       
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
index 0bd73d1..59d2589 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
@@ -544,23 +544,18 @@ public class ExecutionContext {
                for( String var : varList )
                {
                        Data dat = _variables.get(var);
-                       if( dat instanceof MatrixObject )
-                       {
+                       if( dat instanceof MatrixObject ) {
                                MatrixObject mo = (MatrixObject)dat;
                                varsState.put( var, mo.isCleanupEnabled() );
-                               //System.out.println("pre-pin "+var+" 
("+mo.isCleanupEnabled()+")");
                        }
                }
                
                //step 2) pin variables
-               for( String var : varList )
-               {
+               for( String var : varList ) {
                        Data dat = _variables.get(var);
-                       if( dat instanceof MatrixObject )
-                       {
+                       if( dat instanceof MatrixObject ) {
                                MatrixObject mo = (MatrixObject)dat;
                                mo.enableCleanup(false); 
-                               //System.out.println("pin "+var);
                        }
                }
                
@@ -583,11 +578,8 @@ public class ExecutionContext {
         * @param varList variable list
         * @param varsState variable state
         */
-       public void unpinVariables(ArrayList<String> varList, 
HashMap<String,Boolean> varsState)
-       {
-               for( String var : varList)
-               {
-                       //System.out.println("unpin "+var+" 
("+varsState.get(var)+")");
+       public void unpinVariables(ArrayList<String> varList, 
HashMap<String,Boolean> varsState) {
+               for( String var : varList) {
                        Data dat = _variables.get(var);
                        if( dat instanceof MatrixObject )
                                
((MatrixObject)dat).enableCleanup(varsState.get(var));
@@ -597,15 +589,28 @@ public class ExecutionContext {
        /**
         * NOTE: No order guaranteed, so keep same list for pin and unpin. 
         * 
-        * @return variable list as strings
+        * @return list of all variable names.
         */
-       public ArrayList<String> getVarList()
-       {
-               ArrayList<String> varlist = new ArrayList<String>();
-               varlist.addAll(_variables.keySet());    
-               return varlist;
+       public ArrayList<String> getVarList() {
+               return new ArrayList<>(_variables.keySet());
        }
-
+       
+       /**
+        * NOTE: No order guaranteed, so keep same list for pin and unpin. 
+        * 
+        * @return list of all variable names of partitioned matrices.
+        */
+       public ArrayList<String> getVarListPartitioned() {
+               ArrayList<String> ret = new ArrayList<>();
+               for( String var : _variables.keySet() ) {
+                       Data dat = _variables.get(var);
+                       if( dat instanceof MatrixObject 
+                               && ((MatrixObject)dat).isPartitioned() )
+                               ret.add(var);
+               }
+               return ret;
+       }
+       
        public void cleanupMatrixObject(MatrixObject mo)
                throws DMLRuntimeException 
        {

http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/CachedReuseVariables.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/CachedReuseVariables.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/CachedReuseVariables.java
new file mode 100644
index 0000000..48ef883
--- /dev/null
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/CachedReuseVariables.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.parfor;
+
+import java.lang.ref.SoftReference;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+
+import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
+
+public class CachedReuseVariables
+{
+       private final HashMap<Long, SoftReference<LocalVariableMap>> _data;
+       
+       public CachedReuseVariables() {
+               _data = new HashMap<>();
+       }
+       
+       public synchronized void reuseVariables(long pfid, LocalVariableMap 
vars, Collection<String> blacklist) {
+               //check for existing reuse map
+               LocalVariableMap tmp = null;
+               if( _data.containsKey(pfid) )
+                       tmp = _data.get(pfid).get();
+               
+               //build reuse map if not created yet or evicted
+               if( tmp == null ) {
+                       tmp = new LocalVariableMap(vars);
+                       tmp.removeAllIn(new HashSet<>(blacklist));
+                       _data.put(pfid, new SoftReference<>(tmp));
+               }
+               //reuse existing reuse map
+               else {
+                       for( String varName : tmp.keySet() )
+                               vars.put(varName, tmp.get(varName));
+               }
+       }
+
+       public synchronized void clearVariables(long pfid) {
+               _data.remove(pfid);
+       }
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
index 8dc86d4..6d0cd3a 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
@@ -328,16 +328,15 @@ public class ProgramConverter
                ParForProgramBlock tmpPB = null;
                
                if( IDPrefix == -1 ) //still on master node
-                       tmpPB = new ParForProgramBlock(prog,pfpb.getIterVar(), 
pfpb.getParForParams());
+                       tmpPB = new ParForProgramBlock(prog,pfpb.getIterVar(), 
pfpb.getParForParams(), pfpb.getResultVariables());
                else //child of remote ParWorker at any level
-                       tmpPB = new ParForProgramBlock(IDPrefix, prog, 
pfpb.getIterVar(), pfpb.getParForParams());
+                       tmpPB = new ParForProgramBlock(IDPrefix, prog, 
pfpb.getIterVar(), pfpb.getParForParams(), pfpb.getResultVariables());
                
                tmpPB.setStatementBlock( createForStatementBlockCopy( 
(ForStatementBlock) pfpb.getStatementBlock(), pid, plain, forceDeepCopy) );
                tmpPB.setThreadID(pid);
                
                tmpPB.disableOptimization(); //already done in top-level parfor
                tmpPB.disableMonitorReport(); //already done in top-level parfor
-               tmpPB.setResultVariables( pfpb.getResultVariables() );
                
                tmpPB.setFromInstructions( 
createDeepCopyInstructionSet(pfpb.getFromInstructions(), pid, IDPrefix, prog, 
fnStack, fnCreated, plain, true) );
                tmpPB.setToInstructions( 
createDeepCopyInstructionSet(pfpb.getToInstructions(), pid, IDPrefix, prog, 
fnStack, fnCreated, plain, true) );
@@ -1514,9 +1513,8 @@ public class ProgramConverter
                //program blocks //reset id to preinit state, replaced during 
exec
                ArrayList<ProgramBlock> pbs = 
rParseProgramBlocks(st.nextToken(), prog, 0); 
                
-               ParForProgramBlock pfpb = new ParForProgramBlock(id, prog, 
iterVar, params);
+               ParForProgramBlock pfpb = new ParForProgramBlock(id, prog, 
iterVar, params, resultVars);
                pfpb.disableOptimization(); //already done in top-level parfor
-               pfpb.setResultVariables(resultVars);            
                pfpb.setFromInstructions(from);
                pfpb.setToInstructions(to);
                pfpb.setIncrementInstructions(incr);

http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
index 49ac9db..10b44a2 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
@@ -34,6 +34,8 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import 
org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
 import org.apache.sysml.utils.Statistics;
 
 /**
@@ -51,12 +53,14 @@ import org.apache.sysml.utils.Statistics;
  */
 public class RemoteParForSpark 
 {
-       
        protected static final Log LOG = 
LogFactory.getLog(RemoteParForSpark.class.getName());
-
-       public static RemoteParForJobReturn runJob(long pfid, String program, 
HashMap<String, byte[]> clsMap, 
+       
+       //globally unique id for parfor spark job instances (unique across 
spark contexts)
+       private static final IDSequence _jobID = new IDSequence();
+       
+       public static RemoteParForJobReturn runJob(long pfid, String prog, 
HashMap<String, byte[]> clsMap, 
                        List<Task> tasks, ExecutionContext ec, boolean 
cpCaching, int numMappers) 
-               throws DMLRuntimeException  
+               throws DMLRuntimeException
        {
                String jobname = "ParFor-ESP";
                long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
@@ -68,13 +72,16 @@ public class RemoteParForSpark
                LongAccumulator aTasks = sc.sc().longAccumulator("tasks");
                LongAccumulator aIters = sc.sc().longAccumulator("iterations");
                
+               //reset cached shared inputs for correctness in local mode
+               long jobid = _jobID.getNextID();
+               if( InfrastructureAnalyzer.isLocalMode() )
+                       RemoteParForSparkWorker.cleanupCachedVariables(jobid);
+               
                //run remote_spark parfor job 
                //(w/o lazy evaluation to fit existing parfor framework, e.g., 
result merge)
-               RemoteParForSparkWorker func = new 
RemoteParForSparkWorker(program, clsMap, cpCaching, aTasks, aIters);
-               List<Tuple2<Long,String>> out = sc
-                               .parallelize(tasks, tasks.size()) //create rdd 
of parfor tasks
-                               .flatMapToPair(func)              //execute 
parfor tasks 
-                               .collect();                       //get output 
handles
+               List<Tuple2<Long,String>> out = sc.parallelize(tasks, 
tasks.size()) //create rdd of parfor tasks
+                       .flatMapToPair(new RemoteParForSparkWorker(jobid, prog, 
clsMap, cpCaching, aTasks, aIters))
+                       .collect(); //execute and get output handles
                
                //de-serialize results
                LocalVariableMap[] results = RemoteParForUtils.getResults(out, 
LOG);
@@ -85,11 +92,10 @@ public class RemoteParForSpark
                RemoteParForJobReturn ret = new RemoteParForJobReturn(true, 
numTasks, numIters, results);
                
                //maintain statistics
-           Statistics.incrementNoOfCompiledSPInst();
-           Statistics.incrementNoOfExecutedSPInst();
-           if( DMLScript.STATISTICS ){
+               Statistics.incrementNoOfCompiledSPInst();
+               Statistics.incrementNoOfExecutedSPInst();
+               if( DMLScript.STATISTICS )
                        Statistics.maintainCPHeavyHitters(jobname, 
System.nanoTime()-t0);
-               }
                
                return ret;
        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index e1410da..033d398 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -21,10 +21,12 @@ package org.apache.sysml.runtime.controlprogram.parfor;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.util.LongAccumulator;
@@ -40,8 +42,11 @@ import scala.Tuple2;
 public class RemoteParForSparkWorker extends ParWorker implements 
PairFlatMapFunction<Task, Long, String> 
 {
        private static final long serialVersionUID = -3254950138084272296L;
-
-       private final String  _prog;
+       
+       private static final CachedReuseVariables reuseVars = new 
CachedReuseVariables();
+       
+       private final long _jobid;
+       private final String _prog;
        private final HashMap<String, byte[]> _clsMap;
        private boolean _initialized = false;
        private boolean _caching = true;
@@ -49,9 +54,10 @@ public class RemoteParForSparkWorker extends ParWorker 
implements PairFlatMapFun
        private final LongAccumulator _aTasks;
        private final LongAccumulator _aIters;
        
-       public RemoteParForSparkWorker(String program, HashMap<String, byte[]> 
clsMap, boolean cpCaching, LongAccumulator atasks, LongAccumulator aiters) 
+       public RemoteParForSparkWorker(long jobid, String program, 
HashMap<String, byte[]> clsMap, boolean cpCaching, LongAccumulator atasks, 
LongAccumulator aiters) 
                throws DMLRuntimeException
        {
+               _jobid = jobid;
                _prog = program;
                _clsMap = clsMap;
                _initialized = false;
@@ -68,7 +74,7 @@ public class RemoteParForSparkWorker extends ParWorker 
implements PairFlatMapFun
        {
                //lazy parworker initialization
                if( !_initialized )
-                       configureWorker( TaskContext.get().taskAttemptId() );
+                       configureWorker(TaskContext.get().taskAttemptId());
                
                //execute a single task
                long numIter = getExecutedIterations();
@@ -88,10 +94,11 @@ public class RemoteParForSparkWorker extends ParWorker 
implements PairFlatMapFun
                return ret.iterator();
        }
 
-       private void configureWorker( long ID ) 
+       @SuppressWarnings("unchecked")
+       private void configureWorker(long taskID) 
                throws DMLRuntimeException, IOException
        {
-               _workerID = ID;
+               _workerID = taskID;
                
                //initialize codegen class cache (before program parsing)
                synchronized( CodegenUtils.class ) {
@@ -106,7 +113,13 @@ public class RemoteParForSparkWorker extends ParWorker 
implements PairFlatMapFun
                _resultVars  = body.getResultVarNames();
                _numTasks    = 0;
                _numIters    = 0;
-
+               
+               //reuse shared inputs (to read shared inputs once per process 
instead of once per core; 
+               //we reuse everything except result variables and partitioned 
input matrices)
+               _ec.pinVariables(_ec.getVarList()); //avoid cleanup of shared 
inputs
+               Collection<String> blacklist = 
CollectionUtils.union(_resultVars, _ec.getVarListPartitioned());
+               reuseVars.reuseVariables(_jobid, _ec.getVariables(), blacklist);
+               
                //init and register-cleanup of buffer pool (in parfor spark, 
multiple tasks might 
                //share the process-local, i.e., per executor, buffer pool; 
hence we synchronize 
                //the initialization and immediately register the created 
directory for cleanup
@@ -121,7 +134,7 @@ public class RemoteParForSparkWorker extends ParWorker 
implements PairFlatMapFun
                                                
CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
                                //register entire working dir for delete on 
shutdown
                                
RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown();
-                       }       
+                       }
                }
                
                //ensure that resultvar files are not removed
@@ -134,4 +147,8 @@ public class RemoteParForSparkWorker extends ParWorker 
implements PairFlatMapFun
                //mark as initialized
                _initialized = true;
        }
+       
+       public static void cleanupCachedVariables(long pfid) {
+               reuseVars.clearVariables(pfid);
+       }
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
index d202e07..7dd25bf 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
@@ -57,14 +57,14 @@ public class TaskPartitionerFactoring extends 
TaskPartitioner
        {
                LinkedList<Task> tasks = new LinkedList<Task>();
                
-               long lFrom  = _fromVal.getLongValue();
-               long lTo    = _toVal.getLongValue();
-               long lIncr  = _incrVal.getLongValue();
+               long lFrom = _fromVal.getLongValue();
+               long lTo = _toVal.getLongValue();
+               long lIncr = _incrVal.getLongValue();
                
                int P = _numThreads;  // number of parallel workers
-               long N = _numIter;     // total number of iterations
-               long R = N;            // remaining number of iterations
-               long K = -1;           // next _numThreads task sizes   
+               long N = _numIter;    // total number of iterations
+               long R = N;           // remaining number of iterations
+               long K = -1;          // next _numThreads task sizes
                TaskType type = null; // type of iterations: range tasks 
(similar to run-length encoding) make only sense if taskSize>3
                
                for( long i = lFrom; i<=lTo;  )
@@ -73,7 +73,7 @@ public class TaskPartitionerFactoring extends TaskPartitioner
                        R -= (K * P);
                        
                        type = (ParForProgramBlock.USE_RANGE_TASKS_IF_USEFUL && 
K>3 ) ? 
-                                          TaskType.RANGE : TaskType.SET;
+                               TaskType.RANGE : TaskType.SET;
                        
                        //for each logical processor
                        for( int j=0; j<P; j++ )
@@ -86,16 +86,12 @@ public class TaskPartitionerFactoring extends 
TaskPartitioner
                                tasks.addLast(lTask);
                                
                                // add iterations to task 
-                               if( type == TaskType.SET ) 
-                               {
+                               if( type == TaskType.SET ) {
                                        //value based tasks
                                        for( long k=0; k<K && i<=lTo; k++, 
i+=lIncr )
-                                       {
-                                               lTask.addIteration(new 
IntObject(_iterVarName, i));                             
-                                       }                               
+                                               lTask.addIteration(new 
IntObject(_iterVarName, i));
                                }
-                               else 
-                               {
+                               else {
                                        //determine end of task
                                        long to = Math.min( i+(K-1)*lIncr, lTo 
);
                                        
@@ -103,7 +99,6 @@ public class TaskPartitionerFactoring extends TaskPartitioner
                                        lTask.addIteration(new 
IntObject(_iterVarName, i));         //from
                                        lTask.addIteration(new 
IntObject(_iterVarName, to));    //to
                                        lTask.addIteration(new 
IntObject(_iterVarName, lIncr)); //increment
-                                       
                                        i = to + lIncr;
                                }
                        }
@@ -122,11 +117,11 @@ public class TaskPartitionerFactoring extends 
TaskPartitioner
                long lTo    = _toVal.getLongValue();
                long lIncr  = _incrVal.getLongValue();
                
-               int P = _numThreads;     // number of parallel workers
+               int P = _numThreads;   // number of parallel workers
                long N = _numIter;     // total number of iterations
-               long R = N;               // remaining number of iterations
-               long K = -1;              //next _numThreads task sizes 
-           TaskType type = null;    // type of iterations: range tasks 
(similar to run-length encoding) make only sense if taskSize>3
+               long R = N;            // remaining number of iterations
+               long K = -1;           //next _numThreads task sizes    
+           TaskType type = null;  // type of iterations: range tasks (similar 
to run-length encoding) make only sense if taskSize>3
                
                try
                {

Reply via email to