This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 6321dbb  [SYSTEMDS-2641] Improved slice finding algorithm (script, 
spark ops)
6321dbb is described below

commit 6321dbb61e9593e49c59318f3d9cb38a8b0e9c2a
Author: Matthias Boehm <[email protected]>
AuthorDate: Tue Feb 2 19:22:59 2021 +0100

    [SYSTEMDS-2641] Improved slice finding algorithm (script, spark ops)
    
    This patch extends the slice finding algorithm for improved performance,
    especially over large distributed matrices, with millions of features.
    Furthermore, this also includes various fixes and improvements that were
    encountered along the way.
    
    * Slice finder algorithm extension (pruning of unnecessary columns,
    avoid ultra-sparse mm via simplified deduplication)
    
    * Propagate noempty block meta data to avoid inconsistent treatment wrt
    number of partitions
    
    * Checkpoint (avoid unnecessary repartitioning, nnz analysis)
    
    * Improved empty block filtering in spark cpmm instructions
    
    * Consistent handling spark ctable and rexpand post-processing
    
    * Fix spark removeEmpty instruction parsing (wrong broadcast flag
    leading to unnecessary replication)
    
    * Fix csv tests (NPEs w/o output buffering, formatting)
---
 scripts/builtin/slicefinder.dml                    | 31 +++++++----
 .../apache/sysds/hops/ParameterizedBuiltinOp.java  | 11 ++--
 .../spark/CheckpointSPInstruction.java             | 12 ++---
 .../instructions/spark/CpmmSPInstruction.java      |  5 +-
 .../instructions/spark/CtableSPInstruction.java    |  8 +--
 .../spark/ParameterizedBuiltinSPInstruction.java   | 10 ++--
 .../instructions/spark/utils/SparkUtils.java       | 16 +++++-
 .../sysds/runtime/meta/DataCharacteristics.java    | 13 ++++-
 .../functions/builtin/BuiltinSliceFinderTest.java  | 62 ++++++++++++++++++----
 .../sysds/test/functions/io/csv/CSVTestBase.java   | 20 +++----
 .../sysds/test/functions/io/csv/ReadCSVTest.java   | 40 +++++++-------
 src/test/scripts/functions/builtin/slicefinder.dml |  5 +-
 12 files changed, 156 insertions(+), 77 deletions(-)

diff --git a/scripts/builtin/slicefinder.dml b/scripts/builtin/slicefinder.dml
index 9c49718..a6b8bfd 100644
--- a/scripts/builtin/slicefinder.dml
+++ b/scripts/builtin/slicefinder.dml
@@ -29,6 +29,8 @@
 # tpEval    flag for task-parallel slice evaluation, 
 #           otherwise data-parallel
 # tpBlksz   block size for task-parallel execution (num slices) 
+# selFeat   flag for removing one-hot-encoded features that don't satisfy 
+#           the initial minimum-support constraint and/or have zero error 
 # verbose   flag for verbose debug output 
 # ------------------------------------------------------------
 # TK        top-k slices (k x ncol(X) if successful) 
@@ -36,9 +38,9 @@
 # D         debug matrix, populated with enumeration stats if verbose
 # ------------------------------------------------------------
 
