Repository: incubator-systemml
Updated Branches:
  refs/heads/master 39f75ca06 -> 95be80c5b


[SYSTEMML-1310] Extended parfor block partitioning (support in dpesp)

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/1aac97ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/1aac97ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/1aac97ee

Branch: refs/heads/master
Commit: 1aac97ee1b25537278f292d89b4a7c4ab1355c2e
Parents: 39f75ca
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Tue Mar 21 19:40:39 2017 -0700
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Tue Mar 21 19:40:39 2017 -0700

----------------------------------------------------------------------
 .../controlprogram/ParForProgramBlock.java      | 13 +++++
 .../parfor/RemoteDPParForSpark.java             |  7 +--
 .../parfor/RemoteDPParForSparkWorker.java       | 14 +++++-
 .../parfor/opt/OptimizerConstrained.java        |  5 +-
 .../parfor/opt/OptimizerRuleBased.java          | 21 +++++++-
 .../ParForBlockwiseDataPartitioningTest.java    | 53 +++++++++++++-------
 6 files changed, 89 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1aac97ee/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index a6b8e0e..eec84b6 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -103,6 +103,7 @@ import org.apache.sysml.runtime.instructions.cp.IntObject;
 import org.apache.sysml.runtime.instructions.cp.StringObject;
 import org.apache.sysml.runtime.instructions.cp.VariableCPInstruction;
 import org.apache.sysml.runtime.io.IOUtilFunctions;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
 import org.apache.sysml.runtime.util.UtilFunctions;
 import org.apache.sysml.utils.Statistics;
@@ -214,6 +215,18 @@ public class ParForProgramBlock extends ForProgramBlock
                        return _dpf == PDataPartitionFormat.COLUMN_BLOCK_WISE_N 
                                || _dpf == 
PDataPartitionFormat.ROW_BLOCK_WISE_N;
                }
+               public long getNumParts(MatrixCharacteristics mc) {
+                       switch( _dpf ) {
+                               case ROW_WISE: return mc.getRows();
+                               case ROW_BLOCK_WISE: return 
mc.getNumRowBlocks();
+                               case ROW_BLOCK_WISE_N: return 
(long)Math.ceil((double)mc.getRows()/_N);
+                               case COLUMN_WISE: return mc.getCols();
+                               case COLUMN_BLOCK_WISE: return 
mc.getNumColBlocks();
+                               case COLUMN_BLOCK_WISE_N: return 
(long)Math.ceil((double)mc.getCols()/_N);
+                               default:
+                                       throw new RuntimeException("Unsupported 
partition format: "+_dpf);
+                       }
+               }
        }
        
        public enum PDataPartitioner {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1aac97ee/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
index a612a3e..3246cb7 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
@@ -92,8 +92,7 @@ public class RemoteDPParForSpark
 
                //compute number of reducers (to avoid OOMs and reduce memory 
pressure)
                int numParts = SparkUtils.getNumPreferredPartitions(mc, in);
-               int numParts2 = 
(int)((dpf._dpf==PDataPartitionFormat.ROW_BLOCK_WISE) ? mc.getRows() : 
mc.getCols()); 
-               int numReducers2 = Math.max(numReducers, Math.min(numParts, 
numParts2));
+               int numReducers2 = Math.max(numReducers, Math.min(numParts, 
(int)dpf.getNumParts(mc)));
                
                //core parfor datapartition-execute (w/ or w/o shuffle, 
depending on data characteristics)
                RemoteDPParForSparkWorker efun = new 
RemoteDPParForSparkWorker(program, clsMap, 
@@ -177,7 +176,9 @@ public class RemoteDPParForSpark
        private static boolean requiresGrouping(PartitionFormat dpf, 
MatrixObject mo) {
                MatrixCharacteristics mc = mo.getMatrixCharacteristics();
                return ((dpf == PartitionFormat.ROW_WISE && 
mc.getNumColBlocks() > 1)
-                               || (dpf == PartitionFormat.COLUMN_WISE && 
mc.getNumRowBlocks() > 1))
+                               || (dpf == PartitionFormat.COLUMN_WISE && 
mc.getNumRowBlocks() > 1)
+                               || (dpf._dpf == 
PDataPartitionFormat.ROW_BLOCK_WISE_N && mc.getNumColBlocks() > 1)
+                               || (dpf._dpf == 
PDataPartitionFormat.COLUMN_BLOCK_WISE_N && mc.getNumRowBlocks() > 1))
                        && !hasInputDataSet(dpf, mo);
        }
        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1aac97ee/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index 0d368e8..bcf5652 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -87,8 +87,18 @@ public class RemoteDPParForSparkWorker extends ParWorker 
