This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new b6adff8ae5 [MINOR] CompressedMatrixBlock parallel nonzero count
b6adff8ae5 is described below
commit b6adff8ae5fa2cc29fbede25e16b8b4c81c33c7c
Author: Sebastian Baunsgaard <[email protected]>
AuthorDate: Mon Jan 6 15:22:52 2025 +0100
[MINOR] CompressedMatrixBlock parallel nonzero count
---
.../runtime/compress/CompressedMatrixBlock.java | 31 ++++++++++++++++++++++
1 file changed, 31 insertions(+)
diff --git
a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
index 68cfc6f983..e74e6c12f7 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
@@ -28,6 +28,7 @@ import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.commons.lang3.NotImplementedException;
@@ -88,6 +89,7 @@ import
org.apache.sysds.runtime.matrix.operators.ReorgOperator;
import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
import org.apache.sysds.runtime.matrix.operators.TernaryOperator;
import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
+import org.apache.sysds.runtime.util.CommonThreadPool;
import org.apache.sysds.runtime.util.IndexRange;
import org.apache.sysds.utils.DMLCompressionStatistics;
import org.apache.sysds.utils.stats.InfrastructureAnalyzer;
@@ -319,6 +321,35 @@ public class CompressedMatrixBlock extends MatrixBlock {
return nonZeros;
}
+ @Override
+ public long recomputeNonZeros(int k) {
+ if(k <= 1 || isOverlapping() || _colGroups.size() <= 1)
+ return recomputeNonZeros();
+
+ final ExecutorService pool = CommonThreadPool.get(k);
+ try {
+ List<Future<Long>> tasks = new ArrayList<>();
+ for(AColGroup g : _colGroups)
+ tasks.add(pool.submit(() ->
g.getNumberNonZeros(rlen)));
+
+ long nnz = 0;
+ for(Future<Long> t : tasks)
+ nnz += t.get();
+ nonZeros = nnz;
+ }
+ catch(Exception e) {
+ throw new DMLRuntimeException("Failed to count non
zeros", e);
+ }
+ finally {
+ pool.shutdown();
+ }
+
+ if(nonZeros == 0) // If there is no nonzeros then reallocate
into single empty column group.
+ allocateColGroup(ColGroupEmpty.create(getNumColumns()));
+
+ return nonZeros;
+ }
+
@Override
public long recomputeNonZeros(int rl, int ru) {
throw new NotImplementedException();