Repository: incubator-systemml
Updated Branches:
  refs/heads/master 8a6b3856c -> 0a4d563e4


[SYSTEMML-643] Fix mr/spark pmm compilation (row meta data), sample test

So far the output meta data of pmm (e.g., for removeEmpty(diag(V))%*%X)
was always computed as nrow(M) which is incorrect in the case where
removeEmpty(diag) was never materialized. On Spark, runtime meta data
management mitigated the problem. This patch fixes this issue for both
mr/spark by computing the output number of rows as (1) nrow(M) if
removeEmpty(diag) is materialized, and (2) max(V) otherwise.
Furthermore, this change also introduce sample.dml into out application
testsuite (incl data and meta data checks).

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/0a4d563e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/0a4d563e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/0a4d563e

Branch: refs/heads/master
Commit: 0a4d563e4c97ecbfd681ad04db43dcf3ec3d9243
Parents: 8a6b385
Author: Matthias Boehm <mbo...@us.ibm.com>
Authored: Thu Apr 21 21:35:06 2016 -0700
Committer: Matthias Boehm <mbo...@us.ibm.com>
Committed: Fri Apr 22 09:52:13 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/sysml/hops/AggBinaryOp.java |  68 +++++---
 .../test/integration/AutomatedTestBase.java     |  23 ++-
 .../applications/parfor/ParForSampleTest.java   | 162 +++++++++++++++++++
 .../applications/parfor/parfor_sample.dml       |  69 ++++++++
 4 files changed, 290 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0a4d563e/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java 
b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
index 02d32ff..8a8cffc 100644
--- a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
@@ -901,8 +901,12 @@ public class AggBinaryOp extends Hop implements 
MultiThreadedHop
                long brlen = pmInput.getRowsInBlock();
                long bclen = pmInput.getColsInBlock();
                
-               //a) full permutation matrix input (potentially without empty 
block materialized)
                Lop lpmInput = pmInput.constructLops();
+               Hop nrow = null;
+               double mestPM = OptimizerUtils.estimateSize(pmInput.getDim1(), 
1);
+               ExecType etVect = 
(mestPM>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP;            
              
+               
+               //a) full permutation matrix input (potentially without empty 
block materialized)
                if( pmInput.getDim2() != 1 ) //not a vector
                {
                        //compute condensed permutation matrix vector input     
                
@@ -930,20 +934,28 @@ public class AggBinaryOp extends Hop implements 
MultiThreadedHop
                        //mult.computeMemEstimate(memo); //select exec type
                        HopRewriteUtils.copyLineNumbers(this, mult);
                        
-                       lpmInput = mult.constructLops();
+                       //compute NROW target via nrow(m)
+                       nrow = HopRewriteUtils.createValueHop(pmInput, true);
+                       HopRewriteUtils.setOutputBlocksizes(nrow, 0, 0);
+                       nrow.setForcedExecType(ExecType.CP);
+                       HopRewriteUtils.copyLineNumbers(this, nrow);
                        
+                       lpmInput = mult.constructLops();
                        HopRewriteUtils.removeChildReference(pmInput, 
transpose);
                }
+               else //input vector
+               {
+                       //compute NROW target via max(v)
+                       nrow = HopRewriteUtils.createAggUnaryOp(pmInput, 
AggOp.MAX, Direction.RowCol); 
+                       HopRewriteUtils.setOutputBlocksizes(nrow, 0, 0);
+                       nrow.setForcedExecType(etVect);
+                       HopRewriteUtils.copyLineNumbers(this, nrow);
+               }
                
                //b) condensed permutation matrix vector input (target rows)
-               Hop nrow = HopRewriteUtils.createValueHop(pmInput, true); //NROW
-               HopRewriteUtils.setOutputBlocksizes(nrow, 0, 0);
-               nrow.setForcedExecType(ExecType.CP);
-               HopRewriteUtils.copyLineNumbers(this, nrow);
-               Lop lnrow = nrow.constructLops();
-               
                _outputEmptyBlocks = 
