This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new ce379232c2 [SYSTEMDS-3761] Performance parallel reader textcell /
matrixmarket
ce379232c2 is described below
commit ce379232c2786b437acaa298de4c06b9b2ff8ffd
Author: Matthias Boehm <[email protected]>
AuthorDate: Wed Sep 4 15:28:06 2024 +0200
[SYSTEMDS-3761] Performance parallel reader textcell / matrixmarket
The multi-threaded reader for textcell/mm text formats (IJV cells)
has 4 stages of
(1) determining the nnz per row
(2) preallocating the sparse rows (no reallocation)
(3) read the cells (through cell buffers) and synchronized flushes
(4) setting the nnz and sorting the sparse rows
This patch improves performance by eliminating unnecessary thread
contention during the buffer flushes of step (3). If we read into
an MCSR representation we synchronize on individual rows instead of the
full sparse block.
On reading the vas_stokes_4M.mtx dataset (4382246 rows, 4382246 cols,
131577616 nnz), this patch improved the end-to-end read runtime from
17.6s to 7.5s.
---
.../sysds/runtime/io/ReaderTextCellParallel.java | 38 +++++++++++++++-------
1 file changed, 26 insertions(+), 12 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCellParallel.java
b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCellParallel.java
index b9cfd852fd..0e2dbc857f 100644
--- a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCellParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCellParallel.java
@@ -41,6 +41,7 @@ import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.data.SparseBlockMCSR;
import org.apache.sysds.runtime.matrix.data.IJV;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.CommonThreadPool;
@@ -131,7 +132,6 @@ public class ReaderTextCellParallel extends ReaderTextCell
dest.setNonZeros( lnnz );
if( dest.isInSparseFormat() )
sortSparseRowsParallel(dest, rlen, _numThreads,
pool);
-
}
catch (Exception e) {
throw new IOException("Threadpool issue, while parallel
read.", e);
@@ -218,17 +218,11 @@ public class ReaderTextCellParallel extends ReaderTextCell
if( _mmProps != null &&
_mmProps.isSymmetric() && !cell.onDiag() )
buff.addCell(cell.getJ(), cell.getI(), cell.getV());
//flush if needed (<=n-1 to
allow symmetric mm, where 2 values are added)
- if(
buff.size()>=CellBuffer.CAPACITY-1 )
- synchronized( _dest ){
//sparse requires lock
- lnnz +=
buff.size();
-
buff.flushCellBufferToSparseBlock(sblock);
- }
+ if(
buff.size()>=CellBuffer.CAPACITY-1 )
+ lnnz +=
flushBufferToSparseBlock(buff, sblock);
}
//final buffer flush
- synchronized( _dest ){ //sparse
requires lock
- lnnz += buff.size();
-
buff.flushCellBufferToSparseBlock(sblock);
- }
+ lnnz += flushBufferToSparseBlock(buff,
sblock);
}
else { //DENSE<-value
DenseBlock a = _dest.getDenseBlock();
@@ -252,6 +246,15 @@ public class ReaderTextCellParallel extends ReaderTextCell
return lnnz;
}
+
+ private static long flushBufferToSparseBlock(CellBuffer buff,
SparseBlock dest) {
+ int ret = buff.size();
+ if( dest instanceof SparseBlockMCSR ) //row
synchronization
+
buff.flushCellBufferToSparseBlockMCSR((SparseBlockMCSR)dest);
+ else //full block synchronization
+ buff.flushCellBufferToSparseBlock(dest);
+ return ret;
+ }
}
public static class CountNnzTask implements Callable<Void> {
@@ -346,8 +349,19 @@ public class ReaderTextCellParallel extends ReaderTextCell
}
public void flushCellBufferToSparseBlock( SparseBlock dest ) {
- for( int i=0; i<=_pos; i++ )
- dest.append(_rlen[i], _clen[i], _vals!=null ?
_vals[i] : 1);
+ synchronized (dest) {
+ for( int i=0; i<=_pos; i++ )
+ dest.append(_rlen[i], _clen[i],
_vals!=null ? _vals[i] : 1);
+ }
+ reset();
+ }
+
+ public void flushCellBufferToSparseBlockMCSR( SparseBlockMCSR
dest ) {
+ for( int i=0; i<=_pos; i++ ) {
+ synchronized(dest.get(_rlen[i])) {
+ dest.append(_rlen[i], _clen[i],
_vals!=null ? _vals[i] : 1);
+ }
+ }
reset();
}