Repository: incubator-systemml
Updated Branches:
  refs/heads/master 2d2196d84 -> eb35b2c90


[SYSTEMML-1466] Fix rdd status handling for exports and writes, tests

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/eb35b2c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/eb35b2c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/eb35b2c9

Branch: refs/heads/master
Commit: eb35b2c90cbacc674793ca4aed4583273d00fa87
Parents: 2d2196d
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Tue Apr 25 15:03:49 2017 -0700
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Tue Apr 25 16:50:50 2017 -0700

----------------------------------------------------------------------
 .../controlprogram/caching/CacheableData.java   | 11 +++--
 .../controlprogram/caching/MatrixObject.java    |  3 ++
 .../instructions/spark/data/RDDObject.java      | 11 ++++-
 .../mlcontext/MLContextParforDatasetTest.java   | 43 ++++++++++++++++----
 4 files changed, 57 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eb35b2c9/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
index 904eb87..c1a024a 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheableData.java
@@ -204,7 +204,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                _uniqueID = (int)_seq.getNextID();              
                _cacheStatus = CacheStatus.EMPTY;
                _numReadThreads = 0;
-               _gpuObjects = new HashMap<>();
+               _gpuObjects = new HashMap<GPUContext, GPUObject>();
        }
        
        /**
@@ -835,14 +835,19 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                                throw new CacheException ("Export to " + fName 
+ " failed.", e);
                        }
                }
-               else if( getRDDHandle()!=null && //pending rdd operation
-                               !getRDDHandle().allowsShortCircuitRead() )
+               else if( getRDDHandle()!=null && getRDDHandle().isPending()
+                       && !getRDDHandle().isHDFSFile() 
+                       && !getRDDHandle().allowsShortCircuitRead() )
                {
                        //CASE 3: pending rdd operation (other than checkpoints)
                        try
                        {
+                               //write matrix or frame
                                writeBlobFromRDDtoHDFS(getRDDHandle(), fName, 
outputFormat);
                                writeMetaData( fName, outputFormat, 
formatProperties );
+
+                               //update rdd status
+                               getRDDHandle().setPending(false);
                        }
                        catch (Exception e) {
                                throw new CacheException ("Export to " + fName 
+ " failed.", e);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eb35b2c9/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
index 4e560c8..94bdb2d 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/MatrixObject.java
@@ -488,8 +488,11 @@ public class MatrixObject extends 
CacheableData<MatrixBlock>
                                if( 
!MapReduceTool.existsFileOnHDFS(_hdfsFileName) ) { //prevent overwrite existing 
file
                                        long newnnz = 
SparkExecutionContext.writeRDDtoHDFS(lrdd, _hdfsFileName, iimd.getOutputInfo());
                                        ((MatrixDimensionsMetaData) 
_metaData).getMatrixCharacteristics().setNonZeros(newnnz);
+                                       ((RDDObject)rdd).setPending(false); 
//mark rdd as non-pending (for export)
                                        ((RDDObject)rdd).setHDFSFile(true); 
//mark rdd as hdfs file (for restore)
                                        writeStatus.setValue(true);         
//mark for no cache-write on read
+                                       //note: the flag hdfsFile is actually 
not entirely correct because we still hold an rdd 
+                                       //reference to the input not to an rdd 
of the hdfs file but the resulting behavior is correct
                                }
                                mb = readBlobFromHDFS(_hdfsFileName);
                        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eb35b2c9/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
index 67f97a3..0a52323 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/RDDObject.java
@@ -29,7 +29,8 @@ public class RDDObject extends LineageObject
        private boolean _checkpointed = false; //created via checkpoint 
instruction
        private boolean _hdfsfile = false;     //created from hdfs file
        private String  _hdfsFname = null;     //hdfs filename, if created from 
hdfs.  
-       private boolean _parRDD = false;
+       private boolean _parRDD = false;       //is a parallelized rdd at driver
+       private boolean _pending = true;       //is a pending rdd operation
        
        public RDDObject( JavaPairRDD<?,?> rddvar, String varName) {
                super(varName);
@@ -72,6 +73,14 @@ public class RDDObject extends LineageObject
                return _parRDD; 
        }
        
+       public void setPending(boolean flag) {
+               _pending = flag;
+       }
+       
+       public boolean isPending() {
+               return _pending;
+       }
+       
 
        /**
         * Indicates if rdd is an hdfs file or a checkpoint over an hdfs file;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/eb35b2c9/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextParforDatasetTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextParforDatasetTest.java
 
b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextParforDatasetTest.java
index 41b8d16..36e7990 100644
--- 
a/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextParforDatasetTest.java
+++ 
b/src/test/java/org/apache/sysml/test/integration/mlcontext/MLContextParforDatasetTest.java
@@ -80,25 +80,45 @@ public class MLContextParforDatasetTest extends 
AutomatedTestBase
 
        @Test
        public void testParforDatasetVector() {
-               runMLContextParforDatasetTest(true, false);
+               runMLContextParforDatasetTest(true, false, false);
        }
        
        @Test
        public void testParforDatasetRow() {
-               runMLContextParforDatasetTest(false, false);
+               runMLContextParforDatasetTest(false, false, false);
        }
        
        @Test
        public void testParforDatasetVectorUnkownDims() {
-               runMLContextParforDatasetTest(true, true);
+               runMLContextParforDatasetTest(true, true, false);
        }
        
        @Test
        public void testParforDatasetRowUnknownDims() {
-               runMLContextParforDatasetTest(false, true);
+               runMLContextParforDatasetTest(false, true, false);
        }
        
-       private void runMLContextParforDatasetTest(boolean vector, boolean 
unknownDims) 
+       @Test
+       public void testParforDatasetVectorMulti() {
+               runMLContextParforDatasetTest(true, false, true);
+       }
+       
+       @Test
+       public void testParforDatasetRowMulti() {
+               runMLContextParforDatasetTest(false, false, true);
+       }
+       
+       @Test
+       public void testParforDatasetVectorUnkownDimsMulti() {
+               runMLContextParforDatasetTest(true, true, true);
+       }
+       
+       @Test
+       public void testParforDatasetRowUnknownDimsMulti() {
+               runMLContextParforDatasetTest(false, true, true);
+       }
+       
+       private void runMLContextParforDatasetTest(boolean vector, boolean 
unknownDims, boolean multiInputs) 
        {
                //modify memory budget to trigger fused datapartition-execute
                long oldmem = InfrastructureAnalyzer.getLocalMaxMemory();
@@ -119,21 +139,30 @@ public class MLContextParforDatasetTest extends 
AutomatedTestBase
                        MatrixMetadata mm = new MatrixMetadata(vector ? 
MatrixFormat.DF_VECTOR_WITH_INDEX : MatrixFormat.DF_DOUBLES_WITH_INDEX);
                        mm.setMatrixCharacteristics(mc2);
                        
-                       String s = "v = matrix(0, rows=nrow(X), cols=1)"
+                       String s1 = "v = matrix(0, rows=nrow(X), cols=1)"
                                        + "parfor(i in 1:nrow(X), log=DEBUG) {"
                                        + "   v[i, ] = sum(X[i, ]);"
                                        + "}"
                                        + "r = sum(v);";
+                       String s2 = "v = matrix(0, rows=nrow(X), cols=1)"
+                                       +"Y = X;"
+                                       + "parfor(i in 1:nrow(X), log=DEBUG) {"
+                                       + "   v[i, ] = sum(X[i, ]+Y[i, ]);"
+                                       + "}"
+                                       + "r = sum(v);";
+                       String s = multiInputs ? s2 : s1;
+                       
                        Script script = dml(s).in("X", df, mm).out("r");
                        MLResults results = ml.execute(script);
                        
                        //compare aggregation results
                        double sum1 = results.getDouble("r");
-                       double sum2 = mbA.sum();
+                       double sum2 = mbA.sum() * (multiInputs ? 2 : 1);
                        
                        TestUtils.compareScalars(sum2, sum1, 0.000001);
                }
                catch(Exception ex) {
+                       ex.printStackTrace();
                        throw new RuntimeException(ex);
                }
                finally {

Reply via email to