Repository: incubator-systemml Updated Branches: refs/heads/master 2a1eb4c9b -> bda96a8e8
[SYSTEMML-1551] New multi-threaded colum-wise rexpand operations This patch introduces a multi-threaded runtime for the internal parameterized built-in function rexpand, specifically column expansion, along with necessary compiler modifications. The runtime improvements are moderate for both dense and sparse, ranging from 1.6x to 2x due to better write bandwidth exploitation (dense) and latency hiding (sparse). Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/bda96a8e Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/bda96a8e Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/bda96a8e Branch: refs/heads/master Commit: bda96a8e8690f8821c476d5370c0d39e41da19a2 Parents: 2a1eb4c Author: Matthias Boehm <mboe...@gmail.com> Authored: Fri Apr 21 00:08:09 2017 -0700 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Fri Apr 21 00:08:09 2017 -0700 ---------------------------------------------------------------------- .../sysml/hops/ParameterizedBuiltinOp.java | 3 +- .../sysml/hops/rewrite/HopRewriteUtils.java | 14 ++-- .../apache/sysml/lops/ParameterizedBuiltin.java | 21 ++++- .../runtime/compress/CompressedMatrixBlock.java | 4 +- .../parfor/opt/OptimizerRuleBased.java | 3 +- .../cp/ParameterizedBuiltinCPInstruction.java | 5 +- .../runtime/matrix/data/LibMatrixReorg.java | 87 +++++++++++++++++--- .../sysml/runtime/matrix/data/MatrixBlock.java | 4 +- 8 files changed, 116 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java index 1d6828c..74542f4 100644 --- a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java +++ b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java @@ -784,8 +784,9 @@ public class ParameterizedBuiltinOp extends Hop implements MultiThreadedHop { if( et == ExecType.CP || et == ExecType.SPARK ) { + int k = OptimizerUtils.getConstrainedNumThreads( _maxNumThreads ); ParameterizedBuiltin pbilop = new ParameterizedBuiltin(inputlops, - HopsParameterizedBuiltinLops.get(_op), getDataType(), getValueType(), et); + HopsParameterizedBuiltinLops.get(_op), getDataType(), getValueType(), et, k); setOutputDimensions(pbilop); setLineNumbers(pbilop); setLops(pbilop); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java b/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java index f3baec1..f7be8b9 100644 --- a/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java +++ b/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java @@ -1048,23 +1048,27 @@ public class HopRewriteUtils ////////////////////////////////////// // utils for lookup tables - public static boolean isValidOp( AggOp input, AggOp[] validTab ) { + public static boolean isValidOp( AggOp input, AggOp... validTab ) { return ArrayUtils.contains(validTab, input); } - public static boolean isValidOp( OpOp1 input, OpOp1[] validTab ) { + public static boolean isValidOp( OpOp1 input, OpOp1... validTab ) { return ArrayUtils.contains(validTab, input); } - public static boolean isValidOp( OpOp2 input, OpOp2[] validTab ) { + public static boolean isValidOp( OpOp2 input, OpOp2... validTab ) { return ArrayUtils.contains(validTab, input); } - public static boolean isValidOp( ReOrgOp input, ReOrgOp[] validTab ) { + public static boolean isValidOp( ReOrgOp input, ReOrgOp... validTab ) { return ArrayUtils.contains(validTab, input); } - public static int getValidOpPos( OpOp2 input, OpOp2[] validTab ) { + public static boolean isValidOp( ParamBuiltinOp input, ParamBuiltinOp... validTab ) { + return ArrayUtils.contains(validTab, input); + } + + public static int getValidOpPos( OpOp2 input, OpOp2... validTab ) { return ArrayUtils.indexOf(validTab, input); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java b/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java index 81593b8..fdaf3c5 100644 --- a/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java +++ b/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java @@ -48,7 +48,16 @@ public class ParameterizedBuiltin extends Lop private HashMap<String, Lop> _inputParams; private boolean _bRmEmptyBC; + //cp-specific parameters + private int _numThreads = 1; + public ParameterizedBuiltin(HashMap<String, Lop> paramLops, OperationTypes op, DataType dt, ValueType vt, ExecType et) + throws HopsException + { + this(paramLops, op, dt, vt, et, 1); + } + + public ParameterizedBuiltin(HashMap<String, Lop> paramLops, OperationTypes op, DataType dt, ValueType vt, ExecType et, int k) throws HopsException { super(Lop.Type.ParameterizedBuiltin, dt, vt); @@ -60,6 +69,7 @@ public class ParameterizedBuiltin extends Lop } _inputParams = paramLops; + _numThreads = k; boolean breaksAlignment = false; boolean aligner = false; @@ -229,8 +239,15 @@ public class ParameterizedBuiltin extends Lop sb.append( _bRmEmptyBC ); sb.append(OPERAND_DELIMITOR); } - - sb.append(this.prepOutputOperand(output)); + + if( getExecType()==ExecType.CP && _operation == OperationTypes.REXPAND ) { + sb.append( "k" ); + sb.append( Lop.NAME_VALUE_SEPARATOR ); + sb.append( _numThreads ); + sb.append(OPERAND_DELIMITOR); + } + + sb.append(prepOutputOperand(output)); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java index d7c66f1..fc59065 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java @@ -2007,11 +2007,11 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable @Override public MatrixBlock rexpandOperations(MatrixBlock ret, double max, - boolean rows, boolean cast, boolean ignore) + boolean rows, boolean cast, boolean ignore, int k) throws DMLRuntimeException { printDecompressWarning("rexpandOperations"); MatrixBlock tmp = isCompressed() ? decompress() : this; - return tmp.rexpandOperations(ret, max, rows, cast, ignore); + return tmp.rexpandOperations(ret, max, rows, cast, ignore, k); } @Override http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java index 0ff7a31..851e193 100644 --- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java +++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java @@ -1359,7 +1359,8 @@ public class OptimizerRuleBased extends Optimizer if( ConfigurationManager.isParallelMatrixOperations() && h instanceof MultiThreadedHop //abop, datagenop, qop, paramop && !( h instanceof ParameterizedBuiltinOp //only paramop-grpagg - && ((ParameterizedBuiltinOp)h).getOp()!=ParamBuiltinOp.GROUPEDAGG) + && !HopRewriteUtils.isValidOp(((ParameterizedBuiltinOp)h).getOp(), + ParamBuiltinOp.GROUPEDAGG, ParamBuiltinOp.REXPAND)) && !( h instanceof UnaryOp //only unaryop-cumulativeagg && !((UnaryOp)h).isCumulativeUnaryOperation() ) && !( h instanceof ReorgOp //only reorgop-transpose http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java index 7b1fb57..a31fe94 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java @@ -57,7 +57,6 @@ public class ParameterizedBuiltinCPInstruction extends ComputationCPInstruction private static final String TOSTRING_SEPARATOR = " "; private static final String TOSTRING_LINESEPARATOR = "\n"; - private int arity; protected HashMap<String,String> params; @@ -248,7 +247,9 @@ public class ParameterizedBuiltinCPInstruction extends ComputationCPInstruction boolean dirVal = params.get("dir").equals("rows"); boolean cast = Boolean.parseBoolean(params.get("cast")); boolean ignore = Boolean.parseBoolean(params.get("ignore")); - MatrixBlock ret = (MatrixBlock) target.rexpandOperations(new MatrixBlock(), maxVal, dirVal, cast, ignore); + int numThreads = Integer.parseInt(params.get("k")); + MatrixBlock ret = (MatrixBlock) target.rexpandOperations( + new MatrixBlock(), maxVal, dirVal, cast, ignore, numThreads); //release locks ec.setMatrixOutput(output.getName(), ret); http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java index 9f45590..edf69c1 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java @@ -645,7 +645,7 @@ public class LibMatrixReorg * @return output matrix * @throws DMLRuntimeException if DMLRuntimeException occurs */ - public static MatrixBlock rexpand(MatrixBlock in, MatrixBlock ret, double max, boolean rows, boolean cast, boolean ignore) + public static MatrixBlock rexpand(MatrixBlock in, MatrixBlock ret, double max, boolean rows, boolean cast, boolean ignore, int k) throws DMLRuntimeException { //prepare parameters @@ -669,7 +669,7 @@ public class LibMatrixReorg if( rows ) return rexpandRows(in, ret, lmax, cast, ignore); else //cols - return rexpandColumns(in, ret, lmax, cast, ignore); + return rexpandColumns(in, ret, lmax, cast, ignore, k); } /** @@ -694,7 +694,7 @@ public class LibMatrixReorg //execute rexpand operations incl sanity checks //TODO more robust (memory efficient) implementation w/o tmp block - MatrixBlock tmp = rexpand(in, new MatrixBlock(), max, rows, cast, ignore); + MatrixBlock tmp = rexpand(in, new MatrixBlock(), max, rows, cast, ignore, 1); //prepare outputs blocks (slice tmp block into output blocks ) if( rows ) //expanded vertically @@ -1909,7 +1909,7 @@ public class LibMatrixReorg return ret; } - private static MatrixBlock rexpandColumns(MatrixBlock in, MatrixBlock ret, int max, boolean cast, boolean ignore) + private static MatrixBlock rexpandColumns(MatrixBlock in, MatrixBlock ret, int max, boolean cast, boolean ignore, int k) throws DMLRuntimeException { //set meta data @@ -1918,10 +1918,43 @@ public class LibMatrixReorg final long nnz = in.nonZeros; boolean sp = MatrixBlock.evalSparseFormatInMemory(rlen, clen, nnz); ret.reset(rlen, clen, sp); + ret.allocateDenseOrSparseBlock(); + + //execute rexpand columns + long rnnz = 0; //real nnz (due to cutoff max) + if( k <= 1 || in.getNumRows() <= PAR_NUMCELL_THRESHOLD ) { + rnnz = rexpandColumns(in, ret, max, cast, ignore, 0, rlen); + } + else { + try { + ExecutorService pool = Executors.newFixedThreadPool( k ); + ArrayList<RExpandColsTask> tasks = new ArrayList<RExpandColsTask>(); + int blklen = (int)(Math.ceil((double)rlen/k/8)); + for( int i=0; i<8*k & i*blklen<rlen; i++ ) + tasks.add(new RExpandColsTask(in, ret, + max, cast, ignore, i*blklen, Math.min((i+1)*blklen, rlen))); + List<Future<Long>> taskret = pool.invokeAll(tasks); + pool.shutdown(); + for( Future<Long> task : taskret ) + rnnz += task.get(); + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); + } + } + + //post-processing + ret.setNonZeros(rnnz); + return ret; + } + + private static long rexpandColumns(MatrixBlock in, MatrixBlock ret, int max, boolean cast, boolean ignore, int rl, int ru) + throws DMLRuntimeException + { //expand input horizontally (input vector likely dense //but generic implementation for general case) - for( int i=0; i<rlen; i++ ) + for( int i=rl; i<ru; i++ ) { //get value and cast if necessary (table) double val = in.quickGetValue(i, 0); @@ -1931,15 +1964,23 @@ public class LibMatrixReorg //handle invalid values if not to be ignored if( !ignore && val<=0 ) throw new DMLRuntimeException("Invalid input value <= 0 for ignore=false: "+val); - + //set expanded value if matching - if( val == Math.floor(val) && val >= 1 && val <= max ) - ret.appendValue(i, (int)(val-1), 1); + if( val == Math.floor(val) && val >= 1 && val <= max ) { + //update target without global nnz maintenance + if( ret.isInSparseFormat() ) { + ret.sparseBlock.allocate(i, 1); + ret.sparseBlock.append(i, (int)(val-1), 1); + } + else + ret.setValueDenseUnsafe(i, (int)(val-1), 1); + } } - return ret; + //recompute nnz of partition + return ret.recomputeNonZeros(rl, ru-1, 0, ret.getNumColumns()-1); } - + private static void copyColVector( MatrixBlock in, int ixin, double[] tmp, int[] tmpi, int len) { //copy value array from input matrix @@ -2145,4 +2186,30 @@ public class LibMatrixReorg return countNnzPerColumn(_in, _rl, _ru); } } + + private static class RExpandColsTask implements Callable<Long> + { + private final MatrixBlock _in; + private final MatrixBlock _out; + private final int _max; + private final boolean _cast; + private final boolean _ignore; + private final int _rl; + private final int _ru; + + protected RExpandColsTask(MatrixBlock in, MatrixBlock out, int max, boolean cast, boolean ignore, int rl, int ru) { + _in = in; + _out = out; + _max = max; + _cast = cast; + _ignore = ignore; + _rl = rl; + _ru = ru; + } + + @Override + public Long call() throws DMLRuntimeException { + return rexpandColumns(_in, _out, _max, _cast, _ignore, _rl, _ru); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java index 0ed64e3..2aa1bd3 100644 --- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java @@ -5067,11 +5067,11 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab return removeEmptyOperations(ret, rows, null); } - public MatrixBlock rexpandOperations( MatrixBlock ret, double max, boolean rows, boolean cast, boolean ignore ) + public MatrixBlock rexpandOperations( MatrixBlock ret, double max, boolean rows, boolean cast, boolean ignore, int k ) throws DMLRuntimeException { MatrixBlock result = checkType(ret); - return LibMatrixReorg.rexpand(this, result, max, rows, cast, ignore); + return LibMatrixReorg.rexpand(this, result, max, rows, cast, ignore, k); }