!OptimizerUtils.allowsToFilterEmptyBlockOutputs(this); 
-               PMMJ pmm = new PMMJ(lpmInput, rightInput.constructLops(), 
lnrow, getDataType(), getValueType(), false, _outputEmptyBlocks, 
ExecType.SPARK);
+               PMMJ pmm = new PMMJ(lpmInput, rightInput.constructLops(), 
nrow.constructLops(), 
+                               getDataType(), getValueType(), false, 
_outputEmptyBlocks, ExecType.SPARK);
                setOutputDimensions(pmm);
                setLineNumbers(pmm);
                setLops(pmm);
@@ -1309,8 +1321,12 @@ public class AggBinaryOp extends Hop implements 
MultiThreadedHop
                long brlen = pmInput.getRowsInBlock();
                long bclen = pmInput.getColsInBlock();
                
-               //a) full permutation matrix input (potentially without empty 
block materialized)
                Lop lpmInput = pmInput.constructLops();
+               Hop nrow = null;
+               double mestPM = OptimizerUtils.estimateSize(pmInput.getDim1(), 
1);
+               ExecType etVect = 
(mestPM>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP;
+               
+               //a) full permutation matrix input (potentially without empty 
block materialized)
                if( pmInput.getDim2() != 1 ) //not a vector
                {
                        //compute condensed permutation matrix vector input     
                
@@ -1335,31 +1351,36 @@ public class AggBinaryOp extends Hop implements 
MultiThreadedHop
                        HopRewriteUtils.setOutputBlocksizes(mult, brlen, 
bclen); 
                        mult.refreshSizeInformation();
                        mult.setForcedExecType(ExecType.MR);
-                       //mult.computeMemEstimate(memo); //select exec type
                        HopRewriteUtils.copyLineNumbers(this, mult);
                        
-                       lpmInput = mult.constructLops();
-                       
+                       //compute NROW target via nrow(m)
+                       nrow = HopRewriteUtils.createValueHop(pmInput, true);
+                       HopRewriteUtils.setOutputBlocksizes(nrow, 0, 0);
+                       nrow.setForcedExecType(ExecType.CP);
+                       HopRewriteUtils.copyLineNumbers(this, nrow);
+                               
+                       lpmInput = mult.constructLops();                        
                        HopRewriteUtils.removeChildReference(pmInput, 
transpose);
                }
+               else //input vector
+               {
+                       //compute NROW target via max(v)
+                       nrow = HopRewriteUtils.createAggUnaryOp(pmInput, 
AggOp.MAX, Direction.RowCol); 
+                       HopRewriteUtils.setOutputBlocksizes(nrow, 0, 0);
+                       nrow.setForcedExecType(etVect);
+                       HopRewriteUtils.copyLineNumbers(this, nrow);
+               }
                
-               //b) condensed permutation matrix vector input (target rows)
-               Hop nrow = HopRewriteUtils.createValueHop(pmInput, true); //NROW
-               HopRewriteUtils.setOutputBlocksizes(nrow, 0, 0);
-               nrow.setForcedExecType(ExecType.CP);
-               HopRewriteUtils.copyLineNumbers(this, nrow);
-               Lop lnrow = nrow.constructLops();
-               
+               //b) condensed permutation matrix vector input (target rows)    
        
                boolean needPart = !pmInput.dimsKnown() || pmInput.getDim1() > 
