Repository: incubator-systemml Updated Branches: refs/heads/master 39f75ca06 -> 95be80c5b
[SYSTEMML-1310] Extended parfor block partitioning (support in dpesp) Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/1aac97ee Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/1aac97ee Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/1aac97ee Branch: refs/heads/master Commit: 1aac97ee1b25537278f292d89b4a7c4ab1355c2e Parents: 39f75ca Author: Matthias Boehm <mboe...@gmail.com> Authored: Tue Mar 21 19:40:39 2017 -0700 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Tue Mar 21 19:40:39 2017 -0700 ---------------------------------------------------------------------- .../controlprogram/ParForProgramBlock.java | 13 +++++ .../parfor/RemoteDPParForSpark.java | 7 +-- .../parfor/RemoteDPParForSparkWorker.java | 14 +++++- .../parfor/opt/OptimizerConstrained.java | 5 +- .../parfor/opt/OptimizerRuleBased.java | 21 +++++++- .../ParForBlockwiseDataPartitioningTest.java | 53 +++++++++++++------- 6 files changed, 89 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1aac97ee/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java index a6b8e0e..eec84b6 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java @@ -103,6 +103,7 @@ import org.apache.sysml.runtime.instructions.cp.IntObject; import org.apache.sysml.runtime.instructions.cp.StringObject; import org.apache.sysml.runtime.instructions.cp.VariableCPInstruction; import org.apache.sysml.runtime.io.IOUtilFunctions; +import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.OutputInfo; import org.apache.sysml.runtime.util.UtilFunctions; import org.apache.sysml.utils.Statistics; @@ -214,6 +215,18 @@ public class ParForProgramBlock extends ForProgramBlock return _dpf == PDataPartitionFormat.COLUMN_BLOCK_WISE_N || _dpf == PDataPartitionFormat.ROW_BLOCK_WISE_N; } + public long getNumParts(MatrixCharacteristics mc) { + switch( _dpf ) { + case ROW_WISE: return mc.getRows(); + case ROW_BLOCK_WISE: return mc.getNumRowBlocks(); + case ROW_BLOCK_WISE_N: return (long)Math.ceil((double)mc.getRows()/_N); + case COLUMN_WISE: return mc.getCols(); + case COLUMN_BLOCK_WISE: return mc.getNumColBlocks(); + case COLUMN_BLOCK_WISE_N: return (long)Math.ceil((double)mc.getCols()/_N); + default: + throw new RuntimeException("Unsupported partition format: "+_dpf); + } + } } public enum PDataPartitioner { http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1aac97ee/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java index a612a3e..3246cb7 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java @@ -92,8 +92,7 @@ public class RemoteDPParForSpark //compute number of reducers (to avoid OOMs and reduce memory pressure) int numParts = SparkUtils.getNumPreferredPartitions(mc, in); - int numParts2 = (int)((dpf._dpf==PDataPartitionFormat.ROW_BLOCK_WISE) ? mc.getRows() : mc.getCols()); - int numReducers2 = Math.max(numReducers, Math.min(numParts, numParts2)); + int numReducers2 = Math.max(numReducers, Math.min(numParts, (int)dpf.getNumParts(mc))); //core parfor datapartition-execute (w/ or w/o shuffle, depending on data characteristics) RemoteDPParForSparkWorker efun = new RemoteDPParForSparkWorker(program, clsMap, @@ -177,7 +176,9 @@ public class RemoteDPParForSpark private static boolean requiresGrouping(PartitionFormat dpf, MatrixObject mo) { MatrixCharacteristics mc = mo.getMatrixCharacteristics(); return ((dpf == PartitionFormat.ROW_WISE && mc.getNumColBlocks() > 1) - || (dpf == PartitionFormat.COLUMN_WISE && mc.getNumRowBlocks() > 1)) + || (dpf == PartitionFormat.COLUMN_WISE && mc.getNumRowBlocks() > 1) + || (dpf._dpf == PDataPartitionFormat.ROW_BLOCK_WISE_N && mc.getNumColBlocks() > 1) + || (dpf._dpf == PDataPartitionFormat.COLUMN_BLOCK_WISE_N && mc.getNumRowBlocks() > 1)) && !hasInputDataSet(dpf, mo); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1aac97ee/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java index 0d368e8..bcf5652 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java @@ -87,8 +87,18 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF _aIters = aiters; //setup matrix block partition meta data - _rlen = (dpf != PartitionFormat.ROW_WISE) ? (int)mc.getRows() : 1; - _clen = (dpf != PartitionFormat.COLUMN_WISE) ? (int)mc.getCols() : 1; + switch( dpf._dpf ) { + case ROW_WISE: + _rlen = (int)mc.getRows(); _clen = 1; break; + case ROW_BLOCK_WISE_N: + _rlen = dpf._N; _clen = (int)mc.getCols(); break; + case COLUMN_BLOCK_WISE: + _rlen = 1; _clen = (int)mc.getCols(); break; + case COLUMN_BLOCK_WISE_N: + _rlen = (int)mc.getRows(); _clen = dpf._N; break; + default: + throw new RuntimeException("Unsupported partition format: "+dpf._dpf.name()); + } _brlen = mc.getRowsPerBlock(); _bclen = mc.getColsPerBlock(); _tSparseCol = tSparseCol; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1aac97ee/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java index 2e4b95b..235b927 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java @@ -29,6 +29,7 @@ import org.apache.sysml.parser.ParForStatementBlock; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.controlprogram.LocalVariableMap; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock; +import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartitioner; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PExecMode; import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.POptMode; @@ -386,7 +387,9 @@ public class OptimizerConstrained extends OptimizerRuleBased if( rIsAccessByIterationVariable(pn, moVarname, iterVarname) && ((moDpf==PartitionFormat.ROW_WISE && mo.getNumRows()==_N ) || - (moDpf==PartitionFormat.COLUMN_WISE && mo.getNumColumns()==_N)) ) + (moDpf==PartitionFormat.COLUMN_WISE && mo.getNumColumns()==_N) || + (moDpf._dpf==PDataPartitionFormat.ROW_BLOCK_WISE_N && mo.getNumRows()<=_N*moDpf._N)|| + (moDpf._dpf==PDataPartitionFormat.COLUMN_BLOCK_WISE_N && mo.getNumColumns()<=_N*moDpf._N)) ) { int k = (int)Math.min(_N,_rk2); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1aac97ee/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java index 0e7170f..4003975 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java @@ -1524,7 +1524,9 @@ public class OptimizerRuleBased extends Optimizer if( rIsAccessByIterationVariable(pn, moVarname, iterVarname) && ((moDpf==PartitionFormat.ROW_WISE && mo.getNumRows()==_N ) || - (moDpf==PartitionFormat.COLUMN_WISE && mo.getNumColumns()==_N)) ) + (moDpf==PartitionFormat.COLUMN_WISE && mo.getNumColumns()==_N) || + (moDpf._dpf==PDataPartitionFormat.ROW_BLOCK_WISE_N && mo.getNumRows()<=_N*moDpf._N)|| + (moDpf._dpf==PDataPartitionFormat.COLUMN_BLOCK_WISE_N && mo.getNumColumns()<=_N*moDpf._N)) ) { int k = (int)Math.min(_N,_rk2); @@ -1567,10 +1569,16 @@ public class OptimizerRuleBased extends Optimizer if( h.getInput().get(1) instanceof DataOp ) indexAccess = h.getInput().get(1).getName(); break; + case ROW_BLOCK_WISE_N: //input 1 and 2 have same slope and var + indexAccess = rGetVarFromExpression(h.getInput().get(1)); + break; case COLUMN_WISE: //input 3 and 4 eq if( h.getInput().get(3) instanceof DataOp ) indexAccess = h.getInput().get(3).getName(); break; + case COLUMN_BLOCK_WISE_N: //input 3 and 4 have same slope and var + indexAccess = rGetVarFromExpression(h.getInput().get(3)); + break; default: //do nothing @@ -1583,6 +1591,17 @@ public class OptimizerRuleBased extends Optimizer return ret; } + private static String rGetVarFromExpression(Hop current) { + String var = null; + for( Hop c : current.getInput() ) { + var = rGetVarFromExpression(c); + if( var != null ) + return var; + } + return (current instanceof DataOp) ? + current.getName() : null; + } + /////// //REWRITE transpose sparse vector operations /// http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/1aac97ee/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForBlockwiseDataPartitioningTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForBlockwiseDataPartitioningTest.java b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForBlockwiseDataPartitioningTest.java index a01d22e..29a6f97 100644 --- a/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForBlockwiseDataPartitioningTest.java +++ b/src/test/java/org/apache/sysml/test/integration/functions/parfor/ParForBlockwiseDataPartitioningTest.java @@ -21,6 +21,7 @@ package org.apache.sysml.test.integration.functions.parfor; import java.util.HashMap; +import org.junit.Assert; import org.junit.Test; import org.apache.sysml.api.DMLScript; @@ -68,17 +69,17 @@ public class ParForBlockwiseDataPartitioningTest extends AutomatedTestBase } @Test - public void testParForRowBlockPartitioningLocalRemoteSparkDense() { + public void testParForRowBlockPartitioningLocalRemoteDense() { runParForDataPartitioningTest(TEST_NAME1, PDataPartitioner.LOCAL, PExecMode.REMOTE_SPARK, false); } @Test - public void testParForRowBlockPartitioningRemoteSparkLocalDense() { + public void testParForRowBlockPartitioningRemoteLocalDense() { runParForDataPartitioningTest(TEST_NAME1, PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, false); } @Test - public void testParForRowBlockPartitioningRemoteSparkRemoteDense() { + public void testParForRowBlockPartitioningRemoteRemoteDense() { runParForDataPartitioningTest(TEST_NAME1, PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, false); } @@ -88,17 +89,17 @@ public class ParForBlockwiseDataPartitioningTest extends AutomatedTestBase } @Test - public void testParForRowBlockPartitioningLocalRemoteSparkSparse() { + public void testParForRowBlockPartitioningLocalRemoteSparse() { runParForDataPartitioningTest(TEST_NAME1, PDataPartitioner.LOCAL, PExecMode.REMOTE_SPARK, true); } @Test - public void testParForRowBlockPartitioningRemoteSparkLocalSparse() { + public void testParForRowBlockPartitioningRemoteLocalSparse() { runParForDataPartitioningTest(TEST_NAME1, PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, true); } @Test - public void testParForRowBlockPartitioningRemoteSparkRemoteSparse() { + public void testParForRowBlockPartitioningRemoteRemoteSparse() { runParForDataPartitioningTest(TEST_NAME1, PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, true); } @@ -108,17 +109,17 @@ public class ParForBlockwiseDataPartitioningTest extends AutomatedTestBase } @Test - public void testParForColBlockPartitioningLocalRemoteSparkDense() { + public void testParForColBlockPartitioningLocalRemoteDense() { runParForDataPartitioningTest(TEST_NAME2, PDataPartitioner.LOCAL, PExecMode.REMOTE_SPARK, false); } @Test - public void testParForColBlockPartitioningRemoteSparkLocalDense() { + public void testParForColBlockPartitioningRemoteLocalDense() { runParForDataPartitioningTest(TEST_NAME2, PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, false); } @Test - public void testParForColBlockPartitioningRemoteSparkRemoteDense() { + public void testParForColBlockPartitioningRemoteRemoteDense() { runParForDataPartitioningTest(TEST_NAME2, PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, false); } @@ -128,39 +129,52 @@ public class ParForBlockwiseDataPartitioningTest extends AutomatedTestBase } @Test - public void testParForColBlockPartitioningLocalRemoteSparkSparse() { + public void testParForColBlockPartitioningLocalRemoteSparse() { runParForDataPartitioningTest(TEST_NAME2, PDataPartitioner.LOCAL, PExecMode.REMOTE_SPARK, true); } @Test - public void testParForColBlockPartitioningRemoteSparkLocalSparse() { + public void testParForColBlockPartitioningRemoteLocalSparse() { runParForDataPartitioningTest(TEST_NAME2, PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, true); } @Test - public void testParForColBlockPartitioningRemoteSparkRemoteSparse() { + public void testParForColBlockPartitioningRemoteRemoteSparse() { runParForDataPartitioningTest(TEST_NAME2, PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, true); } + //fused data partition execute + + @Test + public void testParForRowBlockPartitioningRemoteRemoteFusedDense() { + runParForDataPartitioningTest(TEST_NAME1, PDataPartitioner.UNSPECIFIED, PExecMode.REMOTE_SPARK_DP, false); + } + + @Test + public void testParForColBlockPartitioningRemoteRemoteFusedDense() { + runParForDataPartitioningTest(TEST_NAME2, PDataPartitioner.UNSPECIFIED, PExecMode.REMOTE_SPARK_DP, false); + } + + //negative examples @Test - public void testParForRowBlockPartitioningRemoteSparkLocalSparseNegative() { + public void testParForRowBlockPartitioningRemoteLocalSparseNegative() { runParForDataPartitioningTest(TEST_NAME3, PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, true); } @Test - public void testParForRowBlockPartitioningRemoteSparkRemoteSparseNegative() { + public void testParForRowBlockPartitioningRemoteRemoteSparseNegative() { runParForDataPartitioningTest(TEST_NAME3, PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, true); } @Test - public void testParForColBlockPartitioningRemoteSparkLocalSparseNegative() { + public void testParForColBlockPartitioningRemoteLocalSparseNegative() { runParForDataPartitioningTest(TEST_NAME4, PDataPartitioner.REMOTE_SPARK, PExecMode.LOCAL, true); } @Test - public void testParForColBlockPartitioningRemoteSparkRemoteSparseNegative() { + public void testParForColBlockPartitioningRemoteRemoteSparseNegative() { runParForDataPartitioningTest(TEST_NAME4, PDataPartitioner.REMOTE_SPARK, PExecMode.REMOTE_SPARK, true); } @@ -183,7 +197,7 @@ public class ParForBlockwiseDataPartitioningTest extends AutomatedTestBase String HOME = SCRIPT_DIR + TEST_DIR; fullDMLScriptName = HOME + testname + ".dml"; - programArgs = new String[]{"-args", input("V"), + programArgs = new String[]{"-stats", "-args", input("V"), partitioner.name(), mode.name(), output("R") }; fullRScriptName = HOME + testname + ".R"; @@ -204,6 +218,11 @@ public class ParForBlockwiseDataPartitioningTest extends AutomatedTestBase HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R"); HashMap<CellIndex, Double> rfile = readRMatrixFromFS("Rout"); TestUtils.compareMatrices(dmlfile, rfile, eps, "DML", "R"); + + //test for correct plan + boolean pos = testname.equals(TEST_NAME1) || testname.equals(TEST_NAME2); + Assert.assertEquals(pos, heavyHittersContainsSubString("ParFor-DPSP") + || heavyHittersContainsSubString("ParFor-DPESP")); } finally {