Repository: incubator-systemml
Updated Branches:
  refs/heads/master 2a1eb4c9b -> bda96a8e8


[SYSTEMML-1551] New multi-threaded colum-wise rexpand operations

This patch introduces a multi-threaded runtime for the internal
parameterized built-in function rexpand, specifically column expansion,
along with necessary compiler modifications. The runtime improvements
are moderate for both dense and sparse, ranging from 1.6x to 2x due to
better write bandwidth exploitation (dense) and latency hiding (sparse). 
 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/bda96a8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/bda96a8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/bda96a8e

Branch: refs/heads/master
Commit: bda96a8e8690f8821c476d5370c0d39e41da19a2
Parents: 2a1eb4c
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Fri Apr 21 00:08:09 2017 -0700
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Fri Apr 21 00:08:09 2017 -0700

----------------------------------------------------------------------
 .../sysml/hops/ParameterizedBuiltinOp.java      |  3 +-
 .../sysml/hops/rewrite/HopRewriteUtils.java     | 14 ++--
 .../apache/sysml/lops/ParameterizedBuiltin.java | 21 ++++-
 .../runtime/compress/CompressedMatrixBlock.java |  4 +-
 .../parfor/opt/OptimizerRuleBased.java          |  3 +-
 .../cp/ParameterizedBuiltinCPInstruction.java   |  5 +-
 .../runtime/matrix/data/LibMatrixReorg.java     | 87 +++++++++++++++++---
 .../sysml/runtime/matrix/data/MatrixBlock.java  |  4 +-
 8 files changed, 116 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java 