DistributedCacheInput.PARTITION_SIZE;
-               double mestPM = OptimizerUtils.estimateSize(pmInput.getDim1(), 
1);
                if( needPart ){ //requires partitioning
-                       lpmInput = new DataPartition(lpmInput, DataType.MATRIX, 
ValueType.DOUBLE, 
(mestPM>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, 
PDataPartitionFormat.ROW_BLOCK_WISE_N);
+                       lpmInput = new DataPartition(lpmInput, DataType.MATRIX, 
ValueType.DOUBLE, etVect, PDataPartitionFormat.ROW_BLOCK_WISE_N);
                        
lpmInput.getOutputParameters().setDimensions(pmInput.getDim1(), 1, 
getRowsInBlock(), getColsInBlock(), pmInput.getDim1());
                        setLineNumbers(lpmInput);       
                }
                
                _outputEmptyBlocks = 
!OptimizerUtils.allowsToFilterEmptyBlockOutputs(this); 
-               PMMJ pmm = new PMMJ(lpmInput, rightInput.constructLops(), 
lnrow, getDataType(), getValueType(), needPart, _outputEmptyBlocks, 
ExecType.MR);
+               PMMJ pmm = new PMMJ(lpmInput, rightInput.constructLops(), 
nrow.constructLops(), getDataType(), getValueType(), needPart, 
_outputEmptyBlocks, ExecType.MR);
                pmm.getOutputParameters().setDimensions(getDim1(), getDim2(), 
getRowsInBlock(), getColsInBlock(), getNnz());
                setLineNumbers(pmm);
                
@@ -1367,7 +1388,6 @@ public class AggBinaryOp extends Hop implements 
MultiThreadedHop
                aggregate.getOutputParameters().setDimensions(getDim1(), 
getDim2(), getRowsInBlock(), getColsInBlock(), getNnz());
                aggregate.setupCorrectionLocation(CorrectionLocationType.NONE); 
// aggregation uses kahanSum but the inputs do not have correction values
                setLineNumbers(aggregate);
-               
                setLops(aggregate);
                
                HopRewriteUtils.removeChildReference(pmInput, nrow);            

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0a4d563e/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java 
b/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java
index f044774..eb87197 100644
--- a/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java
@@ -720,20 +720,27 @@ public abstract class AutomatedTestBase
         * @param fileName
         * @param mc
         */
-       public static void checkDMLMetaDataFile(String fileName, 
MatrixCharacteristics mc)
+       public static void checkDMLMetaDataFile(String fileName, 
MatrixCharacteristics mc) {
+               MatrixCharacteristics rmc = readDMLMetaDataFile(fileName);
+               Assert.assertEquals(mc.getRows(), rmc.getRows());
+               Assert.assertEquals(mc.getCols(), rmc.getCols());
+       }
+       
+       /**
+        * 
+        * @param fileName
+        * @return
+        */
+       public static MatrixCharacteristics readDMLMetaDataFile(String fileName)
        {
-               try
-               {
+               try {
                        String fname = baseDirectory + OUTPUT_DIR + fileName 
+".mtd";
                        JSONObject meta = new 
DataExpression().readMetadataFile(fname, false);
                        long rlen = 
Long.parseLong(meta.get(DataExpression.READROWPARAM).toString());
                        long clen = 
Long.parseLong(meta.get(DataExpression.READCOLPARAM).toString());
-                       
-                       Assert.assertEquals(mc.getRows(), rlen);
-                       Assert.assertEquals(mc.getCols(), clen);
+                       return new MatrixCharacteristics(rlen, clen, -1, -1);
                }
-               catch(Exception ex)
-               {
+               catch(Exception ex) {
                        throw new RuntimeException(ex);
                }
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0a4d563e/src/test/java/org/apache/sysml/test/integration/applications/parfor/ParForSampleTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/sysml/test/integration/applications/parfor/ParForSampleTest.java
 
b/src/test/java/org/apache/sysml/test/integration/applications/parfor/ParForSampleTest.java
new file mode 100644
index 0000000..5b36acd
--- /dev/null
+++ 
b/src/test/java/org/apache/sysml/test/integration/applications/parfor/ParForSampleTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.applications.parfor;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map.Entry;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+/**
+ * 
+ * 
+ */
+public class ParForSampleTest extends AutomatedTestBase 
+{      
+       private final static String TEST_NAME = "parfor_sample";
+       private final static String TEST_DIR = "applications/parfor/";
+       private final static String TEST_CLASS_DIR = TEST_DIR + 
ParForSampleTest.class.getSimpleName() + "/";
+       
+       private final static int rows = 2298;
+       private final static int cols = 1123;
+
+       private final static double sparsity1 = 0.73;
+       private final static double sparsity2 = 0.25;
+       
+       
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B1","B2"}) );
+       }
+       
+       @Test
+       public void testParForSampleDenseCP() {
+               runParForSampleTest(false, ExecType.CP);
+       }
+       
+       @Test
+       public void testParForSampleSparseCP() {
+               runParForSampleTest(true, ExecType.CP);
+       }
+       
+       @Test
+       public void testParForSampleDenseMR() {
+               runParForSampleTest(false, ExecType.MR);
+       }
+       
+       @Test
+       public void testParForSampleSparseMR() {
+               runParForSampleTest(true, ExecType.MR);
+       }
+       
+       @Test
+       public void testParForSampleDenseSpark() {
+               runParForSampleTest(false, ExecType.SPARK);
+       }
+       
+       @Test
+       public void testParForSampleSparseSpark() {
+               runParForSampleTest(true, ExecType.SPARK);
+       }
+               
+       /**
+        * 
+        * @param outer
+        * @param instType
+        * @param smallMem
+        * @param sparse
+        */
+       @SuppressWarnings({ "unchecked" })
+       private void runParForSampleTest( boolean sparse, ExecType et )
+       {
+               RUNTIME_PLATFORM platformOld = rtplatform;
+               switch( et ){
+                       case MR: rtplatform = RUNTIME_PLATFORM.HADOOP; break;
+                       case SPARK: rtplatform = RUNTIME_PLATFORM.SPARK; break;
+                       default: rtplatform = RUNTIME_PLATFORM.HYBRID; break;
+               }
+       
+               boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+               if( rtplatform == RUNTIME_PLATFORM.SPARK )
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+
+
+               try
+               {
+                       //invocation arguments
+                       TestConfiguration config = 
getTestConfiguration(TEST_NAME);
+                       config.addVariable("rows", rows);
+                       config.addVariable("cols", cols);
+                       loadTestConfiguration(config);
+                       
+                       fullDMLScriptName = SCRIPT_DIR + TEST_DIR + TEST_NAME + 
".dml";
+                       programArgs = new String[]{"-explain","-args", 
input("A"), "0.8 0.2", output("B")};
+                       
+                       //generate input data + sequence in first column
+                       double[][] A = getRandomMatrix(rows, cols, -1, 1, 
sparse?sparsity2:sparsity1, 7); 
+                       for( int i=0; i<A.length; i++ )
+                               A[i][0] = (i+1);
+                       writeInputMatrixWithMTD("A", A, false);
+                       
+                       //run test case
+                       runTest(true, false, null, -1);
+                       
+                       //read result data and meta data
+                       HashMap<CellIndex, Double> B1 = 
readDMLMatrixFromHDFS("B1");                            
+                       HashMap<CellIndex, Double> B2 = 
readDMLMatrixFromHDFS("B2");                            
+                       MatrixCharacteristics B1mc = readDMLMetaDataFile("B1"); 
+                       MatrixCharacteristics B2mc = readDMLMetaDataFile("B2"); 
+                       
+                       //compare meta data
+                       Assert.assertEquals(new Long(rows), new 
Long(B1mc.getRows()+B2mc.getRows())); //join full coverage rows
+                       Assert.assertEquals(new Long(cols), new 
Long(B1mc.getCols())); //full coverage cols
+                       Assert.assertEquals(new Long(cols), new 
Long(B2mc.getCols())); //full coverage cols
+                       Assert.assertNotEquals(new Long(rows), new 
Long(B1mc.getRows())); //no sample contains all rows
+                       Assert.assertNotEquals(new Long(rows), new 
Long(B2mc.getRows())); //no sample contains all rows
+                               
+                       //compare data
+                       HashSet<Integer> probe = new HashSet<Integer>(rows);
+                       for( int i=0; i<rows; i++ )
+                               probe.add(i+1);
+                       for( HashMap<CellIndex, Double> B : new HashMap[]{ B1, 
B2 } )
+                               for( Entry<CellIndex,Double> e : B.entrySet() )
+                                       if( e.getKey().column == 1 ) {
+                                               boolean flag = 
probe.remove(e.getValue().intValue());
+                                               Assert.assertTrue("Wrong return 
value for "+e.getKey()+": "+e.getValue(), flag);
+                                       }
+               }
+               finally
+               {
+                       rtplatform = platformOld;
+                       DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0a4d563e/src/test/scripts/applications/parfor/parfor_sample.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/applications/parfor/parfor_sample.dml 
b/src/test/scripts/applications/parfor/parfor_sample.dml
new file mode 100644
index 0000000..98ac395
--- /dev/null
+++ b/src/test/scripts/applications/parfor/parfor_sample.dml
@@ -0,0 +1,69 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+# Randomly sample data (without replacement) into disjoint subsets.
+# The sizes of the subsets are specified in terms of fractions, stored
+# as a 1-column vector in a separate input file (see parameter 'sv'). 
+#
+# Parameters:
+#    X    : (input) input data set: filename of input data set
+#    sv   : (input) splitting vector: filename of 1-column vector with
+#           fractions. sum(sv) must be less than or equal to 1
+#               e.g. sv = [0.2]: Draw a 20% simple random sample
+#                    without replacement.
+#               e.g. sv = [0.25,0.25,0.25,0.25]: Randomly split data
+#                    into 4 approximately equal-sized disjoint subsets.
+#               e.g. sv = [0.5,0.3,0.2]: Randomly split data into 3
+#                    disjoint subsets that contain roughly 50%, 30%
+#                    and 20% of original data, respectively.
+#    O    : (output) output folder name. The output subsets are stored
+#           in subfolders named by consecutive integers: $O/1, $O/2,
+#           ..., $O/#subsets
+#    ofmt : (output, default "binary") format of output file. Other
+#           valid options are: "csv" and "text" 
+#
+# Example:
+#   printf "0.8\n0.2" | hadoop fs -put - /tmp/sv.csv
+#   echo '{"data_type": "matrix", "value_type": "double", "rows": 2, "cols": 
1, "format": "csv"}' | hadoop fs -put - /tmp/sv.csv.mtd
+#   hadoop jar SystemML.jar -f ./scripts/utils/sample.dml -nvargs X=/tmp/X.mtx 
sv=/tmp/sv.csv O=/tmp/Out ofmt=csv
+
+# set defaults
+ofmt = ifdef($ofmt, "text");
+
+# Read inputs
+X = read($1);                    # X: dataset
+sv = matrix($2, rows=2, cols=1); # sv: splitting fraction vector
+
+# Construct sampling lower/upper bounds for samples using prefix sum
+R = rand(rows=nrow(X), cols=1, min=0.0, max=1.0, pdf = "uniform");
+svLowBnd = cumsum(sv) - sv;
+svUpBnd = cumsum(sv);
+
+# Construct sampling matrix SM, and apply to create samples
+parfor ( i in 1:nrow(sv))
+{
+  T1 = ppred(R, as.scalar(svUpBnd[i,1]), "<=");
+  T2 = ppred(R, as.scalar(svLowBnd[i,1]), ">");
+  SM = T1 * T2; 
+  P = removeEmpty(target=diag(SM), margin="rows");
+  iX = P %*% X;
+  write (iX, $3 + i, format=ofmt);
+}

Reply via email to