Repository: incubator-systemml Updated Branches: refs/heads/master 8a6b3856c -> 0a4d563e4
[SYSTEMML-643] Fix mr/spark pmm compilation (row meta data), sample test So far the output meta data of pmm (e.g., for removeEmpty(diag(V))%*%X) was always computed as nrow(M) which is incorrect in the case where removeEmpty(diag) was never materialized. On Spark, runtime meta data management mitigated the problem. This patch fixes this issue for both mr/spark by computing the output number of rows as (1) nrow(M) if removeEmpty(diag) is materialized, and (2) max(V) otherwise. Furthermore, this change also introduce sample.dml into out application testsuite (incl data and meta data checks). Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/0a4d563e Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/0a4d563e Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/0a4d563e Branch: refs/heads/master Commit: 0a4d563e4c97ecbfd681ad04db43dcf3ec3d9243 Parents: 8a6b385 Author: Matthias Boehm <mbo...@us.ibm.com> Authored: Thu Apr 21 21:35:06 2016 -0700 Committer: Matthias Boehm <mbo...@us.ibm.com> Committed: Fri Apr 22 09:52:13 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/sysml/hops/AggBinaryOp.java | 68 +++++--- .../test/integration/AutomatedTestBase.java | 23 ++- .../applications/parfor/ParForSampleTest.java | 162 +++++++++++++++++++ .../applications/parfor/parfor_sample.dml | 69 ++++++++ 4 files changed, 290 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0a4d563e/src/main/java/org/apache/sysml/hops/AggBinaryOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java index 02d32ff..8a8cffc 100644 --- a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java +++ b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java @@ -901,8 +901,12 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop long brlen = pmInput.getRowsInBlock(); long bclen = pmInput.getColsInBlock(); - //a) full permutation matrix input (potentially without empty block materialized) Lop lpmInput = pmInput.constructLops(); + Hop nrow = null; + double mestPM = OptimizerUtils.estimateSize(pmInput.getDim1(), 1); + ExecType etVect = (mestPM>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP; + + //a) full permutation matrix input (potentially without empty block materialized) if( pmInput.getDim2() != 1 ) //not a vector { //compute condensed permutation matrix vector input @@ -930,20 +934,28 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop //mult.computeMemEstimate(memo); //select exec type HopRewriteUtils.copyLineNumbers(this, mult); - lpmInput = mult.constructLops(); + //compute NROW target via nrow(m) + nrow = HopRewriteUtils.createValueHop(pmInput, true); + HopRewriteUtils.setOutputBlocksizes(nrow, 0, 0); + nrow.setForcedExecType(ExecType.CP); + HopRewriteUtils.copyLineNumbers(this, nrow); + lpmInput = mult.constructLops(); HopRewriteUtils.removeChildReference(pmInput, transpose); } + else //input vector + { + //compute NROW target via max(v) + nrow = HopRewriteUtils.createAggUnaryOp(pmInput, AggOp.MAX, Direction.RowCol); + HopRewriteUtils.setOutputBlocksizes(nrow, 0, 0); + nrow.setForcedExecType(etVect); + HopRewriteUtils.copyLineNumbers(this, nrow); + } //b) condensed permutation matrix vector input (target rows) - Hop nrow = HopRewriteUtils.createValueHop(pmInput, true); //NROW - HopRewriteUtils.setOutputBlocksizes(nrow, 0, 0); - nrow.setForcedExecType(ExecType.CP); - HopRewriteUtils.copyLineNumbers(this, nrow); - Lop lnrow = nrow.constructLops(); - _outputEmptyBlocks = !OptimizerUtils.allowsToFilterEmptyBlockOutputs(this); - PMMJ pmm = new PMMJ(lpmInput, rightInput.constructLops(), lnrow, getDataType(), getValueType(), false, _outputEmptyBlocks, ExecType.SPARK); + PMMJ pmm = new PMMJ(lpmInput, rightInput.constructLops(), nrow.constructLops(), + getDataType(), getValueType(), false, _outputEmptyBlocks, ExecType.SPARK); setOutputDimensions(pmm); setLineNumbers(pmm); setLops(pmm); @@ -1309,8 +1321,12 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop long brlen = pmInput.getRowsInBlock(); long bclen = pmInput.getColsInBlock(); - //a) full permutation matrix input (potentially without empty block materialized) Lop lpmInput = pmInput.constructLops(); + Hop nrow = null; + double mestPM = OptimizerUtils.estimateSize(pmInput.getDim1(), 1); + ExecType etVect = (mestPM>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP; + + //a) full permutation matrix input (potentially without empty block materialized) if( pmInput.getDim2() != 1 ) //not a vector { //compute condensed permutation matrix vector input @@ -1335,31 +1351,36 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop HopRewriteUtils.setOutputBlocksizes(mult, brlen, bclen); mult.refreshSizeInformation(); mult.setForcedExecType(ExecType.MR); - //mult.computeMemEstimate(memo); //select exec type HopRewriteUtils.copyLineNumbers(this, mult); - lpmInput = mult.constructLops(); - + //compute NROW target via nrow(m) + nrow = HopRewriteUtils.createValueHop(pmInput, true); + HopRewriteUtils.setOutputBlocksizes(nrow, 0, 0); + nrow.setForcedExecType(ExecType.CP); + HopRewriteUtils.copyLineNumbers(this, nrow); + + lpmInput = mult.constructLops(); HopRewriteUtils.removeChildReference(pmInput, transpose); } + else //input vector + { + //compute NROW target via max(v) + nrow = HopRewriteUtils.createAggUnaryOp(pmInput, AggOp.MAX, Direction.RowCol); + HopRewriteUtils.setOutputBlocksizes(nrow, 0, 0); + nrow.setForcedExecType(etVect); + HopRewriteUtils.copyLineNumbers(this, nrow); + } - //b) condensed permutation matrix vector input (target rows) - Hop nrow = HopRewriteUtils.createValueHop(pmInput, true); //NROW - HopRewriteUtils.setOutputBlocksizes(nrow, 0, 0); - nrow.setForcedExecType(ExecType.CP); - HopRewriteUtils.copyLineNumbers(this, nrow); - Lop lnrow = nrow.constructLops(); - + //b) condensed permutation matrix vector input (target rows) boolean needPart = !pmInput.dimsKnown() || pmInput.getDim1() > DistributedCacheInput.PARTITION_SIZE; - double mestPM = OptimizerUtils.estimateSize(pmInput.getDim1(), 1); if( needPart ){ //requires partitioning - lpmInput = new DataPartition(lpmInput, DataType.MATRIX, ValueType.DOUBLE, (mestPM>OptimizerUtils.getLocalMemBudget())?ExecType.MR:ExecType.CP, PDataPartitionFormat.ROW_BLOCK_WISE_N); + lpmInput = new DataPartition(lpmInput, DataType.MATRIX, ValueType.DOUBLE, etVect, PDataPartitionFormat.ROW_BLOCK_WISE_N); lpmInput.getOutputParameters().setDimensions(pmInput.getDim1(), 1, getRowsInBlock(), getColsInBlock(), pmInput.getDim1()); setLineNumbers(lpmInput); } _outputEmptyBlocks = !OptimizerUtils.allowsToFilterEmptyBlockOutputs(this); - PMMJ pmm = new PMMJ(lpmInput, rightInput.constructLops(), lnrow, getDataType(), getValueType(), needPart, _outputEmptyBlocks, ExecType.MR); + PMMJ pmm = new PMMJ(lpmInput, rightInput.constructLops(), nrow.constructLops(), getDataType(), getValueType(), needPart, _outputEmptyBlocks, ExecType.MR); pmm.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz()); setLineNumbers(pmm); @@ -1367,7 +1388,6 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop aggregate.getOutputParameters().setDimensions(getDim1(), getDim2(), getRowsInBlock(), getColsInBlock(), getNnz()); aggregate.setupCorrectionLocation(CorrectionLocationType.NONE); // aggregation uses kahanSum but the inputs do not have correction values setLineNumbers(aggregate); - setLops(aggregate); HopRewriteUtils.removeChildReference(pmInput, nrow); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0a4d563e/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java b/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java index f044774..eb87197 100644 --- a/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java +++ b/src/test/java/org/apache/sysml/test/integration/AutomatedTestBase.java @@ -720,20 +720,27 @@ public abstract class AutomatedTestBase * @param fileName * @param mc */ - public static void checkDMLMetaDataFile(String fileName, MatrixCharacteristics mc) + public static void checkDMLMetaDataFile(String fileName, MatrixCharacteristics mc) { + MatrixCharacteristics rmc = readDMLMetaDataFile(fileName); + Assert.assertEquals(mc.getRows(), rmc.getRows()); + Assert.assertEquals(mc.getCols(), rmc.getCols()); + } + + /** + * + * @param fileName + * @return + */ + public static MatrixCharacteristics readDMLMetaDataFile(String fileName) { - try - { + try { String fname = baseDirectory + OUTPUT_DIR + fileName +".mtd"; JSONObject meta = new DataExpression().readMetadataFile(fname, false); long rlen = Long.parseLong(meta.get(DataExpression.READROWPARAM).toString()); long clen = Long.parseLong(meta.get(DataExpression.READCOLPARAM).toString()); - - Assert.assertEquals(mc.getRows(), rlen); - Assert.assertEquals(mc.getCols(), clen); + return new MatrixCharacteristics(rlen, clen, -1, -1); } - catch(Exception ex) - { + catch(Exception ex) { throw new RuntimeException(ex); } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0a4d563e/src/test/java/org/apache/sysml/test/integration/applications/parfor/ParForSampleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/sysml/test/integration/applications/parfor/ParForSampleTest.java b/src/test/java/org/apache/sysml/test/integration/applications/parfor/ParForSampleTest.java new file mode 100644 index 0000000..5b36acd --- /dev/null +++ b/src/test/java/org/apache/sysml/test/integration/applications/parfor/ParForSampleTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.test.integration.applications.parfor; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map.Entry; + +import org.junit.Assert; +import org.junit.Test; +import org.apache.sysml.api.DMLScript; +import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM; +import org.apache.sysml.lops.LopProperties.ExecType; +import org.apache.sysml.runtime.matrix.MatrixCharacteristics; +import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex; +import org.apache.sysml.test.integration.AutomatedTestBase; +import org.apache.sysml.test.integration.TestConfiguration; +import org.apache.sysml.test.utils.TestUtils; + +/** + * + * + */ +public class ParForSampleTest extends AutomatedTestBase +{ + private final static String TEST_NAME = "parfor_sample"; + private final static String TEST_DIR = "applications/parfor/"; + private final static String TEST_CLASS_DIR = TEST_DIR + ParForSampleTest.class.getSimpleName() + "/"; + + private final static int rows = 2298; + private final static int cols = 1123; + + private final static double sparsity1 = 0.73; + private final static double sparsity2 = 0.25; + + + @Override + public void setUp() { + TestUtils.clearAssertionInformation(); + addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"B1","B2"}) ); + } + + @Test + public void testParForSampleDenseCP() { + runParForSampleTest(false, ExecType.CP); + } + + @Test + public void testParForSampleSparseCP() { + runParForSampleTest(true, ExecType.CP); + } + + @Test + public void testParForSampleDenseMR() { + runParForSampleTest(false, ExecType.MR); + } + + @Test + public void testParForSampleSparseMR() { + runParForSampleTest(true, ExecType.MR); + } + + @Test + public void testParForSampleDenseSpark() { + runParForSampleTest(false, ExecType.SPARK); + } + + @Test + public void testParForSampleSparseSpark() { + runParForSampleTest(true, ExecType.SPARK); + } + + /** + * + * @param outer + * @param instType + * @param smallMem + * @param sparse + */ + @SuppressWarnings({ "unchecked" }) + private void runParForSampleTest( boolean sparse, ExecType et ) + { + RUNTIME_PLATFORM platformOld = rtplatform; + switch( et ){ + case MR: rtplatform = RUNTIME_PLATFORM.HADOOP; break; + case SPARK: rtplatform = RUNTIME_PLATFORM.SPARK; break; + default: rtplatform = RUNTIME_PLATFORM.HYBRID; break; + } + + boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG; + if( rtplatform == RUNTIME_PLATFORM.SPARK ) + DMLScript.USE_LOCAL_SPARK_CONFIG = true; + + + try + { + //invocation arguments + TestConfiguration config = getTestConfiguration(TEST_NAME); + config.addVariable("rows", rows); + config.addVariable("cols", cols); + loadTestConfiguration(config); + + fullDMLScriptName = SCRIPT_DIR + TEST_DIR + TEST_NAME + ".dml"; + programArgs = new String[]{"-explain","-args", input("A"), "0.8 0.2", output("B")}; + + //generate input data + sequence in first column + double[][] A = getRandomMatrix(rows, cols, -1, 1, sparse?sparsity2:sparsity1, 7); + for( int i=0; i<A.length; i++ ) + A[i][0] = (i+1); + writeInputMatrixWithMTD("A", A, false); + + //run test case + runTest(true, false, null, -1); + + //read result data and meta data + HashMap<CellIndex, Double> B1 = readDMLMatrixFromHDFS("B1"); + HashMap<CellIndex, Double> B2 = readDMLMatrixFromHDFS("B2"); + MatrixCharacteristics B1mc = readDMLMetaDataFile("B1"); + MatrixCharacteristics B2mc = readDMLMetaDataFile("B2"); + + //compare meta data + Assert.assertEquals(new Long(rows), new Long(B1mc.getRows()+B2mc.getRows())); //join full coverage rows + Assert.assertEquals(new Long(cols), new Long(B1mc.getCols())); //full coverage cols + Assert.assertEquals(new Long(cols), new Long(B2mc.getCols())); //full coverage cols + Assert.assertNotEquals(new Long(rows), new Long(B1mc.getRows())); //no sample contains all rows + Assert.assertNotEquals(new Long(rows), new Long(B2mc.getRows())); //no sample contains all rows + + //compare data + HashSet<Integer> probe = new HashSet<Integer>(rows); + for( int i=0; i<rows; i++ ) + probe.add(i+1); + for( HashMap<CellIndex, Double> B : new HashMap[]{ B1, B2 } ) + for( Entry<CellIndex,Double> e : B.entrySet() ) + if( e.getKey().column == 1 ) { + boolean flag = probe.remove(e.getValue().intValue()); + Assert.assertTrue("Wrong return value for "+e.getKey()+": "+e.getValue(), flag); + } + } + finally + { + rtplatform = platformOld; + DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0a4d563e/src/test/scripts/applications/parfor/parfor_sample.dml ---------------------------------------------------------------------- diff --git a/src/test/scripts/applications/parfor/parfor_sample.dml b/src/test/scripts/applications/parfor/parfor_sample.dml new file mode 100644 index 0000000..98ac395 --- /dev/null +++ b/src/test/scripts/applications/parfor/parfor_sample.dml @@ -0,0 +1,69 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +# Randomly sample data (without replacement) into disjoint subsets. +# The sizes of the subsets are specified in terms of fractions, stored +# as a 1-column vector in a separate input file (see parameter 'sv'). +# +# Parameters: +# X : (input) input data set: filename of input data set +# sv : (input) splitting vector: filename of 1-column vector with +# fractions. sum(sv) must be less than or equal to 1 +# e.g. sv = [0.2]: Draw a 20% simple random sample +# without replacement. +# e.g. sv = [0.25,0.25,0.25,0.25]: Randomly split data +# into 4 approximately equal-sized disjoint subsets. +# e.g. sv = [0.5,0.3,0.2]: Randomly split data into 3 +# disjoint subsets that contain roughly 50%, 30% +# and 20% of original data, respectively. +# O : (output) output folder name. The output subsets are stored +# in subfolders named by consecutive integers: $O/1, $O/2, +# ..., $O/#subsets +# ofmt : (output, default "binary") format of output file. Other +# valid options are: "csv" and "text" +# +# Example: +# printf "0.8\n0.2" | hadoop fs -put - /tmp/sv.csv +# echo '{"data_type": "matrix", "value_type": "double", "rows": 2, "cols": 1, "format": "csv"}' | hadoop fs -put - /tmp/sv.csv.mtd +# hadoop jar SystemML.jar -f ./scripts/utils/sample.dml -nvargs X=/tmp/X.mtx sv=/tmp/sv.csv O=/tmp/Out ofmt=csv + +# set defaults +ofmt = ifdef($ofmt, "text"); + +# Read inputs +X = read($1); # X: dataset +sv = matrix($2, rows=2, cols=1); # sv: splitting fraction vector + +# Construct sampling lower/upper bounds for samples using prefix sum +R = rand(rows=nrow(X), cols=1, min=0.0, max=1.0, pdf = "uniform"); +svLowBnd = cumsum(sv) - sv; +svUpBnd = cumsum(sv); + +# Construct sampling matrix SM, and apply to create samples +parfor ( i in 1:nrow(sv)) +{ + T1 = ppred(R, as.scalar(svUpBnd[i,1]), "<="); + T2 = ppred(R, as.scalar(svLowBnd[i,1]), ">"); + SM = T1 * T2; + P = removeEmpty(target=diag(SM), margin="rows"); + iX = P %*% X; + write (iX, $3 + i, format=ofmt); +}