This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new aeefc07 [MINOR] Bug fixes in multi-threaded transformapply
aeefc07 is described below
commit aeefc077575532e433bd2384d56829b777cab596
Author: arnabp <[email protected]>
AuthorDate: Fri Feb 25 20:10:22 2022 +0100
[MINOR] Bug fixes in multi-threaded transformapply
---
.../cp/ParameterizedBuiltinCPInstruction.java | 3 ++-
.../sysds/runtime/matrix/data/MatrixBlock.java | 21 +++++++++++++++++++--
.../runtime/transform/encode/ColumnEncoder.java | 1 +
.../transform/encode/MultiColumnEncoder.java | 18 +++++++++++++-----
4 files changed, 35 insertions(+), 8 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
index be6c40a..02b1638 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ParameterizedBuiltinCPInstruction.java
@@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.lops.Lop;
import org.apache.sysds.parser.ParameterizedBuiltinFunctionExpression;
import org.apache.sysds.parser.Statement;
@@ -303,7 +304,7 @@ public class ParameterizedBuiltinCPInstruction extends
ComputationCPInstruction
// compute transformapply
MultiColumnEncoder encoder = EncoderFactory
.createEncoder(params.get("spec"), colNames,
data.getNumColumns(), meta);
- MatrixBlock mbout = encoder.apply(data);
+ MatrixBlock mbout = encoder.apply(data,
OptimizerUtils.getTransformNumThreads());
// release locks
ec.setMatrixOutput(output.getName(), mbout);
diff --git
a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
index 2a8d74d..bc7f05f 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
@@ -503,7 +503,11 @@ public class MatrixBlock extends MatrixValue implements
CacheBlock, Externalizab
public final long setNonZeros(long nnz) {
return (nonZeros = nnz);
}
-
+
+ public final long setAllNonZeros() {
+ return (nonZeros = getLength());
+ }
+
public final double getSparsity() {
return OptimizerUtils.getSparsity(rlen, clen, nonZeros);
}
@@ -661,6 +665,17 @@ public class MatrixBlock extends MatrixValue implements
CacheBlock, Externalizab
}
}
+ public void denseSuperQuickSetValue(int r, int c, double v)
+ {
+ //early abort
+ if( denseBlock==null && v==0 )
+ return;
+
+ denseBlock.set(r, c, v);
+ if( v==0 )
+ nonZeros--;
+ }
+
public double quickGetValueThreadSafe(int r, int c) {
if(sparse) {
if(!(sparseBlock instanceof SparseBlockMCSR))
@@ -5464,12 +5479,13 @@ public class MatrixBlock extends MatrixValue implements
CacheBlock, Externalizab
double v2 = that.quickGetValue(i, 0);
maxCol = ctable.execute(i+1, v2, w, maxCol,
resultBlock);
}
-
+
//update meta data (initially unknown number of columns)
//note: nnz maintained in ctable (via quickset)
if(updateClen) {
resultBlock.clen = maxCol;
}
+
return resultBlock;
}
@@ -5613,6 +5629,7 @@ public class MatrixBlock extends MatrixValue implements
CacheBlock, Externalizab
////////
// Data Generation Methods
+
// (rand, sequence)
/**
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
index df673e9..bb543d8 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
@@ -172,6 +172,7 @@ public abstract class ColumnEncoder implements Encoder,
Comparable<ColumnEncoder
int lim = Math.min(i+B, rowEnd);
for (int ii=i; ii<lim; ii++)
out.quickSetValue(ii, outputCol,
codes[ii-rowStart]);
+ //out.denseSuperQuickSetValue(ii, outputCol,
codes[ii-rowStart]);
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
index 00c7962..019a140 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
@@ -246,7 +246,7 @@ public class MultiColumnEncoder implements Encoder {
public void build(CacheBlock in, int k) {
if(hasLegacyEncoder() && !(in instanceof FrameBlock))
throw new DMLRuntimeException("LegacyEncoders do not
support non FrameBlock Inputs");
- if(_nPartitions == null) //happens if this method is directly
called from the tests
+ if(_nPartitions == null) //happens if this method is directly
called
_nPartitions = getNumRowPartitions(in, k);
if(k > 1) {
buildMT(in, k);
@@ -294,6 +294,9 @@ public class MultiColumnEncoder implements Encoder {
}
public MatrixBlock apply(CacheBlock in, int k) {
+ // domain sizes are not updated if called from transformapply
+ for(ColumnEncoderComposite columnEncoder : _columnEncoders)
+ columnEncoder.updateAllDCEncoders();
int numCols = in.getNumColumns() + getNumExtraCols();
long estNNz = (long) in.getNumColumns() * (long)
in.getNumRows();
boolean sparse =
MatrixBlock.evalSparseFormatInMemory(in.getNumRows(), numCols, estNNz);
@@ -320,6 +323,8 @@ public class MultiColumnEncoder implements Encoder {
hasDC =
columnEncoder.hasEncoder(ColumnEncoderDummycode.class);
outputMatrixPreProcessing(out, in, hasDC);
if(k > 1) {
+ if(_nPartitions == null) //happens if this method is
directly called
+ _nPartitions = getNumRowPartitions(in, k);
applyMT(in, out, outputCol, k);
}
else {
@@ -403,11 +408,11 @@ public class MultiColumnEncoder implements Encoder {
nBuild++;
int nApply = in.getNumColumns();
// #BuildBlocks = (2 * #PhysicalCores)/#build
- if (numBlocks[0] == 0 && nBuild < nThread)
+ if (numBlocks[0] == 0 && nBuild > 0 && nBuild < nThread)
numBlocks[0] = Math.round(((float)nThread)/nBuild);
// #ApplyBlocks = (4 * #PhysicalCores)/#apply
- if (numBlocks[1] == 0 && nApply < nThread*2)
- numBlocks[1] = Math.round(((float)nThread*2)/nBuild);
+ if (numBlocks[1] == 0 && nApply > 0 && nApply < nThread*2)
+ numBlocks[1] = Math.round(((float)nThread*2)/nApply);
// Reduce #blocks if #rows per partition is too small
while (numBlocks[0] > 1 && nRow/numBlocks[0] < minNumRows)
@@ -469,8 +474,11 @@ public class MultiColumnEncoder implements Encoder {
output.setSparseBlock(csrblock);
}
}
- else //dense
+ else {
+ // Allocate dense block and set nnz to total #entries
output.allocateBlock();
+ //output.setAllNonZeros();
+ }
if(DMLScript.STATISTICS) {
LOG.debug("Elapsed time for allocation: "+ ((double)
System.nanoTime() - t0) / 1000000 + " ms");