This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new 6321dbb [SYSTEMDS-2641] Improved slice finding algorithm (script,
spark ops)
6321dbb is described below
commit 6321dbb61e9593e49c59318f3d9cb38a8b0e9c2a
Author: Matthias Boehm <[email protected]>
AuthorDate: Tue Feb 2 19:22:59 2021 +0100
[SYSTEMDS-2641] Improved slice finding algorithm (script, spark ops)
This patch extends the slice finding algorithm for improved performance,
especially over large distributed matrices, with millions of features.
Furthermore, this also includes various fixes and improvements that were
encountered along the way.
* Slice finder algorithm extension (pruning of unnecessary columns,
avoid ultra-sparse mm via simplified deduplication)
* Propagate noempty block meta data to avoid inconsistent treatment wrt
number of partitions
* Checkpoint (avoid unnecessary repartitioning, nnz analysis)
* Improved empty block filtering in spark cpmm instructions
* Consistent handling spark ctable and rexpand post-processing
* Fix spark removeEmpty instruction parsing (wrong broadcast flag
leading to unnecessary replication)
* Fix csv tests (NPEs w/o output buffering, formatting)
---
scripts/builtin/slicefinder.dml | 31 +++++++----
.../apache/sysds/hops/ParameterizedBuiltinOp.java | 11 ++--
.../spark/CheckpointSPInstruction.java | 12 ++---
.../instructions/spark/CpmmSPInstruction.java | 5 +-
.../instructions/spark/CtableSPInstruction.java | 8 +--
.../spark/ParameterizedBuiltinSPInstruction.java | 10 ++--
.../instructions/spark/utils/SparkUtils.java | 16 +++++-
.../sysds/runtime/meta/DataCharacteristics.java | 13 ++++-
.../functions/builtin/BuiltinSliceFinderTest.java | 62 ++++++++++++++++++----
.../sysds/test/functions/io/csv/CSVTestBase.java | 20 +++----
.../sysds/test/functions/io/csv/ReadCSVTest.java | 40 +++++++-------
src/test/scripts/functions/builtin/slicefinder.dml | 5 +-
12 files changed, 156 insertions(+), 77 deletions(-)
diff --git a/scripts/builtin/slicefinder.dml b/scripts/builtin/slicefinder.dml
index 9c49718..a6b8bfd 100644
--- a/scripts/builtin/slicefinder.dml
+++ b/scripts/builtin/slicefinder.dml
@@ -29,6 +29,8 @@
# tpEval flag for task-parallel slice evaluation,
# otherwise data-parallel
# tpBlksz block size for task-parallel execution (num slices)
+# selFeat flag for removing one-hot-encoded features that don't satisfy
+# the initial minimum-support constraint and/or have zero error
# verbose flag for verbose debug output
# ------------------------------------------------------------
# TK top-k slices (k x ncol(X) if successful)
@@ -36,9 +38,9 @@
# D debug matrix, populated with enumeration stats if verbose
# ------------------------------------------------------------
-m_slicefinder = function(Matrix[Double] X, Matrix[Double] e,
- Integer k = 4, Integer maxL = 0, Integer minSup = 32, Double alpha = 0.5,
- Boolean tpEval = TRUE, Integer tpBlksz = 16, Boolean verbose = FALSE)
+m_slicefinder = function(Matrix[Double] X, Matrix[Double] e, Int k = 4,
+ Int maxL = 0, Int minSup = 32, Double alpha = 0.5, Boolean tpEval = TRUE,
+ Int tpBlksz = 16, Boolean selFeat = FALSE, Boolean verbose = FALSE)
return(Matrix[Double] TK, Matrix[Double] TKC, Matrix[Double] D)
{
# init debug matrix: levelID, enumerated S, valid S, TKmax, TKmin
@@ -58,7 +60,7 @@ m_slicefinder = function(Matrix[Double] X, Matrix[Double] e,
# initialize statistics and basic slices
n2 = ncol(X2); # one-hot encoded features
eAvg = sum(e) / m; # average error
- [S, R] = createAndScoreBasicSlices(X2, e, eAvg, minSup, alpha, verbose);
+ [S, R, selCols] = createAndScoreBasicSlices(X2, e, eAvg, minSup, alpha,
verbose);
# initialize top-k
[TK, TKC] = maintainTopK(S, R, matrix(0,0,n2), matrix(0,0,4), k, minSup);
@@ -69,6 +71,10 @@ m_slicefinder = function(Matrix[Double] X, Matrix[Double] e,
D = rbind(D, t(as.matrix(list(1, n2, nrow(S), maxsc, minsc))));
}
+ # reduced dataset to relevant attributes (minSup, err>0), S reduced
on-the-fly
+ if( selFeat )
+ X2 = removeEmpty(target=X2, margin="cols", select=t(selCols));
+
# lattice enumeration w/ size/error pruning, one iteration per level
# termination condition (max #feature levels)
maxL = ifelse(maxL<=0, n, maxL)
@@ -79,6 +85,9 @@ m_slicefinder = function(Matrix[Double] X, Matrix[Double] e,
# enumerate candidate join pairs, incl size/error pruning
nrS = nrow(S);
S = getPairedCandidates(S, R, TK, TKC, k, level, eAvg, minSup, alpha, n2,
foffb, foffe);
+ S2 = S;
+ if(selFeat)
+ S2 = removeEmpty(target=S, margin="cols", select=t(selCols));
if(verbose) {
print("\nSliceFinder: level "+level+":")
@@ -92,11 +101,11 @@ m_slicefinder = function(Matrix[Double] X, Matrix[Double]
e,
parfor( i in 1:ceil(nrow(S)/tpBlksz), check=0 ) {
beg = (i-1)*tpBlksz + 1;
end = min(i*tpBlksz, nrow(R));
- R[beg:end,] = evalSlice(X2, e, eAvg, t(S[beg:end,]), level, alpha);
+ R[beg:end,] = evalSlice(X2, e, eAvg, t(S2[beg:end,]), level, alpha);
}
}
else { # data-parallel
- R = evalSlice(X2, e, eAvg, t(S), level, alpha);
+ R = evalSlice(X2, e, eAvg, t(S2), level, alpha);
}
# maintain top-k after evaluation
@@ -121,7 +130,7 @@ m_slicefinder = function(Matrix[Double] X, Matrix[Double] e,
createAndScoreBasicSlices = function(Matrix[Double] X2, Matrix[Double] e,
Double eAvg, Double minSup, Double alpha, Boolean verbose)
- return(Matrix[Double] S, Matrix[Double] R)
+ return(Matrix[Double] S, Matrix[Double] R, Matrix[Double] selCols)
{
n2 = ncol(X2);
cCnts = t(colSums(X2)); # column counts
@@ -287,11 +296,13 @@ getPairedCandidates = function(Matrix[Double] S,
Matrix[Double] R,
fParents = (numParents == level);
# apply all pruning
- map = map * (fSizes & fScores & fParents);
+ fall = (fSizes & fScores & fParents);
# deduplication of join outputs
- Dedup = removeEmpty(target=map, margin="rows") != 0
- P = (Dedup %*% P) != 0
+ Dedup = removeEmpty(target=map, margin="rows", select=fall) != 0
+ #P = (Dedup %*% P) != 0, replaced by below (easier sparsity propagation)
+ DeI = table(rowIndexMax(Dedup), 1, nrow(P), 1);
+ P = removeEmpty(target=P, margin="rows", select=DeI);
}
}
diff --git a/src/main/java/org/apache/sysds/hops/ParameterizedBuiltinOp.java
b/src/main/java/org/apache/sysds/hops/ParameterizedBuiltinOp.java
index 568d5c7..298fd6a 100644
--- a/src/main/java/org/apache/sysds/hops/ParameterizedBuiltinOp.java
+++ b/src/main/java/org/apache/sysds/hops/ParameterizedBuiltinOp.java
@@ -973,16 +973,19 @@ public class ParameterizedBuiltinOp extends
MultiThreadedHop {
*/
private boolean isRemoveEmptyBcSP() // TODO find if 2 x size needed.
{
- Hop input = getInput().get(0);
-
//note: both cases (partitioned matrix, and sorted double
array), require to
//fit the broadcast twice into the local memory budget. Also,
the memory
//constraint only needs to take the rhs into account because
the output is
//guaranteed to be an aggregate of <=16KB
+ Hop input = getInput().get(0);
+ Hop margin = getParameterHop("margin");
+ boolean col = (margin instanceof LiteralOp) ?
+ ((LiteralOp)margin).getStringValue().equals("cols") :
false;
+
double size = input.dimsKnown() ?
- OptimizerUtils.estimateSize(input.getDim1(), 1) :
//dims known and estimate fits
- input.getOutputMemEstimate();
//dims unknown but worst-case estimate fits
+
OptimizerUtils.estimateSize(col?input.getDim2():input.getDim1(), 1) :
+ input.getOutputMemEstimate();
return OptimizerUtils.checkSparkBroadcastMemoryBudget(size);
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/CheckpointSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/CheckpointSPInstruction.java
index 7475c4f..a273da8 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/CheckpointSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/CheckpointSPInstruction.java
@@ -111,11 +111,11 @@ public class CheckpointSPInstruction extends
UnarySPInstruction {
{
//determine need for coalesce or repartition, and csr
conversion
int numPartitions =
SparkUtils.getNumPreferredPartitions(mcIn, in);
- boolean coalesce = ( 1.2*numPartitions <
in.getNumPartitions()
+ boolean coalesce = ( 1.4 * numPartitions <
in.getNumPartitions()
&& !SparkUtils.isHashPartitioned(in) &&
in.getNumPartitions()
>
SparkExecutionContext.getDefaultParallelism(true));
boolean repartition = mcIn.dimsKnown(true) &&
mcIn.isUltraSparse()
- && numPartitions > in.getNumPartitions();
+ && numPartitions > 1.4 * in.getNumPartitions();
boolean mcsr2csr =
input1.getDataType()==DataType.MATRIX
&&
OptimizerUtils.checkSparseBlockCSRConversion(mcIn)
&& !_level.equals(Checkpoint.SER_STORAGE_LEVEL);
@@ -153,11 +153,9 @@ public class CheckpointSPInstruction extends
UnarySPInstruction {
//actual checkpoint into given storage level
out = out.persist( _level );
- //trigger nnz computation for datasets that are forced
to spark by their dimensions
- //(larger than MAX_INT) to handle ultra-sparse data
sets during recompilation because
- //otherwise these their nnz would never be evaluated
due to lazy evaluation in spark
- if( input1.isMatrix() && mcIn.dimsKnown()
- && !mcIn.dimsKnown(true) &&
!OptimizerUtils.isValidCPDimensions(mcIn) ) {
+ //trigger nnz computation for all cached (i.e.,
read-only) datasets
+ //otherwise their nnz would never be evaluated due to
lazy evaluation in spark
+ if( input1.isMatrix() && mcIn.dimsKnown() &&
!mcIn.dimsKnown(true) ) {
mcIn.setNonZeros(SparkUtils.getNonZeros((JavaPairRDD<MatrixIndexes,MatrixBlock>)out));
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/CpmmSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/CpmmSPInstruction.java
index ab98af3..d5c0225 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/CpmmSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/CpmmSPInstruction.java
@@ -88,7 +88,8 @@ public class CpmmSPInstruction extends BinarySPInstruction {
DataCharacteristics mc1 =
sec.getDataCharacteristics(input1.getName());
DataCharacteristics mc2 =
sec.getDataCharacteristics(input2.getName());
- if( !_outputEmptyBlocks || _aggtype ==
SparkAggType.SINGLE_BLOCK ) {
+ if( !_outputEmptyBlocks || _aggtype == SparkAggType.SINGLE_BLOCK
+ || mc1.isNoEmptyBlocks() || mc2.isNoEmptyBlocks() ) {
//prune empty blocks of ultra-sparse matrices
in1 = in1.filter(new FilterNonEmptyBlocksFunction());
in2 = in2.filter(new FilterNonEmptyBlocksFunction());
@@ -133,7 +134,7 @@ public class CpmmSPInstruction extends BinarySPInstruction {
sec.setMatrixOutput(output.getName(), out2);
}
else { //DEFAULT: MULTI_BLOCK
- if( !_outputEmptyBlocks )
+ if( !_outputEmptyBlocks ||
mc1.isNoEmptyBlocks() || mc2.isNoEmptyBlocks() )
out = out.filter(new
FilterNonEmptyBlocksFunction());
out = RDDAggregateUtils.sumByKeyStable(out,
false);
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/CtableSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/CtableSPInstruction.java
index ec8d637..038a55c 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/CtableSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/CtableSPInstruction.java
@@ -24,7 +24,6 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.sysds.common.Types.ValueType;
-import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.lops.Ctable;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
@@ -132,6 +131,7 @@ public class CtableSPInstruction extends
ComputationSPInstruction {
}
mcOut.set(dim1, dim2, mc1.getBlocksize());
mcOut.setNonZerosBound(mc1.getLength()); //vector or matrix
+ mcOut.setNoEmptyBlocks(!_outputEmptyBlocks);
if( !mcOut.dimsKnown() )
throw new DMLRuntimeException("Unknown ctable output
dimensions: "+mcOut);
@@ -192,11 +192,7 @@ public class CtableSPInstruction extends
ComputationSPInstruction {
sec.addLineageRDD(output.getName(), input3.getName());
//post-processing to obtain sparsity of ultra-sparse outputs
- long memUB = OptimizerUtils.estimateSizeExactSparsity(
- mcOut.getRows(), mcOut.getCols(),
mcOut.getNonZerosBound());
- if( !OptimizerUtils.exceedsCachingThreshold(mcOut.getCols(),
memUB) //< mem budget
- && memUB <
OptimizerUtils.estimateSizeExactSparsity(mcOut))
- sec.getMatrixObject(output).acquireReadAndRelease();
+
SparkUtils.postprocessUltraSparseOutput(sec.getMatrixObject(output), mcOut);
}
private static class CTableFunction implements
PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes, MatrixBlock[]>>,
MatrixIndexes, MatrixBlock>
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index e5e6b1c..cee035b 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -154,8 +154,8 @@ public class ParameterizedBuiltinSPInstruction extends
ComputationSPInstruction
}
else if (opcode.equalsIgnoreCase("rmempty")) {
func =
ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
- return new
ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out,
opcode, str,
- parts.length > 6 ?
Boolean.parseBoolean(parts[5]) : false);
+ return new
ParameterizedBuiltinSPInstruction(new SimpleOperator(func),
+ paramsMap, out, opcode, str,
Boolean.parseBoolean(parts[parts.length-2]));
}
else if (opcode.equalsIgnoreCase("rexpand")
|| opcode.equalsIgnoreCase("replace")
@@ -428,9 +428,13 @@ public class ParameterizedBuiltinSPInstruction extends
ComputationSPInstruction
sec.setRDDHandleForVariable(output.getName(), out);
sec.addLineageRDD(output.getName(), rddInVar);
- //update output statistics (required for correctness)
+ //update output statistics (required for correctness,
nnz unknown due to cut-off)
DataCharacteristics mcOut =
sec.getDataCharacteristics(output.getName());
mcOut.set(dirRows?lmaxVal:mcIn.getRows(),
dirRows?mcIn.getRows():lmaxVal, (int)blen, -1);
+ mcOut.setNonZerosBound(mcIn.getRows());
+
+ //post-processing to obtain sparsity of ultra-sparse
outputs
+
SparkUtils.postprocessUltraSparseOutput(sec.getMatrixObject(output), mcOut);
}
else if ( opcode.equalsIgnoreCase("transformapply") )
{
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/SparkUtils.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/SparkUtils.java
index 4aeac25..a6a1c15 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/SparkUtils.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/SparkUtils.java
@@ -28,6 +28,7 @@ import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.storage.StorageLevel;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.lops.Checkpoint;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysds.runtime.data.IndexedTensorBlock;
@@ -129,7 +130,7 @@ public class SparkUtils
}
public static int getNumPreferredPartitions(DataCharacteristics dc) {
- return getNumPreferredPartitions(dc, true);
+ return getNumPreferredPartitions(dc, !dc.isNoEmptyBlocks());
}
public static int getNumPreferredPartitions(DataCharacteristics dc,
boolean outputEmptyBlocks) {
@@ -265,12 +266,25 @@ public class SparkUtils
return ret;
}
+ @SuppressWarnings("unchecked")
+ public static long getNonZeros(MatrixObject mo) {
+ return getNonZeros((JavaPairRDD<MatrixIndexes,
MatrixBlock>)mo.getRDDHandle().getRDD());
+ }
+
public static long getNonZeros(JavaPairRDD<MatrixIndexes, MatrixBlock>
input) {
//note: avoid direct lambda expression due reduce unnecessary
GC overhead
return input.filter(new FilterNonEmptyBlocksFunction())
.values().mapPartitions(new
RecomputeNnzFunction()).reduce((a,b)->a+b);
}
+ public static void postprocessUltraSparseOutput(MatrixObject mo,
DataCharacteristics mcOut) {
+ long memUB = OptimizerUtils.estimateSizeExactSparsity(
+ mcOut.getRows(), mcOut.getCols(),
mcOut.getNonZerosBound());
+ if( !OptimizerUtils.exceedsCachingThreshold(mcOut.getCols(),
memUB) //< mem budget
+ && memUB <
OptimizerUtils.estimateSizeExactSparsity(mcOut))
+ mo.acquireReadAndRelease();
+ }
+
private static class AnalyzeCellDataCharacteristics implements
Function<Tuple2<MatrixIndexes,MatrixCell>, DataCharacteristics>
{
private static final long serialVersionUID =
8899395272683723008L;
diff --git
a/src/main/java/org/apache/sysds/runtime/meta/DataCharacteristics.java
b/src/main/java/org/apache/sysds/runtime/meta/DataCharacteristics.java
index a28d98d..17900cb 100644
--- a/src/main/java/org/apache/sysds/runtime/meta/DataCharacteristics.java
+++ b/src/main/java/org/apache/sysds/runtime/meta/DataCharacteristics.java
@@ -29,7 +29,8 @@ import java.io.Serializable;
public abstract class DataCharacteristics implements Serializable {
private static final long serialVersionUID = 3411056029517599342L;
- protected int _blocksize;
+ protected int _blocksize; // squared block size
+ protected boolean _noEmptyBlocks = false; // does not materialize empty
blocks
public DataCharacteristics set(long nr, long nc, int blen) {
throw new DMLRuntimeException("DataCharacteristics.set(long,
long, int): should never get called in the base class");
@@ -79,6 +80,16 @@ public abstract class DataCharacteristics implements
Serializable {
_blocksize = blen;
return this;
}
+
+
+ public DataCharacteristics setNoEmptyBlocks(boolean flag) {
+ _noEmptyBlocks = flag;
+ return this;
+ }
+
+ public boolean isNoEmptyBlocks() {
+ return _noEmptyBlocks;
+ }
public long getNumBlocks() {
throw new
DMLRuntimeException("DataCharacteristics.getNumBlocks(int): should never get
called in the base class");
diff --git
a/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinSliceFinderTest.java
b/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinSliceFinderTest.java
index 2a0c5e9..3055a82 100644
---
a/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinSliceFinderTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinSliceFinderTest.java
@@ -59,42 +59,82 @@ public class BuiltinSliceFinderTest extends
AutomatedTestBase
@Test
public void testTop4HybridDP() {
- runSliceFinderTest(4, true, ExecMode.HYBRID);
+ runSliceFinderTest(4, true, false, ExecMode.HYBRID);
}
@Test
public void testTop4SinglenodeDP() {
- runSliceFinderTest(4, true, ExecMode.SINGLE_NODE);
+ runSliceFinderTest(4, true, false, ExecMode.SINGLE_NODE);
}
@Test
public void testTop4HybridTP() {
- runSliceFinderTest(4, false, ExecMode.HYBRID);
+ runSliceFinderTest(4, false, false, ExecMode.HYBRID);
}
@Test
public void testTop4SinglenodeTP() {
- runSliceFinderTest(4, false, ExecMode.SINGLE_NODE);
+ runSliceFinderTest(4, false, false, ExecMode.SINGLE_NODE);
}
@Test
public void testTop10HybridDP() {
- runSliceFinderTest(10, true, ExecMode.HYBRID);
+ runSliceFinderTest(10, true, false, ExecMode.HYBRID);
}
@Test
public void testTop10SinglenodeDP() {
- runSliceFinderTest(10, true, ExecMode.SINGLE_NODE);
+ runSliceFinderTest(10, true, false, ExecMode.SINGLE_NODE);
}
@Test
public void testTop10HybridTP() {
- runSliceFinderTest(10, false, ExecMode.HYBRID);
+ runSliceFinderTest(10, false, false, ExecMode.HYBRID);
}
@Test
public void testTop10SinglenodeTP() {
- runSliceFinderTest(10, false, ExecMode.SINGLE_NODE);
+ runSliceFinderTest(10, false, false, ExecMode.SINGLE_NODE);
+ }
+
+ @Test
+ public void testTop4HybridDPSel() {
+ runSliceFinderTest(4, true, true, ExecMode.HYBRID);
+ }
+
+ @Test
+ public void testTop4SinglenodeDPSel() {
+ runSliceFinderTest(4, true, true, ExecMode.SINGLE_NODE);
+ }
+
+ @Test
+ public void testTop4HybridTPSel() {
+ runSliceFinderTest(4, false, true, ExecMode.HYBRID);
+ }
+
+ @Test
+ public void testTop4SinglenodeTPSel() {
+ runSliceFinderTest(4, false, true, ExecMode.SINGLE_NODE);
+ }
+
+ @Test
+ public void testTop10HybridDPSel() {
+ runSliceFinderTest(10, true, true, ExecMode.HYBRID);
+ }
+
+ @Test
+ public void testTop10SinglenodeDPSel() {
+ runSliceFinderTest(10, true, true, ExecMode.SINGLE_NODE);
+ }
+
+ @Test
+ public void testTop10HybridTPSel() {
+ runSliceFinderTest(10, false, true, ExecMode.HYBRID);
+ }
+
+ @Test
+ public void testTop10SinglenodeTPSel() {
+ runSliceFinderTest(10, false, true, ExecMode.SINGLE_NODE);
}
// @Test
@@ -102,7 +142,7 @@ public class BuiltinSliceFinderTest extends
AutomatedTestBase
// runSliceFinderTest(10, false, ExecMode.SPARK);
// }
- private void runSliceFinderTest(int K, boolean dp, ExecMode mode) {
+ private void runSliceFinderTest(int K, boolean dp, boolean selCols,
ExecMode mode) {
ExecMode platformOld = setExecMode(mode);
loadTestConfiguration(getTestConfiguration(TEST_NAME));
String HOME = SCRIPT_DIR + TEST_DIR;
@@ -124,8 +164,8 @@ public class BuiltinSliceFinderTest extends
AutomatedTestBase
//execute main test
fullDMLScriptName = HOME + TEST_NAME + ".dml";
- programArgs = new String[]{"-args", input("X"),
input("e"),
-
String.valueOf(K),String.valueOf(!dp).toUpperCase(),
+ programArgs = new String[]{"-args", input("X"),
input("e"), String.valueOf(K),
+ String.valueOf(!dp).toUpperCase(),
String.valueOf(selCols).toUpperCase(),
String.valueOf(VERBOSE).toUpperCase(),
output("R")};
fullRScriptName = HOME + TEST_NAME + ".R";
rCmd = "Rscript" + " " + fullRScriptName + " " +
inputDir() + " " + String.valueOf(K)
diff --git
a/src/test/java/org/apache/sysds/test/functions/io/csv/CSVTestBase.java
b/src/test/java/org/apache/sysds/test/functions/io/csv/CSVTestBase.java
index 92da006..6112d39 100644
--- a/src/test/java/org/apache/sysds/test/functions/io/csv/CSVTestBase.java
+++ b/src/test/java/org/apache/sysds/test/functions/io/csv/CSVTestBase.java
@@ -25,17 +25,17 @@ import org.apache.sysds.test.AutomatedTestBase;
import org.apache.sysds.test.TestConfiguration;
public abstract class CSVTestBase extends AutomatedTestBase {
- protected final static String TEST_DIR = "functions/io/csv/";
- protected static final Log LOG =
LogFactory.getLog(CSVTestBase.class.getName());
- protected final static double eps = 1e-9;
+ protected final static String TEST_DIR = "functions/io/csv/";
+ protected static final Log LOG =
LogFactory.getLog(CSVTestBase.class.getName());
+ protected final static double eps = 1e-9;
- protected abstract String getTestClassDir();
+ protected abstract String getTestClassDir();
- protected abstract String getTestName();
+ protected abstract String getTestName();
- @Override
- public void setUp() {
- addTestConfiguration(getTestName(),
- new TestConfiguration(getTestClassDir(), getTestName(), new
String[] {"Rout"}));
- }
+ @Override
+ public void setUp() {
+ addTestConfiguration(getTestName(),
+ new TestConfiguration(getTestClassDir(), getTestName(),
new String[] {"Rout"}));
+ }
}
diff --git
a/src/test/java/org/apache/sysds/test/functions/io/csv/ReadCSVTest.java
b/src/test/java/org/apache/sysds/test/functions/io/csv/ReadCSVTest.java
index 5476430..6e24adf 100644
--- a/src/test/java/org/apache/sysds/test/functions/io/csv/ReadCSVTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/io/csv/ReadCSVTest.java
@@ -33,25 +33,25 @@ public abstract class ReadCSVTest extends CSVTestBase {
return "transfusion_" + getId();
}
- @Test
- public void testCSV_Sequential_CP1() {
- runCSVTest(getId(), ExecMode.SINGLE_NODE, false);
- }
-
- @Test
- public void testCSV_Parallel_CP1() {
- runCSVTest(getId(), ExecMode.SINGLE_NODE, true);
- }
-
- @Test
- public void testCSV_Sequential_CP() {
- runCSVTest(getId(), ExecMode.HYBRID, false);
- }
-
- @Test
- public void testCSV_Parallel_CP() {
- runCSVTest(getId(), ExecMode.HYBRID, true);
- }
+ @Test
+ public void testCSV_Sequential_CP1() {
+ runCSVTest(getId(), ExecMode.SINGLE_NODE, false);
+ }
+
+ @Test
+ public void testCSV_Parallel_CP1() {
+ runCSVTest(getId(), ExecMode.SINGLE_NODE, true);
+ }
+
+ @Test
+ public void testCSV_Sequential_CP() {
+ runCSVTest(getId(), ExecMode.HYBRID, false);
+ }
+
+ @Test
+ public void testCSV_Parallel_CP() {
+ runCSVTest(getId(), ExecMode.HYBRID, true);
+ }
@Test
public void testCSV_SP() {
@@ -73,8 +73,8 @@ public abstract class ReadCSVTest extends CSVTestBase {
CompilerConfig.FLAG_PARREADWRITE_TEXT = parallel;
TestConfiguration config =
getTestConfiguration(getTestName());
-
loadTestConfiguration(config);
+ setOutputBuffering(true); //otherwise NPEs
String HOME = SCRIPT_DIR + TEST_DIR;
String inputMatrixNameNoExtension = HOME + INPUT_DIR +
getInputCSVFileName();
diff --git a/src/test/scripts/functions/builtin/slicefinder.dml
b/src/test/scripts/functions/builtin/slicefinder.dml
index 1027b17..cac1d3e 100644
--- a/src/test/scripts/functions/builtin/slicefinder.dml
+++ b/src/test/scripts/functions/builtin/slicefinder.dml
@@ -23,6 +23,7 @@ X = read($1);
e = read($2);
# call slice finding
-[TS,TR] = slicefinder(X=X, e=e, k=$3, alpha=0.95, minSup=4, tpEval=$4,
verbose=$5);
+[TS,TR] = slicefinder(X=X, e=e, k=$3,
+ alpha=0.95, minSup=4, tpEval=$4, selFeat=$5, verbose=$6);
-write(TR, $6)
+write(TR, $7)