b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
index 1d6828c..74542f4 100644
--- a/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
+++ b/src/main/java/org/apache/sysml/hops/ParameterizedBuiltinOp.java
@@ -784,8 +784,9 @@ public class ParameterizedBuiltinOp extends Hop implements 
MultiThreadedHop
        {
                if( et == ExecType.CP || et == ExecType.SPARK )
                {
+                       int k = OptimizerUtils.getConstrainedNumThreads( 
_maxNumThreads );
                        ParameterizedBuiltin pbilop = new 
ParameterizedBuiltin(inputlops, 
-                                       HopsParameterizedBuiltinLops.get(_op), 
getDataType(), getValueType(), et);
+                                       HopsParameterizedBuiltinLops.get(_op), 
getDataType(), getValueType(), et, k);
                        setOutputDimensions(pbilop);
                        setLineNumbers(pbilop);
                        setLops(pbilop);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java 
b/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java
index f3baec1..f7be8b9 100644
--- a/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java
+++ b/src/main/java/org/apache/sysml/hops/rewrite/HopRewriteUtils.java
@@ -1048,23 +1048,27 @@ public class HopRewriteUtils
        //////////////////////////////////////
        // utils for lookup tables
        
-       public static boolean isValidOp( AggOp input, AggOp[] validTab ) {
+       public static boolean isValidOp( AggOp input, AggOp... validTab ) {
                return ArrayUtils.contains(validTab, input);
        }
        
-       public static boolean isValidOp( OpOp1 input, OpOp1[] validTab ) {
+       public static boolean isValidOp( OpOp1 input, OpOp1... validTab ) {
                return ArrayUtils.contains(validTab, input);
        }
        
-       public static boolean isValidOp( OpOp2 input, OpOp2[] validTab ) {
+       public static boolean isValidOp( OpOp2 input, OpOp2... validTab ) {
                return ArrayUtils.contains(validTab, input);
        }
        
-       public static boolean isValidOp( ReOrgOp input, ReOrgOp[] validTab ) {
+       public static boolean isValidOp( ReOrgOp input, ReOrgOp... validTab ) {
                return ArrayUtils.contains(validTab, input);
        }
        
-       public static int getValidOpPos( OpOp2 input, OpOp2[] validTab ) {
+       public static boolean isValidOp( ParamBuiltinOp input, 
ParamBuiltinOp... validTab ) {
+               return ArrayUtils.contains(validTab, input);
+       }
+       
+       public static int getValidOpPos( OpOp2 input, OpOp2... validTab ) {
                return ArrayUtils.indexOf(validTab, input);
        }
        

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java 
b/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java
index 81593b8..fdaf3c5 100644
--- a/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java
+++ b/src/main/java/org/apache/sysml/lops/ParameterizedBuiltin.java
@@ -48,7 +48,16 @@ public class ParameterizedBuiltin extends Lop
        private HashMap<String, Lop> _inputParams;
        private boolean _bRmEmptyBC;
 
+       //cp-specific parameters
+       private int _numThreads = 1;
+       
        public ParameterizedBuiltin(HashMap<String, Lop> paramLops, 
OperationTypes op, DataType dt, ValueType vt, ExecType et) 
+                       throws HopsException 
+       {
+               this(paramLops, op, dt, vt, et, 1);
+       }
+       
+       public ParameterizedBuiltin(HashMap<String, Lop> paramLops, 
OperationTypes op, DataType dt, ValueType vt, ExecType et, int k) 
                throws HopsException 
        {
                super(Lop.Type.ParameterizedBuiltin, dt, vt);
@@ -60,6 +69,7 @@ public class ParameterizedBuiltin extends Lop
                }
                
                _inputParams = paramLops;
+               _numThreads = k;
                
                boolean breaksAlignment = false;
                boolean aligner = false;
@@ -229,8 +239,15 @@ public class ParameterizedBuiltin extends Lop
                        sb.append( _bRmEmptyBC );
                        sb.append(OPERAND_DELIMITOR);
                }
-
-               sb.append(this.prepOutputOperand(output));
+               
+               if( getExecType()==ExecType.CP && _operation == 
OperationTypes.REXPAND ) {
+                       sb.append( "k" );
+                       sb.append( Lop.NAME_VALUE_SEPARATOR );
+                       sb.append( _numThreads );       
+                       sb.append(OPERAND_DELIMITOR);
+               }
+               
+               sb.append(prepOutputOperand(output));
                
                return sb.toString();
        }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index d7c66f1..fc59065 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -2007,11 +2007,11 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
 
        @Override
        public MatrixBlock rexpandOperations(MatrixBlock ret, double max,
-                       boolean rows, boolean cast, boolean ignore)
+                       boolean rows, boolean cast, boolean ignore, int k)
                        throws DMLRuntimeException {
                printDecompressWarning("rexpandOperations");
                MatrixBlock tmp = isCompressed() ? decompress() : this;
-               return tmp.rexpandOperations(ret, max, rows, cast, ignore);
+               return tmp.rexpandOperations(ret, max, rows, cast, ignore, k);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index 0ff7a31..851e193 100644
--- 
a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ 
b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -1359,7 +1359,8 @@ public class OptimizerRuleBased extends Optimizer
                                        if(    
ConfigurationManager.isParallelMatrixOperations() 
                                                && h instanceof 
MultiThreadedHop //abop, datagenop, qop, paramop
                                                && !( h instanceof 
ParameterizedBuiltinOp //only paramop-grpagg
-                                                        && 
((ParameterizedBuiltinOp)h).getOp()!=ParamBuiltinOp.GROUPEDAGG)
+                                                        && 
!HopRewriteUtils.isValidOp(((ParameterizedBuiltinOp)h).getOp(), 
+                                                               
ParamBuiltinOp.GROUPEDAGG, ParamBuiltinOp.REXPAND))
                                                && !( h instanceof UnaryOp 
//only unaryop-cumulativeagg
                                                         && 
!((UnaryOp)h).isCumulativeUnaryOperation() )
                                                && !( h instanceof ReorgOp 
//only reorgop-transpose

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
index 7b1fb57..a31fe94 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
@@ -57,7 +57,6 @@ public class ParameterizedBuiltinCPInstruction extends 
ComputationCPInstruction
        private static final String TOSTRING_SEPARATOR = " ";
        private static final String TOSTRING_LINESEPARATOR = "\n";
        
-       
        private int arity;
        protected HashMap<String,String> params;
        
@@ -248,7 +247,9 @@ public class ParameterizedBuiltinCPInstruction extends 
ComputationCPInstruction
                        boolean dirVal = params.get("dir").equals("rows");
                        boolean cast = Boolean.parseBoolean(params.get("cast"));
                        boolean ignore = 
Boolean.parseBoolean(params.get("ignore"));
-                       MatrixBlock ret = (MatrixBlock) 
target.rexpandOperations(new MatrixBlock(), maxVal, dirVal, cast, ignore);
+                       int numThreads = Integer.parseInt(params.get("k"));
+                       MatrixBlock ret = (MatrixBlock) 
target.rexpandOperations(
+                               new MatrixBlock(), maxVal, dirVal, cast, 
ignore, numThreads);
                        
                        //release locks
                        ec.setMatrixOutput(output.getName(), ret);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
index 9f45590..edf69c1 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
@@ -645,7 +645,7 @@ public class LibMatrixReorg
         * @return output matrix
         * @throws DMLRuntimeException if DMLRuntimeException occurs
         */
-       public static MatrixBlock rexpand(MatrixBlock in, MatrixBlock ret, 
double max, boolean rows, boolean cast, boolean ignore) 
+       public static MatrixBlock rexpand(MatrixBlock in, MatrixBlock ret, 
double max, boolean rows, boolean cast, boolean ignore, int k) 
                throws DMLRuntimeException
        {
                //prepare parameters
@@ -669,7 +669,7 @@ public class LibMatrixReorg
                if( rows )
                        return rexpandRows(in, ret, lmax, cast, ignore);
                else //cols
-                       return rexpandColumns(in, ret, lmax, cast, ignore);
+                       return rexpandColumns(in, ret, lmax, cast, ignore, k);
        }
 
        /**
@@ -694,7 +694,7 @@ public class LibMatrixReorg
                
                //execute rexpand operations incl sanity checks
                //TODO more robust (memory efficient) implementation w/o tmp 
block
-               MatrixBlock tmp = rexpand(in, new MatrixBlock(), max, rows, 
cast, ignore);
+               MatrixBlock tmp = rexpand(in, new MatrixBlock(), max, rows, 
cast, ignore, 1);
                
                //prepare outputs blocks (slice tmp block into output blocks ) 
                if( rows ) //expanded vertically
@@ -1909,7 +1909,7 @@ public class LibMatrixReorg
                return ret;
        }
 
-       private static MatrixBlock rexpandColumns(MatrixBlock in, MatrixBlock 
ret, int max, boolean cast, boolean ignore) 
+       private static MatrixBlock rexpandColumns(MatrixBlock in, MatrixBlock 
ret, int max, boolean cast, boolean ignore, int k) 
                throws DMLRuntimeException
        {
                //set meta data
@@ -1918,10 +1918,43 @@ public class LibMatrixReorg
                final long nnz = in.nonZeros;
                boolean sp = MatrixBlock.evalSparseFormatInMemory(rlen, clen, 
nnz);
                ret.reset(rlen, clen, sp);
+               ret.allocateDenseOrSparseBlock();
+               
+               //execute rexpand columns
+               long rnnz = 0; //real nnz (due to cutoff max)
+               if( k <= 1 || in.getNumRows() <= PAR_NUMCELL_THRESHOLD ) {
+                       rnnz = rexpandColumns(in, ret, max, cast, ignore, 0, 
rlen);
+               }
+               else {
+                       try {
+                               ExecutorService pool = 
Executors.newFixedThreadPool( k );
+                               ArrayList<RExpandColsTask> tasks = new 
ArrayList<RExpandColsTask>();
+                               int blklen = (int)(Math.ceil((double)rlen/k/8));
+                               for( int i=0; i<8*k & i*blklen<rlen; i++ )
+                                       tasks.add(new RExpandColsTask(in, ret, 
+                                               max, cast, ignore, i*blklen, 
Math.min((i+1)*blklen, rlen)));
+                               List<Future<Long>> taskret = 
pool.invokeAll(tasks);     
+                               pool.shutdown();
+                               for( Future<Long> task : taskret )
+                                       rnnz += task.get();
+                       }
+                       catch(Exception ex) {
+                               throw new DMLRuntimeException(ex);
+                       }
+               }
+               
+               //post-processing
+               ret.setNonZeros(rnnz);
                
+               return ret;
+       }
+
+       private static long rexpandColumns(MatrixBlock in, MatrixBlock ret, int 
max, boolean cast, boolean ignore, int rl, int ru) 
+               throws DMLRuntimeException
+       {
                //expand input horizontally (input vector likely dense 
                //but generic implementation for general case)
-               for( int i=0; i<rlen; i++ )
+               for( int i=rl; i<ru; i++ )
                {
                        //get value and cast if necessary (table)
                        double val = in.quickGetValue(i, 0);
@@ -1931,15 +1964,23 @@ public class LibMatrixReorg
                        //handle invalid values if not to be ignored
                        if( !ignore && val<=0 )
                                throw new DMLRuntimeException("Invalid input 
value <= 0 for ignore=false: "+val);
-                               
+                       
                        //set expanded value if matching
-                       if( val == Math.floor(val) && val >= 1 && val <= max )
-                               ret.appendValue(i, (int)(val-1), 1);
+                       if( val == Math.floor(val) && val >= 1 && val <= max ) {
+                               //update target without global nnz maintenance
+                               if( ret.isInSparseFormat() ) {
+                                       ret.sparseBlock.allocate(i, 1);
+                                       ret.sparseBlock.append(i, (int)(val-1), 
1);
+                               }
+                               else
+                                       ret.setValueDenseUnsafe(i, 
(int)(val-1), 1);
+                       }
                }
                
-               return ret;
+               //recompute nnz of partition
+               return ret.recomputeNonZeros(rl, ru-1, 0, 
ret.getNumColumns()-1);
        }
-
+       
        private static void copyColVector( MatrixBlock in, int ixin, double[] 
tmp, int[] tmpi, int len)
        {
                //copy value array from input matrix
@@ -2145,4 +2186,30 @@ public class LibMatrixReorg
                        return countNnzPerColumn(_in, _rl, _ru);
                }
        }
+       
+       private static class RExpandColsTask implements Callable<Long>
+       {
+               private final MatrixBlock _in;
+               private final MatrixBlock _out;
+               private final int _max;
+               private final boolean _cast;
+               private final boolean _ignore;
+               private final int _rl;
+               private final int _ru;
+
+               protected RExpandColsTask(MatrixBlock in, MatrixBlock out, int 
max, boolean cast, boolean ignore, int rl, int ru) {
+                       _in = in;
+                       _out = out;
+                       _max = max;
+                       _cast = cast;
+                       _ignore = ignore;
+                       _rl = rl;
+                       _ru = ru;
+               }
+               
+               @Override
+               public Long call() throws DMLRuntimeException {
+                       return rexpandColumns(_in, _out, _max, _cast, _ignore, 
_rl, _ru);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/bda96a8e/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 0ed64e3..2aa1bd3 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -5067,11 +5067,11 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock, Externalizab
                return removeEmptyOperations(ret, rows, null);
        }
 
-       public MatrixBlock rexpandOperations( MatrixBlock ret, double max, 
boolean rows, boolean cast, boolean ignore )
+       public MatrixBlock rexpandOperations( MatrixBlock ret, double max, 
boolean rows, boolean cast, boolean ignore, int k )
                throws DMLRuntimeException 
        {       
                MatrixBlock result = checkType(ret);
-               return LibMatrixReorg.rexpand(this, result, max, rows, cast, 
ignore);
+               return LibMatrixReorg.rexpand(this, result, max, rows, cast, 
ignore, k);
        }
        
        

Reply via email to