This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 40327ea  [SYSTEMDS-2800] Aggressive Compression Rewrite
40327ea is described below

commit 40327ea061ca41d6417b49829d54e3744c26ef38
Author: baunsgaard <[email protected]>
AuthorDate: Mon Jan 18 17:59:04 2021 +0100

    [SYSTEMDS-2800] Aggressive Compression Rewrite
    
    This commit change the COST based approach of the rewrite rules for
    compression to an "aggressive" compression approach.
    The basic idea is to introduce many compression instructions into the
    computation graph, such that we have an compression command at all
    locations that have a potential to:
    
    - Compress nicely from uncompressed representation
    - Restructure an already compressed matrix, into a better compression
    
    For the 2. case operations such as less than resulting in only 0 and 1
    in the matrix have high potential for better restructuring.
    But in general if the matrix inputted before the less than operation is
    not compressed it will try to compress anyway according to case 1, so no
    matter which case the instruction introduced should go the same place.
    
    For this aggressive compression instruction to execute the object to
    compress have a few criteria.
    
    - There has to be at least 3 instructions executed on the object, that
    is "supported compressed operations" or the matrix is used in a loop.
    - The compressed operations that are used should not have many "supported
    but slow operations".
    - The matrix to compress have to be at least 1000 rows and below 10
    columns, or have a ratio of 1000 to nrow/ncol.
    
    Closes #1164
---
 src/main/java/org/apache/sysds/common/Types.java   |  10 +-
 .../apache/sysds/conf/ConfigurationManager.java    |   6 +
 src/main/java/org/apache/sysds/conf/DMLConfig.java |   2 +-
 src/main/java/org/apache/sysds/hops/Hop.java       |  73 ++--
 .../java/org/apache/sysds/hops/OptimizerUtils.java |  22 ++
 .../apache/sysds/hops/rewrite/HopRewriteUtils.java |   6 +
 .../hops/rewrite/RewriteCompressedReblock.java     | 430 ++++++++++++---------
 .../runtime/compress/colgroup/Dictionary.java      |   2 +-
 .../sysds/runtime/compress/lib/LibRightMultBy.java |   8 +-
 .../sysds/runtime/compress/lib/LibScalar.java      |   7 +-
 .../java/org/apache/sysds/utils/Statistics.java    |   3 +-
 .../functions/compress/compressInstruction.java    |  46 ++-
 ...uction.java => compressInstructionRewrite.java} |  73 +++-
 .../compress/compressInstruction/compress_01.dml   |   2 +-
 .../compress/compressInstruction/compress_02.dml   |   2 +-
 .../compress/compressInstruction/compress_07.dml   |  58 +++
 ...{compress_01.dml => compress_07_noCompress.dml} |  34 +-
 .../SystemDS-config-compress-cost.xml              |  24 ++
 .../compress_01.dml                                |   5 +-
 .../compress_02.dml                                |   7 +-
 .../compress_03.dml}                               |  11 +-
 .../compress_04.dml}                               |   7 +-
 .../compress_05.dml}                               |   9 +-
 .../compress_06.dml}                               |   9 +-
 .../compress_07.dml}                               |  35 +-
 .../compress_08.dml}                               |  15 +-
 26 files changed, 614 insertions(+), 292 deletions(-)

diff --git a/src/main/java/org/apache/sysds/common/Types.java 
b/src/main/java/org/apache/sysds/common/Types.java
index a6a3ee1..ed651b9 100644
--- a/src/main/java/org/apache/sysds/common/Types.java
+++ b/src/main/java/org/apache/sysds/common/Types.java
@@ -19,9 +19,10 @@
 
 package org.apache.sysds.common;
 
+import java.util.Arrays;
+
 import org.apache.sysds.runtime.DMLRuntimeException;
 
-import edu.emory.mathcs.backport.java.util.Arrays;
 
 public class Types
 {
@@ -236,9 +237,6 @@ public class Types
                                case CUMPROD:         return "ucum*";
                                case CUMSUM:          return "ucumk+";
                                case CUMSUMPROD:      return "ucumk+*";
-                               case COLNAMES:        return "colnames";
-                               case COMPRESS:        return "compress";
-                               case DECOMPRESS:      return "decompress";
                                case DETECTSCHEMA:    return "detectSchema";
                                case MULT2:           return "*2";
                                case NOT:             return "!";
@@ -262,9 +260,11 @@ public class Types
                                case "ucum*":   return CUMPROD;
                                case "ucumk+":  return CUMSUM;
                                case "ucumk+*": return CUMSUMPROD;
+                               case "detectSchema":    return DETECTSCHEMA;
                                case "*2":      return MULT2;
                                case "!":       return NOT;
                                case "^2":      return POW2;
+                               case "typeOf":          return TYPEOF;
                                default:        return 
valueOf(opcode.toUpperCase());
                        }
                }
@@ -549,7 +549,7 @@ public class Types
                        }
                        catch(Exception ex) {
                                throw new DMLRuntimeException("Unknown file 
format: "+fmt
-                                       + " (valid values: 
"+Arrays.toString(FileFormat.values())+")");
+                                       + " (valid values: " + 
Arrays.toString(FileFormat.values())+")");
                        }
                }
        }
diff --git a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java 
b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
index aea8514..5a3667c 100644
--- a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
+++ b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
@@ -21,6 +21,7 @@ package org.apache.sysds.conf;
 
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysds.conf.CompilerConfig.ConfigType;
+import org.apache.sysds.lops.Compression.CompressConfig;
 
 
 
@@ -178,6 +179,11 @@ public class ConfigurationManager
                return (getDMLConfig().getBooleanValue(DMLConfig.CODEGEN)
                        || getCompilerConfigFlag(ConfigType.CODEGEN_ENABLED));
        }
+
+       public static boolean isCompressionEnabled(){
+               CompressConfig compress = 
CompressConfig.valueOf(getDMLConfig().getTextValue(DMLConfig.COMPRESSED_LINALG).toUpperCase());
+               return compress.isEnabled();
+       }
        
        ///////////////////////////////////////
        // Thread-local classes
diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java 
b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index a9da811..aaf4988 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -67,7 +67,7 @@ public class DMLConfig
        public static final String DEFAULT_BLOCK_SIZE   = 
"sysds.defaultblocksize";
        public static final String CP_PARALLEL_OPS      = 
"sysds.cp.parallel.ops";
        public static final String CP_PARALLEL_IO       = 
"sysds.cp.parallel.io";
-       public static final String COMPRESSED_LINALG    = 
"sysds.compressed.linalg"; //auto, cost, true, false
+       public static final String COMPRESSED_LINALG    = 
"sysds.compressed.linalg"; // auto, cost, true, false
        public static final String COMPRESSED_LOSSY     = 
"sysds.compressed.lossy";
        public static final String COMPRESSED_VALID_COMPRESSIONS = 
"sysds.compressed.valid.compressions";
        public static final String COMPRESSED_OVERLAPPING = 
"sysds.compressed.overlapping"; // true, false
diff --git a/src/main/java/org/apache/sysds/hops/Hop.java 
b/src/main/java/org/apache/sysds/hops/Hop.java
index bb0960d..1eb319f 100644
--- a/src/main/java/org/apache/sysds/hops/Hop.java
+++ b/src/main/java/org/apache/sysds/hops/Hop.java
@@ -377,55 +377,39 @@ public abstract class Hop implements ParseInfo {
                }       
        }
 
