Repository: incubator-systemml
Updated Branches:
  refs/heads/master 0cc4d23e5 -> 53ba37ecc


[SYSTEMML-993] Improved converters dataframe-matrix and csv-matrix

This patch avoids unnecessary re-allocations of sparse rows during
append. Even with estimated nnz, we still required log N re-allocations
per row. Now, we pre-compute the nnz per and allocate the sparse row
once, which reduces GC pressure. On a scenario of 100K x 64K csv to
matrix, this improved performance from 330s to 306s (despite
string-double parsing and shuffle). 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/11fde8e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/11fde8e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/11fde8e9

Branch: refs/heads/master
Commit: 11fde8e9c07efefb6f9e440bd5defb85578696a8
Parents: 0cc4d23
Author: Matthias Boehm <mbo...@us.ibm.com>
Authored: Fri Sep 30 00:19:27 2016 -0700
Committer: Matthias Boehm <mbo...@us.ibm.com>
Committed: Fri Sep 30 00:19:27 2016 -0700

----------------------------------------------------------------------
 .../spark/utils/RDDConverterUtils.java          | 93 ++++++++++++++------
 .../sysml/runtime/io/IOUtilFunctions.java       | 32 +++++++
 2 files changed, 96 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/11fde8e9/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
index 38ebd7e..29c7a44 100644
--- 
a/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
+++ 
b/src/main/java/org/apache/sysml/runtime/instructions/spark/utils/RDDConverterUtils.java
@@ -383,6 +383,42 @@ public class RDDConverterUtils
                return partsize/rowsize/blksz < MatrixBlock.SPARSITY_TURN_POINT;
        }
        
+       /**
+        * 
+        * @param vect
+        * @param isVector
+        * @return
+        */
+       private static int countNnz(Object vect, boolean isVector, int off) {
+               if( isVector ) //note: numNonzeros scans entries but handles 
sparse/dense
+                       return ((Vector) vect).numNonzeros();
+               else 
+                       return countNnz(vect, isVector, off, 
((Row)vect).length()-off);
+       }
+       
+       /**
+        * 
+        * @param vect
+        * @param isVector
+        * @param pos
+        * @param len
+        * @return
+        */
+       private static int countNnz(Object vect, boolean isVector, int pos, int 
len ) {
+               int lnnz = 0;
+               if( isVector ) {
+                       Vector vec = (Vector) vect;
+                       for( int i=pos; i<pos+len; i++ )
+                               lnnz += (vec.apply(i) != 0) ? 1 : 0;
+               }
+               else { //row
+                       Row row = (Row) vect;
+                       for( int i=pos; i<pos+len; i++ )
+                               lnnz += UtilFunctions.isNonZero(row.get(i)) ? 1 
: 0;
+               }
+               return lnnz;
+       }
+       
        /////////////////////////////////
        // BINARYBLOCK-SPECIFIC FUNCTIONS
 
@@ -621,11 +657,7 @@ public class RDDConverterUtils
                        String[] cols = IOUtilFunctions.split(line, _delim);
                        
                        //determine number of non-zeros of row (w/o string 
parsing)
-                       long lnnz = 0;
-                       for( String col : cols ) {
-                               lnnz += (!col.isEmpty() && !col.equals("0") 
-                                               && !col.equals("0.0")) ? 1 : 0;
-                       }
+                       int lnnz = IOUtilFunctions.countNnz(cols);
                        
                        //update counters
                        _aNnz.add( (double)lnnz );
@@ -708,7 +740,12 @@ public class RDDConverterUtils
                                boolean emptyFound = false;
                                for( int cix=1, pix=0; cix<=ncblks; cix++ ) 
                                {
-                                       int lclen = 
(int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);                        
    
+                                       int lclen = 
(int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);
+                                       if( mb[cix-1].isInSparseFormat() ) {
+                                               //allocate row once (avoid 
re-allocations)
+                                               int lnnz = 
IOUtilFunctions.countNnz(parts, pix, lclen);
+                                               
mb[cix-1].getSparseBlock().allocate(pos, lnnz);
+                                       }
                                        for( int j=0; j<lclen; j++ ) {
                                                String part = parts[pix++];
                                                emptyFound |= part.isEmpty() && 
!_fill;
@@ -739,7 +776,8 @@ public class RDDConverterUtils
                        for( int cix=1; cix<=ncblks; cix++ ) {
                                int lclen = 
(int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);                        
    
                                ix[cix-1] = new MatrixIndexes(rix, cix);
-                               mb[cix-1] = new MatrixBlock(lrlen, lclen, 
_sparse, (int)(lrlen*lclen*_sparsity));               
+                               mb[cix-1] = new MatrixBlock(lrlen, lclen, 
_sparse, (int)(lrlen*lclen*_sparsity));
+                               mb[cix-1].allocateDenseOrSparseBlock();
                        }
                }
                
