This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 79ec601 [SYSTEMDS-3242] Multithreaded allocation for transformencode
79ec601 is described below
commit 79ec6019843a037545aee7bd6495f91cbeb88a6e
Author: arnabp <[email protected]>
AuthorDate: Wed Dec 8 14:38:47 2021 +0100
[SYSTEMDS-3242] Multithreaded allocation for transformencode
This patch enables multi-threaded sparse target matrix allocation
for transformencode apply phase.
---
.../transform/encode/MultiColumnEncoder.java | 43 ++++++++++++++++------
1 file changed, 31 insertions(+), 12 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
index f593487..7206600 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
@@ -37,12 +37,14 @@ import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types;
import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
import org.apache.sysds.runtime.data.SparseBlock;
@@ -263,7 +265,10 @@ public class MultiColumnEncoder implements Encoder {
+ "has a encoder or slice the input
accordingly");
// TODO smart checks
// Block allocation for MT access
- outputMatrixPreProcessing(out, in);
+ boolean hasDC = false;
+ for(ColumnEncoderComposite columnEncoder : _columnEncoders)
+ hasDC =
columnEncoder.hasEncoder(ColumnEncoderDummycode.class);
+ outputMatrixPreProcessing(out, in, hasDC);
if(k > 1) {
applyMT(in, out, outputCol, k);
}
@@ -318,7 +323,7 @@ public class MultiColumnEncoder implements Encoder {
pool.shutdown();
}
- private static void outputMatrixPreProcessing(MatrixBlock output,
CacheBlock input) {
+ private static void outputMatrixPreProcessing(MatrixBlock output,
CacheBlock input, boolean hasDC) {
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
output.allocateBlock();
if(output.isInSparseFormat()) {
@@ -326,18 +331,31 @@ public class MultiColumnEncoder implements Encoder {
if(!(block instanceof SparseBlockMCSR))
throw new RuntimeException(
"Transform apply currently only
supported for MCSR sparse and dense output Matrices");
- for(int r = 0; r < output.getNumRows(); r++) {
- // allocate all sparse rows so MT sync can be
done.
- // should be rare that rows have only 0
- block.allocate(r, input.getNumColumns());
- // Setting the size here makes it possible to
run all sparse apply tasks without any sync
- // could become problematic if the input is
very sparse since we allocate the same size as the input
- // should be fine in theory ;)
-
((SparseRowVector)block.get(r)).setSize(input.getNumColumns());
+ if (hasDC && OptimizerUtils.getTransformNumThreads()>1)
{
+ // DC forces a single threaded allocation after
the build phase and
+ // before the apply starts. Below code
parallelizes sparse allocation.
+ IntStream.range(0, output.getNumRows())
+ .parallel().forEach(r -> {
+ block.allocate(r,
input.getNumColumns());
+
((SparseRowVector)block.get(r)).setSize(input.getNumColumns());
+ });
+ }
+ else {
+ for(int r = 0; r < output.getNumRows(); r++) {
+ // allocate all sparse rows so MT sync
can be done.
+ // should be rare that rows have only 0
+ block.allocate(r,
input.getNumColumns());
+ // Setting the size here makes it
possible to run all sparse apply tasks without any sync
+ // could become problematic if the
input is very sparse since we allocate the same size as the input
+ // should be fine in theory ;)
+
((SparseRowVector)block.get(r)).setSize(input.getNumColumns());
+ }
}
}
- if(DMLScript.STATISTICS)
+ if(DMLScript.STATISTICS) {
+ LOG.debug("Elapsed time for allocation: "+ ((double)
System.nanoTime() - t0) / 1000000 + " ms");
Statistics.incTransformOutMatrixPreProcessingTime(System.nanoTime()-t0);
+ }
}
private void outputMatrixPostProcessing(MatrixBlock output){
@@ -803,10 +821,11 @@ public class MultiColumnEncoder implements Encoder {
@Override
public Object call() throws Exception {
int numCols = _input.getNumColumns() +
_encoder.getNumExtraCols();
+ boolean hasDC =
_encoder.getColumnEncoders(ColumnEncoderDummycode.class).size() > 0;
long estNNz = (long) _input.getNumColumns() * (long)
_input.getNumRows();
boolean sparse =
MatrixBlock.evalSparseFormatInMemory(_input.getNumRows(), numCols, estNNz);
_output.reset(_input.getNumRows(), numCols, sparse,
estNNz);
- outputMatrixPreProcessing(_output, _input);
+ outputMatrixPreProcessing(_output, _input, hasDC);
return null;
}