-       private void constructAndSetCompressionLopIfRequired() 
-       {
-               //determine execution type
-               ExecType et = ExecType.CP;
-               if( OptimizerUtils.isSparkExecutionMode() 
-                       && getDataType()!=DataType.SCALAR )
-               {
-                       //conditional checkpoint based on memory estimate in 
order to avoid unnecessary 
-                       //persist and unpersist calls (4x the memory budget is 
conservative)
-                       if(    OptimizerUtils.isHybridExecutionMode() 
-                               && 2*_outputMemEstimate < 
OptimizerUtils.getLocalMemBudget()
-                               || _etypeForced == ExecType.CP )
-                       {
-                               et = ExecType.CP;
-                       }
-                       else //default case
-                       {
-                               et = ExecType.SPARK;
-                       }
-               }
-
-               //add reblock lop to output if required
-               if( _requiresCompression )
-               {
-                       try
-                       {
-                               Lop compress = new Compression(getLops(), 
getDataType(), getValueType(), et);
-                               setOutputDimensions( compress );
-                               setLineNumbers( compress );
-                               setLops( compress );
-                       }
-                       catch( LopsException ex ) {
-                               throw new HopsException(ex);
-                       }
-               }
-
-               if( _requiresDeCompression ){
+       private void constructAndSetCompressionLopIfRequired() {
+               if(_requiresCompression ^ _requiresDeCompression){ // xor
+                       ExecType et = getExecutionModeForCompression();
+                       Lop compressionInstruction = null;
                        try{
-                               Lop decompress = new DeCompression(getLops(), 
getDataType(), getValueType(), et);
-                               setOutputDimensions(decompress);
-                               setLineNumbers(decompress);
-                               setLops(decompress);
+                               if( _requiresCompression ) 
+                                       compressionInstruction = new 
Compression(getLops(), getDataType(), getValueType(), et);
+                               else if( _requiresDeCompression )
+                                       compressionInstruction = new 
DeCompression(getLops(), getDataType(), getValueType(), et);
                        }
-                       catch(LopsException ex){
+                       catch (LopsException ex) {
                                throw new HopsException(ex);
                        }
+                       setOutputDimensions( compressionInstruction );
+                       setLineNumbers( compressionInstruction );
+                       setLops( compressionInstruction );
                }
        }
 
+       private ExecType getExecutionModeForCompression(){
+               ExecType et = ExecType.CP;
+               // conditional checkpoint based on memory estimate in order to 
avoid unnecessary 
+               // persist and unpersist calls (4x the memory budget is 
conservative)
+               if( OptimizerUtils.isSparkExecutionMode() && 
getDataType()!=DataType.SCALAR )
+                       if( OptimizerUtils.isHybridExecutionMode() 
+                               && 2 * _outputMemEstimate < 
OptimizerUtils.getLocalMemBudget()
+                               || _etypeForced == ExecType.CP )
+                               et = ExecType.CP;
+                       else 
+                               et = ExecType.SPARK;
+               return et;
+       }
+
        public static Lop createOffsetLop( Hop hop, boolean repCols ) {
                Lop offset = null;
                if( ConfigurationManager.isDynamicRecompilation() && 
hop.dimsKnown() ) {
@@ -810,6 +794,11 @@ public abstract class Hop implements ParseInfo {
        
        public abstract String getOpString();
 
+       @Override
+       public String toString(){
+               return super.getClass().getSimpleName() + "  " + getOpString();
+       }
+
        // 
========================================================================================
        // Design doc: Memory estimation of GPU
        // 1. Since not all operator are supported on GPU, isGPUEnabled 
indicates whether an operation 
diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java 
b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
index c1335b1..2d94b59 100644
--- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
@@ -681,6 +681,21 @@ public class OptimizerUtils
                double sp = getSparsity(rlen, clen, nnz);
                return estimatePartitionedSizeExactSparsity(rlen, clen, blen, 
sp);
        }
+
+       /**
+        * Estimates the footprint (in bytes) for a partitioned in-memory 
representation of a
+        * matrix with the hops dimensions and number of non-zeros nnz.
+        * 
+        * @param hop The hop to extract dimensions and nnz from
+        * @return the memory estimate
+        */
+       public static long estimatePartitionedSizeExactSparsity(Hop hop){
+               long rlen = hop.getDim1();
+               long clen = hop.getDim2();
+               int blen = hop.getBlocksize();
+               long nnz = hop.getNnz();
+               return  estimatePartitionedSizeExactSparsity(rlen, clen, blen, 
nnz);
+       }
        
        /**
         * Estimates the footprint (in bytes) for a partitioned in-memory 
representation of a
@@ -1154,6 +1169,13 @@ public class OptimizerUtils
                        Math.min(((double)nnz)/dim1/dim2, 1.0);
        }
 
+       public static double getSparsity(Hop hop){
+               long dim1 = hop.getDim1();
+               long dim2 = hop.getDim2();
+               long nnz = hop.getNnz();
+               return getSparsity(dim1, dim2, nnz);
+       }
+
        public static double getSparsity(long[] dims, long nnz) {
                double sparsity = nnz;
                for (long dim : dims) {
diff --git a/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java 
b/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java
index b1a8799..fc7a818 100644
--- a/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java
+++ b/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java
@@ -1118,6 +1118,12 @@ public class HopRewriteUtils
                        && hop.getInput().get(0).getDataType().isMatrix() && 
hop.getInput().get(1).getDataType().isMatrix()
                        && hop.getInput().get(0).dimsKnown() && 
hop.getInput().get(1).dimsKnown() && hop.getInput().get(1).getDim2() == 1;
        }
+
+       public static boolean isBinaryMatrixRowVectorOperation(Hop hop) {
+               return hop instanceof BinaryOp 
+                       && hop.getInput().get(0).getDataType().isMatrix() && 
hop.getInput().get(1).getDataType().isMatrix()
+                       && hop.getInput().get(0).dimsKnown() && 
hop.getInput().get(1).dimsKnown() && hop.getInput().get(1).getDim1() == 1;
+       }
        
        public static boolean isUnary(Hop hop, OpOp1 type) {
                return hop instanceof UnaryOp && ((UnaryOp)hop).getOp()==type;
diff --git 
a/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java 
b/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java
index c861663..624d424 100644
--- a/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java
+++ b/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java
@@ -24,8 +24,6 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.common.Types.AggOp;
 import org.apache.sysds.common.Types.OpOp1;
 import org.apache.sysds.common.Types.OpOp2;
@@ -37,7 +35,6 @@ import org.apache.sysds.hops.FunctionOp;
 import org.apache.sysds.hops.Hop;
 import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.lops.Compression.CompressConfig;
-import org.apache.sysds.lops.MMTSJ.MMTSJType;
 import org.apache.sysds.parser.DMLProgram;
 import org.apache.sysds.parser.ForStatement;
 import org.apache.sysds.parser.ForStatementBlock;
@@ -53,12 +50,13 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
 /**
  * Rule: Compressed Re block if config compressed.linalg is enabled, we inject 
compression directions after read of
- * matrices if number of rows is above 1000 and cols at least 1. In case of 
'auto' compression, we apply compression if
- * the data size is known to exceed aggregate cluster memory, the matrix is 
used in loops, and all operations are
- * supported over compressed matrices.
+ * matrices if number of rows is above 1000 and cols at least 1.
+ * 
+ * In case of 'auto' compression, we apply compression if the data size is 
known to exceed aggregate cluster memory, the
+ * matrix is used in loops, and all operations are supported over compressed 
matrices.
  */
 public class RewriteCompressedReblock extends StatementBlockRewriteRule {
-       private static final Log LOG = 
LogFactory.getLog(RewriteCompressedReblock.class.getName());
+       // private static final Log LOG = 
LogFactory.getLog(RewriteCompressedReblock.class.getName());
 
        private static final String TMP_PREFIX = "__cmtx";
 
@@ -99,6 +97,7 @@ public class RewriteCompressedReblock extends 
StatementBlockRewriteRule {
                // recursively process children
                for(Hop hi : hop.getInput())
                        injectCompressionDirective(hi, compress, prog);
+
                // check for compression conditions
                switch(compress) {
                        case TRUE:
@@ -106,11 +105,13 @@ public class RewriteCompressedReblock extends 
StatementBlockRewriteRule {
                                        hop.setRequiresCompression(true);
                                break;
                        case AUTO:
-                               if(satisfiesAutoCompressionCondition(hop, prog))
+                               if(OptimizerUtils.isSparkExecutionMode() && 
satisfiesAutoCompressionCondition(hop, prog))
                                        hop.setRequiresCompression(true);
+                               break;
                        case COST:
                                if(satisfiesCostCompressionCondition(hop, prog))
                                        hop.setRequiresCompression(true);
+                               break;
                        default:
                                break;
                }
@@ -122,8 +123,38 @@ public class RewriteCompressedReblock extends 
StatementBlockRewriteRule {
                hop.setVisited();
        }
 
+       private static boolean satisfiesSizeConstraintsForCompression(Hop hop) {
+               return hop.getDim2() >= 1 &&
+                       ((hop.getDim1() >= 1000 && hop.getDim2() < 100) || 
hop.getDim1() / hop.getDim2() >= 1000);
+       }
+
        private static boolean satisfiesCompressionCondition(Hop hop) {
-               return HopRewriteUtils.isData(hop, OpOpData.PERSISTENTREAD) && 
(hop.getDim1() >= 1000 && hop.getDim2() >= 1);
+               boolean satisfies = false;
+               if(satisfiesSizeConstraintsForCompression(hop))
+                       satisfies |= HopRewriteUtils.isData(hop, 
OpOpData.PERSISTENTREAD);
+
+               return satisfies;
+       }
+
+       private static boolean satisfiesAggressiveCompressionCondition(Hop hop) 
{
+               boolean satisfies = false;
+               if(satisfiesSizeConstraintsForCompression(hop)) {
+                       satisfies |= HopRewriteUtils.isData(hop, 
OpOpData.PERSISTENTREAD);
+                       satisfies |= HopRewriteUtils.isUnary(hop, OpOp1.ROUND, 
OpOp1.FLOOR, OpOp1.NOT, OpOp1.CEIL);
+                       satisfies |= HopRewriteUtils.isBinary(hop,
+                               OpOp2.EQUAL,
+                               OpOp2.NOTEQUAL,
+                               OpOp2.LESS,
+                               OpOp2.LESSEQUAL,
+                               OpOp2.GREATER,
+                               OpOp2.GREATEREQUAL,
+                               OpOp2.AND,
+                               OpOp2.OR,
+                               OpOp2.MODULUS);
+               }
+               if(LOG.isDebugEnabled() && satisfies)
+                       LOG.debug("Operation Satisfies: " + hop);
+               return satisfies;
        }
 
        private static boolean satisfiesDeCompressionCondition(Hop hop) {
@@ -131,226 +162,279 @@ public class RewriteCompressedReblock extends 
StatementBlockRewriteRule {
                return false;
        }
 
+       private static boolean outOfCore(Hop hop) {
+               double matrixPSize = 
OptimizerUtils.estimatePartitionedSizeExactSparsity(hop);
+               double cacheSize = 
SparkExecutionContext.getDataMemoryBudget(true, true);
+               return matrixPSize > cacheSize;
+       }
+
+       private static boolean ultraSparse(Hop hop) {
+               double sparsity = OptimizerUtils.getSparsity(hop);
+               return sparsity < MatrixBlock.ULTRA_SPARSITY_TURN_POINT;
+       }
+
        private static boolean satisfiesAutoCompressionCondition(Hop hop, 
DMLProgram prog) {
                // check for basic compression condition
-               if(!(satisfiesCompressionCondition(hop) && hop.getMemEstimate() 
>= OptimizerUtils.getLocalMemBudget() &&
-                       OptimizerUtils.isSparkExecutionMode()))
+               if(!(satisfiesCompressionCondition(hop) && hop.getMemEstimate() 
>= OptimizerUtils.getLocalMemBudget()))
                        return false;
 
-               // determine if data size exceeds aggregate cluster storage 
memory
-               double matrixPSize = OptimizerUtils
-                       .estimatePartitionedSizeExactSparsity(hop.getDim1(), 
hop.getDim2(), hop.getBlocksize(), hop.getNnz());
-               double cacheSize = 
SparkExecutionContext.getDataMemoryBudget(true, true);
-               boolean outOfCore = matrixPSize > cacheSize;
-
-               // determine if matrix is ultra sparse (and hence serialized)
-               double sparsity = OptimizerUtils.getSparsity(hop.getDim1(), 
hop.getDim2(), hop.getNnz());
-               boolean ultraSparse = sparsity < 
MatrixBlock.ULTRA_SPARSITY_TURN_POINT;
-
                // determine if all operations are supported over compressed 
matrices,
                // but conditionally only if all other conditions are met
-               if(hop.dimsKnown(true) && outOfCore && !ultraSparse) {
-                       // analyze program recursively, including called 
functions
-                       ProbeStatus status = new ProbeStatus(hop.getHopID(), 
prog);
-                       for(StatementBlock sb : prog.getStatementBlocks())
-                               rAnalyzeProgram(sb, status);
-
-                       // applicable if used in loop (amortized compressed 
costs),
-                       // no conditional updates in if-else branches
-                       // and all operations are applicable (no decompression 
costs)
-                       boolean ret = status.foundStart && status.usedInLoop && 
!status.condUpdate && !status.nonApplicable;
-                       if(LOG.isDebugEnabled()) {
-                               LOG.debug("Auto compression: " + ret + " 
(dimsKnown=" + hop.dimsKnown(true) + ", outOfCore=" + outOfCore
-                                       + ", !ultraSparse=" + !ultraSparse + ", 
foundStart=" + status.foundStart + ", usedInLoop="
-                                       + status.foundStart + ", !condUpdate=" 
+ !status.condUpdate + ", !nonApplicable="
-                                       + !status.nonApplicable + ")");
-                       }
-                       return ret;
-               }
-               else if(LOG.isDebugEnabled()) {
-                       LOG.debug("Auto compression: false (dimsKnown=" + 
hop.dimsKnown(true) + ", outOfCore=" + outOfCore
-                               + ", !ultraSparse=" + !ultraSparse + ")");
+               if(hop.dimsKnown(true) && outOfCore(hop) && !ultraSparse(hop)) {
+                       return analyseProgram(hop, 
prog).isValidAutoCompression();
                }
+
                return false;
        }
 
        private static boolean satisfiesCostCompressionCondition(Hop hop, 
DMLProgram prog) {
-               // TODO Cost compression Condition
-               return false;
+               return satisfiesAggressiveCompressionCondition(hop) && 
hop.dimsKnown(false) &&
+                       analyseProgram(hop, 
prog).isValidAggressiveCompression();
+
        }
 
-       private static void rAnalyzeProgram(StatementBlock sb, ProbeStatus 
status) {
-               if(sb instanceof FunctionStatementBlock) {
-                       FunctionStatementBlock fsb = (FunctionStatementBlock) 
sb;
-                       FunctionStatement fstmt = (FunctionStatement) 
fsb.getStatement(0);
-                       for(StatementBlock csb : fstmt.getBody())
-                               rAnalyzeProgram(csb, status);
-               }
-               else if(sb instanceof WhileStatementBlock) {
-                       WhileStatementBlock wsb = (WhileStatementBlock) sb;
-                       WhileStatement wstmt = (WhileStatement) 
wsb.getStatement(0);
-                       for(StatementBlock csb : wstmt.getBody())
-                               rAnalyzeProgram(csb, status);
-                       if(wsb.variablesRead().containsAnyName(status.compMtx))
-                               status.usedInLoop = true;
+       private static ProbeStatus analyseProgram(Hop hop, DMLProgram prog) {
+               ProbeStatus status = new ProbeStatus(hop.getHopID(), prog);
+               for(StatementBlock sb : prog.getStatementBlocks())
+                       status.rAnalyzeProgram(sb);
+               return status;
+       }
+
+       private static class ProbeStatus {
+               private final long startHopID;
+               private final DMLProgram prog;
+
+               private int numberCompressedOpsExecuted = 0;
+               private int numberDecompressedOpsExecuted = 0;
+               private int inefficientSupportedOpsExecuted = 0;
+
+               private boolean foundStart = false;
+               private boolean usedInLoop = false;
+               private boolean condUpdate = false;
+               private boolean nonApplicable = false;
+
+               private HashSet<String> procFn = new HashSet<>();
+               private HashSet<String> compMtx = new HashSet<>();
+
+               private ProbeStatus(long hopID, DMLProgram p) {
+                       startHopID = hopID;
+                       prog = p;
                }
-               else if(sb instanceof IfStatementBlock) {
-                       IfStatementBlock isb = (IfStatementBlock) sb;
-                       IfStatement istmt = (IfStatement) isb.getStatement(0);
-                       for(StatementBlock csb : istmt.getIfBody())
-                               rAnalyzeProgram(csb, status);
-                       for(StatementBlock csb : istmt.getElseBody())
-                               rAnalyzeProgram(csb, status);
-                       
if(isb.variablesUpdated().containsAnyName(status.compMtx))
-                               status.condUpdate = true;
+
+               private ProbeStatus(ProbeStatus status) {
+                       startHopID = status.startHopID;
+                       prog = status.prog;
+                       foundStart = status.foundStart;
+                       usedInLoop = status.usedInLoop;
+                       condUpdate = status.condUpdate;
+                       nonApplicable = status.nonApplicable;
+                       procFn.addAll(status.procFn);
                }
-               else if(sb instanceof ForStatementBlock) { // incl parfor
-                       ForStatementBlock fsb = (ForStatementBlock) sb;
-                       ForStatement fstmt = (ForStatement) fsb.getStatement(0);
-                       for(StatementBlock csb : fstmt.getBody())
-                               rAnalyzeProgram(csb, status);
-                       if(fsb.variablesRead().containsAnyName(status.compMtx))
-                               status.usedInLoop = true;
+
+               private void rAnalyzeProgram(StatementBlock sb) {
+                       if(sb instanceof FunctionStatementBlock) {
+                               FunctionStatementBlock fsb = 
(FunctionStatementBlock) sb;
+                               FunctionStatement fstmt = (FunctionStatement) 
fsb.getStatement(0);
+                               for(StatementBlock csb : fstmt.getBody())
+                                       rAnalyzeProgram(csb);
+                       }
+                       else if(sb instanceof WhileStatementBlock) {
+                               WhileStatementBlock wsb = (WhileStatementBlock) 
sb;
+                               WhileStatement wstmt = (WhileStatement) 
wsb.getStatement(0);
+                               for(StatementBlock csb : wstmt.getBody())
+                                       rAnalyzeProgram(csb);
+                               if(wsb.variablesRead().containsAnyName(compMtx))
+                                       usedInLoop = true;
+                       }
+                       else if(sb instanceof IfStatementBlock) {
+                               IfStatementBlock isb = (IfStatementBlock) sb;
+                               IfStatement istmt = (IfStatement) 
isb.getStatement(0);
+                               for(StatementBlock csb : istmt.getIfBody())
+                                       rAnalyzeProgram(csb);
+                               for(StatementBlock csb : istmt.getElseBody())
+                                       rAnalyzeProgram(csb);
+                               
if(isb.variablesUpdated().containsAnyName(compMtx))
+                                       condUpdate = true;
+                       }
+                       else if(sb instanceof ForStatementBlock) { // incl 
parfor
+                               ForStatementBlock fsb = (ForStatementBlock) sb;
+                               ForStatement fstmt = (ForStatement) 
fsb.getStatement(0);
+                               for(StatementBlock csb : fstmt.getBody())
+                                       rAnalyzeProgram(csb);
+                               if(fsb.variablesRead().containsAnyName(compMtx))
+                                       usedInLoop = true;
+                       }
+                       else if(sb.getHops() != null) { // generic (last-level)
+                               ArrayList<Hop> roots = sb.getHops();
+                               Hop.resetVisitStatus(roots);
+                               // process entire HOP DAG starting from the 
roots
+                               for(Hop root : roots)
+                                       rAnalyzeHopDag(root);
+                               // remove temporary variables
+                               compMtx.removeIf(n -> n.startsWith(TMP_PREFIX));
+                               Hop.resetVisitStatus(roots);
+                       }
                }
-               else if(sb.getHops() != null) { // generic (last-level)
-                       ArrayList<Hop> roots = sb.getHops();
-                       Hop.resetVisitStatus(roots);
-                       // process entire HOP DAG starting from the roots
-                       for(Hop root : roots)
-                               rAnalyzeHopDag(root, status);
-                       // remove temporary variables
-                       status.compMtx.removeIf(n -> n.startsWith(TMP_PREFIX));
-                       Hop.resetVisitStatus(roots);
+
+               private void rAnalyzeHopDag(Hop current) {
+                       if(current.isVisited())
+                               return;
+
+                       // process children recursively
+                       for(Hop input : current.getInput())
+                               rAnalyzeHopDag(input);
+
+                       // handle source persistent read
+                       if(current.getHopID() == startHopID) {
+                               compMtx.add(getTmpName(current));
+                               foundStart = true;
+                       }
+
+                       // 1) handle transient reads and writes (name mapping)
+                       if(HopRewriteUtils.isData(current, 
OpOpData.TRANSIENTWRITE) &&
+                               
compMtx.contains(getTmpName(current.getInput().get(0))))
+                               compMtx.add(current.getName());
+                       else if(HopRewriteUtils.isData(current, 
OpOpData.TRANSIENTREAD) && compMtx.contains(current.getName()))
+                               compMtx.add(getTmpName(current));
+                       // handle individual hops
+                       else if(hasCompressedInput(current)) {
+                               if(current instanceof FunctionOp)
+                                       handleFunctionOps(current);
+                               else
+                                       handleApplicableOps(current);
+                       }
+                       current.setVisited();
                }
-       }
 
-       private static void rAnalyzeHopDag(Hop current, ProbeStatus status) {
-               if(current.isVisited())
-                       return;
+               private boolean hasCompressedInput(Hop hop) {
+                       if(compMtx.isEmpty())
+                               return false;
+                       for(Hop input : hop.getInput())
+                               if(compMtx.contains(getTmpName(input)))
+                                       return true;
+                       return false;
+               }
 
-               // process children recursively
-               for(Hop input : current.getInput())
-                       rAnalyzeHopDag(input, status);
+               private static String getTmpName(Hop hop) {
+                       return TMP_PREFIX + hop.getHopID();
+               }
 
-               // handle source persistent read
-               if(current.getHopID() == status.startHopID) {
-                       status.compMtx.add(getTmpName(current));
-                       status.foundStart = true;
+               private boolean isCompressed(Hop hop) {
+                       return compMtx.contains(getTmpName(hop));
                }
 
-               // handle individual hops
-               
-               // a) handle function calls
-               if(current instanceof FunctionOp && hasCompressedInput(current, 
status)) {
+               private void handleFunctionOps(Hop current) {
                        // TODO handle of functions in a more fine-grained 
manner
                        // to cover special cases multiple calls where 
compressed
                        // inputs might occur for different input parameters
 
                        FunctionOp fop = (FunctionOp) current;
                        String fkey = fop.getFunctionKey();
-                       if(!status.procFn.contains(fkey)) {
+                       if(!procFn.contains(fkey)) {
                                // memoization to avoid redundant analysis and 
recursive calls
-                               status.procFn.add(fkey);
+                               procFn.add(fkey);
                                // map inputs to function inputs
-                               FunctionStatementBlock fsb = 
status.prog.getFunctionStatementBlock(fkey);
+                               FunctionStatementBlock fsb = 
prog.getFunctionStatementBlock(fkey);
                                FunctionStatement fstmt = (FunctionStatement) 
fsb.getStatement(0);
-                               ProbeStatus status2 = new ProbeStatus(status);
+                               ProbeStatus status2 = new ProbeStatus(this);
                                for(int i = 0; i < fop.getInput().size(); i++)
-                                       
if(status.compMtx.contains(getTmpName(fop.getInput().get(i))))
+                                       
if(compMtx.contains(getTmpName(fop.getInput().get(i))))
                                                
status2.compMtx.add(fstmt.getInputParams().get(i).getName());
                                // analyze function and merge meta info
-                               rAnalyzeProgram(fsb, status2);
-                               status.foundStart |= status2.foundStart;
-                               status.usedInLoop |= status2.usedInLoop;
-                               status.condUpdate |= status2.condUpdate;
-                               status.nonApplicable |= status2.nonApplicable;
+                               status2.rAnalyzeProgram(fsb);
+                               foundStart |= status2.foundStart;
+                               usedInLoop |= status2.usedInLoop;
+                               condUpdate |= status2.condUpdate;
+                               nonApplicable |= status2.nonApplicable;
+                               numberCompressedOpsExecuted += 
status2.numberCompressedOpsExecuted;
+                               numberDecompressedOpsExecuted += 
status2.numberDecompressedOpsExecuted;
                                // map function outputs to outputs
                                String[] outputs = fop.getOutputVariableNames();
                                for(int i = 0; i < outputs.length; i++)
                                        
if(status2.compMtx.contains(fstmt.getOutputParams().get(i).getName()))
-                                               status.compMtx.add(outputs[i]);
+                                               compMtx.add(outputs[i]);
                        }
                }
 
-               // b) handle transient reads and writes (name mapping)
-               else if(HopRewriteUtils.isData(current, 
OpOpData.TRANSIENTWRITE) &&
-                       
status.compMtx.contains(getTmpName(current.getInput().get(0))))
-                       status.compMtx.add(current.getName());
-               else if(HopRewriteUtils.isData(current, OpOpData.TRANSIENTREAD) 
&& status.compMtx.contains(current.getName()))
-                       status.compMtx.add(getTmpName(current));
-
-               // c) handle applicable operations
-               else if(hasCompressedInput(current, status)) {
+               private void handleApplicableOps(Hop current) {
                        // Valid with uncompressed outputs
-                       // tsmm
-                       boolean compUCOut = (current instanceof AggBinaryOp && 
current.getDim2() <= current.getBlocksize() &&
-                               ((AggBinaryOp) current).checkTransposeSelf() == 
MMTSJType.LEFT);
+                       boolean compUCOut = false;
+                       // // tsmm
+                       // compUCOut |= (current instanceof AggBinaryOp && 
current.getDim2() <= current.getBlocksize() &&
+                       // ((AggBinaryOp) current).checkTransposeSelf() == 
MMTSJType.LEFT);
 
-                       // mvmm
-                       compUCOut |= (current instanceof AggBinaryOp && 
(current.getDim1() == 1 || current.getDim2() == 1));
-                       compUCOut |= 
(HopRewriteUtils.isTransposeOperation(current) && current.getParent().size() == 
1 &&
-                               current.getParent().get(0) instanceof 
AggBinaryOp &&
-                               (current.getParent().get(0).getDim1() == 1 || 
current.getParent().get(0).getDim2() == 1));
-                       compUCOut |= HopRewriteUtils.isAggUnaryOp(current, 
AggOp.SUM, AggOp.SUM_SQ, AggOp.MIN, AggOp.MAX);
+                       // // mvmm
+                       // compUCOut |= (current instanceof AggBinaryOp && 
(current.getDim1() == 1 || current.getDim2() == 1));
+                       // compUCOut |= 
(HopRewriteUtils.isTransposeOperation(current) && current.getParent().size() == 
1 &&
+                       // current.getParent().get(0) instanceof AggBinaryOp &&
+                       // (current.getParent().get(0).getDim1() == 1 || 
current.getParent().get(0).getDim2() == 1));
+
+                       compUCOut |= (current instanceof AggBinaryOp);
+                       compUCOut |= 
HopRewriteUtils.isBinaryMatrixColVectorOperation(current);
+
+                       boolean isAggregate = HopRewriteUtils
+                               .isAggUnaryOp(current, AggOp.SUM, AggOp.SUM_SQ, 
AggOp.MIN, AggOp.MAX, AggOp.MEAN);
+
+                       // If the aggregation function is done row wise.
+                       if(isAggregate && current.getDim2() < 2 && 
current.getDim1() >= 1000)
+                               inefficientSupportedOpsExecuted++;
+
+                       compUCOut |= isAggregate;
 
                        // Valid compressed
+                       boolean compCOut = false;
+
                        // Compressed Output if the operation is Binary scalar
-                       boolean compCOut = 
HopRewriteUtils.isBinaryMatrixScalarOperation(current);
-                       // Compressed Output if the operation is right Matrix 
Multiply
-                       compCOut |= 
HopRewriteUtils.isBinaryMatrixMatrixOperation(current) &&
-                               isCompressed(current.getInput().get(0), status);
-                       // Compressed Output if the operation is binary.
+                       compCOut |= 
HopRewriteUtils.isBinaryMatrixScalarOperation(current);
+                       compCOut |= 
HopRewriteUtils.isBinaryMatrixRowVectorOperation(current);
+
+                       // Compressed Output possible through overlapping 
matrix.if the operation is right Matrix Multiply
+                       compCOut |= (current instanceof AggBinaryOp) && 
isCompressed(current.getInput().get(0));
+                       compUCOut = compCOut ? false : compUCOut;
+
+                       // Compressed Output if the operation is column bind.
                        compCOut |= HopRewriteUtils.isBinary(current, 
OpOp2.CBIND);
 
                        boolean metaOp = HopRewriteUtils.isUnary(current, 
OpOp1.NROW, OpOp1.NCOL);
-                       status.nonApplicable |= !(compUCOut || compCOut || 
metaOp);
-                       if(compCOut)
-                               status.compMtx.add(getTmpName(current));
-               }
+                       boolean applicable = compUCOut || compCOut || metaOp;
 
-               current.setVisited();
-       }
+                       if(applicable)
+                               numberCompressedOpsExecuted++;
+                       else {
+                               LOG.warn("Decompession op: " + current);
+                               numberDecompressedOpsExecuted++;
+                       }
 
-       private static String getTmpName(Hop hop) {
-               return TMP_PREFIX + hop.getHopID();
-       }
+                       nonApplicable |= !(applicable);
 
-       private static boolean hasCompressedInput(Hop hop, ProbeStatus status) {
-               if(status.compMtx.isEmpty())
-                       return false;
-               for(Hop input : hop.getInput())
-                       if(status.compMtx.contains(getTmpName(input)))
-                               return true;
-               return false;
-       }
-
-       private static boolean isCompressed(Hop hop, ProbeStatus status) {
-               return status.compMtx.contains(getTmpName(hop));
-       }
+                       if(compCOut)
+                               compMtx.add(getTmpName(current));
+               }
 
-       private static class ProbeStatus {
-               private final long startHopID;
-               private final DMLProgram prog;
-               private boolean foundStart = false;
-               private boolean usedInLoop = false;
-               private boolean condUpdate = false;
-               private boolean nonApplicable = false;
-               private HashSet<String> procFn = new HashSet<>();
-               private HashSet<String> compMtx = new HashSet<>();
+               private boolean isValidAutoCompression() {
+                       return foundStart && usedInLoop && !condUpdate && 
!nonApplicable;
+               }
 
-               public ProbeStatus(long hopID, DMLProgram p) {
-                       startHopID = hopID;
-                       prog = p;
+               private boolean isValidAggressiveCompression() {
+                       if(LOG.isDebugEnabled())
+                               LOG.debug(this.toString());
+                       return (inefficientSupportedOpsExecuted < 
numberCompressedOpsExecuted) &&
+                               (usedInLoop || numberCompressedOpsExecuted > 3) 
&& numberDecompressedOpsExecuted < 1;
                }
 
-               public ProbeStatus(ProbeStatus status) {
-                       startHopID = status.startHopID;
-                       prog = status.prog;
-                       foundStart = status.foundStart;
-                       usedInLoop = status.usedInLoop;
-                       condUpdate = status.condUpdate;
-                       nonApplicable = status.nonApplicable;
-                       procFn.addAll(status.procFn);
+               @Override
+               public String toString() {
+                       StringBuilder sb = new StringBuilder();
+                       sb.append("Compressed ProbeStatus : hopID =" + 
startHopID);
+                       sb.append("\n CLA Ops         : " + 
numberCompressedOpsExecuted);
+                       sb.append("\n Decompress Ops  : " + 
numberDecompressedOpsExecuted);
+                       sb.append("\n Inefficient Ops : " + 
inefficientSupportedOpsExecuted);
+                       sb.append("\n foundStart " + foundStart + " inLoop " + 
usedInLoop + " condUpdate : " + condUpdate
+                               + " nonApplicable : " + nonApplicable);
+                       sb.append("\n compressed Matrix: " + compMtx);
+                       sb.append("\n Prog Fn " + procFn);
+                       return sb.toString();
                }
+
        }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/Dictionary.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/Dictionary.java
index d4346a5..570a46a 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/Dictionary.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/Dictionary.java
@@ -176,7 +176,7 @@ public class Dictionary extends ADictionary {
 
        @Override
        public int getNumberOfValues(int nCol) {
-               return (_values == null) ? 0 : _values.length / nCol;
+               return (_values == null || nCol == 0) ? 0 : _values.length / 
nCol;
        }
 
        @Override
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java 
b/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java
index aac6dcf..f8bfb40 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java
@@ -273,7 +273,8 @@ public class LibRightMultBy {
                                ColGroupValue g = (ColGroupValue) 
colGroups.get(j);
                                Pair<int[], double[]> preAggregatedB = g
                                        .preaggValues(v.getRight()[j], that, 
g.getValues(), 0, that.getNumColumns(), that.getNumColumns());
-                               
retCg.add(g.copyAndSet(preAggregatedB.getLeft(), preAggregatedB.getRight()));
+                               if(preAggregatedB.getLeft().length > 0)
+                                       
retCg.add(g.copyAndSet(preAggregatedB.getLeft(), preAggregatedB.getRight()));
                        }
                }
                else {
@@ -284,8 +285,9 @@ public class LibRightMultBy {
 
                                for(int j = 0; j < colGroups.size(); j++) {
                                        Pair<int[], double[]> preAggregates = 
ag.get(j).get();
-                                       retCg.add(((ColGroupValue) 
colGroups.get(j)).copyAndSet(preAggregates.getLeft(),
-                                               preAggregates.getRight()));
+                                       if(preAggregates.getLeft().length > 0)
+                                               retCg.add(((ColGroupValue) 
colGroups.get(j)).copyAndSet(preAggregates.getLeft(),
+                                                       
preAggregates.getRight()));
                                }
                        }
                        catch(InterruptedException | ExecutionException e) {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/LibScalar.java 
b/src/main/java/org/apache/sysds/runtime/compress/lib/LibScalar.java
index 18c30ed..800c83b 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/LibScalar.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibScalar.java
@@ -82,9 +82,8 @@ public class LibScalar {
                else {
                        int threadsAvailable = (sop.getNumThreads() > 1) ? 
sop.getNumThreads() : OptimizerUtils
                                .getConstrainedNumThreads(-1);
-                       if(threadsAvailable > 1) {
+                       if(threadsAvailable > 1)
                                parallelScalarOperations(sop, colGroups, ret, 
threadsAvailable);
-                       }
                        else {
                                // Apply the operation to each of the column 
groups.
                                // Most implementations will only modify 
metadata.
@@ -103,11 +102,11 @@ public class LibScalar {
 
        }
 
-       private static CompressedMatrixBlock setupRet(CompressedMatrixBlock m1, 
MatrixValue result){
+       private static CompressedMatrixBlock setupRet(CompressedMatrixBlock m1, 
MatrixValue result) {
                CompressedMatrixBlock ret;
                if(result == null || !(result instanceof CompressedMatrixBlock))
                        ret = new CompressedMatrixBlock(m1.getNumRows(), 
m1.getNumColumns());
-               else{
+               else {
                        ret = (CompressedMatrixBlock) result;
                        ret.setNumColumns(m1.getNumColumns());
                        ret.setNumRows(m1.getNumRows());
diff --git a/src/main/java/org/apache/sysds/utils/Statistics.java 
b/src/main/java/org/apache/sysds/utils/Statistics.java
index ad52b10..b9059d9 100644
--- a/src/main/java/org/apache/sysds/utils/Statistics.java
+++ b/src/main/java/org/apache/sysds/utils/Statistics.java
@@ -34,7 +34,6 @@ import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.conf.ConfigurationManager;
-import org.apache.sysds.conf.DMLConfig;
 import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.controlprogram.caching.CacheStatistics;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
@@ -1038,7 +1037,7 @@ public class Statistics
                                        federatedExecuteUDFCount.longValue() + 
".\n");
                        }
 
-                       if( 
ConfigurationManager.getDMLConfig().getTextValue(DMLConfig.COMPRESSED_LINALG).contains("true")){
+                       if(ConfigurationManager.isCompressionEnabled()){
                                DMLCompressionStatistics.display(sb);
                        }
 
diff --git 
a/src/test/java/org/apache/sysds/test/functions/compress/compressInstruction.java
 
b/src/test/java/org/apache/sysds/test/functions/compress/compressInstruction.java
index 8140df9..0e9e529 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/compress/compressInstruction.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/compress/compressInstruction.java
@@ -24,15 +24,15 @@ import static org.junit.Assert.assertTrue;
 import org.apache.sysds.common.Types;
 import org.apache.sysds.lops.LopProperties;
 import org.apache.sysds.lops.LopProperties.ExecType;
+import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing;
 import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
 import org.apache.sysds.test.TestUtils;
-import org.apache.sysds.utils.DMLCompressionStatistics;
 import org.apache.sysds.utils.Statistics;
-import org.junit.Assert;
 import org.junit.Test;
 
 public class compressInstruction extends AutomatedTestBase {
+    // private static final Log LOG = 
LogFactory.getLog(compressInstruction.class.getName());
 
     protected String getTestClassDir() {
         return getTestDir() + this.getClass().getSimpleName() + "/";
@@ -47,11 +47,6 @@ public class compressInstruction extends AutomatedTestBase {
     }
 
     @Test
-    public void empty() {
-
-    }
-
-    @Test
     public void testCompressInstruction_01() {
         compressTest(1, 1000, 0.2, ExecType.CP, 0, 5, 0, 1, "01");
     }
@@ -61,6 +56,33 @@ public class compressInstruction extends AutomatedTestBase {
         compressTest(1, 1000, 0.2, ExecType.CP, 0, 5, 1, 1, "02");
     }
 
+    @Test
+    public void testCompressInstruction_07() {
+        compressTest(10, 10000, 0.3, ExecType.CP, 0, 5, 1, 1, "07");
+    }
+
+    // @Test
+    // public void testCompressInstruction_07_noCompress() {
+    // compressTest(10, 10000, 0.3, ExecType.CP, 0, 5, 1, 1, "07_noCompress");
+    // }
+
+    public void testCompressInstruction_07_timeCompare() {
+
+        compressTest(10, 1000, 0.3, ExecType.CP, 0, 5, 1, 1, "07");
+
+        for(int i = 0; i < 10; i++) {
+            Timing time = new Timing(true);
+            compressTest(10, 100000, 1.0, ExecType.CP, 0, 5, 1, 1, "07");
+            System.out.println("CLA " + time.stop() + " ms.");
+            Statistics.reset();
+        }
+        for(int i = 0; i < 10; i++) {
+            Timing time = new Timing(true);
+            compressTest(10, 100000, 1.0, ExecType.CP, 0, 5, 0, 0, 
"07_noCompress");
+            System.out.println("ULA " + time.stop() + " ms.");
+        }
+    }
+
     public void compressTest(int cols, int rows, double sparsity, 
LopProperties.ExecType instType, int min, int max,
         int decompressionCountExpected, int compressionCountsExpected, String 
name) {
 
@@ -73,16 +95,8 @@ public class compressInstruction extends AutomatedTestBase {
 
             programArgs = new String[] {"-stats", "100", "-nvargs", "cols=" + 
cols, "rows=" + rows,
                 "sparsity=" + sparsity, "min=" + min, "max= " + max};
-
             runTest(null);
-
-            int decompressCount = 0;
-            decompressCount += 
DMLCompressionStatistics.getDecompressionCount();
-            decompressCount += 
DMLCompressionStatistics.getDecompressionSTCount();
-            long compressionCount = 
Statistics.getCPHeavyHitterCount("compress");
-
-            Assert.assertEquals(compressionCount, compressionCountsExpected);
-            Assert.assertEquals(decompressionCountExpected, decompressCount);
+            // LOG.error(runTest(null));
 
         }
         catch(Exception e) {
diff --git 
a/src/test/java/org/apache/sysds/test/functions/compress/compressInstruction.java
 
b/src/test/java/org/apache/sysds/test/functions/compress/compressInstructionRewrite.java
similarity index 58%
copy from 
src/test/java/org/apache/sysds/test/functions/compress/compressInstruction.java
copy to 
src/test/java/org/apache/sysds/test/functions/compress/compressInstructionRewrite.java
index 8140df9..6b8ce83 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/compress/compressInstruction.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/compress/compressInstructionRewrite.java
@@ -21,6 +21,11 @@ package org.apache.sysds.test.functions.compress;
 
 import static org.junit.Assert.assertTrue;
 
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.common.Types;
 import org.apache.sysds.lops.LopProperties;
 import org.apache.sysds.lops.LopProperties.ExecType;
@@ -32,7 +37,11 @@ import org.apache.sysds.utils.Statistics;
 import org.junit.Assert;
 import org.junit.Test;
 
-public class compressInstruction extends AutomatedTestBase {
+public class compressInstructionRewrite extends AutomatedTestBase {
+    private static final Log LOG = 
LogFactory.getLog(compressInstructionRewrite.class.getName());
+
+    private String TEST_CONF = "SystemDS-config-compress-cost.xml";
+    private File TEST_CONF_FILE = new File(SCRIPT_DIR + getTestDir(), 
TEST_CONF);
 
     protected String getTestClassDir() {
         return getTestDir() + this.getClass().getSimpleName() + "/";
@@ -43,22 +52,52 @@ public class compressInstruction extends AutomatedTestBase {
     }
 
     protected String getTestDir() {
-        return "functions/compress/compressInstruction/";
+        return "functions/compress/compressInstructionRewrite/";
     }
 
     @Test
-    public void empty() {
+    public void testCompressInstruction_01() {
+        compressTest(1, 1000, 0.2, ExecType.CP, 0, 5, 0, 0, "01");
+    }
 
+    @Test
+    public void testCompressInstruction_02() {
+        compressTest(1, 1000, 0.2, ExecType.CP, 0, 5, 0, 1, "02");
     }
 
     @Test
-    public void testCompressInstruction_01() {
-        compressTest(1, 1000, 0.2, ExecType.CP, 0, 5, 0, 1, "01");
+    public void testCompressInstruction_02_toSmallToCompress() {
+        compressTest(1, 999, 0.2, ExecType.CP, 0, 5, 0, 0, "02");
     }
 
     @Test
-    public void testCompressInstruction_02() {
-        compressTest(1, 1000, 0.2, ExecType.CP, 0, 5, 1, 1, "02");
+    public void testCompressInstruction_03() {
+        compressTest(1, 1000, 0.2, ExecType.CP, 0, 5, 0, 1, "03");
+    }
+
+    @Test
+    public void testCompressInstruction_04() {
+        compressTest(1, 1000, 0.2, ExecType.CP, 0, 5, 0, 0, "04");
+    }
+
+    @Test
+    public void testCompressInstruction_05() {
+        compressTest(3, 1000, 0.2, ExecType.CP, 0, 5, 0, 0, "05");
+    }
+
+    @Test
+    public void testCompressInstruction_06() {
+        compressTest(3, 1000, 0.2, ExecType.CP, 0, 5, 0, 1, "06");
+    }
+
+    @Test
+    public void testCompressInstruction_07() {
+        compressTest(6, 6000, 0.2, ExecType.CP, 0, 5, 0, 1, "07");
+    }
+
+    @Test
+    public void testCompressInstruction_08() {
+        compressTest(6, 6000, 0.2, ExecType.CP, 0, 5, 0, 1, "08");
     }
 
     public void compressTest(int cols, int rows, double sparsity, 
LopProperties.ExecType instType, int min, int max,
@@ -70,11 +109,13 @@ public class compressInstruction extends AutomatedTestBase 
{
             loadTestConfiguration(getTestConfiguration(getTestName()));
 
             fullDMLScriptName = SCRIPT_DIR + "/" + getTestDir() + "compress_" 
+ name + ".dml";
-
-            programArgs = new String[] {"-stats", "100", "-nvargs", "cols=" + 
cols, "rows=" + rows,
+            programArgs = new String[] {"-explain", "-stats", "100", 
"-nvargs", "cols=" + cols, "rows=" + rows,
                 "sparsity=" + sparsity, "min=" + min, "max= " + max};
 
-            runTest(null);
+            ByteArrayOutputStream stdout = runTest(null);
+
+            if(LOG.isDebugEnabled())
+                LOG.debug(stdout);
 
             int decompressCount = 0;
             decompressCount += 
DMLCompressionStatistics.getDecompressionCount();
@@ -83,7 +124,8 @@ public class compressInstruction extends AutomatedTestBase {
 
             Assert.assertEquals(compressionCount, compressionCountsExpected);
             Assert.assertEquals(decompressionCountExpected, decompressCount);
-
+            if(decompressionCountExpected > 0)
+                Assert.assertTrue(heavyHittersContainsString("decompress", 
decompressionCountExpected));
         }
         catch(Exception e) {
             e.printStackTrace();
@@ -100,4 +142,13 @@ public class compressInstruction extends AutomatedTestBase 
{
         addTestConfiguration(getTestName(), new 
TestConfiguration(getTestClassDir(), getTestName()));
     }
 
+    /**
+     * Override default configuration with custom test configuration to ensure 
scratch space and local temporary
+     * directory locations are also updated.
+     */
+    @Override
+    protected File getConfigTemplateFile() {
+        return TEST_CONF_FILE;
+    }
+
 }
diff --git 
a/src/test/scripts/functions/compress/compressInstruction/compress_01.dml 
b/src/test/scripts/functions/compress/compressInstruction/compress_01.dml
index fa10d65..b507a7c 100644
--- a/src/test/scripts/functions/compress/compressInstruction/compress_01.dml
+++ b/src/test/scripts/functions/compress/compressInstruction/compress_01.dml
@@ -19,7 +19,7 @@
 #
 #-------------------------------------------------------------
 
-A = rand(rows=$rows, cols=$cols, sparsity=$sparsity, min=$min, max=$max)
+A = rand(rows=$rows, cols=$cols, sparsity=$sparsity, min=$min, max=$max, seed 
= 1412)
 A = round(A)
 A = compress(A)
 
diff --git 
a/src/test/scripts/functions/compress/compressInstruction/compress_02.dml 
b/src/test/scripts/functions/compress/compressInstruction/compress_02.dml
index 1d2dbda..9f114c6 100644
--- a/src/test/scripts/functions/compress/compressInstruction/compress_02.dml
+++ b/src/test/scripts/functions/compress/compressInstruction/compress_02.dml
@@ -19,7 +19,7 @@
 #
 #-------------------------------------------------------------
 
-A = rand(rows=$rows, cols=$cols, sparsity=$sparsity, min=$min, max=$max)
+A = rand(rows=$rows, cols=$cols, sparsity=$sparsity, min=$min, max=$max, seed 
= 1412)
 A = round(A)
 A = compress(A)
 A = decompress(A)
diff --git 
a/src/test/scripts/functions/compress/compressInstruction/compress_07.dml 
b/src/test/scripts/functions/compress/compressInstruction/compress_07.dml
new file mode 100644
index 0000000..12fd53d
--- /dev/null
+++ b/src/test/scripts/functions/compress/compressInstruction/compress_07.dml
@@ -0,0 +1,58 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A = rand(rows=$rows, cols=$cols, sparsity=$sparsity, min=$min, max=$max, seed 
= 1412)
+X = round(A)
+# Should compress here becuase the operation in the loop is optimal for 
compressed ops.
+X = compress(X)
+
+num_centroids = 5
+C = rand(rows=num_centroids, cols=$cols, sparsity=1.0, min =$min, max=$max, 
seed = 1412)
+sumXsq = sum (X ^ 2);
+wcss = Inf
+
+for(i in 1:10)
+{
+    C_old = C;
+    D = -2 * (X %*% t(C)) + t(rowSums (C ^ 2))
+    # Should decompress the D here because it is used two times for 
ineffecient operations
+    # On the overlapping matrix D.
+    D = decompress(D)
+    # First ineffecient rowMins
+    minD = rowMins(D)
+
+    wcss_old = wcss;
+    wcss = sumXsq + sum (minD);
+    print(wcss)
+    print(sum ((C - C_old) ^ 2) / num_centroids)
+    # Find the closest centroid for each record
+    # Then ineffecient relational less than.
+    P = D <= minD;
+    # If some records belong to multiple centroids, share them equally
+    P = P / rowSums (P);
+    # Compute the column normalization factor for P
+    P_denom = colSums (P);
+    # Compute new centroids as weighted averages over the records
+    C_new = (t(P) %*% X) / t(P_denom);
+
+    C_old = C
+    C = C_new;
+}
diff --git 
a/src/test/scripts/functions/compress/compressInstruction/compress_01.dml 
b/src/test/scripts/functions/compress/compressInstruction/compress_07_noCompress.dml
similarity index 55%
copy from 
src/test/scripts/functions/compress/compressInstruction/compress_01.dml
copy to 
src/test/scripts/functions/compress/compressInstruction/compress_07_noCompress.dml
index fa10d65..6195801 100644
--- a/src/test/scripts/functions/compress/compressInstruction/compress_01.dml
+++ 
b/src/test/scripts/functions/compress/compressInstruction/compress_07_noCompress.dml
@@ -19,8 +19,34 @@
 #
 #-------------------------------------------------------------
 
-A = rand(rows=$rows, cols=$cols, sparsity=$sparsity, min=$min, max=$max)
-A = round(A)
-A = compress(A)
+A = rand(rows=$rows, cols=$cols, sparsity=$sparsity, min=$min, max=$max, seed 
= 1412)
+X = round(A)
 
-print(sum(A))
+
+num_centroids = 5
+C = rand(rows=num_centroids, cols=$cols, sparsity=1.0, min =$min, max=$max, 
seed = 1412)
+sumXsq = sum (X ^ 2);
+wcss = Inf
+
+for(i in 1:10)
+{
+    C_old = C;
+    D = -2 * (X %*% t(C)) + t(rowSums (C ^ 2))
+
+    minD = rowMins(D)
+    wcss_old = wcss;
+    wcss = sumXsq + sum (minD);
+    print(wcss)
+    print(sum ((C - C_old) ^ 2) / num_centroids)
+    # Find the closest centroid for each record
+    P = D <= minD;
+    # If some records belong to multiple centroids, share them equally
+    P = P / rowSums (P);
+    # Compute the column normalization factor for P
+    P_denom = colSums (P);
+    # Compute new centroids as weighted averages over the records
+    C_new = (t(P) %*% X) / t(P_denom);
+
+    C_old = C
+    C = C_new;
+}
diff --git 
a/src/test/scripts/functions/compress/compressInstructionRewrite/SystemDS-config-compress-cost.xml
 
b/src/test/scripts/functions/compress/compressInstructionRewrite/SystemDS-config-compress-cost.xml
new file mode 100644
index 0000000..988774b
--- /dev/null
+++ 
b/src/test/scripts/functions/compress/compressInstructionRewrite/SystemDS-config-compress-cost.xml
@@ -0,0 +1,24 @@
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+
+<root>
+       <sysds.compressed.linalg>cost</sysds.compressed.linalg>
+       <sysds.cp.parallel.ops>true</sysds.cp.parallel.ops>
+       <sysds.scratch>cost_scratch_space</sysds.scratch>
+</root>
diff --git 
a/src/test/scripts/functions/compress/compressInstruction/compress_01.dml 
b/src/test/scripts/functions/compress/compressInstructionRewrite/compress_01.dml
similarity index 91%
copy from 
src/test/scripts/functions/compress/compressInstruction/compress_01.dml
copy to 
src/test/scripts/functions/compress/compressInstructionRewrite/compress_01.dml
index fa10d65..22c647d 100644
--- a/src/test/scripts/functions/compress/compressInstruction/compress_01.dml
+++ 
b/src/test/scripts/functions/compress/compressInstructionRewrite/compress_01.dml
@@ -19,8 +19,7 @@
 #
 #-------------------------------------------------------------
 
-A = rand(rows=$rows, cols=$cols, sparsity=$sparsity, min=$min, max=$max)
+A = rand(rows=$rows, cols=$cols, sparsity=$sparsity, min=$min, max=$max, seed 
=14)
+# Should not compress since the number of operations on A is to small.
 A = round(A)
-A = compress(A)
-
 print(sum(A))
diff --git 
a/src/test/scripts/functions/compress/compressInstruction/compress_02.dml 
b/src/test/scripts/functions/compress/compressInstructionRewrite/compress_02.dml
similarity index 90%
copy from 
src/test/scripts/functions/compress/compressInstruction/compress_02.dml
copy to 
src/test/scripts/functions/compress/compressInstructionRewrite/compress_02.dml
index 1d2dbda..4c77c99 100644
--- a/src/test/scripts/functions/compress/compressInstruction/compress_02.dml
+++ 
b/src/test/scripts/functions/compress/compressInstructionRewrite/compress_02.dml
@@ -19,8 +19,9 @@
 #
 #-------------------------------------------------------------
 
-A = rand(rows=$rows, cols=$cols, sparsity=$sparsity, min=$min, max=$max)
+A = rand(rows=$rows, cols=$cols, sparsity=$sparsity, min=$min, max=$max, seed 
=14)
 A = round(A)
-A = compress(A)
-A = decompress(A)
+# Should compress here becuase the A is used in a loop.
+for(i in 1:10)
+    A = A * i
 print(sum(A))
diff --git 
a/src/test/scripts/functions/compress/compressInstruction/compress_01.dml 
b/src/test/scripts/functions/compress/compressInstructionRewrite/compress_03.dml
similarity index 82%
copy from 
src/test/scripts/functions/compress/compressInstruction/compress_01.dml
copy to 
src/test/scripts/functions/compress/compressInstructionRewrite/compress_03.dml
index fa10d65..fd0d3ff 100644
--- a/src/test/scripts/functions/compress/compressInstruction/compress_01.dml
+++ 
b/src/test/scripts/functions/compress/compressInstructionRewrite/compress_03.dml
@@ -19,8 +19,13 @@
 #
 #-------------------------------------------------------------
 
-A = rand(rows=$rows, cols=$cols, sparsity=$sparsity, min=$min, max=$max)
+A = rand(rows=$rows, cols=$cols, sparsity=$sparsity, min=$min, max=$max, seed 
=14)
 A = round(A)
-A = compress(A)
+# Should compress here becuase the A is used more than 3 times, on operations 
supported in compressed.
+B = A + 1
+C = A * 4
+D = sum(A)
+E = max(A)
 
-print(sum(A))
+total = sum(B) + sum(C) + D + E
+print(total)
diff --git 
a/src/test/scripts/functions/compress/compressInstruction/compress_01.dml 
b/src/test/scripts/functions/compress/compressInstructionRewrite/compress_04.dml
similarity index 88%
copy from 
src/test/scripts/functions/compress/compressInstruction/compress_01.dml
copy to 
src/test/scripts/functions/compress/compressInstructionRewrite/compress_04.dml
index fa10d65..391ee7e 100644
--- a/src/test/scripts/functions/compress/compressInstruction/compress_01.dml
+++ 
b/src/test/scripts/functions/compress/compressInstructionRewrite/compress_04.dml
@@ -19,8 +19,9 @@
 #
 #-------------------------------------------------------------
 
-A = rand(rows=$rows, cols=$cols, sparsity=$sparsity, min=$min, max=$max)
+A = rand(rows=$rows, cols=$cols, sparsity=$sparsity, min=$min, max=$max, seed 
=14)
 A = round(A)
-A = compress(A)
-
+# Should not compress here becuase the A operation in the loop is not comp 
supported
+for(i in 1:10)
+    A = t(A) 
 print(sum(A))
diff --git 
a/src/test/scripts/functions/compress/compressInstruction/compress_01.dml 
b/src/test/scripts/functions/compress/compressInstructionRewrite/compress_05.dml
similarity index 85%
copy from 
src/test/scripts/functions/compress/compressInstruction/compress_01.dml
copy to 
src/test/scripts/functions/compress/compressInstructionRewrite/compress_05.dml
index fa10d65..de2c15d 100644
--- a/src/test/scripts/functions/compress/compressInstruction/compress_01.dml
+++ 
b/src/test/scripts/functions/compress/compressInstructionRewrite/compress_05.dml
@@ -19,8 +19,9 @@
 #
 #-------------------------------------------------------------
 
-A = rand(rows=$rows, cols=$cols, sparsity=$sparsity, min=$min, max=$max)
+A = rand(rows=$rows, cols=$cols, sparsity=$sparsity, min=$min, max=$max, seed 
=14)
 A = round(A)
-A = compress(A)
-
-print(sum(A))
+# Should not compress here becuase the operation in the loop is not optimal 
for compressed ops.
+for(i in 1:10)
+    res = rowMaxs(A) + i
+print(sum(res))
diff --git 
a/src/test/scripts/functions/compress/compressInstruction/compress_01.dml 
b/src/test/scripts/functions/compress/compressInstructionRewrite/compress_06.dml
similarity index 85%
copy from 
src/test/scripts/functions/compress/compressInstruction/compress_01.dml
copy to 
src/test/scripts/functions/compress/compressInstructionRewrite/compress_06.dml
index fa10d65..d970e0f 100644
--- a/src/test/scripts/functions/compress/compressInstruction/compress_01.dml
+++ 
b/src/test/scripts/functions/compress/compressInstructionRewrite/compress_06.dml
@@ -19,8 +19,9 @@
 #
 #-------------------------------------------------------------
 
-A = rand(rows=$rows, cols=$cols, sparsity=$sparsity, min=$min, max=$max)
+A = rand(rows=$rows, cols=$cols, sparsity=$sparsity, min=$min, max=$max, seed 
=14)
 A = round(A)
-A = compress(A)
-
-print(sum(A))
+# Should compress here becuase the operation in the loop is optimal for 
compressed ops.
+for(i in 1:10)
+    res = colSums(A) + i
+print(sum(res))
diff --git 
a/src/test/scripts/functions/compress/compressInstruction/compress_01.dml 
b/src/test/scripts/functions/compress/compressInstructionRewrite/compress_07.dml
similarity index 53%
copy from 
src/test/scripts/functions/compress/compressInstruction/compress_01.dml
copy to 
src/test/scripts/functions/compress/compressInstructionRewrite/compress_07.dml
index fa10d65..14a74b1 100644
--- a/src/test/scripts/functions/compress/compressInstruction/compress_01.dml
+++ 
b/src/test/scripts/functions/compress/compressInstructionRewrite/compress_07.dml
@@ -19,8 +19,35 @@
 #
 #-------------------------------------------------------------
 
-A = rand(rows=$rows, cols=$cols, sparsity=$sparsity, min=$min, max=$max)
-A = round(A)
-A = compress(A)
+A = rand(rows=$rows, cols=$cols, sparsity=$sparsity, min=$min, max=$max, seed 
= 1412)
+X = round(A)
+# Should compress here becuase the operation in the loop is optimal for 
compressed ops.
 
-print(sum(A))
+num_centroids = 5
+C = rand(rows=num_centroids, cols=$cols, sparsity=1.0, min =$min, max=$max, 
seed = 1412)
+sumXsq = sum (X ^ 2);
+wcss = Inf
+
+for(i in 1:10)
+{
+    C_old = C;
+    D = -2 * (X %*% t(C)) + t(rowSums (C ^ 2))
+    
+    minD = rowMins(D)
+
+    wcss_old = wcss;
+    wcss = sumXsq + sum (minD);
+    print(wcss)
+    print(sum ((C - C_old) ^ 2) / num_centroids)
+    # Find the closest centroid for each record
+    P = D <= minD;
+    # If some records belong to multiple centroids, share them equally
+    P = P / rowSums (P);
+    # Compute the column normalization factor for P
+    P_denom = colSums (P);
+    # Compute new centroids as weighted averages over the records
+    C_new = (t(P) %*% X) / t(P_denom);
+
+    C_old = C
+    C = C_new;
+}
diff --git 
a/src/test/scripts/functions/compress/compressInstruction/compress_01.dml 
b/src/test/scripts/functions/compress/compressInstructionRewrite/compress_08.dml
similarity index 75%
copy from 
src/test/scripts/functions/compress/compressInstruction/compress_01.dml
copy to 
src/test/scripts/functions/compress/compressInstructionRewrite/compress_08.dml
index fa10d65..a41b1dc 100644
--- a/src/test/scripts/functions/compress/compressInstruction/compress_01.dml
+++ 
b/src/test/scripts/functions/compress/compressInstructionRewrite/compress_08.dml
@@ -19,8 +19,15 @@
 #
 #-------------------------------------------------------------
 
-A = rand(rows=$rows, cols=$cols, sparsity=$sparsity, min=$min, max=$max)
-A = round(A)
-A = compress(A)
+A = rand(rows=$rows, cols=$cols, sparsity=$sparsity, min=$min, max=$max, seed 
= 1412)
+X = round(A)
+# Should compress here becuase the operation in the loop is optimal for 
compressed ops.
 
-print(sum(A))
+num_centroids = 5
+C = rand(rows=num_centroids, cols=$cols, sparsity=1.0, min =$min, max=$max, 
seed = 1412)
+
+for(i in 1:10)
+{
+    D = -2 * (X %*% t(C)) + t(rowSums (C ^ 2))
+    print(sum (D))
+}

Reply via email to