This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new cdff113 [SYSTEMDS-2972] Transformencode sparse improvements
cdff113 is described below
commit cdff113648371b2dace5905dc0eb81ebf486f094
Author: Lukas Erlbacher <[email protected]>
AuthorDate: Wed Sep 8 19:58:50 2021 +0200
[SYSTEMDS-2972] Transformencode sparse improvements
This PR introduces the sparse implementations for the transform encoders.
Furthermore, this adds support for row partitioning, statistics, and
debug logging. Now we can enable multithreaded transorm via a flag
in the config file, "parallel.encode".
Closes #1388, closes #1383
---
.../apache/sysds/conf/ConfigurationManager.java | 4 +
src/main/java/org/apache/sysds/conf/DMLConfig.java | 4 +-
.../java/org/apache/sysds/hops/OptimizerUtils.java | 18 +++
.../runtime/compress/CompressedMatrixBlock.java | 5 -
...ltiReturnParameterizedBuiltinCPInstruction.java | 4 +-
.../sysds/runtime/matrix/data/MatrixBlock.java | 35 +----
.../runtime/transform/encode/ColumnEncoder.java | 141 +++++++++++++-----
.../runtime/transform/encode/ColumnEncoderBin.java | 90 ++++++++++--
.../transform/encode/ColumnEncoderComposite.java | 26 ++--
.../transform/encode/ColumnEncoderDummycode.java | 115 +++++++--------
.../transform/encode/ColumnEncoderFeatureHash.java | 64 +++++++--
.../transform/encode/ColumnEncoderPassThrough.java | 90 ++++++++++--
.../transform/encode/ColumnEncoderRecode.java | 81 +++++++++--
.../runtime/transform/encode/EncoderFactory.java | 26 ++--
.../runtime/transform/encode/EncoderMVImpute.java | 36 +++--
.../runtime/transform/encode/EncoderOmit.java | 22 +--
.../transform/encode/MultiColumnEncoder.java | 91 ++++++++----
.../apache/sysds/runtime/util/DependencyTask.java | 24 +++-
.../sysds/runtime/util/DependencyThreadPool.java | 12 +-
.../apache/sysds/runtime/util/UtilFunctions.java | 9 ++
.../java/org/apache/sysds/utils/Statistics.java | 159 +++++++++++++++++++--
.../mt/TransformFrameBuildMultithreadedTest.java | 2 +
.../mt/TransformFrameEncodeMultithreadedTest.java | 4 +-
.../datasets/homes3/homes.tfspec_dummy_all.json | 1 -
.../datasets/homes3/homes.tfspec_dummy_sparse.json | 1 +
25 files changed, 805 insertions(+), 259 deletions(-)
diff --git a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
index 5a3667c..93654b7 100644
--- a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
+++ b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
@@ -171,6 +171,10 @@ public class ConfigurationManager
return
getCompilerConfigFlag(ConfigType.PARALLEL_CP_MATRIX_OPERATIONS);
}
+ public static boolean isParallelTransform() {
+ return
getDMLConfig().getBooleanValue(DMLConfig.PARALLEL_ENCODE);
+ }
+
public static boolean isParallelParFor() {
return
getCompilerConfigFlag(ConfigType.PARALLEL_LOCAL_OR_REMOTE_PARFOR);
}
diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java
b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index 0b7692b..db59505 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -67,6 +67,7 @@ public class DMLConfig
public static final String DEFAULT_BLOCK_SIZE =
"sysds.defaultblocksize";
public static final String CP_PARALLEL_OPS =
"sysds.cp.parallel.ops";
public static final String CP_PARALLEL_IO =
"sysds.cp.parallel.io";
+ public static final String PARALLEL_ENCODE =
"sysds.parallel.encode"; // boolean: enable multi-threaded transformencode and
apply
public static final String COMPRESSED_LINALG =
"sysds.compressed.linalg";
public static final String COMPRESSED_LOSSY =
"sysds.compressed.lossy";
public static final String COMPRESSED_VALID_COMPRESSIONS =
"sysds.compressed.valid.compressions";
@@ -125,6 +126,7 @@ public class DMLConfig
_defaultVals.put(DEFAULT_BLOCK_SIZE,
String.valueOf(OptimizerUtils.DEFAULT_BLOCKSIZE) );
_defaultVals.put(CP_PARALLEL_OPS, "true" );
_defaultVals.put(CP_PARALLEL_IO, "true" );
+ _defaultVals.put(PARALLEL_ENCODE, "false" );
_defaultVals.put(COMPRESSED_LINALG,
Compression.CompressConfig.FALSE.name() );
_defaultVals.put(COMPRESSED_LOSSY, "false" );
_defaultVals.put(COMPRESSED_VALID_COMPRESSIONS, "SDC,DDC");
@@ -398,7 +400,7 @@ public class DMLConfig
public String getConfigInfo() {
String[] tmpConfig = new String[] {
LOCAL_TMP_DIR,SCRATCH_SPACE,OPTIMIZATION_LEVEL,
DEFAULT_BLOCK_SIZE,
- CP_PARALLEL_OPS, CP_PARALLEL_IO, NATIVE_BLAS,
NATIVE_BLAS_DIR,
+ CP_PARALLEL_OPS, CP_PARALLEL_IO, PARALLEL_ENCODE,
NATIVE_BLAS, NATIVE_BLAS_DIR,
COMPRESSED_LINALG, COMPRESSED_LOSSY,
COMPRESSED_VALID_COMPRESSIONS, COMPRESSED_OVERLAPPING,
COMPRESSED_SAMPLING_RATIO, COMPRESSED_COCODE,
COMPRESSED_TRANSPOSE,
CODEGEN, CODEGEN_API, CODEGEN_COMPILER,
CODEGEN_OPTIMIZER, CODEGEN_PLANCACHE, CODEGEN_LITERALS,
diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
index 01769c7..d63b2cb 100644
--- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
@@ -1008,6 +1008,24 @@ public class OptimizerUtils
return ret;
}
+
+ public static int getTransformNumThreads(int maxNumThreads)
+ {
+ //by default max local parallelism (vcores)
+ int ret = InfrastructureAnalyzer.getLocalParallelism();
+
+ //apply external max constraint (e.g., set by parfor or other
rewrites)
+ if( maxNumThreads > 0 ) {
+ ret = Math.min(ret, maxNumThreads);
+ }
+
+ //check if enabled in config.xml
+ if( !ConfigurationManager.isParallelTransform() ) {
+ ret = 1;
+ }
+
+ return ret;
+ }
public static Level getDefaultLogLevel() {
Level log = Logger.getRootLogger().getLevel();
diff --git
a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
index 2c205f3..d374d3c 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
@@ -1345,11 +1345,6 @@ public class CompressedMatrixBlock extends MatrixBlock {
}
@Override
- public void quickSetValueThreadSafe(int r, int c, double v) {
- throw new DMLCompressionException("Thread safe execution does
not work on Compressed Matrix");
- }
-
- @Override
public double quickGetValueThreadSafe(int r, int c) {
throw new DMLCompressionException("Thread safe execution does
not work on Compressed Matrix");
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
index b6e0d97..800aa51 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.instructions.InstructionUtils;
@@ -85,7 +86,8 @@ public class MultiReturnParameterizedBuiltinCPInstruction
extends ComputationCPI
// execute block transform encode
MultiColumnEncoder encoder = EncoderFactory.createEncoder(spec,
colnames, fin.getNumColumns(), null);
- MatrixBlock data = encoder.encode(fin); // build and apply
+ // TODO: Assign #threads in compiler and pass via the
instruction string
+ MatrixBlock data = encoder.encode(fin,
OptimizerUtils.getTransformNumThreads(-1)); // build and apply
FrameBlock meta = encoder.getMetaData(new
FrameBlock(fin.getNumColumns(), ValueType.STRING));
meta.setColumnNames(colnames);
diff --git
a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
index 160f8e9..a86a878 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
@@ -117,7 +117,7 @@ import org.apache.sysds.utils.NativeHelper;
public class MatrixBlock extends MatrixValue implements CacheBlock,
Externalizable {
// private static final Log LOG =
LogFactory.getLog(MatrixBlock.class.getName());
-
+
private static final long serialVersionUID = 7319972089143154056L;
//sparsity nnz threshold, based on practical experiments on space
consumption and performance
@@ -654,27 +654,6 @@ public class MatrixBlock extends MatrixValue implements
CacheBlock, Externalizab
}
}
- /**
- * Thread save set.
- * Blocks need to be allocated, and in case of MCSR sparse, all rows
- * that are going to be accessed need to be allocated as well.
- *
- * @param r row
- * @param c column
- * @param v value
- */
- public void quickSetValueThreadSafe(int r, int c, double v) {
- if(sparse) {
- if(!(sparseBlock instanceof SparseBlockMCSR))
- throw new RuntimeException("Only MCSR Blocks
are supported for Multithreaded sparse set.");
- synchronized (sparseBlock.get(r)) {
- sparseBlock.set(r,c,v);
- }
- }
- else
- denseBlock.set(r,c,v);
- }
-
public double quickGetValueThreadSafe(int r, int c) {
if(sparse) {
if(!(sparseBlock instanceof SparseBlockMCSR))
@@ -976,7 +955,7 @@ public class MatrixBlock extends MatrixValue implements
CacheBlock, Externalizab
/**
* Wrapper method for single threaded reduceall-colSum of a matrix.
- *
+ *
* @return A new MatrixBlock containing the column sums of this matrix.
*/
public MatrixBlock colSum() {
@@ -986,7 +965,7 @@ public class MatrixBlock extends MatrixValue implements
CacheBlock, Externalizab
/**
* Wrapper method for single threaded reduceall-rowSum of a matrix.
- *
+ *
* @return A new MatrixBlock containing the row sums of this matrix.
*/
public MatrixBlock rowSum(){
@@ -1422,7 +1401,7 @@ public class MatrixBlock extends MatrixValue implements
CacheBlock, Externalizab
throw new RuntimeException( "Copy must not overwrite
itself!" );
if(that instanceof CompressedMatrixBlock)
that = CompressedMatrixBlock.getUncompressed(that,
"Copy not effecient into a MatrixBlock");
-
+
rlen=that.rlen;
clen=that.clen;
sparse=sp;
@@ -2935,7 +2914,7 @@ public class MatrixBlock extends MatrixValue implements
CacheBlock, Externalizab
LibMatrixBincell.isValidDimensionsBinary(this, that);
if(thatValue instanceof CompressedMatrixBlock)
return ((CompressedMatrixBlock)
thatValue).binaryOperationsLeft(op, this, result);
-
+
//compute output dimensions
boolean outer = (LibMatrixBincell.getBinaryAccessType(this,
that)
== BinaryAccessType.OUTER_VECTOR_VECTOR);
@@ -2980,7 +2959,7 @@ public class MatrixBlock extends MatrixValue implements
CacheBlock, Externalizab
m2 = ((CompressedMatrixBlock)
m2).getUncompressed("Ternay Operator arg2 " + op.fn.getClass().getSimpleName());
if(m3 instanceof CompressedMatrixBlock)
m3 = ((CompressedMatrixBlock)
m3).getUncompressed("Ternay Operator arg3 " + op.fn.getClass().getSimpleName());
-
+
//prepare inputs
final boolean s1 = (rlen==1 && clen==1);
final boolean s2 = (m2.rlen==1 && m2.clen==1);
@@ -3674,7 +3653,7 @@ public class MatrixBlock extends MatrixValue implements
CacheBlock, Externalizab
"Invalid nCol dimension for
append rbind: was " + in[i].clen + " should be: " + clen);
}
}
-
+
public static MatrixBlock naryOperations(Operator op, MatrixBlock[]
matrices, ScalarObject[] scalars, MatrixBlock ret) {
//note: currently only min max, plus supported and hence
specialized implementation
//prepare operator
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
index 33bf452..e04eeff 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
@@ -20,6 +20,7 @@
package org.apache.sysds.runtime.transform.encode;
import static
org.apache.sysds.runtime.transform.encode.EncoderFactory.getEncoderType;
+import static org.apache.sysds.runtime.util.UtilFunctions.getBlockSizes;
import java.io.Externalizable;
import java.io.IOException;
@@ -45,6 +46,8 @@ import org.apache.sysds.runtime.util.DependencyThreadPool;
*/
public abstract class ColumnEncoder implements Externalizable, Encoder,
Comparable<ColumnEncoder> {
protected static final Log LOG =
LogFactory.getLog(ColumnEncoder.class.getName());
+ protected static final int APPLY_ROW_BLOCKS_PER_COLUMN = 1;
+ public static int BUILD_ROW_BLOCKS_PER_COLUMN = 1;
private static final long serialVersionUID = 2299156350718979064L;
protected int _colID;
@@ -52,7 +55,23 @@ public abstract class ColumnEncoder implements
Externalizable, Encoder, Comparab
_colID = colID;
}
- public abstract MatrixBlock apply(MatrixBlock in, MatrixBlock out, int
outputCol);
+ /**
+ * Apply Functions are only used in Single Threaded or Multi-Threaded
Dense context.
+ * That's why there is no regard for MT sparse!
+ *
+ * @param in Input Block
+ * @param out Output Matrix
+ * @param outputCol The output column for the given column
+ * @return same as out
+ *
+ */
+ public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int
outputCol){
+ return apply(in, out, outputCol, 0, -1);
+ }
+
+ public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol){
+ return apply(in, out, outputCol, 0, -1);
+ }
public abstract MatrixBlock apply(MatrixBlock in, MatrixBlock out, int
outputCol, int rowStart, int blk);
@@ -172,18 +191,18 @@ public abstract class ColumnEncoder implements
Externalizable, Encoder, Comparab
* complete if all previous tasks are done. This is so that we can use
the last task as a dependency for the whole
* build, reducing unnecessary dependencies.
*/
- public List<DependencyTask<?>> getBuildTasks(FrameBlock in, int
blockSize) {
+ public List<DependencyTask<?>> getBuildTasks(FrameBlock in) {
List<Callable<Object>> tasks = new ArrayList<>();
List<List<? extends Callable<?>>> dep = null;
- if(blockSize <= 0 || blockSize >= in.getNumRows()) {
+ int nRows = in.getNumRows();
+ int[] blockSizes = getBlockSizes(nRows,
getNumBuildRowPartitions());
+ if(blockSizes.length == 1) {
tasks.add(getBuildTask(in));
}
else {
HashMap<Integer, Object> ret = new HashMap<>();
- for(int i = 0; i < in.getNumRows(); i = i + blockSize)
- tasks.add(getPartialBuildTask(in, i, blockSize,
ret));
- if(in.getNumRows() % blockSize != 0)
- tasks.add(getPartialBuildTask(in,
in.getNumRows() - in.getNumRows() % blockSize, -1, ret));
+ for(int startRow = 0, i = 0; i < blockSizes.length;
startRow+=blockSizes[i], i++)
+ tasks.add(getPartialBuildTask(in, startRow,
blockSizes[i], ret));
tasks.add(getPartialMergeBuildTask(ret));
dep = new ArrayList<>(Collections.nCopies(tasks.size()
- 1, null));
dep.add(tasks.subList(0, tasks.size() - 1));
@@ -198,24 +217,63 @@ public abstract class ColumnEncoder implements
Externalizable, Encoder, Comparab
public Callable<Object> getPartialBuildTask(FrameBlock in, int
startRow, int blockSize,
HashMap<Integer, Object> ret) {
throw new DMLRuntimeException(
- "Trying to get the PartialBuild task of an Encoder
which does not support " + "partial building");
+ "Trying to get the PartialBuild task of an Encoder
which does not support partial building");
}
public Callable<Object> getPartialMergeBuildTask(HashMap<Integer, ?>
ret) {
throw new DMLRuntimeException(
- "Trying to get the BuildMergeTask task of an Encoder
which does not support " + "partial building");
+ "Trying to get the BuildMergeTask task of an Encoder
which does not support partial building");
}
public List<DependencyTask<?>> getApplyTasks(FrameBlock in, MatrixBlock
out, int outputCol) {
- List<Callable<Object>> tasks = new ArrayList<>();
- tasks.add(new ColumnApplyTask(this, in, out, outputCol));
- return DependencyThreadPool.createDependencyTasks(tasks, null);
+ return getApplyTasks(in, null, out, outputCol);
}
public List<DependencyTask<?>> getApplyTasks(MatrixBlock in,
MatrixBlock out, int outputCol) {
+ return getApplyTasks(null, in, out, outputCol);
+ }
+
+ private List<DependencyTask<?>> getApplyTasks(FrameBlock inF,
MatrixBlock inM, MatrixBlock out, int outputCol){
List<Callable<Object>> tasks = new ArrayList<>();
- tasks.add(new ColumnApplyTask(this, in, out, outputCol));
- return DependencyThreadPool.createDependencyTasks(tasks, null);
+ List<List<? extends Callable<?>>> dep = null;
+ if ((inF != null && inM != null) || (inF == null && inM ==
null))
+ throw new DMLRuntimeException("getApplyTasks needs to
be called with either FrameBlock input " +
+ "or MatrixBlock input");
+ int nRows = inF == null ? inM.getNumRows() : inF.getNumRows();
+ int[] blockSizes = getBlockSizes(nRows,
getNumApplyRowPartitions());
+ for(int startRow = 0, i = 0; i < blockSizes.length;
startRow+=blockSizes[i], i++){
+ if(inF != null)
+ if(out.isInSparseFormat())
+ tasks.add(getSparseTask(inF, out,
outputCol, startRow, blockSizes[i]));
+ else
+ tasks.add(new ColumnApplyTask<>(this,
inF, out, outputCol, startRow, blockSizes[i]));
+ else
+ if(out.isInSparseFormat())
+ tasks.add(getSparseTask(inM, out, outputCol,
startRow, blockSizes[i]));
+ else
+ tasks.add(new ColumnApplyTask<>(this, inM, out,
outputCol, startRow, blockSizes[i]));
+ }
+ if(tasks.size() > 1){
+ dep = new ArrayList<>(Collections.nCopies(tasks.size(),
null));
+ tasks.add(() -> null); // Empty task as barrier
+ dep.add(tasks.subList(0, tasks.size()-1));
+ }
+
+ return DependencyThreadPool.createDependencyTasks(tasks, dep);
+ }
+
+ protected abstract ColumnApplyTask<? extends ColumnEncoder>
+ getSparseTask(FrameBlock in, MatrixBlock out, int
outputCol, int startRow, int blk);
+
+ protected abstract ColumnApplyTask<? extends ColumnEncoder>
+ getSparseTask(MatrixBlock in, MatrixBlock out, int
outputCol, int startRow, int blk);
+
+ protected int getNumApplyRowPartitions(){
+ return APPLY_ROW_BLOCKS_PER_COLUMN;
+ }
+
+ protected int getNumBuildRowPartitions(){
+ return BUILD_ROW_BLOCKS_PER_COLUMN;
}
public enum EncoderType {
@@ -226,39 +284,54 @@ public abstract class ColumnEncoder implements
Externalizable, Encoder, Comparab
* This is the base Task for each column apply. If no custom
"getApplyTasks" is implemented in an Encoder this task
* will be used.
*/
- private static class ColumnApplyTask implements Callable<Object> {
+ protected static class ColumnApplyTask<T extends ColumnEncoder>
implements Callable<Object> {
+
+ protected final T _encoder;
+ protected final FrameBlock _inputF;
+ protected final MatrixBlock _inputM;
+ protected final MatrixBlock _out;
+ protected final int _outputCol;
+ protected final int _startRow;
+ protected final int _blk;
+
+ protected ColumnApplyTask(T encoder, FrameBlock input,
MatrixBlock out, int outputCol){
+ this(encoder, input, out, outputCol, 0, -1);
+ }
- private final ColumnEncoder _encoder;
- private final FrameBlock _inputF;
- private final MatrixBlock _inputM;
- private final MatrixBlock _out;
- private final int _outputCol;
+ protected ColumnApplyTask(T encoder, MatrixBlock input,
MatrixBlock out, int outputCol){
+ this(encoder, input, out, outputCol, 0, -1);
+ }
- protected ColumnApplyTask(ColumnEncoder encoder, FrameBlock
input, MatrixBlock out, int outputCol) {
- _encoder = encoder;
- _inputF = input;
- _inputM = null;
- _out = out;
- _outputCol = outputCol;
+ protected ColumnApplyTask(T encoder, FrameBlock input,
MatrixBlock out, int outputCol, int startRow, int blk) {
+ this(encoder, input, null, out, outputCol, startRow,
blk);
}
- protected ColumnApplyTask(ColumnEncoder encoder, MatrixBlock
input, MatrixBlock out, int outputCol) {
+ protected ColumnApplyTask(T encoder, MatrixBlock input,
MatrixBlock out, int outputCol, int startRow, int blk) {
+ this(encoder, null, input, out, outputCol, startRow,
blk);
+ }
+ private ColumnApplyTask(T encoder, FrameBlock inputF,
MatrixBlock inputM, MatrixBlock out, int outputCol,
+ int startRow,
int blk){
_encoder = encoder;
- _inputM = input;
- _inputF = null;
+ _inputM = inputM;
+ _inputF = inputF;
_out = out;
_outputCol = outputCol;
+ _startRow = startRow;
+ _blk = blk;
}
@Override
- public Void call() throws Exception {
+ public Object call() throws Exception {
assert _outputCol >= 0;
- int _rowStart = 0;
- int _blk = -1;
+ if(_out.isInSparseFormat()){
+ // this is an issue since most sparse Tasks
modify the sparse structure so normal get and set calls are
+ // not possible.
+ throw new DMLRuntimeException("ColumnApplyTask
called although output is in sparse format.");
+ }
if(_inputF == null)
- _encoder.apply(_inputM, _out, _outputCol,
_rowStart, _blk);
+ _encoder.apply(_inputM, _out, _outputCol,
_startRow, _blk);
else
- _encoder.apply(_inputF, _out, _outputCol,
_rowStart, _blk);
+ _encoder.apply(_inputF, _out, _outputCol,
_startRow, _blk);
return null;
}
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
index b4d4800..5736c1e 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
@@ -28,11 +28,15 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.Callable;
+import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.tuple.MutableTriple;
+import org.apache.sysds.api.DMLScript;
import org.apache.sysds.lops.Lop;
+import org.apache.sysds.runtime.data.SparseRowVector;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.utils.Statistics;
public class ColumnEncoderBin extends ColumnEncoder {
public static final String MIN_PREFIX = "min";
@@ -84,10 +88,13 @@ public class ColumnEncoderBin extends ColumnEncoder {
@Override
public void build(FrameBlock in) {
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
if(!isApplicable())
return;
double[] pairMinMax = getMinMaxOfCol(in, _colID, 0, -1);
computeBins(pairMinMax[0], pairMinMax[1]);
+ if(DMLScript.STATISTICS)
+
Statistics.incTransformBinningBuildTime(System.nanoTime()-t0);
}
private static double[] getMinMaxOfCol(FrameBlock in, int colID, int
startRow, int blockSize) {
@@ -118,6 +125,8 @@ public class ColumnEncoderBin extends ColumnEncoder {
return new BinMergePartialBuildTask(this, ret);
}
+
+
public void computeBins(double min, double max) {
// ensure allocated internal transformation metadata
if(_binMins == null || _binMaxs == null) {
@@ -146,39 +155,47 @@ public class ColumnEncoderBin extends ColumnEncoder {
}
@Override
- public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol)
{
- return apply(in, out, outputCol, 0, -1);
- }
-
- @Override
- public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int
outputCol) {
- return apply(in, out, outputCol, 0, -1);
- }
-
- @Override
public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol,
int rowStart, int blk) {
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
for(int i = rowStart; i < getEndIndex(in.getNumRows(),
rowStart, blk); i++) {
double inVal =
UtilFunctions.objectToDouble(in.getSchema()[_colID - 1], in.get(i, _colID - 1));
int ix = Arrays.binarySearch(_binMaxs, inVal);
int binID = ((ix < 0) ? Math.abs(ix + 1) : ix) + 1;
- out.quickSetValueThreadSafe(i, outputCol, binID);
+ out.quickSetValue(i, outputCol, binID);
}
+ if (DMLScript.STATISTICS)
+
Statistics.incTransformBinningApplyTime(System.nanoTime()-t0);
return out;
}
@Override
public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int
outputCol, int rowStart, int blk) {
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
int end = getEndIndex(in.getNumRows(), rowStart, blk);
for(int i = rowStart; i < end; i++) {
double inVal = in.quickGetValueThreadSafe(i, _colID -
1);
int ix = Arrays.binarySearch(_binMaxs, inVal);
int binID = ((ix < 0) ? Math.abs(ix + 1) : ix) + 1;
- out.quickSetValueThreadSafe(i, outputCol, binID);
+ out.quickSetValue(i, outputCol, binID);
}
+ if (DMLScript.STATISTICS)
+
Statistics.incTransformBinningApplyTime(System.nanoTime()-t0);
return out;
}
@Override
+ protected ColumnApplyTask<? extends ColumnEncoder>
+ getSparseTask(FrameBlock in, MatrixBlock out, int outputCol,
int startRow, int blk) {
+ return new BinSparseApplyTask(this, in, out, outputCol);
+ }
+
+ @Override
+ protected ColumnApplyTask<? extends ColumnEncoder>
+ getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol,
int startRow, int blk) {
+ throw new NotImplementedException("Sparse Binning for
MatrixBlocks not jet implemented");
+ }
+
+ @Override
public void mergeAt(ColumnEncoder other) {
if(other instanceof ColumnEncoderBin) {
ColumnEncoderBin otherBin = (ColumnEncoderBin) other;
@@ -264,6 +281,43 @@ public class ColumnEncoderBin extends ColumnEncoder {
}
}
+ private static class BinSparseApplyTask extends
ColumnApplyTask<ColumnEncoderBin> {
+
+ public BinSparseApplyTask(ColumnEncoderBin encoder, FrameBlock
input,
+ MatrixBlock out, int outputCol, int startRow,
int blk) {
+ super(encoder, input, out, outputCol, startRow, blk);
+ }
+
+ private BinSparseApplyTask(ColumnEncoderBin encoder, FrameBlock
input, MatrixBlock out, int outputCol) {
+ super(encoder, input, out, outputCol);
+ }
+
+ public Object call() throws Exception {
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+ int index = _encoder._colID - 1;
+ if(_out.getSparseBlock() == null)
+ return null;
+ assert _inputF != null;
+ for(int r = _startRow; r <
getEndIndex(_inputF.getNumRows(), _startRow, _blk); r++) {
+ SparseRowVector row = (SparseRowVector)
_out.getSparseBlock().get(r);
+ double inVal =
UtilFunctions.objectToDouble(_inputF.getSchema()[index], _inputF.get(r, index));
+ int ix = Arrays.binarySearch(_encoder._binMaxs,
inVal);
+ int binID = ((ix < 0) ? Math.abs(ix + 1) : ix)
+ 1;
+ row.values()[index] = binID;
+ row.indexes()[index] = _outputCol;
+ }
+ if(DMLScript.STATISTICS)
+
Statistics.incTransformBinningApplyTime(System.nanoTime()-t0);
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "<ColId: " +
_encoder._colID + ">";
+ }
+
+ }
+
private static class BinPartialBuildTask implements Callable<Object> {
private final FrameBlock _input;
@@ -284,7 +338,13 @@ public class ColumnEncoderBin extends ColumnEncoder {
@Override
public double[] call() throws Exception {
- _partialMinMax.put(_startRow, getMinMaxOfCol(_input,
_colID, _startRow, _blockSize));
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+ double[] minMax = getMinMaxOfCol(_input, _colID,
_startRow, _blockSize);
+ synchronized (_partialMinMax){
+ _partialMinMax.put(_startRow, minMax);
+ }
+ if (DMLScript.STATISTICS)
+
Statistics.incTransformBinningBuildTime(System.nanoTime()-t0);
return null;
}
@@ -306,6 +366,7 @@ public class ColumnEncoderBin extends ColumnEncoder {
@Override
public Object call() throws Exception {
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
for(Object minMax : _partialMaps.values()) {
@@ -313,6 +374,9 @@ public class ColumnEncoderBin extends ColumnEncoder {
max = Math.max(max, ((double[]) minMax)[1]);
}
_encoder.computeBins(min, max);
+
+ if(DMLScript.STATISTICS)
+
Statistics.incTransformBinningBuildTime(System.nanoTime()-t0);
return null;
}
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
index 54b8795..f7611c3 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
@@ -139,11 +139,23 @@ public class ColumnEncoderComposite extends ColumnEncoder
{
}
@Override
- public List<DependencyTask<?>> getBuildTasks(FrameBlock in, int
blockSize) {
+ protected ColumnApplyTask<? extends ColumnEncoder>
+ getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol,
int startRow, int blk) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ protected ColumnApplyTask<? extends ColumnEncoder>
+ getSparseTask(FrameBlock in, MatrixBlock out, int outputCol,
int startRow, int blk) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public List<DependencyTask<?>> getBuildTasks(FrameBlock in) {
List<DependencyTask<?>> tasks = new ArrayList<>();
Map<Integer[], Integer[]> depMap = null;
for(ColumnEncoder columnEncoder : _columnEncoders) {
- List<DependencyTask<?>> t =
columnEncoder.getBuildTasks(in, blockSize);
+ List<DependencyTask<?>> t =
columnEncoder.getBuildTasks(in);
if(t == null)
continue;
// Linear execution between encoders so they can't be
built in parallel
@@ -179,16 +191,6 @@ public class ColumnEncoderComposite extends ColumnEncoder {
}
@Override
- public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol)
{
- return apply(in, out, outputCol, 0, -1);
- }
-
- @Override
- public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int
outputCol) {
- return apply(in, out, outputCol, 0, -1);
- }
-
- @Override
public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol,
int rowStart, int blk) {
try {
for(int i = 0; i < _columnEncoders.size(); i++) {
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
index 25b2eb9..1047f54 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
@@ -24,17 +24,15 @@ import static
org.apache.sysds.runtime.util.UtilFunctions.getEndIndex;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
-import java.util.concurrent.Callable;
+import org.apache.sysds.api.DMLScript;
import org.apache.sysds.runtime.DMLRuntimeException;
-import org.apache.sysds.runtime.data.SparseRowVector;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.DependencyTask;
-import org.apache.sysds.runtime.util.DependencyThreadPool;
+import org.apache.sysds.utils.Statistics;
public class ColumnEncoderDummycode extends ColumnEncoder {
private static final long serialVersionUID = 5832130477659116489L;
@@ -60,26 +58,18 @@ public class ColumnEncoderDummycode extends ColumnEncoder {
}
@Override
- public List<DependencyTask<?>> getBuildTasks(FrameBlock in, int
blockSize) {
+ public List<DependencyTask<?>> getBuildTasks(FrameBlock in) {
return null;
}
@Override
- public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol)
{
- return apply(in, out, outputCol, 0, -1);
- }
-
- public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int
outputCol) {
- return apply(in, out, outputCol, 0, -1);
- }
-
- @Override
public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol,
int rowStart, int blk) {
throw new DMLRuntimeException("Called DummyCoder with
FrameBlock");
}
@Override
public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int
outputCol, int rowStart, int blk) {
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
// Out Matrix should already be correct size!
// append dummy coded or unchanged values to output
for(int i = rowStart; i < getEndIndex(in.getNumRows(),
rowStart, blk); i++) {
@@ -89,20 +79,25 @@ public class ColumnEncoderDummycode extends ColumnEncoder {
int nCol = outputCol + (int) val - 1;
// Setting value to 0 first in case of sparse so the
row vector does not need to be resized
if(nCol != outputCol)
- out.quickSetValueThreadSafe(i, outputCol, 0);
- out.quickSetValueThreadSafe(i, nCol, 1);
+ out.quickSetValue(i, outputCol, 0);
+ out.quickSetValue(i, nCol, 1);
}
+ if (DMLScript.STATISTICS)
+
Statistics.incTransformDummyCodeApplyTime(System.nanoTime()-t0);
return out;
}
+
+ @Override
+ protected ColumnApplyTask<? extends ColumnEncoder>
+ getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol,
int startRow, int blk) {
+ return new DummycodeSparseApplyTask(this, in, out, outputCol,
startRow, blk);
+ }
+
@Override
- public List<DependencyTask<?>> getApplyTasks(MatrixBlock in,
MatrixBlock out, int outputCol) {
- List<Callable<Object>> tasks = new ArrayList<>();
- if(out.isInSparseFormat())
- tasks.add(new DummycodeSparseApplyTask(this, in, out,
outputCol));
- else
- return super.getApplyTasks(in, out, outputCol);
- return DependencyThreadPool.createDependencyTasks(tasks, null);
+ protected ColumnApplyTask<? extends ColumnEncoder>
+ getSparseTask(FrameBlock in, MatrixBlock out, int outputCol,
int startRow, int blk) {
+ throw new DMLRuntimeException("Called DummyCoder with
FrameBlock");
}
@Override
@@ -139,6 +134,7 @@ public class ColumnEncoderDummycode extends ColumnEncoder {
if(distinct != -1) {
_domainSize = distinct;
+ LOG.debug("DummyCoder for column: " + _colID +
" has domain size: " + _domainSize);
}
}
}
@@ -188,48 +184,47 @@ public class ColumnEncoderDummycode extends ColumnEncoder
{
return _domainSize;
}
- private static class DummycodeSparseApplyTask implements
Callable<Object> {
- private final ColumnEncoderDummycode _encoder;
- private final MatrixBlock _input;
- private final MatrixBlock _out;
- private final int _outputCol;
+ private static class DummycodeSparseApplyTask extends
ColumnApplyTask<ColumnEncoderDummycode> {
+
+ protected DummycodeSparseApplyTask(ColumnEncoderDummycode
encoder, MatrixBlock input,
+ MatrixBlock out, int outputCol) {
+ super(encoder, input, out, outputCol);
+ }
- private DummycodeSparseApplyTask(ColumnEncoderDummycode
encoder, MatrixBlock input, MatrixBlock out,
- int outputCol) {
- _encoder = encoder;
- _input = input;
- _out = out;
- _outputCol = outputCol;
+ protected DummycodeSparseApplyTask(ColumnEncoderDummycode
encoder, MatrixBlock input,
+ MatrixBlock out, int outputCol, int startRow,
int blk) {
+ super(encoder, input, out, outputCol, startRow, blk);
}
public Object call() throws Exception {
- for(int r = 0; r < _input.getNumRows(); r++) {
- if(_out.getSparseBlock() == null)
- return null;
- synchronized(_out.getSparseBlock().get(r)) {
- // Since the recoded values are already
offset in the output matrix (same as input at this point)
- // the dummycoding only needs to offset
them within their column domain. Which means that the
- // indexes in the SparseRowVector do
not need to be sorted anymore and can be updated directly.
- //
- // Input: Output:
- //
- // 1 | 0 | 2 | 0 1 | 0 | 0 | 1
- // 2 | 0 | 1 | 0 ===> 0 | 1 | 1 | 0
- // 1 | 0 | 2 | 0 1 | 0 | 0 | 1
- // 1 | 0 | 1 | 0 1 | 0 | 1 | 0
- //
- // Example SparseRowVector Internals
(1. row):
- //
- // indexes = [0,2] ===> indexes = [0,3]
- // values = [1,2] values = [1,1]
- int index = ((SparseRowVector)
_out.getSparseBlock().get(r)).getIndex(_outputCol);
- double val =
_out.getSparseBlock().get(r).values()[index];
- int nCol = _outputCol + (int) val - 1;
-
-
_out.getSparseBlock().get(r).indexes()[index] = nCol;
-
_out.getSparseBlock().get(r).values()[index] = 1;
- }
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+ assert _inputM != null;
+ if(_out.getSparseBlock() == null)
+ return null;
+ for(int r = _startRow; r <
getEndIndex(_inputM.getNumRows(), _startRow, _blk); r++) {
+ // Since the recoded values are already offset
in the output matrix (same as input at this point)
+ // the dummycoding only needs to offset them
within their column domain. Which means that the
+ // indexes in the SparseRowVector do not need
to be sorted anymore and can be updated directly.
+ //
+ // Input: Output:
+ //
+ // 1 | 0 | 2 | 0 1 | 0 | 0 | 1
+ // 2 | 0 | 1 | 0 ===> 0 | 1 | 1 | 0
+ // 1 | 0 | 2 | 0 1 | 0 | 0 | 1
+ // 1 | 0 | 1 | 0 1 | 0 | 1 | 0
+ //
+ // Example SparseRowVector Internals (1. row):
+ //
+ // indexes = [0,2] ===> indexes = [0,3]
+ // values = [1,2] values = [1,1]
+ int index = _encoder._colID - 1;
+ double val =
_out.getSparseBlock().get(r).values()[index];
+ int nCol = _outputCol + (int) val - 1;
+ _out.getSparseBlock().get(r).indexes()[index] =
nCol;
+ _out.getSparseBlock().get(r).values()[index] =
1;
}
+ if (DMLScript.STATISTICS)
+
Statistics.incTransformDummyCodeApplyTime(System.nanoTime()-t0);
return null;
}
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
index 1c74ae5..d30d8dc 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
@@ -26,11 +26,15 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.List;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.sysds.api.DMLScript;
import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.data.SparseRowVector;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.DependencyTask;
import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.utils.Statistics;
/**
* Class used for feature hashing transformation of frames.
@@ -56,7 +60,7 @@ public class ColumnEncoderFeatureHash extends ColumnEncoder {
}
private long getCode(String key) {
- return key.hashCode() % _K;
+ return (key.hashCode() % _K) + 1;
}
@Override
@@ -65,22 +69,25 @@ public class ColumnEncoderFeatureHash extends ColumnEncoder
{
}
@Override
- public List<DependencyTask<?>> getBuildTasks(FrameBlock in, int
blockSize) {
+ public List<DependencyTask<?>> getBuildTasks(FrameBlock in) {
return null;
}
@Override
- public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol)
{
- return apply(in, out, outputCol, 0, -1);
+ protected ColumnApplyTask<? extends ColumnEncoder>
+ getSparseTask(FrameBlock in, MatrixBlock out, int outputCol,
int startRow, int blk) {
+ return new FeatureHashSparseApplyTask(this, in, out, outputCol,
startRow, blk);
}
@Override
- public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int
outputCol) {
- return apply(in, out, outputCol, 0, -1);
+ protected ColumnApplyTask<? extends ColumnEncoder>
+ getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol,
int startRow, int blk) {
+ throw new NotImplementedException("Sparse FeatureHashing for
MatrixBlocks not jet implemented");
}
@Override
public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol,
int rowStart, int blk) {
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
// apply feature hashing column wise
for(int i = rowStart; i < getEndIndex(in.getNumRows(),
rowStart, blk); i++) {
Object okey = in.get(i, _colID - 1);
@@ -88,21 +95,26 @@ public class ColumnEncoderFeatureHash extends ColumnEncoder
{
if(key == null)
throw new DMLRuntimeException("Missing Value
encountered in input Frame for FeatureHash");
long code = getCode(key);
- out.quickSetValueThreadSafe(i, outputCol, (code >= 0) ?
code : Double.NaN);
+ out.quickSetValue(i, outputCol, (code >= 0) ? code :
Double.NaN);
}
+ if(DMLScript.STATISTICS)
+
Statistics.incTransformFeatureHashingApplyTime(System.nanoTime()-t0);
return out;
}
@Override
public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int
outputCol, int rowStart, int blk) {
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
int end = getEndIndex(in.getNumRows(), rowStart, blk);
// apply feature hashing column wise
for(int i = rowStart; i < end; i++) {
Object okey = in.quickGetValueThreadSafe(i, _colID - 1);
String key = okey.toString();
long code = getCode(key);
- out.quickSetValueThreadSafe(i, outputCol, (code >= 0) ?
code : Double.NaN);
+ out.quickSetValue(i, outputCol, (code >= 0) ? code :
Double.NaN);
}
+ if(DMLScript.STATISTICS)
+
Statistics.incTransformFeatureHashingApplyTime(System.nanoTime()-t0);
return out;
}
@@ -145,4 +157,40 @@ public class ColumnEncoderFeatureHash extends
ColumnEncoder {
super.readExternal(in);
_K = in.readLong();
}
+
+ public static class FeatureHashSparseApplyTask extends
ColumnApplyTask<ColumnEncoderFeatureHash>{
+
+ public FeatureHashSparseApplyTask(ColumnEncoderFeatureHash
encoder, FrameBlock input,
+ MatrixBlock out, int outputCol, int startRow,
int blk) {
+ super(encoder, input, out, outputCol, startRow, blk);
+ }
+
+ public FeatureHashSparseApplyTask(ColumnEncoderFeatureHash
encoder, FrameBlock input,
+ MatrixBlock out, int outputCol) {
+ super(encoder, input, out, outputCol);
+ }
+
+ @Override
+ public Object call() throws Exception {
+ if(_out.getSparseBlock() == null)
+ return null;
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+ int index = _encoder._colID - 1;
+ assert _inputF != null;
+ for(int r = _startRow; r <
getEndIndex(_inputF.getNumRows(), _startRow, _blk); r++){
+ SparseRowVector row = (SparseRowVector)
_out.getSparseBlock().get(r);
+ Object okey = _inputF.get(r, index);
+ String key = (okey != null) ? okey.toString() :
null;
+ if(key == null)
+ throw new DMLRuntimeException("Missing
Value encountered in input Frame for FeatureHash");
+ long code = _encoder.getCode(key);
+ row.values()[index] = code;
+ row.indexes()[index] = _outputCol;
+ }
+ if(DMLScript.STATISTICS)
+
Statistics.incTransformFeatureHashingApplyTime(System.nanoTime()-t0);
+ return null;
+ }
+ }
+
}
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
index 7a8df24..5c7392c 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
@@ -21,16 +21,22 @@ package org.apache.sysds.runtime.transform.encode;
import static org.apache.sysds.runtime.util.UtilFunctions.getEndIndex;
+import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.data.SparseRowVector;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.DependencyTask;
import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.utils.Statistics;
public class ColumnEncoderPassThrough extends ColumnEncoder {
private static final long serialVersionUID = -8473768154646831882L;
+ private List<Integer> sparseRowsWZeros = null;
protected ColumnEncoderPassThrough(int ptCols) {
super(ptCols); // 1-based
@@ -40,37 +46,46 @@ public class ColumnEncoderPassThrough extends ColumnEncoder
{
this(-1);
}
+ public List<Integer> getSparseRowsWZeros(){
+ return sparseRowsWZeros;
+ }
+
@Override
public void build(FrameBlock in) {
// do nothing
}
@Override
- public List<DependencyTask<?>> getBuildTasks(FrameBlock in, int
blockSize) {
+ public List<DependencyTask<?>> getBuildTasks(FrameBlock in) {
return null;
}
@Override
- public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol)
{
- return apply(in, out, outputCol, 0, -1);
+ protected ColumnApplyTask<? extends ColumnEncoder>
+ getSparseTask(FrameBlock in, MatrixBlock out, int outputCol,
int startRow, int blk) {
+ return new PassThroughSparseApplyTask(this, in, out, outputCol,
startRow, blk);
}
@Override
- public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int
outputCol) {
- return apply(in, out, outputCol, 0, -1);
+ protected ColumnApplyTask<? extends ColumnEncoder>
+ getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol,
int startRow, int blk) {
+ throw new NotImplementedException("Sparse PassThrough for
MatrixBlocks not jet implemented");
}
@Override
public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol,
int rowStart, int blk) {
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
int col = _colID - 1; // 1-based
ValueType vt = in.getSchema()[col];
for(int i = rowStart; i < getEndIndex(in.getNumRows(),
rowStart, blk); i++) {
Object val = in.get(i, col);
double v = (val == null ||
- (vt == ValueType.STRING &&
val.toString().isEmpty())) ? Double.NaN : UtilFunctions.objectToDouble(vt,
- val);
- out.quickSetValueThreadSafe(i, outputCol, v);
+ (vt == ValueType.STRING &&
val.toString().isEmpty()))
+ ? Double.NaN :
UtilFunctions.objectToDouble(vt, val);
+ out.quickSetValue(i, outputCol, v);
}
+ if(DMLScript.STATISTICS)
+
Statistics.incTransformPassThroughApplyTime(System.nanoTime()-t0);
return out;
}
@@ -79,12 +94,15 @@ public class ColumnEncoderPassThrough extends ColumnEncoder
{
// only transfer from in to out
if(in == out)
return out;
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
int col = _colID - 1; // 1-based
int end = getEndIndex(in.getNumRows(), rowStart, blk);
for(int i = rowStart; i < end; i++) {
double val = in.quickGetValueThreadSafe(i, col);
- out.quickSetValueThreadSafe(i, outputCol, val);
+ out.quickSetValue(i, outputCol, val);
}
+ if(DMLScript.STATISTICS)
+
Statistics.incTransformPassThroughApplyTime(System.nanoTime()-t0);
return out;
}
@@ -106,4 +124,58 @@ public class ColumnEncoderPassThrough extends
ColumnEncoder {
public void initMetaData(FrameBlock meta) {
// do nothing
}
+
+ public static class PassThroughSparseApplyTask extends
ColumnApplyTask<ColumnEncoderPassThrough>{
+
+
+ protected PassThroughSparseApplyTask(ColumnEncoderPassThrough
encoder, FrameBlock input,
+ MatrixBlock out, int outputCol) {
+ super(encoder, input, out, outputCol);
+ }
+
+ protected PassThroughSparseApplyTask(ColumnEncoderPassThrough
encoder, FrameBlock input, MatrixBlock out,
+ int outputCol, int startRow, int blk) {
+ super(encoder, input, out, outputCol, startRow, blk);
+ }
+
+ @Override
+ public Object call() throws Exception {
+ if(_out.getSparseBlock() == null)
+ return null;
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+ int index = _encoder._colID - 1;
+ assert _inputF != null;
+ List<Integer> sparseRowsWZeros = null;
+ ValueType vt = _inputF.getSchema()[index];
+ for(int r = _startRow; r <
getEndIndex(_inputF.getNumRows(), _startRow, _blk); r++) {
+ Object val = _inputF.get(r, index);
+ double v = (val == null || (vt ==
ValueType.STRING && val.toString().isEmpty())) ?
+ Double.NaN :
UtilFunctions.objectToDouble(vt, val);
+ SparseRowVector row = (SparseRowVector)
_out.getSparseBlock().get(r);
+ if(v == 0) {
+ if(sparseRowsWZeros == null)
+ sparseRowsWZeros = new
ArrayList<>();
+ sparseRowsWZeros.add(r);
+ }
+ row.values()[index] = v;
+ row.indexes()[index] = _outputCol;
+ }
+ if(sparseRowsWZeros != null){
+ synchronized (_encoder){
+ if(_encoder.sparseRowsWZeros == null)
+ _encoder.sparseRowsWZeros = new
ArrayList<>();
+
_encoder.sparseRowsWZeros.addAll(sparseRowsWZeros);
+ }
+ }
+ if(DMLScript.STATISTICS)
+
Statistics.incTransformPassThroughApplyTime(System.nanoTime()-t0);
+ return null;
+ }
+
+ public String toString() {
+ return getClass().getSimpleName() + "<ColId: " +
_encoder._colID + ">";
+ }
+
+ }
+
}
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
index fd18d86..e190d74 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
@@ -33,10 +33,13 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.Callable;
+import org.apache.sysds.api.DMLScript;
import org.apache.sysds.lops.Lop;
import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.data.SparseRowVector;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.utils.Statistics;
public class ColumnEncoderRecode extends ColumnEncoder {
private static final long serialVersionUID = 8213163881283341874L;
@@ -135,7 +138,11 @@ public class ColumnEncoderRecode extends ColumnEncoder {
public void build(FrameBlock in) {
if(!isApplicable())
return;
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
makeRcdMap(in, _rcdMap, _colID, 0, in.getNumRows());
+ if(DMLScript.STATISTICS){
+
Statistics.incTransformRecodeBuildTime(System.nanoTime() - t0);
+ }
}
@Override
@@ -186,18 +193,17 @@ public class ColumnEncoderRecode extends ColumnEncoder {
}
@Override
- public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol)
{
- return apply(in, out, outputCol, 0, -1);
- }
-
- @Override
public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol,
int rowStart, int blk) {
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
// FrameBlock is column Major and MatrixBlock row Major this
results in cache inefficiencies :(
for(int i = rowStart; i < getEndIndex(in.getNumRows(),
rowStart, blk); i++) {
Object okey = in.get(i, _colID - 1);
String key = (okey != null) ? okey.toString() : null;
long code = lookupRCDMap(key);
- out.quickSetValueThreadSafe(i, outputCol, (code >= 0) ?
code : Double.NaN);
+ out.quickSetValue(i, outputCol, (code >= 0) ? code :
Double.NaN);
+ }
+ if(DMLScript.STATISTICS){
+
Statistics.incTransformRecodeApplyTime(System.nanoTime() - t0);
}
return out;
}
@@ -209,9 +215,16 @@ public class ColumnEncoderRecode extends ColumnEncoder {
}
@Override
- public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int
outputCol) {
- throw new DMLRuntimeException(
- "Recode called with MatrixBlock. Should not happen
since Recode is the first " + "encoder in the Stack");
+ protected ColumnApplyTask<? extends ColumnEncoder>
+ getSparseTask(FrameBlock in, MatrixBlock out, int outputCol,
int startRow, int blk){
+ return new RecodeSparseApplyTask(this, in ,out, outputCol,
startRow, blk);
+ }
+
+ @Override
+ protected ColumnApplyTask<? extends ColumnEncoder>
+ getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol,
int startRow, int blk) {
+ throw new DMLRuntimeException("Recode called with MatrixBlock.
Should not happen since Recode is the first " +
+ "encoder in the Stack");
}
@Override
@@ -313,6 +326,48 @@ public class ColumnEncoderRecode extends ColumnEncoder {
return _rcdMap;
}
+ private static class RecodeSparseApplyTask extends
ColumnApplyTask<ColumnEncoderRecode>{
+
+ public RecodeSparseApplyTask(ColumnEncoderRecode encoder,
FrameBlock input, MatrixBlock out, int outputCol) {
+ super(encoder, input, out, outputCol);
+ }
+
+ protected RecodeSparseApplyTask(ColumnEncoderRecode encoder,
FrameBlock input, MatrixBlock out,
+ int outputCol, int startRow, int blk) {
+ super(encoder, input, out, outputCol, startRow, blk);
+ }
+
+ public Object call() throws Exception {
+ int index = _encoder._colID - 1;
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+ if(_out.getSparseBlock() == null)
+ return null;
+ assert _inputF != null;
+ for(int r = _startRow; r <
getEndIndex(_inputF.getNumRows(), _startRow, _blk); r++) {
+ SparseRowVector row = (SparseRowVector)
_out.getSparseBlock().get(r);
+ Object okey = _inputF.get(r, index);
+ String key = (okey != null) ? okey.toString() :
null;
+ long code = _encoder.lookupRCDMap(key);
+ double val = (code < 0) ? Double.NaN : code;
+ row.values()[index] = val;
+ row.indexes()[index] = _outputCol;
+ }
+ if(DMLScript.STATISTICS){
+
Statistics.incTransformRecodeApplyTime(System.nanoTime() - t0);
+ }
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ String str = getClass().getSimpleName() + "<ColId: " +
_encoder._colID + ">";
+ if(_blk != -1)
+ str+= "<Sr: " + _startRow + ">";
+ return str;
+ }
+
+ }
+
private static class RecodePartialBuildTask implements Callable<Object>
{
private final FrameBlock _input;
@@ -332,11 +387,15 @@ public class ColumnEncoderRecode extends ColumnEncoder {
@Override
public HashMap<String, Long> call() throws Exception {
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
HashMap<String, Long> partialMap = new HashMap<>();
makeRcdMap(_input, partialMap, _colID, _startRow,
_blockSize);
synchronized(_partialMaps) {
_partialMaps.put(_startRow, partialMap);
}
+ if(DMLScript.STATISTICS){
+
Statistics.incTransformRecodeBuildTime(System.nanoTime() - t0);
+ }
return null;
}
@@ -358,6 +417,7 @@ public class ColumnEncoderRecode extends ColumnEncoder {
@Override
public Object call() throws Exception {
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
HashMap<String, Long> rcdMap = _encoder.getRcdMap();
_partialMaps.forEach((start_row, map) -> {
((HashMap<?, ?>) map).forEach((k, v) -> {
@@ -366,6 +426,9 @@ public class ColumnEncoderRecode extends ColumnEncoder {
});
});
_encoder._rcdMap = rcdMap;
+ if(DMLScript.STATISTICS){
+
Statistics.incTransformRecodeBuildTime(System.nanoTime() - t0);
+ }
return null;
}
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java
index 4b48d2a..012379a 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java
@@ -19,16 +19,8 @@
package org.apache.sysds.runtime.transform.encode;
-import static org.apache.sysds.runtime.util.CollectionUtils.except;
-import static org.apache.sysds.runtime.util.CollectionUtils.unionDistinct;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map.Entry;
-
import org.apache.commons.lang.ArrayUtils;
+import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
@@ -36,9 +28,19 @@ import org.apache.sysds.runtime.transform.TfUtils.TfMethod;
import org.apache.sysds.runtime.transform.encode.ColumnEncoder.EncoderType;
import org.apache.sysds.runtime.transform.meta.TfMetaUtils;
import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.utils.Statistics;
import org.apache.wink.json4j.JSONArray;
import org.apache.wink.json4j.JSONObject;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+import static org.apache.sysds.runtime.util.CollectionUtils.except;
+import static org.apache.sysds.runtime.util.CollectionUtils.unionDistinct;
+
public class EncoderFactory {
public static MultiColumnEncoder createEncoder(String spec, String[]
colnames, int clen, FrameBlock meta) {
@@ -125,16 +127,22 @@ public class EncoderFactory {
}
// create composite decoder of all created encoders
for(Entry<Integer, List<ColumnEncoder>> listEntry :
colEncoders.entrySet()) {
+ if(DMLScript.STATISTICS)
+
Statistics.incTransformEncoderCount(listEntry.getValue().size());
lencoders.add(new
ColumnEncoderComposite(listEntry.getValue()));
}
encoder = new MultiColumnEncoder(lencoders);
if(!oIDs.isEmpty()) {
encoder.addReplaceLegacyEncoder(new
EncoderOmit(jSpec, colnames, schema.length, minCol, maxCol));
+ if(DMLScript.STATISTICS)
+ Statistics.incTransformEncoderCount(1);
}
if(!mvIDs.isEmpty()) {
EncoderMVImpute ma = new EncoderMVImpute(jSpec,
colnames, schema.length, minCol, maxCol);
ma.initRecodeIDList(rcIDs);
encoder.addReplaceLegacyEncoder(ma);
+ if(DMLScript.STATISTICS)
+ Statistics.incTransformEncoderCount(1);
}
// initialize meta data w/ robustness for superset of
cols
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderMVImpute.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderMVImpute.java
index cda6b2a..f77e690 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderMVImpute.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderMVImpute.java
@@ -19,21 +19,8 @@
package org.apache.sysds.runtime.transform.encode;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.stream.Collectors;
-
import org.apache.commons.lang.ArrayUtils;
+import org.apache.sysds.api.DMLScript;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.functionobjects.KahanPlus;
import org.apache.sysds.runtime.functionobjects.Mean;
@@ -44,10 +31,25 @@ import org.apache.sysds.runtime.transform.TfUtils.TfMethod;
import org.apache.sysds.runtime.transform.meta.TfMetaUtils;
import org.apache.sysds.runtime.util.IndexRange;
import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.utils.Statistics;
import org.apache.wink.json4j.JSONArray;
import org.apache.wink.json4j.JSONException;
import org.apache.wink.json4j.JSONObject;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.stream.Collectors;
+
public class EncoderMVImpute extends LegacyEncoder {
private static final long serialVersionUID = 9057868620144662194L;
// objects required to compute mean and variance of all non-missing
entries
@@ -173,6 +175,7 @@ public class EncoderMVImpute extends LegacyEncoder {
@Override
public void build(FrameBlock in) {
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
try {
for(int j = 0; j < _colList.length; j++) {
int colID = _colList[j];
@@ -215,10 +218,13 @@ public class EncoderMVImpute extends LegacyEncoder {
catch(Exception ex) {
throw new RuntimeException(ex);
}
+ if(DMLScript.STATISTICS)
+
Statistics.incTransformImputeBuildTime(System.nanoTime()-t0);
}
@Override
public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
for(int i = 0; i < in.getNumRows(); i++) {
for(int j = 0; j < _colList.length; j++) {
int colID = _colList[j];
@@ -226,6 +232,8 @@ public class EncoderMVImpute extends LegacyEncoder {
out.quickSetValue(i, colID - 1,
Double.parseDouble(_replacementList[j]));
}
}
+ if(DMLScript.STATISTICS)
+
Statistics.incTransformImputeApplyTime(System.nanoTime()-t0);
return out;
}
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderOmit.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderOmit.java
index db61fc1..c2f3b68 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderOmit.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderOmit.java
@@ -19,16 +19,9 @@
package org.apache.sysds.runtime.transform.encode;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -37,9 +30,18 @@ import org.apache.sysds.runtime.transform.TfUtils.TfMethod;
import org.apache.sysds.runtime.transform.meta.TfMetaUtils;
import org.apache.sysds.runtime.util.IndexRange;
import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.utils.Statistics;
import org.apache.wink.json4j.JSONException;
import org.apache.wink.json4j.JSONObject;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
public class EncoderOmit extends LegacyEncoder {
/*
* THIS CLASS IS ONLY FOR LEGACY SUPPORT!!! and will be fazed out
slowly.
@@ -126,6 +128,7 @@ public class EncoderOmit extends LegacyEncoder {
public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
// local rmRows for broadcasting encoder in spark
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
boolean[] rmRows;
if(_federated)
rmRows = _rmRows;
@@ -148,7 +151,8 @@ public class EncoderOmit extends LegacyEncoder {
}
_rmRows = rmRows;
-
+ if(DMLScript.STATISTICS)
+
Statistics.incTransformOmitApplyTime(System.nanoTime()-t0);
return ret;
}
diff --git
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
index 6db63f5..73d33a9 100644
---
a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
+++
b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
@@ -28,6 +28,8 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
@@ -36,22 +38,31 @@ import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.data.SparseBlockMCSR;
+import org.apache.sysds.runtime.data.SparseRowVector;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.DependencyTask;
import org.apache.sysds.runtime.util.DependencyThreadPool;
import org.apache.sysds.runtime.util.DependencyWrapperTask;
import org.apache.sysds.runtime.util.IndexRange;
+import org.apache.sysds.utils.Statistics;
public class MultiColumnEncoder implements Encoder {
protected static final Log LOG =
LogFactory.getLog(MultiColumnEncoder.class.getName());
private static final boolean MULTI_THREADED = true;
- public static boolean MULTI_THREADED_STAGES = true;
+ // If true build and apply separately by placing a synchronization
barrier
+ public static boolean MULTI_THREADED_STAGES = false;
+
+ // Only affects if MULTI_THREADED_STAGES is true
+ // if true apply tasks for each column will complete
+ // before the next will start.
+ public static boolean APPLY_ENCODER_SEPARATE_STAGES = false;
private List<ColumnEncoderComposite> _columnEncoders;
// These encoders are deprecated and will be phased out soon.
@@ -60,18 +71,6 @@ public class MultiColumnEncoder implements Encoder {
private int _colOffset = 0; // offset for federated Workers who are
using subrange encoders
private FrameBlock _meta = null;
- // TEMP CONSTANTS for testing only
- //private int APPLY_BLOCKSIZE = 0; // temp only for testing until
automatic calculation of block size
- public static int BUILD_BLOCKSIZE = 0;
-
- /*public void setApplyBlockSize(int blk) {
- APPLY_BLOCKSIZE = blk;
- }*/
-
- public void setBuildBlockSize(int blk) {
- BUILD_BLOCKSIZE = blk;
- }
-
public MultiColumnEncoder(List<ColumnEncoderComposite> columnEncoders) {
_columnEncoders = columnEncoders;
}
@@ -90,6 +89,7 @@ public class MultiColumnEncoder implements Encoder {
if(MULTI_THREADED && k > 1 && !MULTI_THREADED_STAGES &&
!hasLegacyEncoder()) {
out = new MatrixBlock();
DependencyThreadPool pool = new
DependencyThreadPool(k);
+ LOG.debug("Encoding with full DAG on " + k + "
Threads");
try {
pool.submitAllAndWait(getEncodeTasks(in, out, pool));
}
@@ -98,10 +98,10 @@ public class MultiColumnEncoder implements Encoder {
e.printStackTrace();
}
pool.shutdown();
- out.recomputeNonZeros();
- return out;
+ outputMatrixPostProcessing(out);
}
else {
+ LOG.debug("Encoding with staged approach on: "
+ k + " Threads");
build(in, k);
if(_legacyMVImpute != null) {
// These operations are redundant for
every encoder excluding the legacyMVImpute, the workaround to
@@ -127,9 +127,10 @@ public class MultiColumnEncoder implements Encoder {
List<DependencyTask<?>> applyTAgg = null;
Map<Integer[], Integer[]> depMap = new HashMap<>();
boolean hasDC =
getColumnEncoders(ColumnEncoderDummycode.class).size() > 0;
+ boolean applyOffsetDep = false;
tasks.add(DependencyThreadPool.createDependencyTask(new
InitOutputMatrixTask(this, in, out)));
for(ColumnEncoderComposite e : _columnEncoders) {
- List<DependencyTask<?>> buildTasks =
e.getBuildTasks(in, BUILD_BLOCKSIZE);
+ List<DependencyTask<?>> buildTasks =
e.getBuildTasks(in);
tasks.addAll(buildTasks);
if(buildTasks.size() > 0) {
@@ -152,11 +153,14 @@ public class MultiColumnEncoder implements Encoder {
// colUpdateTask can start when all domain
sizes, because it can now calculate the offsets for
// each column
depMap.put(new Integer[] {-2, -1}, new
Integer[] {tasks.size() - 1, tasks.size()});
+ buildTasks.forEach(t -> t.setPriority(5));
+ applyOffsetDep = true;
}
- if(hasDC) {
+ if(hasDC && applyOffsetDep) {
// Apply Task dependency to output col update
task (is last in list)
- // All ApplyTasks need to wait for this task so
they all have the correct offsets.
+ // All ApplyTasks need to wait for this task,
so they all have the correct offsets.
+ // But only for the columns that come after the
first DC coder since they don't have an offset
depMap.put(new Integer[] {tasks.size(),
tasks.size() + 1}, new Integer[] {-2, -1});
applyTAgg = applyTAgg == null ? new
ArrayList<>() : applyTAgg;
@@ -195,7 +199,7 @@ public class MultiColumnEncoder implements Encoder {
private List<DependencyTask<?>> getBuildTasks(FrameBlock in) {
List<DependencyTask<?>> tasks = new ArrayList<>();
for(ColumnEncoderComposite columnEncoder : _columnEncoders) {
- tasks.addAll(columnEncoder.getBuildTasks(in,
BUILD_BLOCKSIZE));
+ tasks.addAll(columnEncoder.getBuildTasks(in));
}
return tasks;
}
@@ -241,23 +245,23 @@ public class MultiColumnEncoder implements Encoder {
if(in.getNumColumns() != numEncoders)
throw new DMLRuntimeException("Not every column in has
a CompositeEncoder. Please make sure every column "
+ "has a encoder or slice the input
accordingly");
- // Block allocation for MT access
- outputMatrixPreProcessing(out, in);
// TODO smart checks
if(MULTI_THREADED && k > 1) {
+ // Block allocation for MT access
+ outputMatrixPreProcessing(out, in);
applyMT(in, out, outputCol, k);
}
else {
int offset = outputCol;
for(ColumnEncoderComposite columnEncoder :
_columnEncoders) {
columnEncoder.apply(in, out,
columnEncoder._colID - 1 + offset);
-
if(columnEncoder.hasEncoder(ColumnEncoderDummycode.class))
+ if
(columnEncoder.hasEncoder(ColumnEncoderDummycode.class))
offset +=
columnEncoder.getEncoder(ColumnEncoderDummycode.class)._domainSize - 1;
}
}
// Recomputing NNZ since we access the block directly
// TODO set NNZ explicit count them in the encoders
- out.recomputeNonZeros();
+ outputMatrixPostProcessing(out);
if(_legacyOmit != null)
out = _legacyOmit.apply(in, out);
if(_legacyMVImpute != null)
@@ -280,16 +284,26 @@ public class MultiColumnEncoder implements Encoder {
private void applyMT(FrameBlock in, MatrixBlock out, int outputCol, int
k) {
DependencyThreadPool pool = new DependencyThreadPool(k);
try {
- pool.submitAllAndWait(getApplyTasks(in, out,
outputCol));
+ if(APPLY_ENCODER_SEPARATE_STAGES){
+ int offset = outputCol;
+ for (ColumnEncoderComposite e :
_columnEncoders) {
+
pool.submitAllAndWait(e.getApplyTasks(in, out, e._colID - 1 + offset));
+ if
(e.hasEncoder(ColumnEncoderDummycode.class))
+ offset +=
e.getEncoder(ColumnEncoderDummycode.class)._domainSize - 1;
+ }
+ }else{
+ pool.submitAllAndWait(getApplyTasks(in, out,
outputCol));
+ }
}
catch(ExecutionException | InterruptedException e) {
- LOG.error("MT Column encode failed");
+ LOG.error("MT Column apply failed");
e.printStackTrace();
}
pool.shutdown();
}
private static void outputMatrixPreProcessing(MatrixBlock output,
FrameBlock input) {
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
output.allocateBlock();
if(output.isInSparseFormat()) {
SparseBlock block = output.getSparseBlock();
@@ -300,8 +314,33 @@ public class MultiColumnEncoder implements Encoder {
// allocate all sparse rows so MT sync can be
done.
// should be rare that rows have only 0
block.allocate(r, input.getNumColumns());
+ // Setting the size here makes it possible to
run all sparse apply tasks without any sync
+ // could become problematic if the input is
very sparse since we allocate the same size as the input
+ // should be fine in theory ;)
+
((SparseRowVector)block.get(r)).setSize(input.getNumColumns());
+ }
+ }
+ if(DMLScript.STATISTICS)
+
Statistics.incTransformOutMatrixPreProcessingTime(System.nanoTime()-t0);
+ }
+
+ private void outputMatrixPostProcessing(MatrixBlock output){
+ long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+ Set<Integer> indexSet =
getColumnEncoders(ColumnEncoderPassThrough.class).stream()
+
.map(ColumnEncoderPassThrough::getSparseRowsWZeros).flatMap(l -> {
+ if(l == null)
+ return null;
+ return l.stream();
+ }).collect(Collectors.toSet());
+ if(!indexSet.stream().allMatch(Objects::isNull)){
+ for(Integer row : indexSet){
+ // TODO: Maybe MT in special cases when the
number of rows is large
+ output.getSparseBlock().get(row).compact();
}
}
+ output.recomputeNonZeros();
+ if(DMLScript.STATISTICS)
+
Statistics.incTransformOutMatrixPostProcessingTime(System.nanoTime()-t0);
}
@Override
@@ -660,7 +699,7 @@ public class MultiColumnEncoder implements Encoder {
}
/*
- * Currently not in use will be integrated in the future
+ * Currently, not in use will be integrated in the future
*/
@SuppressWarnings("unused")
private static class MultiColumnLegacyBuildTask implements
Callable<Object> {
diff --git a/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java
b/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java
index 5aff39b..17351a6 100644
--- a/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java
+++ b/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java
@@ -25,16 +25,20 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.DMLRuntimeException;
-public class DependencyTask<E> implements Callable<E> {
+public class DependencyTask<E> implements Comparable<DependencyTask<?>>,
Callable<E> {
public static final boolean ENABLE_DEBUG_DATA = false;
+ protected static final Log LOG =
LogFactory.getLog(DependencyTask.class.getName());
private final Callable<E> _task;
protected final List<DependencyTask<?>> _dependantTasks;
public List<DependencyTask<?>> _dependencyTasks = null; // only for
debugging
private CompletableFuture<Future<?>> _future;
private int _rdy = 0;
+ private Integer _priority = 0;
private ExecutorService _pool;
public DependencyTask(Callable<E> task, List<DependencyTask<?>>
dependantTasks) {
@@ -54,6 +58,10 @@ public class DependencyTask<E> implements Callable<E> {
return _rdy == 0;
}
+ public void setPriority(int priority) {
+ _priority = priority;
+ }
+
private boolean decrease() {
synchronized(this) {
_rdy -= 1;
@@ -68,7 +76,11 @@ public class DependencyTask<E> implements Callable<E> {
@Override
public E call() throws Exception {
+ LOG.debug("Executing Task: " + this);
+ long t0 = System.nanoTime();
E ret = _task.call();
+ LOG.debug("Finished Task: " + this + " in: " +
+ (String.format("%.3f",
(System.nanoTime()-t0)*1e-9)) + "sec.");
_dependantTasks.forEach(t -> {
if(t.decrease()) {
if(_pool == null)
@@ -79,4 +91,14 @@ public class DependencyTask<E> implements Callable<E> {
return ret;
}
+
+ @Override
+ public String toString(){
+ return _task.toString() + "<Prio: " + _priority + ">" +
"<Waiting: " + _dependantTasks.size() + ">";
+ }
+
+ @Override
+ public int compareTo(DependencyTask<?> task) {
+ return -1 * this._priority.compareTo(task._priority);
+ }
}
diff --git
a/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java
b/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java
index 4fdd63a..50675d6 100644
--- a/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java
+++ b/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java
@@ -19,7 +19,12 @@
package org.apache.sysds.runtime.util;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.DMLRuntimeException;
+
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -31,11 +36,10 @@ import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import org.apache.sysds.runtime.DMLRuntimeException;
-
public class DependencyThreadPool {
+ protected static final Log LOG =
LogFactory.getLog(DependencyThreadPool.class.getName());
private final ExecutorService _pool;
public DependencyThreadPool(int k) {
@@ -50,6 +54,8 @@ public class DependencyThreadPool {
List<Future<Future<?>>> futures = new ArrayList<>();
List<Integer> rdyTasks = new ArrayList<>();
int i = 0;
+ // sort by priority
+ Collections.sort(dtasks);
for(DependencyTask<?> t : dtasks) {
CompletableFuture<Future<?>> f = new
CompletableFuture<>();
t.addPool(_pool);
@@ -63,6 +69,8 @@ public class DependencyThreadPool {
futures.add(f);
i++;
}
+ LOG.debug("Initial Starting tasks: \n\t" +
+ rdyTasks.stream().map(index ->
dtasks.get(index).toString()).collect(Collectors.joining("\n\t")));
// Two stages to avoid race condition!
for(Integer index : rdyTasks) {
synchronized(_pool) {
diff --git a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
index de6d7d6..7431a82 100644
--- a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
@@ -989,6 +989,15 @@ public class UtilFunctions {
return (blockSize <= 0)? arrayLength: Math.min(arrayLength,
startIndex + blockSize);
}
+ public static int[] getBlockSizes(int num, int numBlocks){
+ int[] blockSizes = new int[numBlocks];
+ Arrays.fill(blockSizes, num/numBlocks);
+ for (int i = 0; i < num%numBlocks; i++){
+ blockSizes[i]++;
+ }
+ return blockSizes;
+ }
+
public static String[] splitRecodeEntry(String s) {
//forward to column encoder, as UtilFunctions available in map
context
return ColumnEncoderRecode.splitRecodeMapEntry(s);
diff --git a/src/main/java/org/apache/sysds/utils/Statistics.java
b/src/main/java/org/apache/sysds/utils/Statistics.java
index cacd2f7..d91d9c5 100644
--- a/src/main/java/org/apache/sysds/utils/Statistics.java
+++ b/src/main/java/org/apache/sysds/utils/Statistics.java
@@ -19,20 +19,6 @@
package org.apache.sysds.utils;
-import java.lang.management.CompilationMXBean;
-import java.lang.management.GarbageCollectorMXBean;
-import java.lang.management.ManagementFactory;
-import java.text.DecimalFormat;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.DoubleAdder;
-import java.util.concurrent.atomic.LongAdder;
-
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.sysds.api.DMLScript;
@@ -51,6 +37,20 @@ import
org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
import org.apache.sysds.runtime.lineage.LineageCacheStatistics;
import org.apache.sysds.runtime.privacy.CheckedConstraintsLog;
+import java.lang.management.CompilationMXBean;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.text.DecimalFormat;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.DoubleAdder;
+import java.util.concurrent.atomic.LongAdder;
+
/**
* This class captures all statistics.
*/
@@ -149,7 +149,28 @@ public class Statistics
private static final LongAdder lTotalUIPVar = new LongAdder();
private static final LongAdder lTotalLix = new LongAdder();
private static final LongAdder lTotalLixUIP = new LongAdder();
-
+
+ // Transformencode stats
+ private static final LongAdder transformEncoderCount = new LongAdder();
+
+ //private static final LongAdder transformBuildTime = new LongAdder();
+ private static final LongAdder transformRecodeBuildTime = new
LongAdder();
+ private static final LongAdder transformBinningBuildTime = new
LongAdder();
+ private static final LongAdder transformImputeBuildTime = new
LongAdder();
+
+ //private static final LongAdder transformApplyTime = new LongAdder();
+ private static final LongAdder transformRecodeApplyTime = new
LongAdder();
+ private static final LongAdder transformDummyCodeApplyTime = new
LongAdder();
+ private static final LongAdder transformPassThroughApplyTime = new
LongAdder();
+ private static final LongAdder transformFeatureHashingApplyTime = new
LongAdder();
+ private static final LongAdder transformBinningApplyTime = new
LongAdder();
+ private static final LongAdder transformOmitApplyTime = new LongAdder();
+ private static final LongAdder transformImputeApplyTime = new
LongAdder();
+
+
+ private static final LongAdder transformOutMatrixPreProcessingTime =
new LongAdder();
+ private static final LongAdder transformOutMatrixPostProcessingTime =
new LongAdder();
+
// Federated stats
private static final LongAdder federatedReadCount = new LongAdder();
private static final LongAdder federatedPutCount = new LongAdder();
@@ -649,6 +670,70 @@ public class Statistics
public static void accFedPSCommunicationTime(long t) {
fedPSCommunicationTime.add(t);}
+ public static void incTransformEncoderCount(long encoders){
+ transformEncoderCount.add(encoders);
+ }
+
+ public static void incTransformRecodeApplyTime(long t){
+ transformRecodeApplyTime.add(t);
+ }
+
+ public static void incTransformDummyCodeApplyTime(long t){
+ transformDummyCodeApplyTime.add(t);
+ }
+
+ public static void incTransformBinningApplyTime(long t){
+ transformBinningApplyTime.add(t);
+ }
+
+ public static void incTransformPassThroughApplyTime(long t){
+ transformPassThroughApplyTime.add(t);
+ }
+
+ public static void incTransformFeatureHashingApplyTime(long t){
+ transformFeatureHashingApplyTime.add(t);
+ }
+
+ public static void incTransformOmitApplyTime(long t) {
+ transformOmitApplyTime.add(t);
+ }
+
+ public static void incTransformImputeApplyTime(long t) {
+ transformImputeApplyTime.add(t);
+ }
+
+ public static void incTransformRecodeBuildTime(long t){
+ transformRecodeBuildTime.add(t);
+ }
+
+ public static void incTransformBinningBuildTime(long t){
+ transformBinningBuildTime.add(t);
+ }
+
+ public static void incTransformImputeBuildTime(long t) {
+ transformImputeBuildTime.add(t);
+ }
+
+ public static void incTransformOutMatrixPreProcessingTime(long t){
+ transformOutMatrixPreProcessingTime.add(t);
+ }
+
+ public static void incTransformOutMatrixPostProcessingTime(long t){
+ transformOutMatrixPostProcessingTime.add(t);
+ }
+
+ public static long getTransformEncodeBuildTime(){
+ return transformBinningBuildTime.longValue() +
transformImputeBuildTime.longValue() +
+ transformRecodeBuildTime.longValue();
+ }
+
+ public static long getTransformEncodeApplyTime(){
+ return transformDummyCodeApplyTime.longValue() +
transformBinningApplyTime.longValue() +
+ transformFeatureHashingApplyTime.longValue() +
transformPassThroughApplyTime.longValue() +
+ transformRecodeApplyTime.longValue() +
transformOmitApplyTime.longValue() +
+ transformImputeApplyTime.longValue();
+ }
+
public static String getCPHeavyHitterCode( Instruction inst )
{
String opcode = null;
@@ -1129,6 +1214,50 @@ public class Statistics
federatedExecuteInstructionCount.longValue() + "/" +
federatedExecuteUDFCount.longValue() +
".\n");
}
+ if( transformEncoderCount.longValue() > 0) {
+ //TODO: Cleanup and condense
+ sb.append("TransformEncode num.
encoders:\t").append(transformEncoderCount.longValue()).append("\n");
+ sb.append("TransformEncode build
time:\t").append(String.format("%.3f",
+
getTransformEncodeBuildTime()*1e-9)).append(" sec.\n");
+ if(transformRecodeBuildTime.longValue() > 0)
+ sb.append("\tRecode build
time:\t").append(String.format("%.3f",
+
transformRecodeBuildTime.longValue()*1e-9)).append(" sec.\n");
+ if(transformBinningBuildTime.longValue() > 0)
+ sb.append("\tBinning build
time:\t").append(String.format("%.3f",
+
transformBinningBuildTime.longValue()*1e-9)).append(" sec.\n");
+ if(transformImputeBuildTime.longValue() > 0)
+ sb.append("\tImpute build
time:\t").append(String.format("%.3f",
+
transformImputeBuildTime.longValue()*1e-9)).append(" sec.\n");
+
+ sb.append("TransformEncode apply
time:\t").append(String.format("%.3f",
+
getTransformEncodeApplyTime()*1e-9)).append(" sec.\n");
+ if(transformRecodeApplyTime.longValue() > 0)
+ sb.append("\tRecode apply
time:\t").append(String.format("%.3f",
+
transformRecodeApplyTime.longValue()*1e-9)).append(" sec.\n");
+ if(transformBinningApplyTime.longValue() > 0)
+ sb.append("\tBinning apply
time:\t").append(String.format("%.3f",
+
transformBinningApplyTime.longValue()*1e-9)).append(" sec.\n");
+ if(transformDummyCodeApplyTime.longValue() > 0)
+ sb.append("\tDummyCode apply
time:\t").append(String.format("%.3f",
+
transformDummyCodeApplyTime.longValue()*1e-9)).append(" sec.\n");
+ if(transformFeatureHashingApplyTime.longValue()
> 0)
+ sb.append("\tHashing apply
time:\t").append(String.format("%.3f",
+
transformFeatureHashingApplyTime.longValue()*1e-9)).append(" sec.\n");
+ if(transformPassThroughApplyTime.longValue() >
0)
+ sb.append("\tPassThrough apply
time:\t").append(String.format("%.3f",
+
transformPassThroughApplyTime.longValue()*1e-9)).append(" sec.\n");
+ if(transformOmitApplyTime.longValue() > 0)
+ sb.append("\tOmit apply
time:\t").append(String.format("%.3f",
+
transformOmitApplyTime.longValue()*1e-9)).append(" sec.\n");
+ if(transformImputeApplyTime.longValue() > 0)
+ sb.append("\tImpute apply
time:\t").append(String.format("%.3f",
+
transformImputeApplyTime.longValue()*1e-9)).append(" sec.\n");
+
+ sb.append("TransformEncode PreProc.
time:\t").append(String.format("%.3f",
+
transformOutMatrixPreProcessingTime.longValue()*1e-9)).append(" sec.\n");
+ sb.append("TransformEncode PostProc.
time:\t").append(String.format("%.3f",
+
transformOutMatrixPostProcessingTime.longValue()*1e-9)).append(" sec.\n");
+ }
if(ConfigurationManager.isCompressionEnabled()){
DMLCompressionStatistics.display(sb);
diff --git
a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java
b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java
index 8824b9d..b70571b 100644
---
a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java
@@ -30,6 +30,7 @@ import org.apache.sysds.common.Types;
import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
import org.apache.sysds.runtime.io.FrameReaderFactory;
import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.transform.encode.ColumnEncoder;
import org.apache.sysds.runtime.transform.encode.ColumnEncoderBin;
import org.apache.sysds.runtime.transform.encode.ColumnEncoderRecode;
import org.apache.sysds.runtime.transform.encode.EncoderFactory;
@@ -173,6 +174,7 @@ public class TransformFrameBuildMultithreadedTest extends
AutomatedTestBase {
.readFrameFromHDFS(DATASET, -1L, -1L);
StringBuilder specSb = new StringBuilder();
Files.readAllLines(Paths.get(SPEC)).forEach(s ->
specSb.append(s).append("\n"));
+ ColumnEncoder.BUILD_ROW_BLOCKS_PER_COLUMN =
Math.max(blockSize, 1);
MultiColumnEncoder encoderS =
EncoderFactory.createEncoder(specSb.toString(), input.getColumnNames(),
input.getNumColumns(), null);
MultiColumnEncoder encoderM =
EncoderFactory.createEncoder(specSb.toString(), input.getColumnNames(),
diff --git
a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java
b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java
index 2156250..6679f36 100644
---
a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java
@@ -48,7 +48,7 @@ public class TransformFrameEncodeMultithreadedTest extends
AutomatedTestBase {
private final static String DATASET1 = "homes3/homes.csv";
private final static String SPEC1 = "homes3/homes.tfspec_recode.json";
private final static String SPEC2 = "homes3/homes.tfspec_dummy.json";
- private final static String SPEC2all =
"homes3/homes.tfspec_dummy_all.json";
+ private final static String SPEC2sparse =
"homes3/homes.tfspec_dummy_sparse.json";
private final static String SPEC3 = "homes3/homes.tfspec_bin.json"; //
recode
private final static String SPEC6 =
"homes3/homes.tfspec_recode_dummy.json";
private final static String SPEC7 =
"homes3/homes.tfspec_binDummy.json"; // recode+dummy
@@ -164,7 +164,7 @@ public class TransformFrameEncodeMultithreadedTest extends
AutomatedTestBase {
DATASET = DATASET1;
break;
case DUMMY_ALL:
- SPEC = SPEC2all;
+ SPEC = SPEC2sparse;
DATASET = DATASET1;
break;
case BIN:
diff --git a/src/test/resources/datasets/homes3/homes.tfspec_dummy_all.json
b/src/test/resources/datasets/homes3/homes.tfspec_dummy_all.json
deleted file mode 100644
index 65b8fee..0000000
--- a/src/test/resources/datasets/homes3/homes.tfspec_dummy_all.json
+++ /dev/null
@@ -1 +0,0 @@
-{"ids": true, "dummycode": [ 2, 7, 1, 3, 4, 5, 6, 8, 9 ] }
\ No newline at end of file
diff --git a/src/test/resources/datasets/homes3/homes.tfspec_dummy_sparse.json
b/src/test/resources/datasets/homes3/homes.tfspec_dummy_sparse.json
new file mode 100644
index 0000000..ed48308
--- /dev/null
+++ b/src/test/resources/datasets/homes3/homes.tfspec_dummy_sparse.json
@@ -0,0 +1 @@
+{"ids": true, "dummycode": [ 2, 7, 1, 3, 4, 6, 8, 9 ] }
\ No newline at end of file