Repository: incubator-systemml Updated Branches: refs/heads/master 2d2196d84 -> eb35b2c90
[SYSTEMML-1466] Fix rdd status handling for exports and writes, tests Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/eb35b2c9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/eb35b2c9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/eb35b2c9 Branch: refs/heads/master Commit: eb35b2c90cbacc674793ca4aed4583273d00fa87 Parents: 2d2196d Author: Matthias Boehm <mboe...@gmail.com> Authored: Tue Apr 25 15:03:49 2017 -0700 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Tue Apr 25 16:50:50 2017 -0700 ---------------------------------------------------------------------- .../controlprogram/caching/CacheableData.java | 11 +++-- .../controlprogram/caching/MatrixObject.java | 3 ++ .../instructions/spark/data/RDDObject.java | 11 ++++- .../mlcontext/MLContextParforDatasetTest.java | 43 ++++++++++++++++---- 4 files changed, 57 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eb35b2c9/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java index 904eb87..c1a024a 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java @@ -204,7 +204,7 @@ public abstract class CacheableData<T extends CacheBlock> extends Data _uniqueID = (int)_seq.getNextID(); _cacheStatus = CacheStatus.EMPTY; _numReadThreads = 0; - _gpuObjects = new HashMap<>(); + _gpuObjects = new HashMap<GPUContext, GPUObject>(); } /** @@ -835,14 +835,19 @@ public abstract class CacheableData<T extends CacheBlock> extends Data throw new CacheException ("Export to " + fName + " failed.", e); } } - else if( getRDDHandle()!=null && //pending rdd operation - !getRDDHandle().allowsShortCircuitRead() ) + else if( getRDDHandle()!=null && getRDDHandle().isPending() + && !getRDDHandle().isHDFSFile() + && !getRDDHandle().allowsShortCircuitRead() ) { //CASE 3: pending rdd operation (other than checkpoints) try { + //write matrix or frame writeBlobFromRDDtoHDFS(getRDDHandle(), fName, outputFormat); writeMetaData( fName, outputFormat, formatProperties ); + + //update rdd status + getRDDHandle().setPending(false); } catch (Exception e) { throw new CacheException ("Export to " + fName + " failed.", e); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eb35b2c9/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java index 4e560c8..94bdb2d 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java @@ -488,8 +488,11 @@ public class MatrixObject extends CacheableData<MatrixBlock> if( !MapReduceTool.existsFileOnHDFS(_hdfsFileName) ) { //prevent overwrite existing file long newnnz = SparkExecutionContext.writeRDDtoHDFS(lrdd, _hdfsFileName, iimd.getOutputInfo()); ((MatrixDimensionsMetaData) _metaData).getMatrixCharacteristics().setNonZeros(newnnz); + ((RDDObject)rdd).setPending(false); //mark rdd as non-pending (for export) ((RDDObject)rdd).setHDFSFile(true); //mark rdd as hdfs file (for restore) writeStatus.setValue(true); //mark for no cache-write on read + //note: the flag hdfsFile is actually not entirely correct because we still hold an rdd + //reference to the input not to an rdd of the hdfs file but the resulting behavior is correct } mb = readBlobFromHDFS(_hdfsFileName); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eb35b2c9/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java index 67f97a3..0a52323 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java @@ -29,7 +29,8 @@ public class RDDObject extends LineageObject private boolean _checkpointed = false; //created via checkpoint instruction private boolean _hdfsfile = false; //created from hdfs file private String _hdfsFname = null; //hdfs filename, if created from hdfs. - private boolean _parRDD = false; + private boolean _parRDD = false; //is a parallelized rdd at driver + private boolean _pending = true; //is a pending rdd operation public RDDObject( JavaPairRDD<?,?> rddvar, String varName) { super(varName); @@ -72,6 +73,14 @@ public class RDDObject extends LineageObject return _parRDD; } + public void setPending(boolean flag) { + _pending = flag; + } + + public boolean isPending() { + return _pending; + } + /** * Indicates if rdd is an hdfs file or a checkpoint over an hdfs file; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eb35b2c9/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextParforDatasetTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextParforDatasetTest.java b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextParforDatasetTest.java index 41b8d16..36e7990 100644 --- a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextParforDatasetTest.java +++ b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextParforDatasetTest.java @@ -80,25 +80,45 @@ public class MLContextParforDatasetTest extends AutomatedTestBase @Test public void testParforDatasetVector() { - runMLContextParforDatasetTest(true, false); + runMLContextParforDatasetTest(true, false, false); } @Test public void testParforDatasetRow() { - runMLContextParforDatasetTest(false, false); + runMLContextParforDatasetTest(false, false, false); } @Test public void testParforDatasetVectorUnkownDims() { - runMLContextParforDatasetTest(true, true); + runMLContextParforDatasetTest(true, true, false); } @Test public void testParforDatasetRowUnknownDims() { - runMLContextParforDatasetTest(false, true); + runMLContextParforDatasetTest(false, true, false); } - private void runMLContextParforDatasetTest(boolean vector, boolean unknownDims) + @Test + public void testParforDatasetVectorMulti() { + runMLContextParforDatasetTest(true, false, true); + } + + @Test + public void testParforDatasetRowMulti() { + runMLContextParforDatasetTest(false, false, true); + } + + @Test + public void testParforDatasetVectorUnkownDimsMulti() { + runMLContextParforDatasetTest(true, true, true); + } + + @Test + public void testParforDatasetRowUnknownDimsMulti() { + runMLContextParforDatasetTest(false, true, true); + } + + private void runMLContextParforDatasetTest(boolean vector, boolean unknownDims, boolean multiInputs) { //modify memory budget to trigger fused datapartition-execute long oldmem = InfrastructureAnalyzer.getLocalMaxMemory(); @@ -119,21 +139,30 @@ public class MLContextParforDatasetTest extends AutomatedTestBase MatrixMetadata mm = new MatrixMetadata(vector ? MatrixFormat.DF_VECTOR_WITH_INDEX : MatrixFormat.DF_DOUBLES_WITH_INDEX); mm.setMatrixCharacteristics(mc2); - String s = "v = matrix(0, rows=nrow(X), cols=1)" + String s1 = "v = matrix(0, rows=nrow(X), cols=1)" + "parfor(i in 1:nrow(X), log=DEBUG) {" + " v[i, ] = sum(X[i, ]);" + "}" + "r = sum(v);"; + String s2 = "v = matrix(0, rows=nrow(X), cols=1)" + +"Y = X;" + + "parfor(i in 1:nrow(X), log=DEBUG) {" + + " v[i, ] = sum(X[i, ]+Y[i, ]);" + + "}" + + "r = sum(v);"; + String s = multiInputs ? s2 : s1; + Script script = dml(s).in("X", df, mm).out("r"); MLResults results = ml.execute(script); //compare aggregation results double sum1 = results.getDouble("r"); - double sum2 = mbA.sum(); + double sum2 = mbA.sum() * (multiInputs ? 2 : 1); TestUtils.compareScalars(sum2, sum1, 0.000001); } catch(Exception ex) { + ex.printStackTrace(); throw new RuntimeException(ex); } finally {