Repository: systemml Updated Branches: refs/heads/master bda61b600 -> 1a58946a0
[SYSTEMML-2503/04] Fix correctness in-place and broadcast cumagg ops This patch fixes correctness issues of in-place cumulative aggregate operations and well as the handling of lineage tracing on spark cumagg offset. In addition, the patch also includes a minor performance improvement that avoids unnecessary copying of offset vectors on cumagg offset operations. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/1a58946a Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/1a58946a Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/1a58946a Branch: refs/heads/master Commit: 1a58946a0a335ccae61d0cf3873a937467ae5544 Parents: bda61b6 Author: Matthias Boehm <mboe...@gmail.com> Authored: Sat Dec 8 13:40:33 2018 +0100 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Sat Dec 8 13:40:33 2018 +0100 ---------------------------------------------------------------------- .../instructions/spark/CumulativeOffsetSPInstruction.java | 9 ++++++--- .../apache/sysml/runtime/matrix/data/LibMatrixAgg.java | 10 ++++++---- .../org/apache/sysml/runtime/matrix/data/MatrixBlock.java | 4 ++-- .../java/org/apache/sysml/runtime/util/DataConverter.java | 9 ++++++++- 4 files changed, 22 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/1a58946a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java index 1b26060..3dba53e 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/CumulativeOffsetSPInstruction.java @@ -32,6 +32,7 @@ import scala.Tuple2; import org.apache.sysml.runtime.controlprogram.context.ExecutionContext; import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysml.runtime.functionobjects.Builtin; +import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode; import org.apache.sysml.runtime.instructions.InstructionUtils; import org.apache.sysml.runtime.instructions.cp.CPOperand; import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast; @@ -94,8 +95,9 @@ public class CumulativeOffsetSPInstruction extends BinarySPInstruction { //get and join inputs JavaPairRDD<MatrixIndexes,MatrixBlock> inData = sec.getBinaryBlockRDDHandleForVariable(input1.getName()); JavaPairRDD<MatrixIndexes,Tuple2<MatrixBlock,MatrixBlock>> joined = null; + boolean broadcast = _broadcast && !SparkUtils.isHashPartitioned(inData); - if( _broadcast && !SparkUtils.isHashPartitioned(inData) ) { + if( broadcast ) { //broadcast offsets and broadcast join with data PartitionedBroadcast<MatrixBlock> inAgg = sec.getBroadcastForVariable(input2.getName()); joined = inData.mapToPair(new RDDCumSplitLookupFunction(inAgg,_initValue, rlen, brlen)); @@ -119,7 +121,7 @@ public class CumulativeOffsetSPInstruction extends BinarySPInstruction { updateUnaryOutputMatrixCharacteristics(sec); sec.setRDDHandleForVariable(output.getName(), out); sec.addLineageRDD(output.getName(), input1.getName()); - sec.addLineage(output.getName(), input2.getName(), _broadcast); + sec.addLineage(output.getName(), input2.getName(), broadcast); } private static class RDDCumSplitFunction implements PairFlatMapFunction<Tuple2<MatrixIndexes, MatrixBlock>, MatrixIndexes, MatrixBlock> @@ -229,7 +231,8 @@ public class CumulativeOffsetSPInstruction extends BinarySPInstruction { //blockwise cumagg computation, incl offset aggregation return LibMatrixAgg.cumaggregateUnaryMatrix(dblkIn, blkOut, _uop, - DataConverter.convertToDoubleVector(oblkIn)); + DataConverter.convertToDoubleVector(oblkIn, false, + ((Builtin)_uop.fn).bFunc == BuiltinCode.CUMSUM)); } } } http://git-wip-us.apache.org/repos/asf/systemml/blob/1a58946a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java index 5e785d9..ed7d8f1 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java @@ -294,14 +294,16 @@ public class LibMatrixAgg final int n2 = out.clen; //filter empty input blocks (incl special handling for sparse-unsafe operations) - if( in.isEmptyBlock(false) && (agg == null || aggtype == AggType.CUM_SUM_PROD ) ) { + if( in.isEmpty() && (agg == null || aggtype == AggType.CUM_SUM_PROD) ) { return aggregateUnaryMatrixEmpty(in, out, aggtype, null); } //allocate output arrays (if required) - if( !uop.isInplace() || in.isInSparseFormat() ) { + if( !uop.isInplace() || in.isInSparseFormat() || in.isEmpty() ) { out.reset(m2, n2, false); //always dense out.allocateDenseBlock(); + if( in.isEmpty() ) + in.allocateBlock(); } else { out = in; @@ -340,14 +342,14 @@ public class LibMatrixAgg final int mk = aggtype==AggType.CUM_KAHAN_SUM?2:1; //filter empty input blocks (incl special handling for sparse-unsafe operations) - if( in.isEmptyBlock(false) ){ + if( in.isEmpty() ){ return aggregateUnaryMatrixEmpty(in, out, aggtype, null); } //Timing time = new Timing(true); //allocate output arrays (if required) - if( !uop.isInplace() || in.isInSparseFormat() ) { + if( !uop.isInplace() || in.isInSparseFormat() || in.isEmpty() ) { out.reset(m2, n2, false); //always dense out.allocateDenseBlock(); } http://git-wip-us.apache.org/repos/asf/systemml/blob/1a58946a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index 7e93598..d1f9ac9 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -2656,9 +2656,9 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab if( LibMatrixAgg.isSupportedUnaryOperator(op) ) { //e.g., cumsum/cumprod/cummin/cumax/cumsumprod if( op.getNumThreads() > 1 ) - LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, op, op.getNumThreads()); + ret = LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, op, op.getNumThreads()); else - LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, op); + ret = LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, op); } else if(!sparse && !isEmptyBlock(false) && OptimizerUtils.isMaxLocalParallelism(op.getNumThreads())) { http://git-wip-us.apache.org/repos/asf/systemml/blob/1a58946a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java index f4d3fb5..0c99959 100644 --- a/src/main/java/org/apache/sysml/runtime/util/DataConverter.java +++ b/src/main/java/org/apache/sysml/runtime/util/DataConverter.java @@ -344,8 +344,15 @@ public class DataConverter return convertToDoubleVector(mb, true); } - public static double[] convertToDoubleVector( MatrixBlock mb, boolean deep ) + public static double[] convertToDoubleVector( MatrixBlock mb, boolean deep ) { + return convertToDoubleVector(mb, deep, false); + } + + public static double[] convertToDoubleVector( MatrixBlock mb, boolean deep, boolean allowNull ) { + if( mb.isEmpty() && allowNull ) + return null; + int rows = mb.getNumRows(); int cols = mb.getNumColumns(); double[] ret = (!mb.isInSparseFormat() && mb.isAllocated() && !deep) ?