Repository: incubator-systemml Updated Branches: refs/heads/master 0cc4d23e5 -> 53ba37ecc
[SYSTEMML-993] Improved converters dataframe-matrix and csv-matrix This patch avoids unnecessary re-allocations of sparse rows during append. Even with estimated nnz, we still required log N re-allocations per row. Now, we pre-compute the nnz per and allocate the sparse row once, which reduces GC pressure. On a scenario of 100K x 64K csv to matrix, this improved performance from 330s to 306s (despite string-double parsing and shuffle). Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/11fde8e9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/11fde8e9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/11fde8e9 Branch: refs/heads/master Commit: 11fde8e9c07efefb6f9e440bd5defb85578696a8 Parents: 0cc4d23 Author: Matthias Boehm <mbo...@us.ibm.com> Authored: Fri Sep 30 00:19:27 2016 -0700 Committer: Matthias Boehm <mbo...@us.ibm.com> Committed: Fri Sep 30 00:19:27 2016 -0700 ---------------------------------------------------------------------- .../spark/utils/RDDConverterUtils.java | 93 ++++++++++++++------ .../sysml/runtime/io/IOUtilFunctions.java | 32 +++++++ 2 files changed, 96 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/11fde8e9/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java index 38ebd7e..29c7a44 100644 --- a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java +++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java @@ -383,6 +383,42 @@ public class RDDConverterUtils return partsize/rowsize/blksz < MatrixBlock.SPARSITY_TURN_POINT; } + /** + * + * @param vect + * @param isVector + * @return + */ + private static int countNnz(Object vect, boolean isVector, int off) { + if( isVector ) //note: numNonzeros scans entries but handles sparse/dense + return ((Vector) vect).numNonzeros(); + else + return countNnz(vect, isVector, off, ((Row)vect).length()-off); + } + + /** + * + * @param vect + * @param isVector + * @param pos + * @param len + * @return + */ + private static int countNnz(Object vect, boolean isVector, int pos, int len ) { + int lnnz = 0; + if( isVector ) { + Vector vec = (Vector) vect; + for( int i=pos; i<pos+len; i++ ) + lnnz += (vec.apply(i) != 0) ? 1 : 0; + } + else { //row + Row row = (Row) vect; + for( int i=pos; i<pos+len; i++ ) + lnnz += UtilFunctions.isNonZero(row.get(i)) ? 1 : 0; + } + return lnnz; + } + ///////////////////////////////// // BINARYBLOCK-SPECIFIC FUNCTIONS @@ -621,11 +657,7 @@ public class RDDConverterUtils String[] cols = IOUtilFunctions.split(line, _delim); //determine number of non-zeros of row (w/o string parsing) - long lnnz = 0; - for( String col : cols ) { - lnnz += (!col.isEmpty() && !col.equals("0") - && !col.equals("0.0")) ? 1 : 0; - } + int lnnz = IOUtilFunctions.countNnz(cols); //update counters _aNnz.add( (double)lnnz ); @@ -708,7 +740,12 @@ public class RDDConverterUtils boolean emptyFound = false; for( int cix=1, pix=0; cix<=ncblks; cix++ ) { - int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen); + int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen); + if( mb[cix-1].isInSparseFormat() ) { + //allocate row once (avoid re-allocations) + int lnnz = IOUtilFunctions.countNnz(parts, pix, lclen); + mb[cix-1].getSparseBlock().allocate(pos, lnnz); + } for( int j=0; j<lclen; j++ ) { String part = parts[pix++]; emptyFound |= part.isEmpty() && !_fill; @@ -739,7 +776,8 @@ public class RDDConverterUtils for( int cix=1; cix<=ncblks; cix++ ) { int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen); ix[cix-1] = new MatrixIndexes(rix, cix); - mb[cix-1] = new MatrixBlock(lrlen, lclen, _sparse, (int)(lrlen*lclen*_sparsity)); + mb[cix-1] = new MatrixBlock(lrlen, lclen, _sparse, (int)(lrlen*lclen*_sparsity)); + mb[cix-1].allocateDenseOrSparseBlock(); } } @@ -943,18 +981,22 @@ public class RDDConverterUtils //process row data int off = _containsID ? 1: 0; - if( _isVector ) { - Vector vect = (Vector) tmp._1().get(off); - for( int cix=1, pix=0; cix<=ncblks; cix++ ) { - int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen); + Object obj = _isVector ? tmp._1().get(off) : tmp._1(); + for( int cix=1, pix=_isVector?0:off; cix<=ncblks; cix++ ) { + int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen); + //allocate sparse row once (avoid re-allocations) + if( mb[cix-1].isInSparseFormat() ) { + int lnnz = countNnz(obj, _isVector, pix, lclen); + mb[cix-1].getSparseBlock().allocate(pos, lnnz); + } + //append data to matrix blocks + if( _isVector ) { + Vector vect = (Vector) obj; for( int j=0; j<lclen; j++ ) - mb[cix-1].appendValue(pos, j, vect.apply(pix++)); - } - } - else { //row - Row row = tmp._1(); - for( int cix=1, pix=off; cix<=ncblks; cix++ ) { - int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen); + mb[cix-1].appendValue(pos, j, vect.apply(pix++)); + } + else { //row + Row row = (Row) obj; for( int j=0; j<lclen; j++ ) mb[cix-1].appendValue(pos, j, UtilFunctions.getDouble(row.get(pix++))); } @@ -979,6 +1021,7 @@ public class RDDConverterUtils int lclen = (int)UtilFunctions.computeBlockSize(_clen, cix, _bclen); ix[cix-1] = new MatrixIndexes(rix, cix); mb[cix-1] = new MatrixBlock(lrlen, lclen, _sparse,(int)(lrlen*lclen*_sparsity)); + mb[cix-1].allocateDenseOrSparseBlock(); } } @@ -1016,17 +1059,9 @@ public class RDDConverterUtils public Row call(Row arg0) throws Exception { //determine number of non-zeros of row int off = _containsID ? 1 : 0; - long lnnz = 0; - if( _isVector ) { - //note: numNonzeros scans entries but handles sparse/dense - Vector vec = (Vector) arg0.get(off); - lnnz += vec.numNonzeros(); - } - else { //row - for(int i=off; i<arg0.length(); i++) - lnnz += UtilFunctions.isNonZero(arg0.get(i)) ? 1 : 0; - } - + Object vect = _isVector ? arg0.get(off) : arg0; + int lnnz = countNnz(vect, _isVector, off); + //update counters _aNnz.add( (double)lnnz ); return arg0; http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/11fde8e9/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java index 7327796..b2d25ec 100644 --- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java +++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java @@ -285,6 +285,38 @@ public class IOUtilFunctions } /** + * Returns the number of non-zero entries but avoids the expensive + * string to double parsing. This function is guaranteed to never + * underestimate. + * + * @param cols + * @return + */ + public static int countNnz(String[] cols) { + return countNnz(cols, 0, cols.length); + } + + /** + * Returns the number of non-zero entries but avoids the expensive + * string to double parsing. This function is guaranteed to never + * underestimate. + * + * @param cols + * @param pos + * @param len + * @return + */ + public static int countNnz(String[] cols, int pos, int len) { + int lnnz = 0; + for( int i=pos; i<pos+len; i++ ) { + String col = cols[i]; + lnnz += (!col.isEmpty() && !col.equals("0") + && !col.equals("0.0")) ? 1 : 0; + } + return lnnz; + } + + /** * Returns the serialized size in bytes of the given string value, * following the modified UTF-8 specification as used by Java's * DataInput/DataOutput.