@@ -943,18 +981,22 @@ public class RDDConverterUtils
                                
                                //process row data
                                int off = _containsID ? 1: 0;
-                               if( _isVector ) {
-                                       Vector vect = (Vector) 
tmp._1().get(off);
-                                       for( int cix=1, pix=0; cix<=ncblks; 
cix++ ) {
-                                               int lclen = 
(int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);                        
    
+                               Object obj = _isVector ? tmp._1().get(off) : 
tmp._1();
+                               for( int cix=1, pix=_isVector?0:off; 
cix<=ncblks; cix++ ) {
+                                       int lclen = 
(int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);                        
    
+                                       //allocate sparse row once (avoid 
re-allocations)
+                                       if( mb[cix-1].isInSparseFormat() ) {
+                                               int lnnz = countNnz(obj, 
_isVector, pix, lclen);
+                                               
mb[cix-1].getSparseBlock().allocate(pos, lnnz);
+                                       }
+                                       //append data to matrix blocks
+                                       if( _isVector ) {
+                                               Vector vect = (Vector) obj;
                                                for( int j=0; j<lclen; j++ )
-                                                       
mb[cix-1].appendValue(pos, j, vect.apply(pix++));
-                                       }       
-                               }
-                               else { //row
-                                       Row row = tmp._1();
-                                       for( int cix=1, pix=off; cix<=ncblks; 
cix++ ) {
-                                               int lclen = 
(int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);                        
    
+                                                       
mb[cix-1].appendValue(pos, j, vect.apply(pix++));       
+                                       }
+                                       else { //row
+                                               Row row = (Row) obj;
                                                for( int j=0; j<lclen; j++ )
                                                        
mb[cix-1].appendValue(pos, j, UtilFunctions.getDouble(row.get(pix++)));
                                        }
@@ -979,6 +1021,7 @@ public class RDDConverterUtils
                                int lclen = 
(int)UtilFunctions.computeBlockSize(_clen, cix, _bclen);                        
    
                                ix[cix-1] = new MatrixIndexes(rix, cix);
                                mb[cix-1] = new MatrixBlock(lrlen, lclen, 
_sparse,(int)(lrlen*lclen*_sparsity));
+                               mb[cix-1].allocateDenseOrSparseBlock();
                        }
                }
                
@@ -1016,17 +1059,9 @@ public class RDDConverterUtils
                public Row call(Row arg0) throws Exception {
                        //determine number of non-zeros of row
                        int off = _containsID ? 1 : 0;
-                       long lnnz = 0;
-                       if( _isVector ) {
-                               //note: numNonzeros scans entries but handles 
sparse/dense
-                               Vector vec = (Vector) arg0.get(off);
-                               lnnz += vec.numNonzeros();
-                       }
-                       else { //row
-                               for(int i=off; i<arg0.length(); i++)
-                                       lnnz += 
UtilFunctions.isNonZero(arg0.get(i)) ? 1 : 0;
-                       }
-               
+                       Object vect = _isVector ? arg0.get(off) : arg0;
+                       int lnnz = countNnz(vect, _isVector, off);
+                       
                        //update counters
                        _aNnz.add( (double)lnnz );
                        return arg0;

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/11fde8e9/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java 
b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
index 7327796..b2d25ec 100644
--- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java
@@ -285,6 +285,38 @@ public class IOUtilFunctions
        }
        
        /**
+        * Returns the number of non-zero entries but avoids the expensive 
+        * string to double parsing. This function is guaranteed to never
+        * underestimate.
+        * 
+        * @param cols
+        * @return
+        */
+       public static int countNnz(String[] cols) {
+               return countNnz(cols, 0, cols.length);
+       }
+       
+       /**
+        * Returns the number of non-zero entries but avoids the expensive 
+        * string to double parsing. This function is guaranteed to never
+        * underestimate.
+        * 
+        * @param cols
+        * @param pos
+        * @param len
+        * @return
+        */
+       public static int countNnz(String[] cols, int pos, int len) {
+               int lnnz = 0;
+               for( int i=pos; i<pos+len; i++ ) {
+                       String col = cols[i];
+                       lnnz += (!col.isEmpty() && !col.equals("0") 
+                                       && !col.equals("0.0")) ? 1 : 0;
+               }
+               return lnnz;
+       }
+       
+       /**
         * Returns the serialized size in bytes of the given string value,
         * following the modified UTF-8 specification as used by Java's
         * DataInput/DataOutput.

Reply via email to