[SYSTEMML-2504] In-place CP cumulative aggregates, incl compiler This patch adds an option for in-place CP cumulative aggregates because result allocation is the major bottleneck. As an initial compiler integration, we now compiler inplace CP operations for the aggregation of partial aggregates in Spark cumsum because it guarantees validity.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/25a10f41 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/25a10f41 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/25a10f41 Branch: refs/heads/master Commit: 25a10f412614235d8974f371a2bb07bc08c88cee Parents: 21b1a53 Author: Matthias Boehm <mboe...@gmail.com> Authored: Wed Dec 5 20:38:37 2018 +0100 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Wed Dec 5 20:38:37 2018 +0100 ---------------------------------------------------------------------- .../java/org/apache/sysml/hops/UnaryOp.java | 10 +++++----- src/main/java/org/apache/sysml/lops/Unary.java | 7 +++++-- .../instructions/cp/UnaryCPInstruction.java | 5 +++-- .../sysml/runtime/matrix/data/LibMatrixAgg.java | 20 +++++++++++++++----- .../runtime/matrix/operators/UnaryOperator.java | 10 ++++++++-- 5 files changed, 36 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/25a10f41/src/main/java/org/apache/sysml/hops/UnaryOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/UnaryOp.java b/src/main/java/org/apache/sysml/hops/UnaryOp.java index d1110c3..4071d6f 100644 --- a/src/main/java/org/apache/sysml/hops/UnaryOp.java +++ b/src/main/java/org/apache/sysml/hops/UnaryOp.java @@ -170,7 +170,7 @@ public class UnaryOp extends MultiThreadedHop int k = isCumulativeUnaryOperation() || isExpensiveUnaryOperation() ? OptimizerUtils.getConstrainedNumThreads( _maxNumThreads ) : 1; Unary unary1 = new Unary(input.constructLops(), - HopsOpOp1LopsU.get(_op), getDataType(), getValueType(), et, k); + HopsOpOp1LopsU.get(_op), getDataType(), getValueType(), et, k, false); setOutputDimensions(unary1); setLineNumbers(unary1); setLops(unary1); @@ -404,15 +404,15 @@ public class UnaryOp extends MultiThreadedHop agg.getOutputParameters().setDimensions(rlenAgg, clen, brlen, bclen, -1); agg.setupCorrectionLocation(CorrectionLocationType.NONE); // aggregation uses kahanSum but the inputs do not have correction values setLineNumbers(agg); - TEMP = agg; + TEMP = agg; level++; force = false; //in case of unknowns, generate one level } //in-memory cum sum (of partial aggregates) if( TEMP.getOutputParameters().getNumRows()!=1 ) { - int k = OptimizerUtils.getConstrainedNumThreads( _maxNumThreads ); - Unary unary1 = new Unary( TEMP, HopsOpOp1LopsU.get(_op), DataType.MATRIX, ValueType.DOUBLE, ExecType.CP, k); + int k = OptimizerUtils.getConstrainedNumThreads( _maxNumThreads ); + Unary unary1 = new Unary( TEMP, HopsOpOp1LopsU.get(_op), DataType.MATRIX, ValueType.DOUBLE, ExecType.CP, k, true); unary1.getOutputParameters().setDimensions(TEMP.getOutputParameters().getNumRows(), clen, brlen, bclen, -1); setLineNumbers(unary1); TEMP = unary1; @@ -487,7 +487,7 @@ public class UnaryOp extends MultiThreadedHop //in-memory cum sum (of partial aggregates) if( TEMP.getOutputParameters().getNumRows()!=1 ){ int k = OptimizerUtils.getConstrainedNumThreads( _maxNumThreads ); - Unary unary1 = new Unary( TEMP, HopsOpOp1LopsU.get(_op), DataType.MATRIX, ValueType.DOUBLE, ExecType.CP, k); + Unary unary1 = new Unary( TEMP, HopsOpOp1LopsU.get(_op), DataType.MATRIX, ValueType.DOUBLE, ExecType.CP, k, true); unary1.getOutputParameters().setDimensions(TEMP.getOutputParameters().getNumRows(), clen, brlen, bclen, -1); setLineNumbers(unary1); TEMP = unary1; http://git-wip-us.apache.org/repos/asf/systemml/blob/25a10f41/src/main/java/org/apache/sysml/lops/Unary.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/Unary.java b/src/main/java/org/apache/sysml/lops/Unary.java index c6f3151..f299603 100644 --- a/src/main/java/org/apache/sysml/lops/Unary.java +++ b/src/main/java/org/apache/sysml/lops/Unary.java @@ -53,7 +53,7 @@ public class Unary extends Lop //cp-specific parameters private int _numThreads = 1; - + private boolean _inplace = false; /** * Constructor to perform a unary operation with 2 inputs @@ -114,10 +114,11 @@ public class Unary extends Lop * @param et execution type * @param numThreads number of threads */ - public Unary(Lop input1, OperationTypes op, DataType dt, ValueType vt, ExecType et, int numThreads) { + public Unary(Lop input1, OperationTypes op, DataType dt, ValueType vt, ExecType et, int numThreads, boolean inplace) { super(Lop.Type.UNARY, dt, vt); init(input1, op, dt, vt, et); _numThreads = numThreads; + _inplace = inplace; } private void init(Lop input1, OperationTypes op, DataType dt, ValueType vt, ExecType et) { @@ -361,6 +362,8 @@ public class Unary extends Lop if( getExecType() == ExecType.CP && isMultiThreadedOp(operation) ) { sb.append( OPERAND_DELIMITOR ); sb.append( _numThreads ); + sb.append( OPERAND_DELIMITOR ); + sb.append( _inplace ); } return sb.toString(); http://git-wip-us.apache.org/repos/asf/systemml/blob/25a10f41/src/main/java/org/apache/sysml/runtime/instructions/cp/UnaryCPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/UnaryCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/UnaryCPInstruction.java index 9f5d71e..dcf9647 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/UnaryCPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/UnaryCPInstruction.java @@ -56,14 +56,15 @@ public abstract class UnaryCPInstruction extends ComputationCPInstruction { ValueFunction func = null; //print or stop or cumulative aggregates - if( parts.length==4 ) { + if( parts.length==5 ) { opcode = parts[0]; in.split(parts[1]); out.split(parts[2]); func = Builtin.getBuiltinFnObject(opcode); if( Arrays.asList(new String[]{"ucumk+","ucum*","ucumk+*","ucummin","ucummax","exp","log","sigmoid"}).contains(opcode) ) - return new UnaryMatrixCPInstruction(new UnaryOperator(func,Integer.parseInt(parts[3])), in, out, opcode, str); + return new UnaryMatrixCPInstruction(new UnaryOperator(func, + Integer.parseInt(parts[3]),Boolean.parseBoolean(parts[4])), in, out, opcode, str); else return new UnaryScalarCPInstruction(null, in, out, opcode, str); } http://git-wip-us.apache.org/repos/asf/systemml/blob/25a10f41/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java index c817a26..5e785d9 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixAgg.java @@ -299,8 +299,13 @@ public class LibMatrixAgg } //allocate output arrays (if required) - out.reset(m2, n2, false); //always dense - out.allocateDenseBlock(); + if( !uop.isInplace() || in.isInSparseFormat() ) { + out.reset(m2, n2, false); //always dense + out.allocateDenseBlock(); + } + else { + out = in; + } //Timing time = new Timing(true); @@ -337,13 +342,18 @@ public class LibMatrixAgg //filter empty input blocks (incl special handling for sparse-unsafe operations) if( in.isEmptyBlock(false) ){ return aggregateUnaryMatrixEmpty(in, out, aggtype, null); - } + } //Timing time = new Timing(true); //allocate output arrays (if required) - out.reset(m2, n2, false); //always dense - out.allocateDenseBlock(); + if( !uop.isInplace() || in.isInSparseFormat() ) { + out.reset(m2, n2, false); //always dense + out.allocateDenseBlock(); + } + else { + out = in; + } //core multi-threaded unary aggregate computation //(currently: always parallelization over number of rows) http://git-wip-us.apache.org/repos/asf/systemml/blob/25a10f41/src/main/java/org/apache/sysml/runtime/matrix/operators/UnaryOperator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/operators/UnaryOperator.java b/src/main/java/org/apache/sysml/runtime/matrix/operators/UnaryOperator.java index 8b3888e..b24ccf9 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/operators/UnaryOperator.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/operators/UnaryOperator.java @@ -29,12 +29,13 @@ public class UnaryOperator extends Operator public final ValueFunction fn; private final int k; //num threads + private final boolean inplace; public UnaryOperator(ValueFunction p) { - this(p, 1); //default single-threaded + this(p, 1, false); //default single-threaded } - public UnaryOperator(ValueFunction p, int numThreads) { + public UnaryOperator(ValueFunction p, int numThreads, boolean inPlace) { super(p instanceof Builtin && (((Builtin)p).bFunc==Builtin.BuiltinCode.SIN || ((Builtin)p).bFunc==Builtin.BuiltinCode.TAN // sinh and tanh are zero only at zero, else they are nnz @@ -44,9 +45,14 @@ public class UnaryOperator extends Operator || ((Builtin)p).bFunc==Builtin.BuiltinCode.LOG_NZ || ((Builtin)p).bFunc==Builtin.BuiltinCode.SIGN) ); fn = p; k = numThreads; + inplace = inPlace; } public int getNumThreads() { return k; } + + public boolean isInplace() { + return inplace; + } }