This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new b9eb4d8275 [SYSTEMDS-3688] Improved MatrixMarket parallel reader
performance
b9eb4d8275 is described below
commit b9eb4d8275ada2d3a9daa91481926ecc1d11ec8d
Author: Matthias Boehm <[email protected]>
AuthorDate: Thu Mar 28 15:42:52 2024 +0100
[SYSTEMDS-3688] Improved MatrixMarket parallel reader performance
This patch improves the performance of the existing parallel reader
for the matrix-market formats. The changes include
* Parallel sparse row pre-allocation (according to counted row-nnz)
* Avoid buffer-value allocation for matrix-market pattern type (0/1)
* Tailored sparse block append from buffers
On a machine with 24 pcores / 48 vcores, this patch improved the
read performance for road-network graph data as follows:
* germany_osm.mtx (first read): 3.84s -> 2.94s
* germany_osm.mtx (avg 10xread): 2.97s -> 2.25s
* europe_osm.mtx (first read): 13.67s -> 12.09s
* europe_osm.mtx (avg 10xread): 10.36s -> 8.71s
---
.../apache/sysds/runtime/data/SparseBlockMCSR.java | 2 +-
.../sysds/runtime/io/ReaderTextCellParallel.java | 40 ++++++++++++++++------
2 files changed, 30 insertions(+), 12 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/data/SparseBlockMCSR.java
b/src/main/java/org/apache/sysds/runtime/data/SparseBlockMCSR.java
index 70e36bf4cd..025da10394 100644
--- a/src/main/java/org/apache/sysds/runtime/data/SparseBlockMCSR.java
+++ b/src/main/java/org/apache/sysds/runtime/data/SparseBlockMCSR.java
@@ -381,7 +381,7 @@ public class SparseBlockMCSR extends SparseBlock
if(v == 0)
return;
else if(_rows[r] == null)
- _rows[r] = new SparseRowScalar().append(c, v);
+ _rows[r] = new SparseRowScalar(c, v);
else
_rows[r] = _rows[r].append(c, v);
}
diff --git
a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCellParallel.java
b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCellParallel.java
index 23b6b738ca..9b80252040 100644
--- a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCellParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCellParallel.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
@@ -91,6 +92,7 @@ public class ReaderTextCellParallel extends ReaderTextCell
long len = HDFSTool.getFilesizeOnHDFS(path);
par = ( len < MIN_FILESIZE_MM ) ? 1: par;
}
+ final int par2 = par;
try
{
@@ -107,10 +109,13 @@ public class ReaderTextCellParallel extends ReaderTextCell
List<Future<Void>> rt1 = pool.invokeAll(tasks);
for( Future<Void> task : rt1 )
task.get();
+ //allocate sparse block and rows
SparseBlock sblock =
dest.allocateBlock().getSparseBlock();
- for( int i=0; i<rlen; i++ )
- if( rNnz[i] > 0 )
- sblock.allocate(i,
UtilFunctions.roundToNext(rNnz[i], 4));
+ List<Future<?>> rt1b = IntStream.range(0, par)
+ .mapToObj(i -> pool.submit(() ->
preallocateSparseRows(sblock, i, par2, rlen, rNnz)))
+ .collect(Collectors.toList());
+ for( Future<?> task : rt1b )
+ task.get();
}
//create and execute read tasks for all splits
@@ -135,6 +140,17 @@ public class ReaderTextCellParallel extends ReaderTextCell
throw new IOException("Threadpool issue, while parallel
read.", e);
}
}
+
+ private static void preallocateSparseRows(SparseBlock sblock, int i,
int par, long rlen, int[] rNnz) {
+ int rl = (int) (i*rlen/par);
+ int ru = (int) Math.min((i+1)*rlen/par, rlen);
+ for( int j=rl; j<ru; j++ ) {
+ if( rNnz[j] > 1 ) //sparse row
+ sblock.allocate(j,
UtilFunctions.roundToNext(rNnz[j], 4));
+ else if( rNnz[j] == 1 ) //sparse scalar
+ sblock.allocate(j, 1);
+ }
+ }
public static class ReadTask implements Callable<Long>
{
@@ -194,7 +210,8 @@ public class ReaderTextCellParallel extends ReaderTextCell
}
if( _sparse ) { //SPARSE<-value
- CellBuffer buff = new CellBuffer();
+ CellBuffer buff = new
CellBuffer(!(_mmProps!=null && _mmProps.isPatternField()));
+ SparseBlock sblock =
_dest.getSparseBlock();
while( reader.next(key, value) ) {
cell =
parseCell(value.toString(), st, cell, _mmProps);
buff.addCell(cell.getI(),
cell.getJ(), cell.getV());
@@ -203,13 +220,13 @@ public class ReaderTextCellParallel extends ReaderTextCell
if(
buff.size()>=CellBuffer.CAPACITY )
synchronized( _dest ){
//sparse requires lock
lnnz +=
buff.size();
-
buff.flushCellBufferToMatrixBlock(_dest);
+
buff.flushCellBufferToSparseBlock(sblock);
}
}
//final buffer flush
synchronized( _dest ){ //sparse
requires lock
lnnz += buff.size();
-
buff.flushCellBufferToMatrixBlock(_dest);
+
buff.flushCellBufferToSparseBlock(sblock);
}
}
else { //DENSE<-value
@@ -311,10 +328,10 @@ public class ReaderTextCellParallel extends ReaderTextCell
private double[] _vals;
private int _pos;
- public CellBuffer( ) {
+ public CellBuffer( boolean values ) {
_rlen = new int[CAPACITY];
_clen = new int[CAPACITY];
- _vals = new double[CAPACITY];
+ _vals = values ? new double[CAPACITY] : null;
_pos = -1;
}
@@ -323,12 +340,13 @@ public class ReaderTextCellParallel extends ReaderTextCell
_pos++;
_rlen[_pos] = rlen;
_clen[_pos] = clen;
- _vals[_pos] = val;
+ if(_vals != null)
+ _vals[_pos] = val;
}
- public void flushCellBufferToMatrixBlock( MatrixBlock dest ) {
+ public void flushCellBufferToSparseBlock( SparseBlock dest ) {
for( int i=0; i<=_pos; i++ )
- dest.appendValue(_rlen[i], _clen[i], _vals[i]);
+ dest.append(_rlen[i], _clen[i], _vals!=null ?
_vals[i] : 1);
reset();
}