This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 96d7f1fc0d [MINOR] Performance row aggregates (task-local nnz
maintenance)
96d7f1fc0d is described below
commit 96d7f1fc0d0b58dbf763141d336263b1d829767d
Author: Matthias Boehm <[email protected]>
AuthorDate: Thu Mar 28 17:11:09 2024 +0100
[MINOR] Performance row aggregates (task-local nnz maintenance)
This patch makes a minor performance improvement for row aggregates
which matters when aggregating few non-zeros row. So far the number
of non-zeros of the output column vector was computed sequentially
after all aggregation tasks finished. Now, we compute (like in many
other kernels) the nnz in a task-local manner avoiding an
unnecessarily large sequential fraction.
On two graph datasets, the performance of rowMaxs improved as follows:
* germany_osm.mtx (100x rowMaxs): 5.9s -> 5.3s
* europe_osm.mtx (100x rowMaxs): 26.3s -> 21.7s
---
.../org/apache/sysds/runtime/matrix/data/LibMatrixAgg.java | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixAgg.java
b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixAgg.java
index 2efdc2a0f7..a9924c280a 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixAgg.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixAgg.java
@@ -296,13 +296,19 @@ public class LibMatrixAgg {
new RowAggTask(in, out, aggtype, uaop,
lb, lb+blklens.get(i)) :
new PartialAggTask(in, out, aggtype,
uaop, lb, lb+blklens.get(i)) );
}
- pool.invokeAll(tasks);
+ List<Future<Object>> rtasks = pool.invokeAll(tasks);
pool.shutdown();
//aggregate partial results
- if( !(uaop.indexFn instanceof ReduceCol) ) {
+ if( uaop.indexFn instanceof ReduceCol ) {
+ //error handling and nnz aggregation
+ out.setNonZeros(rtasks.stream()
+ .mapToLong(t ->
(long)UtilFunctions.getSafe(t)).sum());
+ }
+ else { //colAgg()/agg()
out.copy(((PartialAggTask)tasks.get(0)).getResult(), false); //for init
for( int i=1; i<tasks.size(); i++ )
aggregateFinalResult(uaop.aggOp, out,
((PartialAggTask)tasks.get(i)).getResult());
+ out.recomputeNonZeros();
}
}
catch(Exception ex) {
@@ -310,7 +316,6 @@ public class LibMatrixAgg {
}
//cleanup output and change representation (if necessary)
- out.recomputeNonZeros();
out.examSparsity();
//System.out.println("uagg k="+k+"
("+in.rlen+","+in.clen+","+in.sparse+") in "+time.stop()+"ms.");
@@ -3778,7 +3783,7 @@ public class LibMatrixAgg {
aggregateUnaryMatrixDense(_in, _ret, _aggtype,
_uaop.aggOp.increOp.fn, _uaop.indexFn, _rl, _ru);
else
aggregateUnaryMatrixSparse(_in, _ret, _aggtype,
_uaop.aggOp.increOp.fn, _uaop.indexFn, _rl, _ru);
- return null;
+ return _ret.recomputeNonZeros(_rl, _ru-1);
}
}