This is an automated email from the ASF dual-hosted git repository. mboehm7 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push: new ce379232c2 [SYSTEMDS-3761] Performance parallel reader textcell / matrixmarket ce379232c2 is described below commit ce379232c2786b437acaa298de4c06b9b2ff8ffd Author: Matthias Boehm <mboe...@gmail.com> AuthorDate: Wed Sep 4 15:28:06 2024 +0200 [SYSTEMDS-3761] Performance parallel reader textcell / matrixmarket The multi-threaded reader for textcell/mm text formats (IJV cells) has 4 stages of (1) determining the nnz per row (2) preallocating the sparse rows (no reallocation) (3) read the cells (through cell buffers) and synchronized flushes (4) setting the nnz and sorting the sparse rows This patch improves performance by eliminating unnecessary thread contention during the buffer flushes of step (3). If we read into an MCSR representation we synchronize on individual rows instead of the full sparse block. On reading the vas_stokes_4M.mtx dataset (4382246 rows, 4382246 cols, 131577616 nnz), this patch improved the end-to-end read runtime from 17.6s to 7.5s. --- .../sysds/runtime/io/ReaderTextCellParallel.java | 38 +++++++++++++++------- 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCellParallel.java b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCellParallel.java index b9cfd852fd..0e2dbc857f 100644 --- a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCellParallel.java +++ b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCellParallel.java @@ -41,6 +41,7 @@ import org.apache.sysds.common.Types.FileFormat; import org.apache.sysds.hops.OptimizerUtils; import org.apache.sysds.runtime.data.DenseBlock; import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.data.SparseBlockMCSR; import org.apache.sysds.runtime.matrix.data.IJV; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.CommonThreadPool; @@ -131,7 +132,6 @@ public class ReaderTextCellParallel extends ReaderTextCell dest.setNonZeros( lnnz ); if( dest.isInSparseFormat() ) sortSparseRowsParallel(dest, rlen, _numThreads, pool); - } catch (Exception e) { throw new IOException("Threadpool issue, while parallel read.", e); @@ -218,17 +218,11 @@ public class ReaderTextCellParallel extends ReaderTextCell if( _mmProps != null && _mmProps.isSymmetric() && !cell.onDiag() ) buff.addCell(cell.getJ(), cell.getI(), cell.getV()); //flush if needed (<=n-1 to allow symmetric mm, where 2 values are added) - if( buff.size()>=CellBuffer.CAPACITY-1 ) - synchronized( _dest ){ //sparse requires lock - lnnz += buff.size(); - buff.flushCellBufferToSparseBlock(sblock); - } + if( buff.size()>=CellBuffer.CAPACITY-1 ) + lnnz += flushBufferToSparseBlock(buff, sblock); } //final buffer flush - synchronized( _dest ){ //sparse requires lock - lnnz += buff.size(); - buff.flushCellBufferToSparseBlock(sblock); - } + lnnz += flushBufferToSparseBlock(buff, sblock); } else { //DENSE<-value DenseBlock a = _dest.getDenseBlock(); @@ -252,6 +246,15 @@ public class ReaderTextCellParallel extends ReaderTextCell return lnnz; } + + private static long flushBufferToSparseBlock(CellBuffer buff, SparseBlock dest) { + int ret = buff.size(); + if( dest instanceof SparseBlockMCSR ) //row synchronization + buff.flushCellBufferToSparseBlockMCSR((SparseBlockMCSR)dest); + else //full block synchronization + buff.flushCellBufferToSparseBlock(dest); + return ret; + } } public static class CountNnzTask implements Callable<Void> { @@ -346,8 +349,19 @@ public class ReaderTextCellParallel extends ReaderTextCell } public void flushCellBufferToSparseBlock( SparseBlock dest ) { - for( int i=0; i<=_pos; i++ ) - dest.append(_rlen[i], _clen[i], _vals!=null ? _vals[i] : 1); + synchronized (dest) { + for( int i=0; i<=_pos; i++ ) + dest.append(_rlen[i], _clen[i], _vals!=null ? _vals[i] : 1); + } + reset(); + } + + public void flushCellBufferToSparseBlockMCSR( SparseBlockMCSR dest ) { + for( int i=0; i<=_pos; i++ ) { + synchronized(dest.get(_rlen[i])) { + dest.append(_rlen[i], _clen[i], _vals!=null ? _vals[i] : 1); + } + } reset(); }