-m_slicefinder = function(Matrix[Double] X, Matrix[Double] e,
-    Integer k = 4, Integer maxL = 0, Integer minSup = 32, Double alpha = 0.5,
-    Boolean tpEval = TRUE, Integer tpBlksz = 16, Boolean verbose = FALSE)
+m_slicefinder = function(Matrix[Double] X, Matrix[Double] e, Int k = 4, 
+    Int maxL = 0, Int minSup = 32, Double alpha = 0.5, Boolean tpEval = TRUE, 
+    Int tpBlksz = 16, Boolean selFeat = FALSE, Boolean verbose = FALSE)
   return(Matrix[Double] TK, Matrix[Double] TKC, Matrix[Double] D)
 {
   # init debug matrix: levelID, enumerated S, valid S, TKmax, TKmin
@@ -58,7 +60,7 @@ m_slicefinder = function(Matrix[Double] X, Matrix[Double] e,
   # initialize statistics and basic slices
   n2 = ncol(X2);     # one-hot encoded features
   eAvg = sum(e) / m; # average error
-  [S, R] = createAndScoreBasicSlices(X2, e, eAvg, minSup, alpha, verbose); 
+  [S, R, selCols] = createAndScoreBasicSlices(X2, e, eAvg, minSup, alpha, 
verbose); 
 
   # initialize top-k
   [TK, TKC] = maintainTopK(S, R, matrix(0,0,n2), matrix(0,0,4), k, minSup);
@@ -69,6 +71,10 @@ m_slicefinder = function(Matrix[Double] X, Matrix[Double] e,
     D = rbind(D, t(as.matrix(list(1, n2, nrow(S), maxsc, minsc))));
   }
 
+  # reduced dataset to relevant attributes (minSup, err>0), S reduced 
on-the-fly
+  if( selFeat )
+    X2 = removeEmpty(target=X2, margin="cols", select=t(selCols));
+
   # lattice enumeration w/ size/error pruning, one iteration per level
   # termination condition (max #feature levels)
   maxL = ifelse(maxL<=0, n, maxL)
@@ -79,6 +85,9 @@ m_slicefinder = function(Matrix[Double] X, Matrix[Double] e,
     # enumerate candidate join pairs, incl size/error pruning 
     nrS = nrow(S);
     S = getPairedCandidates(S, R, TK, TKC, k, level, eAvg, minSup, alpha, n2, 
foffb, foffe); 
+    S2 = S;
+    if(selFeat)
+      S2 = removeEmpty(target=S, margin="cols", select=t(selCols));
 
     if(verbose) {
       print("\nSliceFinder: level "+level+":")
@@ -92,11 +101,11 @@ m_slicefinder = function(Matrix[Double] X, Matrix[Double] 
e,
       parfor( i in 1:ceil(nrow(S)/tpBlksz), check=0 ) {
         beg = (i-1)*tpBlksz + 1; 
         end = min(i*tpBlksz, nrow(R));
-        R[beg:end,] = evalSlice(X2, e, eAvg, t(S[beg:end,]), level, alpha);
+        R[beg:end,] = evalSlice(X2, e, eAvg, t(S2[beg:end,]), level, alpha);
       }
     }
     else { # data-parallel
-      R = evalSlice(X2, e, eAvg, t(S), level, alpha);
+      R = evalSlice(X2, e, eAvg, t(S2), level, alpha);
     }
     
     # maintain top-k after evaluation
@@ -121,7 +130,7 @@ m_slicefinder = function(Matrix[Double] X, Matrix[Double] e,
 
 createAndScoreBasicSlices = function(Matrix[Double] X2, Matrix[Double] e, 
     Double eAvg, Double minSup, Double alpha, Boolean verbose)
-  return(Matrix[Double] S, Matrix[Double] R)
+  return(Matrix[Double] S, Matrix[Double] R, Matrix[Double] selCols)
 {
   n2 = ncol(X2);
   cCnts = t(colSums(X2));    # column counts
@@ -287,11 +296,13 @@ getPairedCandidates = function(Matrix[Double] S, 
Matrix[Double] R,
     fParents = (numParents == level);
 
     # apply all pruning 
-    map = map * (fSizes & fScores & fParents);
+    fall = (fSizes & fScores & fParents);
     
     # deduplication of join outputs
-    Dedup = removeEmpty(target=map, margin="rows") != 0
-    P = (Dedup %*% P) != 0
+    Dedup = removeEmpty(target=map, margin="rows", select=fall) != 0
+    #P = (Dedup %*% P) != 0, replaced by below (easier sparsity propagation)
+    DeI = table(rowIndexMax(Dedup), 1, nrow(P), 1);
+    P = removeEmpty(target=P, margin="rows", select=DeI);
   }
 }
 
diff --git a/src/main/java/org/apache/sysds/hops/ParameterizedBuiltinOp.java 
b/src/main/java/org/apache/sysds/hops/ParameterizedBuiltinOp.java
index 568d5c7..298fd6a 100644
--- a/src/main/java/org/apache/sysds/hops/ParameterizedBuiltinOp.java
+++ b/src/main/java/org/apache/sysds/hops/ParameterizedBuiltinOp.java
@@ -973,16 +973,19 @@ public class ParameterizedBuiltinOp extends 
MultiThreadedHop {
         */
        private boolean isRemoveEmptyBcSP() // TODO find if 2 x size needed. 
        {
-               Hop input = getInput().get(0);
-               
                //note: both cases (partitioned matrix, and sorted double 
array), require to
                //fit the broadcast twice into the local memory budget. Also, 
the memory 
                //constraint only needs to take the rhs into account because 
the output is 
                //guaranteed to be an aggregate of <=16KB
                
+               Hop input = getInput().get(0);
+               Hop margin = getParameterHop("margin");
+               boolean col = (margin instanceof LiteralOp) ?
+                       ((LiteralOp)margin).getStringValue().equals("cols") : 
false;
+               
                double size = input.dimsKnown() ? 
-                       OptimizerUtils.estimateSize(input.getDim1(), 1) : 
//dims known and estimate fits
-                       input.getOutputMemEstimate();                     
//dims unknown but worst-case estimate fits
+                       
OptimizerUtils.estimateSize(col?input.getDim2():input.getDim1(), 1) : 
+                       input.getOutputMemEstimate();
                
                return OptimizerUtils.checkSparkBroadcastMemoryBudget(size);
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/CheckpointSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/CheckpointSPInstruction.java
index 7475c4f..a273da8 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/CheckpointSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/CheckpointSPInstruction.java
@@ -111,11 +111,11 @@ public class CheckpointSPInstruction extends 
UnarySPInstruction {
                {
                        //determine need for coalesce or repartition, and csr 
conversion
                        int numPartitions = 
SparkUtils.getNumPreferredPartitions(mcIn, in);
-                       boolean coalesce = ( 1.2*numPartitions < 
in.getNumPartitions()
+                       boolean coalesce = ( 1.4 * numPartitions < 
in.getNumPartitions()
                                && !SparkUtils.isHashPartitioned(in) && 
in.getNumPartitions()
                                > 
SparkExecutionContext.getDefaultParallelism(true));
                        boolean repartition = mcIn.dimsKnown(true) && 
mcIn.isUltraSparse()
-                               && numPartitions > in.getNumPartitions();
+                               && numPartitions > 1.4 * in.getNumPartitions();
                        boolean mcsr2csr = 
input1.getDataType()==DataType.MATRIX 
                                && 
OptimizerUtils.checkSparseBlockCSRConversion(mcIn)
                                && !_level.equals(Checkpoint.SER_STORAGE_LEVEL);
@@ -153,11 +153,9 @@ public class CheckpointSPInstruction extends 
UnarySPInstruction {
                        //actual checkpoint into given storage level
                        out = out.persist( _level );
                        
-                       //trigger nnz computation for datasets that are forced 
to spark by their dimensions
-                       //(larger than MAX_INT) to handle ultra-sparse data 
sets during recompilation because
-                       //otherwise these their nnz would never be evaluated 
due to lazy evaluation in spark
-                       if( input1.isMatrix() && mcIn.dimsKnown() 
-                               && !mcIn.dimsKnown(true) && 
!OptimizerUtils.isValidCPDimensions(mcIn) ) {
+                       //trigger nnz computation for all cached (i.e., 
read-only) datasets 
+                       //otherwise their nnz would never be evaluated due to 
lazy evaluation in spark
+                       if( input1.isMatrix() && mcIn.dimsKnown() && 
!mcIn.dimsKnown(true) ) {
                                
mcIn.setNonZeros(SparkUtils.getNonZeros((JavaPairRDD<MatrixIndexes,MatrixBlock>)out));
                        }
                }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/CpmmSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/CpmmSPInstruction.java
index ab98af3..d5c0225 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/CpmmSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/CpmmSPInstruction.java
@@ -88,7 +88,8 @@ public class CpmmSPInstruction extends BinarySPInstruction {
                DataCharacteristics mc1 = 
sec.getDataCharacteristics(input1.getName());
                DataCharacteristics mc2 = 
sec.getDataCharacteristics(input2.getName());
                
-               if( !_outputEmptyBlocks || _aggtype == 
SparkAggType.SINGLE_BLOCK ) {
+               if( !_outputEmptyBlocks || _aggtype == SparkAggType.SINGLE_BLOCK
+                       || mc1.isNoEmptyBlocks() || mc2.isNoEmptyBlocks() ) {
                        //prune empty blocks of ultra-sparse matrices
                        in1 = in1.filter(new FilterNonEmptyBlocksFunction());
                        in2 = in2.filter(new FilterNonEmptyBlocksFunction());
@@ -133,7 +134,7 @@ public class CpmmSPInstruction extends BinarySPInstruction {
                                sec.setMatrixOutput(output.getName(), out2);
                        }
                        else { //DEFAULT: MULTI_BLOCK
-                               if( !_outputEmptyBlocks )
+                               if( !_outputEmptyBlocks || 
mc1.isNoEmptyBlocks() || mc2.isNoEmptyBlocks() )
                                        out = out.filter(new 
FilterNonEmptyBlocksFunction());
                                out = RDDAggregateUtils.sumByKeyStable(out, 
false);
                                
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/CtableSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/CtableSPInstruction.java
index ec8d637..038a55c 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/CtableSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/CtableSPInstruction.java
@@ -24,7 +24,6 @@ import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.sysds.common.Types.ValueType;
-import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.lops.Ctable;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
@@ -132,6 +131,7 @@ public class CtableSPInstruction extends 
ComputationSPInstruction {
                }
                mcOut.set(dim1, dim2, mc1.getBlocksize());
                mcOut.setNonZerosBound(mc1.getLength()); //vector or matrix
+               mcOut.setNoEmptyBlocks(!_outputEmptyBlocks);
                if( !mcOut.dimsKnown() )
                        throw new DMLRuntimeException("Unknown ctable output 
dimensions: "+mcOut);
                
@@ -192,11 +192,7 @@ public class CtableSPInstruction extends 
ComputationSPInstruction {
                        sec.addLineageRDD(output.getName(), input3.getName());
                
                //post-processing to obtain sparsity of ultra-sparse outputs
-               long memUB = OptimizerUtils.estimateSizeExactSparsity(
-                       mcOut.getRows(), mcOut.getCols(), 
mcOut.getNonZerosBound());
-               if( !OptimizerUtils.exceedsCachingThreshold(mcOut.getCols(), 
memUB) //< mem budget
-                       && memUB < 
OptimizerUtils.estimateSizeExactSparsity(mcOut))
-                       sec.getMatrixObject(output).acquireReadAndRelease();
+               
SparkUtils.postprocessUltraSparseOutput(sec.getMatrixObject(output), mcOut);
        }
 
        private static class CTableFunction implements 
PairFlatMapFunction<Iterator<Tuple2<MatrixIndexes, MatrixBlock[]>>, 
MatrixIndexes, MatrixBlock> 
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
index e5e6b1c..cee035b 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ParameterizedBuiltinSPInstruction.java
@@ -154,8 +154,8 @@ public class ParameterizedBuiltinSPInstruction extends 
ComputationSPInstruction
                        } 
                        else if (opcode.equalsIgnoreCase("rmempty")) {
                                func = 
ParameterizedBuiltin.getParameterizedBuiltinFnObject(opcode);
-                               return new 
ParameterizedBuiltinSPInstruction(new SimpleOperator(func), paramsMap, out, 
opcode, str,
-                                               parts.length > 6 ? 
Boolean.parseBoolean(parts[5]) : false);
+                               return new 
ParameterizedBuiltinSPInstruction(new SimpleOperator(func),
+                                       paramsMap, out, opcode, str, 
Boolean.parseBoolean(parts[parts.length-2]));
                        }
                        else if (opcode.equalsIgnoreCase("rexpand")
                                || opcode.equalsIgnoreCase("replace")
@@ -428,9 +428,13 @@ public class ParameterizedBuiltinSPInstruction extends 
ComputationSPInstruction
                        sec.setRDDHandleForVariable(output.getName(), out);
                        sec.addLineageRDD(output.getName(), rddInVar);
                        
-                       //update output statistics (required for correctness)
+                       //update output statistics (required for correctness, 
nnz unknown due to cut-off)
                        DataCharacteristics mcOut = 
sec.getDataCharacteristics(output.getName());
                        mcOut.set(dirRows?lmaxVal:mcIn.getRows(), 
dirRows?mcIn.getRows():lmaxVal, (int)blen, -1);
+                       mcOut.setNonZerosBound(mcIn.getRows());
+                       
+                       //post-processing to obtain sparsity of ultra-sparse 
outputs
+                       
SparkUtils.postprocessUltraSparseOutput(sec.getMatrixObject(output), mcOut);
                }
                else if ( opcode.equalsIgnoreCase("transformapply") ) 
                {
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/SparkUtils.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/SparkUtils.java
index 4aeac25..a6a1c15 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/SparkUtils.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/SparkUtils.java
@@ -28,6 +28,7 @@ import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.lops.Checkpoint;
+import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
 import 
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysds.runtime.data.IndexedTensorBlock;
@@ -129,7 +130,7 @@ public class SparkUtils
        }
 
        public static int getNumPreferredPartitions(DataCharacteristics dc) {
-               return getNumPreferredPartitions(dc, true);
+               return getNumPreferredPartitions(dc, !dc.isNoEmptyBlocks());
        }
        
        public static int getNumPreferredPartitions(DataCharacteristics dc, 
boolean outputEmptyBlocks) {
@@ -265,12 +266,25 @@ public class SparkUtils
                return ret;
        }
        
+       @SuppressWarnings("unchecked")
+       public static long getNonZeros(MatrixObject mo) {
+               return getNonZeros((JavaPairRDD<MatrixIndexes, 
MatrixBlock>)mo.getRDDHandle().getRDD());
+       }
+       
        public static long getNonZeros(JavaPairRDD<MatrixIndexes, MatrixBlock> 
input) {
                //note: avoid direct lambda expression due reduce unnecessary 
GC overhead
                return input.filter(new FilterNonEmptyBlocksFunction())
                        .values().mapPartitions(new 
RecomputeNnzFunction()).reduce((a,b)->a+b);
        }
 
+       public static void postprocessUltraSparseOutput(MatrixObject mo, 
DataCharacteristics mcOut) {
+               long memUB = OptimizerUtils.estimateSizeExactSparsity(
+                       mcOut.getRows(), mcOut.getCols(), 
mcOut.getNonZerosBound());
+               if( !OptimizerUtils.exceedsCachingThreshold(mcOut.getCols(), 
memUB) //< mem budget
+                       && memUB < 
OptimizerUtils.estimateSizeExactSparsity(mcOut))
+                       mo.acquireReadAndRelease();
+       }
+       
        private static class AnalyzeCellDataCharacteristics implements 
Function<Tuple2<MatrixIndexes,MatrixCell>, DataCharacteristics>
        {
                private static final long serialVersionUID = 
8899395272683723008L;
diff --git 
a/src/main/java/org/apache/sysds/runtime/meta/DataCharacteristics.java 
b/src/main/java/org/apache/sysds/runtime/meta/DataCharacteristics.java
index a28d98d..17900cb 100644
--- a/src/main/java/org/apache/sysds/runtime/meta/DataCharacteristics.java
+++ b/src/main/java/org/apache/sysds/runtime/meta/DataCharacteristics.java
@@ -29,7 +29,8 @@ import java.io.Serializable;
 public abstract class DataCharacteristics implements Serializable {
        private static final long serialVersionUID = 3411056029517599342L;
 
-       protected int _blocksize;
+       protected int _blocksize;                 // squared block size
+       protected boolean _noEmptyBlocks = false; // does not materialize empty 
blocks
        
        public DataCharacteristics set(long nr, long nc, int blen) {
                throw new DMLRuntimeException("DataCharacteristics.set(long, 
long, int): should never get called in the base class");
@@ -79,6 +80,16 @@ public abstract class DataCharacteristics implements 
Serializable {
                _blocksize = blen;
                return this;
        }
+       
+
+       public DataCharacteristics setNoEmptyBlocks(boolean flag) {
+               _noEmptyBlocks = flag;
+               return this;
+       }
+       
+       public boolean isNoEmptyBlocks() {
+               return _noEmptyBlocks;
+       }
 
        public long getNumBlocks() {
                throw new 
DMLRuntimeException("DataCharacteristics.getNumBlocks(int): should never get 
called in the base class");
diff --git 
a/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinSliceFinderTest.java
 
b/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinSliceFinderTest.java
index 2a0c5e9..3055a82 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinSliceFinderTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/builtin/BuiltinSliceFinderTest.java
@@ -59,42 +59,82 @@ public class BuiltinSliceFinderTest extends 
AutomatedTestBase
 
        @Test
        public void testTop4HybridDP() {
-               runSliceFinderTest(4, true, ExecMode.HYBRID);
+               runSliceFinderTest(4, true, false, ExecMode.HYBRID);
        }
        
        @Test
        public void testTop4SinglenodeDP() {
-               runSliceFinderTest(4, true, ExecMode.SINGLE_NODE);
+               runSliceFinderTest(4, true, false, ExecMode.SINGLE_NODE);
        }
        
        @Test
        public void testTop4HybridTP() {
-               runSliceFinderTest(4, false, ExecMode.HYBRID);
+               runSliceFinderTest(4, false, false, ExecMode.HYBRID);
        }
        
        @Test
        public void testTop4SinglenodeTP() {
-               runSliceFinderTest(4, false, ExecMode.SINGLE_NODE);
+               runSliceFinderTest(4, false, false, ExecMode.SINGLE_NODE);
        }
 
        @Test
        public void testTop10HybridDP() {
-               runSliceFinderTest(10, true, ExecMode.HYBRID);
+               runSliceFinderTest(10, true, false, ExecMode.HYBRID);
        }
        
        @Test
        public void testTop10SinglenodeDP() {
-               runSliceFinderTest(10, true, ExecMode.SINGLE_NODE);
+               runSliceFinderTest(10, true, false, ExecMode.SINGLE_NODE);
        }
        
        @Test
        public void testTop10HybridTP() {
-               runSliceFinderTest(10, false, ExecMode.HYBRID);
+               runSliceFinderTest(10, false, false, ExecMode.HYBRID);
        }
        
        @Test
        public void testTop10SinglenodeTP() {
-               runSliceFinderTest(10, false, ExecMode.SINGLE_NODE);
+               runSliceFinderTest(10, false, false, ExecMode.SINGLE_NODE);
+       }
+
+       @Test
+       public void testTop4HybridDPSel() {
+               runSliceFinderTest(4, true, true, ExecMode.HYBRID);
+       }
+       
+       @Test
+       public void testTop4SinglenodeDPSel() {
+               runSliceFinderTest(4, true, true, ExecMode.SINGLE_NODE);
+       }
+       
+       @Test
+       public void testTop4HybridTPSel() {
+               runSliceFinderTest(4, false, true, ExecMode.HYBRID);
+       }
+       
+       @Test
+       public void testTop4SinglenodeTPSel() {
+               runSliceFinderTest(4, false, true, ExecMode.SINGLE_NODE);
+       }
+
+       @Test
+       public void testTop10HybridDPSel() {
+               runSliceFinderTest(10, true, true, ExecMode.HYBRID);
+       }
+       
+       @Test
+       public void testTop10SinglenodeDPSel() {
+               runSliceFinderTest(10, true, true, ExecMode.SINGLE_NODE);
+       }
+       
+       @Test
+       public void testTop10HybridTPSel() {
+               runSliceFinderTest(10, false, true, ExecMode.HYBRID);
+       }
+       
+       @Test
+       public void testTop10SinglenodeTPSel() {
+               runSliceFinderTest(10, false, true, ExecMode.SINGLE_NODE);
        }
        
 //     @Test
@@ -102,7 +142,7 @@ public class BuiltinSliceFinderTest extends 
AutomatedTestBase
 //             runSliceFinderTest(10, false, ExecMode.SPARK);
 //     }
        
-       private void runSliceFinderTest(int K, boolean dp, ExecMode mode) {
+       private void runSliceFinderTest(int K, boolean dp, boolean selCols, 
ExecMode mode) {
                ExecMode platformOld = setExecMode(mode);
                loadTestConfiguration(getTestConfiguration(TEST_NAME));
                String HOME = SCRIPT_DIR + TEST_DIR;
@@ -124,8 +164,8 @@ public class BuiltinSliceFinderTest extends 
AutomatedTestBase
                        
                        //execute main test
                        fullDMLScriptName = HOME + TEST_NAME + ".dml";
-                       programArgs = new String[]{"-args", input("X"), 
input("e"),
-                               
String.valueOf(K),String.valueOf(!dp).toUpperCase(),
+                       programArgs = new String[]{"-args", input("X"), 
input("e"), String.valueOf(K),
+                               String.valueOf(!dp).toUpperCase(), 
String.valueOf(selCols).toUpperCase(),
                                String.valueOf(VERBOSE).toUpperCase(), 
output("R")};
                        fullRScriptName = HOME + TEST_NAME + ".R";
                        rCmd = "Rscript" + " " + fullRScriptName + " " + 
inputDir() + " " + String.valueOf(K) 
diff --git 
a/src/test/java/org/apache/sysds/test/functions/io/csv/CSVTestBase.java 
b/src/test/java/org/apache/sysds/test/functions/io/csv/CSVTestBase.java
index 92da006..6112d39 100644
--- a/src/test/java/org/apache/sysds/test/functions/io/csv/CSVTestBase.java
+++ b/src/test/java/org/apache/sysds/test/functions/io/csv/CSVTestBase.java
@@ -25,17 +25,17 @@ import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
 
 public abstract class CSVTestBase extends AutomatedTestBase {
-    protected final static String TEST_DIR = "functions/io/csv/";
-    protected static final Log LOG = 
LogFactory.getLog(CSVTestBase.class.getName());
-    protected final static double eps = 1e-9;
+       protected final static String TEST_DIR = "functions/io/csv/";
+       protected static final Log LOG = 
LogFactory.getLog(CSVTestBase.class.getName());
+       protected final static double eps = 1e-9;
 
-    protected abstract String getTestClassDir();
+       protected abstract String getTestClassDir();
 
-    protected abstract String getTestName();
+       protected abstract String getTestName();
 
-    @Override
-    public void setUp() {
-        addTestConfiguration(getTestName(),
-            new TestConfiguration(getTestClassDir(), getTestName(), new 
String[] {"Rout"}));
-    }
+       @Override
+       public void setUp() {
+               addTestConfiguration(getTestName(),
+                       new TestConfiguration(getTestClassDir(), getTestName(), 
new String[] {"Rout"}));
+       }
 }
diff --git 
a/src/test/java/org/apache/sysds/test/functions/io/csv/ReadCSVTest.java 
b/src/test/java/org/apache/sysds/test/functions/io/csv/ReadCSVTest.java
index 5476430..6e24adf 100644
--- a/src/test/java/org/apache/sysds/test/functions/io/csv/ReadCSVTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/io/csv/ReadCSVTest.java
@@ -33,25 +33,25 @@ public abstract class ReadCSVTest extends CSVTestBase {
                return "transfusion_" + getId();
        }
 
-        @Test
-        public void testCSV_Sequential_CP1() {
-               runCSVTest(getId(), ExecMode.SINGLE_NODE, false);
-        }
-
-        @Test
-        public void testCSV_Parallel_CP1() {
-               runCSVTest(getId(), ExecMode.SINGLE_NODE, true);
-        }
-
-        @Test
-        public void testCSV_Sequential_CP() {
-               runCSVTest(getId(), ExecMode.HYBRID, false);
-        }
-
-        @Test
-        public void testCSV_Parallel_CP() {
-               runCSVTest(getId(), ExecMode.HYBRID, true);
-        }
+       @Test
+       public void testCSV_Sequential_CP1() {
+               runCSVTest(getId(), ExecMode.SINGLE_NODE, false);
+       }
+
+       @Test
+       public void testCSV_Parallel_CP1() {
+               runCSVTest(getId(), ExecMode.SINGLE_NODE, true);
+       }
+
+       @Test
+       public void testCSV_Sequential_CP() {
+               runCSVTest(getId(), ExecMode.HYBRID, false);
+       }
+
+       @Test
+       public void testCSV_Parallel_CP() {
+               runCSVTest(getId(), ExecMode.HYBRID, true);
+       }
 
        @Test
        public void testCSV_SP() {
@@ -73,8 +73,8 @@ public abstract class ReadCSVTest extends CSVTestBase {
                        CompilerConfig.FLAG_PARREADWRITE_TEXT = parallel;
 
                        TestConfiguration config = 
getTestConfiguration(getTestName());
-
                        loadTestConfiguration(config);
+                       setOutputBuffering(true); //otherwise NPEs
 
                        String HOME = SCRIPT_DIR + TEST_DIR;
                        String inputMatrixNameNoExtension = HOME + INPUT_DIR + 
getInputCSVFileName();
diff --git a/src/test/scripts/functions/builtin/slicefinder.dml 
b/src/test/scripts/functions/builtin/slicefinder.dml
index 1027b17..cac1d3e 100644
--- a/src/test/scripts/functions/builtin/slicefinder.dml
+++ b/src/test/scripts/functions/builtin/slicefinder.dml
@@ -23,6 +23,7 @@ X = read($1);
 e = read($2);
 
 # call slice finding
-[TS,TR] = slicefinder(X=X, e=e, k=$3, alpha=0.95, minSup=4, tpEval=$4, 
verbose=$5);
+[TS,TR] = slicefinder(X=X, e=e, k=$3,
+  alpha=0.95, minSup=4, tpEval=$4, selFeat=$5, verbose=$6);
 
-write(TR, $6)
+write(TR, $7)

Reply via email to