implements PairFlatMapF
                _aIters = aiters;
                
                //setup matrix block partition meta data
-               _rlen = (dpf != PartitionFormat.ROW_WISE) ? (int)mc.getRows() : 
1;
-               _clen = (dpf != PartitionFormat.COLUMN_WISE) ? 
(int)mc.getCols() : 1;
+               switch( dpf._dpf ) {
+                       case ROW_WISE: 
+                               _rlen = (int)mc.getRows(); _clen = 1; break;
+                       case ROW_BLOCK_WISE_N:
+                               _rlen = dpf._N; _clen = (int)mc.getCols(); 
break;
+                       case COLUMN_BLOCK_WISE:
+                               _rlen = 1; _clen = (int)mc.getCols(); break;
+                       case COLUMN_BLOCK_WISE_N:
+                               _rlen = (int)mc.getRows(); _clen = dpf._N; 
break;
+                       default:
+                               throw new RuntimeException("Unsupported 
partition format: "+dpf._dpf.name());
+               }
                _brlen = mc.getRowsPerBlock();
                _bclen = mc.getColsPerBlock();
                _tSparseCol = tSparseCol;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1aac97ee/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
index 2e4b95b..235b927 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
@@ -29,6 +29,7 @@ import org.apache.sysml.parser.ParForStatementBlock;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock;
+import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
 import 
org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitioner;
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PExecMode;
 import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.POptMode;
@@ -386,7 +387,9 @@ public class OptimizerConstrained extends OptimizerRuleBased
 
                        if( rIsAccessByIterationVariable(pn, moVarname, 
iterVarname) &&
                           ((moDpf==PartitionFormat.ROW_WISE && 
mo.getNumRows()==_N ) ||
-                               (moDpf==PartitionFormat.COLUMN_WISE && 
mo.getNumColumns()==_N)) )
+                               (moDpf==PartitionFormat.COLUMN_WISE && 
mo.getNumColumns()==_N) ||
+                               
(moDpf._dpf==PDataPartitionFormat.ROW_BLOCK_WISE_N && 
mo.getNumRows()<=_N*moDpf._N)||
+                               
(moDpf._dpf==PDataPartitionFormat.COLUMN_BLOCK_WISE_N && 
mo.getNumColumns()<=_N*moDpf._N)) )
                        {
                                int k = (int)Math.min(_N,_rk2);
 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1aac97ee/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index 0e7170f..4003975 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -1524,7 +1524,9 @@ public class OptimizerRuleBased extends Optimizer
                        
                        if( rIsAccessByIterationVariable(pn, moVarname, 
iterVarname) &&
                           ((moDpf==PartitionFormat.ROW_WISE && 
mo.getNumRows()==_N ) ||
-                               (moDpf==PartitionFormat.COLUMN_WISE && 
mo.getNumColumns()==_N)) )
+                               (moDpf==PartitionFormat.COLUMN_WISE && 
mo.getNumColumns()==_N) ||
+                               
(moDpf._dpf==PDataPartitionFormat.ROW_BLOCK_WISE_N && 
mo.getNumRows()<=_N*moDpf._N)||
+                               
(moDpf._dpf==PDataPartitionFormat.COLUMN_BLOCK_WISE_N && 
mo.getNumColumns()<=_N*moDpf._N)) )
                        {
                                int k = (int)Math.min(_N,_rk2);
                                
@@ -1567,10 +1569,16 @@ public class OptimizerRuleBased extends Optimizer
                                        if( h.getInput().get(1) instanceof 
DataOp )
                                                indexAccess = 
h.getInput().get(1).getName();
                                        break;
+                               case ROW_BLOCK_WISE_N: //input 1 and 2 have 
same slope and var
+                                       indexAccess = 
rGetVarFromExpression(h.getInput().get(1));
+                                       break;
                                case COLUMN_WISE: //input 3 and 4 eq
                                        if( h.getInput().get(3) instanceof 
DataOp )
                                                indexAccess = 
h.getInput().get(3).getName();
                                        break;
+                               case COLUMN_BLOCK_WISE_N: //input 3 and 4 have 
same slope and var
+                                       indexAccess = 
rGetVarFromExpression(h.getInput().get(3));
+                                       break;
                                        
                                default:
                                        //do nothing
@@ -1583,6 +1591,17 @@ public class OptimizerRuleBased extends Optimizer
                return ret;
        }
        
