This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new ce379232c2 [SYSTEMDS-3761] Performance parallel reader textcell / 
matrixmarket
ce379232c2 is described below

commit ce379232c2786b437acaa298de4c06b9b2ff8ffd
Author: Matthias Boehm <mboe...@gmail.com>
AuthorDate: Wed Sep 4 15:28:06 2024 +0200

    [SYSTEMDS-3761] Performance parallel reader textcell / matrixmarket
    
    The multi-threaded reader for textcell/mm text formats (IJV cells)
    has 4 stages of
    (1) determining the nnz per row
    (2) preallocating the sparse rows (no reallocation)
    (3) read the cells (through cell buffers) and synchronized flushes
    (4) setting the nnz and sorting the sparse rows
    
    This patch improves performance by eliminating unnecessary thread
    contention during the buffer flushes of step (3). If we read into
    an MCSR representation we synchronize on individual rows instead of the
    full sparse block.
    
    On reading the vas_stokes_4M.mtx dataset (4382246 rows, 4382246 cols,
    131577616 nnz), this patch improved the end-to-end read runtime from
    17.6s to 7.5s.
---
 .../sysds/runtime/io/ReaderTextCellParallel.java   | 38 +++++++++++++++-------
 1 file changed, 26 insertions(+), 12 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCellParallel.java 
b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCellParallel.java
index b9cfd852fd..0e2dbc857f 100644
--- a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCellParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCellParallel.java
@@ -41,6 +41,7 @@ import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.data.DenseBlock;
 import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.data.SparseBlockMCSR;
 import org.apache.sysds.runtime.matrix.data.IJV;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.CommonThreadPool;
@@ -131,7 +132,6 @@ public class ReaderTextCellParallel extends ReaderTextCell
                        dest.setNonZeros( lnnz );
                        if( dest.isInSparseFormat() ) 
                                sortSparseRowsParallel(dest, rlen, _numThreads, 
pool);
-                       
                } 
                catch (Exception e) {
                        throw new IOException("Threadpool issue, while parallel 
read.", e);
@@ -218,17 +218,11 @@ public class ReaderTextCellParallel extends ReaderTextCell
                                                if( _mmProps != null && 
_mmProps.isSymmetric() && !cell.onDiag() )
                                                        
buff.addCell(cell.getJ(), cell.getI(), cell.getV());
                                                //flush if needed (<=n-1 to 
allow symmetric mm, where 2 values are added)
-                                               if( 
buff.size()>=CellBuffer.CAPACITY-1 ) 
-                                                       synchronized( _dest ){ 
//sparse requires lock
-                                                               lnnz += 
buff.size();
-                                                               
buff.flushCellBufferToSparseBlock(sblock);
-                                                       }
+                                               if( 
buff.size()>=CellBuffer.CAPACITY-1 )
+                                                       lnnz += 
flushBufferToSparseBlock(buff, sblock);
                                        }
                                        //final buffer flush 
-                                       synchronized( _dest ){ //sparse 
requires lock
-                                               lnnz += buff.size();
-                                               
buff.flushCellBufferToSparseBlock(sblock);
-                                       }
+                                       lnnz += flushBufferToSparseBlock(buff, 
sblock);
                                } 
                                else { //DENSE<-value
                                        DenseBlock a = _dest.getDenseBlock();
@@ -252,6 +246,15 @@ public class ReaderTextCellParallel extends ReaderTextCell
                        
                        return lnnz;
                }
+               
+               private static long flushBufferToSparseBlock(CellBuffer buff, 
SparseBlock dest) {
+                       int ret = buff.size();
+                       if( dest instanceof SparseBlockMCSR ) //row 
synchronization
+                               
buff.flushCellBufferToSparseBlockMCSR((SparseBlockMCSR)dest);
+                       else //full block synchronization
+                               buff.flushCellBufferToSparseBlock(dest);
+                       return ret;
+               }
        }
        
        public static class CountNnzTask implements Callable<Void> {
@@ -346,8 +349,19 @@ public class ReaderTextCellParallel extends ReaderTextCell
                }
                
                public void flushCellBufferToSparseBlock( SparseBlock dest ) {
-                       for( int i=0; i<=_pos; i++ )
-                               dest.append(_rlen[i], _clen[i], _vals!=null ? 
_vals[i] : 1);
+                       synchronized (dest) {
+                               for( int i=0; i<=_pos; i++ )
+                                       dest.append(_rlen[i], _clen[i], 
_vals!=null ? _vals[i] : 1);
+                       }
+                       reset();
+               }
+               
+               public void flushCellBufferToSparseBlockMCSR( SparseBlockMCSR 
dest ) {
+                       for( int i=0; i<=_pos; i++ ) {
+                               synchronized(dest.get(_rlen[i])) {
+                                       dest.append(_rlen[i], _clen[i], 
_vals!=null ? _vals[i] : 1);
+                               }
+                       }
                        reset();
                }
                

Reply via email to