+       private static String rGetVarFromExpression(Hop current) {
+               String var = null;
+               for( Hop c : current.getInput() ) {
+                       var = rGetVarFromExpression(c);
+                       if( var != null )
+                               return var;
+               }
+               return (current instanceof DataOp) ?
+                       current.getName() : null;
+       }
+       
        ///////
        //REWRITE transpose sparse vector operations
        ///

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1aac97ee/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForBlockwiseDataPartitioningTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForBlockwiseDataPartitioningTest.java
 
b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForBlockwiseDataPartitioningTest.java
index a01d22e..29a6f97 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForBlockwiseDataPartitioningTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForBlockwiseDataPartitioningTest.java
@@ -21,6 +21,7 @@ package org.apache.sysml.test.integration.functions.parfor;
 
 import java.util.HashMap;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.sysml.api.DMLScript;
@@ -68,17 +69,17 @@ public class ParForBlockwiseDataPartitioningTest extends 
AutomatedTestBase
        }
 
        @Test
-       public void testParForRowBlockPartitioningLocalRemoteSparkDense() {
+       public void testParForRowBlockPartitioningLocalRemoteDense() {
                runParForDataPartitioningTest(TEST_NAME1, 
PDataPartitioner.LOCAL, PExecMode.REMOTE_SPARK, false);
        }       
 
        @Test
-       public void testParForRowBlockPartitioningRemoteSparkLocalDense() {
+       public void testParForRowBlockPartitioningRemoteLocalDense() {
                runParForDataPartitioningTest(TEST_NAME1, 
PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, false);
        }
 
        @Test
-       public void testParForRowBlockPartitioningRemoteSparkRemoteDense() {
+       public void testParForRowBlockPartitioningRemoteRemoteDense() {
                runParForDataPartitioningTest(TEST_NAME1, 
PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, false);
        }
 
@@ -88,17 +89,17 @@ public class ParForBlockwiseDataPartitioningTest extends 
AutomatedTestBase
        }
 
        @Test
-       public void testParForRowBlockPartitioningLocalRemoteSparkSparse() {
+       public void testParForRowBlockPartitioningLocalRemoteSparse() {
                runParForDataPartitioningTest(TEST_NAME1, 
PDataPartitioner.LOCAL, PExecMode.REMOTE_SPARK, true);
        }       
 
        @Test
-       public void testParForRowBlockPartitioningRemoteSparkLocalSparse() {
+       public void testParForRowBlockPartitioningRemoteLocalSparse() {
                runParForDataPartitioningTest(TEST_NAME1, 
PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, true);
        }
 
        @Test
-       public void testParForRowBlockPartitioningRemoteSparkRemoteSparse() {
+       public void testParForRowBlockPartitioningRemoteRemoteSparse() {
                runParForDataPartitioningTest(TEST_NAME1, 
PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, true);
        }
 
@@ -108,17 +109,17 @@ public class ParForBlockwiseDataPartitioningTest extends 
AutomatedTestBase
        }
 
        @Test
-       public void testParForColBlockPartitioningLocalRemoteSparkDense() {
+       public void testParForColBlockPartitioningLocalRemoteDense() {
                runParForDataPartitioningTest(TEST_NAME2, 
PDataPartitioner.LOCAL, PExecMode.REMOTE_SPARK, false);
        }       
 
        @Test
-       public void testParForColBlockPartitioningRemoteSparkLocalDense() {
+       public void testParForColBlockPartitioningRemoteLocalDense() {
                runParForDataPartitioningTest(TEST_NAME2, 
PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, false);
        }
 
        @Test
-       public void testParForColBlockPartitioningRemoteSparkRemoteDense() {
+       public void testParForColBlockPartitioningRemoteRemoteDense() {
                runParForDataPartitioningTest(TEST_NAME2, 
PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, false);
        }
 
@@ -128,39 +129,52 @@ public class ParForBlockwiseDataPartitioningTest extends 
AutomatedTestBase
        }
 
        @Test
-       public void testParForColBlockPartitioningLocalRemoteSparkSparse() {
+       public void testParForColBlockPartitioningLocalRemoteSparse() {
                runParForDataPartitioningTest(TEST_NAME2, 
PDataPartitioner.LOCAL, PExecMode.REMOTE_SPARK, true);
        }       
 
        @Test
-       public void testParForColBlockPartitioningRemoteSparkLocalSparse() {
+       public void testParForColBlockPartitioningRemoteLocalSparse() {
                runParForDataPartitioningTest(TEST_NAME2, 
PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, true);
        }
 
        @Test
-       public void testParForColBlockPartitioningRemoteSparkRemoteSparse() {
+       public void testParForColBlockPartitioningRemoteRemoteSparse() {
                runParForDataPartitioningTest(TEST_NAME2, 
PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, true);
        }
        
+       //fused data partition execute
+       
+       @Test
+       public void testParForRowBlockPartitioningRemoteRemoteFusedDense() {
+               runParForDataPartitioningTest(TEST_NAME1, 
PDataPartitioner.UNSPECIFIED, PExecMode.REMOTE_SPARK_DP, false);
+       }
+
+       @Test
+       public void testParForColBlockPartitioningRemoteRemoteFusedDense() {
+               runParForDataPartitioningTest(TEST_NAME2, 
PDataPartitioner.UNSPECIFIED, PExecMode.REMOTE_SPARK_DP, false);
+       }
+
+       
        //negative examples
        
        @Test
-       public void 
testParForRowBlockPartitioningRemoteSparkLocalSparseNegative() {
+       public void testParForRowBlockPartitioningRemoteLocalSparseNegative() {
                runParForDataPartitioningTest(TEST_NAME3, 
PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, true);
        }
        
        @Test
-       public void 
testParForRowBlockPartitioningRemoteSparkRemoteSparseNegative() {
+       public void testParForRowBlockPartitioningRemoteRemoteSparseNegative() {
                runParForDataPartitioningTest(TEST_NAME3, 
PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, true);
        }
        
        @Test
-       public void 
testParForColBlockPartitioningRemoteSparkLocalSparseNegative() {
+       public void testParForColBlockPartitioningRemoteLocalSparseNegative() {
                runParForDataPartitioningTest(TEST_NAME4, 
PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, true);
        }
        
        @Test
-       public void 
testParForColBlockPartitioningRemoteSparkRemoteSparseNegative() {
+       public void testParForColBlockPartitioningRemoteRemoteSparseNegative() {
                runParForDataPartitioningTest(TEST_NAME4, 
PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, true);
        }
        
@@ -183,7 +197,7 @@ public class ParForBlockwiseDataPartitioningTest extends 
AutomatedTestBase
                        
                        String HOME = SCRIPT_DIR + TEST_DIR;
                        fullDMLScriptName = HOME + testname + ".dml";
-                       programArgs = new String[]{"-args", input("V"), 
+                       programArgs = new String[]{"-stats", "-args", 
input("V"), 
                                partitioner.name(), mode.name(), output("R") };
                        
                        fullRScriptName = HOME + testname + ".R";
@@ -204,6 +218,11 @@ public class ParForBlockwiseDataPartitioningTest extends 
AutomatedTestBase
                        HashMap<CellIndex, Double> dmlfile = 
readDMLMatrixFromHDFS("R");
                        HashMap<CellIndex, Double> rfile  = 
readRMatrixFromFS("Rout");
                        TestUtils.compareMatrices(dmlfile, rfile, eps, "DML", 
"R");
+                       
+                       //test for correct plan
+                       boolean pos = testname.equals(TEST_NAME1) || 
testname.equals(TEST_NAME2);
+                       Assert.assertEquals(pos, 
heavyHittersContainsSubString("ParFor-DPSP") 
+                                       || 
heavyHittersContainsSubString("ParFor-DPESP"));
                }
                finally
                {

Reply via email to