This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 51c1147  [SYSTEMDS-2748] TSMM Compressed Optimize
51c1147 is described below

commit 51c1147a5b6aff7bca6d6bcfc0dd60b3fcafbffb
Author: baunsgaard <[email protected]>
AuthorDate: Wed Feb 3 10:20:41 2021 +0100

    [SYSTEMDS-2748] TSMM Compressed Optimize
    
    This commit optimize the transpose self matrix multiplication in
    compressed space.
    There are a few key upgrades
    
    - We avoid calculating the lower triangle of the output
      (~50% improvement)
    - The parallelization scheme is containing equal number of elements,
      since only the upper triangle have to calculated, this means that
      the same task is tasked with a row and the reflected row,
      reflected in the middle row of the matrix.
      (no tailing tasks giving full cpu util)
    - Second the diagonal is calculated without decompression, since
      the diagonal values can be computed without decompression by simply
      multiplying the dictionary entries.
      This makes tsmm, on single colGroup matrices many times faster than
      normal tsmm.
      (exploit cocode)
---
 scripts/builtin/scale.dml                          |  10 +-
 .../compress/CompressedMatrixBlockFactory.java     |   6 +-
 .../runtime/compress/colgroup/ADictionary.java     |   4 +-
 .../sysds/runtime/compress/colgroup/ColGroup.java  |  13 +-
 .../runtime/compress/colgroup/ColGroupConst.java   |   5 -
 .../runtime/compress/colgroup/ColGroupDDC.java     |   5 +-
 .../runtime/compress/colgroup/ColGroupDDC1.java    |   1 -
 .../runtime/compress/colgroup/ColGroupDDC2.java    |   1 -
 .../runtime/compress/colgroup/ColGroupOLE.java     | 233 +++++++++++----------
 .../runtime/compress/colgroup/ColGroupRLE.java     |   6 +-
 .../runtime/compress/colgroup/ColGroupValue.java   |  20 +-
 .../runtime/compress/colgroup/Dictionary.java      |  24 ++-
 .../runtime/compress/colgroup/QDictionary.java     |  97 +++++----
 .../sysds/runtime/compress/lib/LibCompAgg.java     | 136 ++++++------
 .../sysds/runtime/compress/lib/LibLeftMultBy.java  | 226 ++++++++++++++------
 .../sysds/runtime/compress/lib/LibRightMultBy.java |  23 +-
 .../sysds/runtime/compress/lib/LibScalar.java      |   4 +-
 .../sysds/runtime/matrix/data/LibCommonsMath.java  |   2 +
 .../compress/AbstractCompressedUnaryTests.java     |  10 +-
 19 files changed, 484 insertions(+), 342 deletions(-)

diff --git a/scripts/builtin/scale.dml b/scripts/builtin/scale.dml
index 013515f..3ec4a54 100644
--- a/scripts/builtin/scale.dml
+++ b/scripts/builtin/scale.dml
@@ -50,12 +50,14 @@ m_scale = function(Matrix[Double] X, Boolean center, 
Boolean scale)
 
     ScaleFactor = sqrt(colSums(X^2)/(N-1))
 
-    # Replace entries in the scale factor that are 0 with 1.
-    # To avoid division by 0, introducing NaN to the ouput.
+    # Replace entries in the scale factor that are 0 and NaN with 1.
+    # To avoid division by 0 or NaN, introducing NaN to the ouput.
+    ScaleFactor = replace(target=ScaleFactor,
+      pattern=NaN, replacement=1);
     ScaleFactor = replace(target=ScaleFactor,
       pattern=0, replacement=1);
-    
-    X = X/ScaleFactor
+
+    X = X / ScaleFactor
 
   }
   else{
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
 
b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
index 1017322..1f1fea0 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
@@ -225,10 +225,10 @@ public class CompressedMatrixBlockFactory {
                                        LOG.debug("--compression Hash 
collisions:" + DblArrayIntListHashMap.hashMissCount);
                                        DblArrayIntListHashMap.hashMissCount = 
0;
                                        break;
+                               // case 4:
+                               //      LOG.debug("--compression phase " + 
phase++ + " Share     : " + _stats.getLastTimePhase());
+                               //      break;
                                case 4:
-                                       LOG.debug("--compression phase " + 
phase++ + " Share     : " + _stats.getLastTimePhase());
-                                       break;
-                               case 5:
                                        LOG.debug("--num col groups: " + 
res.getColGroups().size());
                                        LOG.debug("--compression phase " + 
phase++ + " Cleanup   : " + _stats.getLastTimePhase());
                                        LOG.debug("--col groups types " + 
_stats.getGroupsTypesString());
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ADictionary.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ADictionary.java
index 1e07289..351d997 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ADictionary.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ADictionary.java
@@ -196,7 +196,9 @@ public abstract class ADictionary {
 
        protected abstract void colSum(double[] c, int[] counts, int[] 
colIndexes, KahanFunction kplus);
 
-       protected abstract double sum(int[] counts, int ncol, KahanFunction 
kplus);
+       protected abstract double sum(int[] counts, int ncol);
+
+       protected abstract double sumsq(int[] counts, int ncol);
 
        public abstract StringBuilder getString(StringBuilder sb, int 
colIndexes);
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroup.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroup.java
index 20e3804..e88f1d0 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroup.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroup.java
@@ -77,10 +77,10 @@ public abstract class ColGroup implements Serializable {
         * ColGroup Implementation Contains zero row. Note this is not if it 
contains a zero value. If false then the stored
         * values are filling the ColGroup making it a dense representation, 
that can be leveraged in operations.
         */
-       protected boolean _zeros;
+       protected boolean _zeros = false;
 
        /** boolean specifying if the column group is encoded lossy */
-       protected boolean _lossy;
+       protected boolean _lossy = false;
 
        /** Empty constructor, used for serializing into an empty new object of 
ColGroup. */
        protected ColGroup() {
@@ -278,6 +278,15 @@ public abstract class ColGroup implements Serializable {
                }
        }
 
+       public static void decompressColumnToArray(double[] target, int 
colIndex, List<ColGroup> colGroups){
+               for(ColGroup g: colGroups){
+                       int groupColIndex = Arrays.binarySearch(g._colIndexes, 
colIndex);
+                       if(groupColIndex >= 0){
+                               g.decompressColumnToBlock(target, 
groupColIndex, 0, g._numRows);
+                       }
+               }
+       }
+
        /**
         * Decompress part of the col groups into the target matrix block, this 
decompression maintain the number of non zeros.
         * 
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
index 09d3b8e..932426a 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
@@ -65,11 +65,6 @@ public class ColGroupConst extends ColGroupValue {
        }
 
        @Override
-       protected void computeSum(double[] c, KahanFunction kplus) {
-               c[0] += _dict.sum(getCounts(), _colIndexes.length, kplus);
-       }
-
-       @Override
        protected void computeRowSums(double[] c, KahanFunction kplus, int rl, 
int ru, boolean mean) {
 
                KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject();
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
index f673c89..440236f 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
@@ -180,10 +180,7 @@ public abstract class ColGroupDDC extends ColGroupValue {
                }
        }
 
-       @Override
-       protected void computeSum(double[] c, KahanFunction kplus) {
-               c[0] += _dict.sum(getCounts(), _colIndexes.length, kplus);
-       }
+
 
        @Override
        protected void computeColSums(double[] c, KahanFunction kplus) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC1.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC1.java
index 3a49512..27f8a30 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC1.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC1.java
@@ -60,7 +60,6 @@ public class ColGroupDDC1 extends ColGroupDDC {
                                zeroIx = numVals;
                        }
                        Arrays.fill(_data, (byte) zeroIx);
-                       _zeros = true;
                }
 
                // iterate over values and write dictionary codes
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC2.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC2.java
index 39b8bdb..10c6837 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC2.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC2.java
@@ -57,7 +57,6 @@ public class ColGroupDDC2 extends ColGroupDDC {
                                zeroIx = numVals;
                        }
                        Arrays.fill(_data, (char) zeroIx);
-                       _zeros = true;
                }
 
                // iterate over values and write dictionary codes
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java
index f530ee1..f4a272f 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java
@@ -62,6 +62,7 @@ public class ColGroupOLE extends ColGroupOffset {
                        lbitmaps[i] = 
genOffsetBitmap(ubm.getOffsetsList(i).extractValues(), ubm.getNumOffsets(i));
                        totalLen += lbitmaps[i].length;
                }
+
                // compact bitmaps to linearized representation
                createCompressedBitmaps(numVals, totalLen, lbitmaps);
 
@@ -514,64 +515,74 @@ public class ColGroupOLE extends ColGroupOffset {
        @Override
        public void leftMultByRowVector(double[] a, double[] c, int numVals, 
double[] values) {
                final int blksz = CompressionSettings.BITMAP_BLOCK_SZ;
-               final int numCols = getNumCols();
 
-               if(numVals >= 1 && _numRows > blksz) {
-                       // cache blocking config (see matrix-vector mult for 
explanation)
-                       final int blksz2 = 2 * 
CompressionSettings.BITMAP_BLOCK_SZ;
+               if(numVals >= 1 && _numRows > blksz)
+                       leftMultByRowVectorBlocking(a, c, numVals, values);
+               else
+                       leftMultByRowVectorNonBlocking(a, c, numVals, values);
 
-                       // step 1: prepare position and value arrays
+       }
 
-                       // current pos per OLs / output values
-                       int[] apos = allocIVector(numVals, true);
-                       double[] cvals = allocDVector(numVals, true);
+       private void leftMultByRowVectorBlocking(double[] a, double[] c, int 
numVals, double[] values) {
+               // cache blocking config (see matrix-vector mult for 
explanation)
+               final int blksz = CompressionSettings.BITMAP_BLOCK_SZ;
+               final int numCols = getNumCols();
+               final int blksz2 = 2 * blksz;
 
-                       // step 2: cache conscious matrix-vector via horizontal 
scans
-                       for(int ai = 0; ai < _numRows; ai += blksz2) {
-                               int aimax = Math.min(ai + blksz2, _numRows);
+               // step 1: prepare position and value arrays
 
-                               // horizontal segment scan, incl pos maintenance
-                               for(int k = 0; k < numVals; k++) {
-                                       int boff = _ptr[k];
-                                       int blen = len(k);
-                                       int bix = apos[k];
-                                       double vsum = 0;
+               // current pos per OLs / output values
+               int[] apos = allocIVector(numVals, true);
+               double[] cvals = allocDVector(numVals, true);
 
-                                       for(int ii = ai; ii < aimax && bix < 
blen; ii += blksz) {
-                                               // prepare length, start, and 
end pos
-                                               int len = _data[boff + bix];
-                                               int pos = boff + bix + 1;
+               // step 2: cache conscious matrix-vector via horizontal scans
+               for(int ai = 0; ai < _numRows; ai += blksz2) {
+                       int aimax = Math.min(ai + blksz2, _numRows);
 
-                                               // iterate over bitmap blocks 
and compute partial results (a[i]*1)
-                                               vsum += 
LinearAlgebraUtils.vectSum(a, _data, ii, pos, len);
-                                               bix += len + 1;
-                                       }
+                       // horizontal segment scan, incl pos maintenance
+                       for(int k = 0; k < numVals; k++) {
+                               int boff = _ptr[k];
+                               int blen = len(k);
+                               int bix = apos[k];
+                               double vsum = 0;
 
-                                       apos[k] = bix;
-                                       cvals[k] += vsum;
+                               for(int ii = ai; ii < aimax && bix < blen; ii 
+= blksz) {
+                                       // prepare length, start, and end pos
+                                       int len = _data[boff + bix];
+                                       int pos = boff + bix + 1;
+
+                                       // iterate over bitmap blocks and 
compute partial results (a[i]*1)
+                                       vsum += LinearAlgebraUtils.vectSum(a, 
_data, ii, pos, len);
+                                       bix += len + 1;
                                }
-                       }
 
-                       // step 3: scale partial results by values and write to 
global output
-                       for(int k = 0, valOff = 0; k < numVals; k++, valOff += 
numCols)
-                               for(int j = 0; j < numCols; j++)
-                                       c[_colIndexes[j]] += cvals[k] * 
values[valOff + j];
+                               apos[k] = bix;
+                               cvals[k] += vsum;
+                       }
                }
-               else {
-                       // iterate over all values and their bitmaps
-                       for(int k = 0, valOff = 0; k < numVals; k++, valOff += 
numCols) {
-                               int boff = _ptr[k];
-                               int blen = len(k);
 
-                               // iterate over bitmap blocks and add partial 
results
-                               double vsum = 0;
-                               for(int bix = 0, off = 0; bix < blen; bix += 
_data[boff + bix] + 1, off += blksz)
-                                       vsum += LinearAlgebraUtils.vectSum(a, 
_data, off, boff + bix + 1, _data[boff + bix]);
+               // step 3: scale partial results by values and write to global 
output
+               for(int k = 0, valOff = 0; k < numVals; k++, valOff += numCols)
+                       for(int j = 0; j < numCols; j++)
+                               c[_colIndexes[j]] += cvals[k] * values[valOff + 
j];
+       }
 
-                               // scale partial results by values and write 
results
-                               for(int j = 0; j < numCols; j++)
-                                       c[_colIndexes[j]] += vsum * 
values[valOff + j];
-                       }
+       private void leftMultByRowVectorNonBlocking(double[] a, double[] c, int 
numVals, double[] values) {
+               // iterate over all values and their bitmaps
+               final int blksz = CompressionSettings.BITMAP_BLOCK_SZ;
+               final int numCols = getNumCols();
+               for(int k = 0, valOff = 0; k < numVals; k++, valOff += numCols) 
{
+                       int boff = _ptr[k];
+                       int blen = len(k);
+
+                       // iterate over bitmap blocks and add partial results
+                       double vsum = 0;
+                       for(int bix = 0, off = 0; bix < blen; bix += _data[boff 
+ bix] + 1, off += blksz)
+                               vsum += LinearAlgebraUtils.vectSum(a, _data, 
off, boff + bix + 1, _data[boff + bix]);
+
+                       // scale partial results by values and write results
+                       for(int j = 0; j < numCols; j++)
+                               c[_colIndexes[j]] += vsum * values[valOff + j];
                }
        }
 
@@ -656,82 +667,92 @@ public class ColGroupOLE extends ColGroupOffset {
                double[] tmpA) {
                final int blksz = CompressionSettings.BITMAP_BLOCK_SZ;
                final int numVals = getNumValues();
+               if(numVals > 1 && _numRows > blksz)
+                       leftMultBySparseMatrixBlocking(sb, c, values, numRows, 
numCols, row, tmpA, numVals);
+               else
+                       leftMultBySparseMatrixNonBlock(sb, c, values, numRows, 
numCols, row, tmpA, numVals);
+
+       }
+
+       private void leftMultBySparseMatrixBlocking(SparseBlock sb, double[] c, 
double[] values, int numRows, int numCols,
+               int row, double[] tmpA, int numVals) {
+               final int blksz = CompressionSettings.BITMAP_BLOCK_SZ;
                int sparseEndIndex = sb.size(row) + sb.pos(row);
                int[] indexes = sb.indexes(row);
                double[] sparseV = sb.values(row);
-               if(numVals > 1 && _numRows > blksz) {
-
-                       // cache blocking config (see matrix-vector mult for 
explanation)
-                       final int blksz2 = 2 * 
CompressionSettings.BITMAP_BLOCK_SZ;
 
-                       // step 1: prepare position and value arrays
-                       int[] apos = allocIVector(numVals, true);
-                       double[] cvals = allocDVector(numVals, true);
-                       // step 2: cache conscious matrix-vector via horizontal 
scans
-                       int pI = sb.pos(row);
-                       for(int ai = 0; ai < _numRows; ai += blksz2) {
-                               int aimax = Math.min(ai + blksz2, _numRows);
-                               Arrays.fill(tmpA, 0);
-                               for(; pI < sparseEndIndex && indexes[pI] < 
aimax; pI++) {
-                                       if(indexes[pI] >= ai)
-                                               tmpA[indexes[pI] - ai] = 
sparseV[pI];
-                               }
-
-                               // horizontal segment scan, incl pos maintenance
-                               for(int k = 0; k < numVals; k++) {
-                                       int boff = _ptr[k];
-                                       int blen = len(k);
-                                       int bix = apos[k];
-                                       double vsum = 0;
-                                       for(int ii = ai; ii < aimax && bix < 
blen; ii += blksz) {
-                                               int len = _data[boff + bix];
-                                               int pos = boff + bix + 1;
-                                               int blockId = (ii / blksz) % 2;
-                                               vsum += 
LinearAlgebraUtils.vectSum(tmpA, _data, blockId * blksz, pos, len);
-                                               bix += len + 1;
-                                       }
-
-                                       apos[k] = bix;
-                                       cvals[k] += vsum;
-                               }
+               // cache blocking config (see matrix-vector mult for 
explanation)
+               final int blksz2 = 2 * CompressionSettings.BITMAP_BLOCK_SZ;
+
+               // step 1: prepare position and value arrays
+               int[] apos = allocIVector(numVals, true);
+               double[] cvals = allocDVector(numVals, true);
+               // step 2: cache conscious matrix-vector via horizontal scans
+               int pI = sb.pos(row);
+               for(int ai = 0; ai < _numRows; ai += blksz2) {
+                       int aimax = Math.min(ai + blksz2, _numRows);
+                       Arrays.fill(tmpA, 0);
+                       for(; pI < sparseEndIndex && indexes[pI] < aimax; pI++) 
{
+                               if(indexes[pI] >= ai)
+                                       tmpA[indexes[pI] - ai] = sparseV[pI];
                        }
 
-                       int offC = row * numCols;
-                       // step 3: scale partial results by values and write to 
global output
-                       for(int k = 0, valOff = 0; k < numVals; k++, valOff += 
_colIndexes.length)
-                               for(int j = 0; j < _colIndexes.length; j++) {
-                                       int colIx = _colIndexes[j] + offC;
-                                       c[colIx] += cvals[k] * values[valOff + 
j];
-                               }
-
-               }
-               else {
-                       for(int k = 0, valOff = 0; k < numVals; k++, valOff += 
_colIndexes.length) {
+                       // horizontal segment scan, incl pos maintenance
+                       for(int k = 0; k < numVals; k++) {
                                int boff = _ptr[k];
                                int blen = len(k);
+                               int bix = apos[k];
                                double vsum = 0;
-                               int pI = sb.pos(row);
-                               for(int bix = 0, off = 0; bix < blen; bix += 
_data[boff + bix] + 1, off += blksz) {
-                                       // blockId = off / blksz;
-                                       Arrays.fill(tmpA, 0);
-                                       for(; pI < sparseEndIndex && 
indexes[pI] < off + blksz; pI++) {
-                                               if(indexes[pI] >= off)
-                                                       tmpA[indexes[pI] - off] 
= sparseV[pI];
-                                       }
-                                       vsum += 
LinearAlgebraUtils.vectSum(tmpA, _data, 0, boff + bix + 1, _data[boff + bix]);
+                               for(int ii = ai; ii < aimax && bix < blen; ii 
+= blksz) {
+                                       int len = _data[boff + bix];
+                                       int pos = boff + bix + 1;
+                                       int blockId = (ii / blksz) % 2;
+                                       vsum += 
LinearAlgebraUtils.vectSum(tmpA, _data, blockId * blksz, pos, len);
+                                       bix += len + 1;
                                }
 
-                               for(int j = 0; j < _colIndexes.length; j++) {
-                                       int Voff = _colIndexes[j] + row * 
numCols;
-                                       c[Voff] += vsum * values[valOff + j];
-                               }
+                               apos[k] = bix;
+                               cvals[k] += vsum;
                        }
                }
+
+               int offC = row * numCols;
+               // step 3: scale partial results by values and write to global 
output
+               for(int k = 0, valOff = 0; k < numVals; k++, valOff += 
_colIndexes.length)
+                       for(int j = 0; j < _colIndexes.length; j++) {
+                               int colIx = _colIndexes[j] + offC;
+                               c[colIx] += cvals[k] * values[valOff + j];
+                       }
+
        }
 
-       @Override
-       protected final void computeSum(double[] c, KahanFunction kplus) {
-               c[0] += _dict.sum(getCounts(), _colIndexes.length, kplus);
+       private void leftMultBySparseMatrixNonBlock(SparseBlock sb, double[] c, 
double[] values, int numRows, int numCols,
+               int row, double[] tmpA, int numVals) {
+               final int blksz = CompressionSettings.BITMAP_BLOCK_SZ;
+               int sparseEndIndex = sb.size(row) + sb.pos(row);
+               int[] indexes = sb.indexes(row);
+               double[] sparseV = sb.values(row);
+
+               for(int k = 0, valOff = 0; k < numVals; k++, valOff += 
_colIndexes.length) {
+                       int boff = _ptr[k];
+                       int blen = len(k);
+                       double vsum = 0;
+                       int pI = sb.pos(row);
+                       for(int bix = 0, off = 0; bix < blen; bix += _data[boff 
+ bix] + 1, off += blksz) {
+                               // blockId = off / blksz;
+                               Arrays.fill(tmpA, 0);
+                               for(; pI < sparseEndIndex && indexes[pI] < off 
+ blksz; pI++) {
+                                       if(indexes[pI] >= off)
+                                               tmpA[indexes[pI] - off] = 
sparseV[pI];
+                               }
+                               vsum += LinearAlgebraUtils.vectSum(tmpA, _data, 
0, boff + bix + 1, _data[boff + bix]);
+                       }
+
+                       for(int j = 0; j < _colIndexes.length; j++) {
+                               int Voff = _colIndexes[j] + row * numCols;
+                               c[Voff] += vsum * values[valOff + j];
+                       }
+               }
        }
 
        @Override
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java
index 5a81fa2..df8160e 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java
@@ -64,6 +64,7 @@ public class ColGroupRLE extends ColGroupOffset {
                        totalLen += lbitmaps[k].length;
                }
 
+
                // compact bitmaps to linearized representation
                createCompressedBitmaps(numVals, totalLen, lbitmaps);
        }
@@ -727,10 +728,7 @@ public class ColGroupRLE extends ColGroupOffset {
                return new ColGroupRLE(_colIndexes, _numRows, false, rvalues, 
rbitmaps, rbitmapOffs, getCachedCounts());
        }
 
-       @Override
-       protected final void computeSum(double[] c, KahanFunction kplus) {
-               c[0] += _dict.sum(getCounts(), _colIndexes.length, kplus);
-       }
+
 
        @Override
        protected final void computeRowSums(double[] c, KahanFunction kplus, 
int rl, int ru, boolean mean) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java
index 7d53ac2..3f3db1a 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java
@@ -82,13 +82,13 @@ public abstract class ColGroupValue extends ColGroup 
implements Cloneable {
         */
        protected ColGroupValue(int[] colIndices, int numRows, ABitmap ubm, 
CompressionSettings cs) {
                super(colIndices, numRows);
-               _lossy = false;
-               _zeros = ubm.containsZero();
+               
+               _zeros = ubm.getNumOffsets() < (long) numRows;
 
                // sort values by frequency, if requested
-               if(cs.sortValuesByLength && numRows > 
CompressionSettings.BITMAP_BLOCK_SZ) {
+               if(cs.sortValuesByLength && numRows > 
CompressionSettings.BITMAP_BLOCK_SZ)
                        ubm.sortValuesByFrequency();
-               }
+               
                switch(ubm.getType()) {
                        case Full:
                                _dict = new Dictionary(((Bitmap) 
ubm).getValues());
@@ -516,8 +516,10 @@ public abstract class ColGroupValue extends ColGroup 
implements Cloneable {
                Pair<int[], double[]> p = memPool.get();
 
                // sanity check for missing setup
-               if(p == null)
+               if(p == null){
+                       // LOG.error("No Cached Int Vector");
                        return new int[len + 1];
+               }
 
                if(p.getKey().length < len) {
                        setupThreadLocalMemory(len);
@@ -596,7 +598,13 @@ public abstract class ColGroupValue extends ColGroup 
implements Cloneable {
 
        public abstract int[] getCounts(int rl, int ru, int[] out);
 
-       protected abstract void computeSum(double[] c, KahanFunction kplus);
+       protected void computeSum(double[] c, KahanFunction kplus){
+
+               if(kplus instanceof KahanPlusSq)
+                       c[0] += _dict.sumsq(getCounts(), _colIndexes.length);
+               else
+                       c[0] += _dict.sum(getCounts(), _colIndexes.length);
+       }       
 
        protected abstract void computeRowSums(double[] c, KahanFunction kplus, 
int rl, int ru, boolean mean);
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/Dictionary.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/Dictionary.java
index a74be56..f41ba93 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/Dictionary.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/Dictionary.java
@@ -234,18 +234,34 @@ public class Dictionary extends ADictionary {
        }
 
        @Override
-       protected double sum(int[] counts, int ncol, KahanFunction kplus) {
+       protected double sum(int[] counts, int ncol) {
                if(_values == null)
                        return 0;
-               KahanObject kbuff = new KahanObject(0, 0);
+               double out = 0;
+               int valOff = 0;
+               for(int k = 0; k < _values.length / ncol; k++) {
+                       int countK = counts[k];
+                       for(int j = 0; j < ncol; j++) {
+                               out +=  getValue(valOff++) *  countK;
+                       }
+               }
+               return out;
+       }
+
+       @Override
+       protected double sumsq(int[] counts, int ncol) {
+               if(_values == null)
+                       return 0;
+               double out = 0;
                int valOff = 0;
                for(int k = 0; k < _values.length / ncol; k++) {
                        int countK = counts[k];
                        for(int j = 0; j < ncol; j++) {
-                               kplus.execute3(kbuff, getValue(valOff++), 
countK);
+                               double val = getValue(valOff++);
+                               out +=  val * val  * countK;
                        }
                }
-               return kbuff._sum;
+               return out;
        }
 
        @Override
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/QDictionary.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/QDictionary.java
index 43ab8a6..e054aa7 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/QDictionary.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/QDictionary.java
@@ -24,6 +24,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.compress.utils.BitmapLossy;
@@ -62,7 +63,7 @@ public class QDictionary extends ADictionary {
 
        @Override
        public double[] getValues() {
-               if(_values == null){
+               if(_values == null) {
                        return new double[0];
                }
                double[] res = new double[_values.length];
@@ -186,15 +187,16 @@ public class QDictionary extends ADictionary {
 
        @Override
        public QDictionary applyBinaryRowOp(ValueFunction fn, double[] v, 
boolean sparseSafe, int[] colIndexes) {
-       
-               if (_values == null){
-                       if (sparseSafe){
+
+               if(_values == null) {
+                       if(sparseSafe) {
                                return new QDictionary(null, 1);
-                       } else{
+                       }
+                       else {
                                _values = new byte[0];
                        }
                }
-                       
+
                double[] temp = sparseSafe ? new double[_values.length] : new 
double[_values.length + colIndexes.length];
                double max = Math.abs(fn.execute(0, v[0]));
                final int colL = colIndexes.length;
@@ -207,7 +209,7 @@ public class QDictionary extends ADictionary {
                        }
                }
                if(!sparseSafe)
-                       for(; i <size() + colL; i++) {
+                       for(; i < size() + colL; i++) {
                                temp[i] = fn.execute(0, v[colIndexes[i % 
colL]]);
                                double absTemp = Math.abs(temp[i]);
                                if(absTemp > max) {
@@ -225,7 +227,7 @@ public class QDictionary extends ADictionary {
        }
 
        @Override
-       public int size(){
+       public int size() {
                return _values == null ? 0 : _values.length;
        }
 
@@ -273,7 +275,7 @@ public class QDictionary extends ADictionary {
                if(nrColumns == 1 && kplus instanceof KahanPlus)
                        return getValues(); // shallow copy of values
 
-               final int numVals =  getNumberOfValues(nrColumns);
+               final int numVals = getNumberOfValues(nrColumns);
                double[] ret = ColGroupValue.allocDVector(numVals, false);
                for(int k = 0; k < numVals; k++) {
                        ret[k] = sumRow(k, kplus, nrColumns);
@@ -284,9 +286,10 @@ public class QDictionary extends ADictionary {
 
        @Override
        protected double sumRow(int k, KahanFunction kplus, int nrColumns) {
-               if (_values == null) return 0;
+               if(_values == null)
+                       return 0;
                int valOff = k * nrColumns;
-               
+
                if(kplus instanceof KahanPlus) {
                        int res = 0;
                        for(int i = 0; i < nrColumns; i++) {
@@ -336,48 +339,54 @@ public class QDictionary extends ADictionary {
        }
 
        @Override
-       protected double sum(int[] counts, int ncol, KahanFunction kplus) {
-               if(!(kplus instanceof KahanPlusSq)) {
-                       int sum = 0;
-                       int valOff = 0;
-                       for(int k = 0; k < getNumberOfValues(ncol); k++) {
-                               int countK = counts[k];
-                               for(int j = 0; j < ncol; j++) {
-                                       sum += countK * getValueByte(valOff++);
-                               }
-                       }
-                       return sum * _scale;
-               }
-               else {
-                       KahanObject kbuff = new KahanObject(0, 0);
-                       int valOff = 0;
-                       for(int k = 0; k < getNumberOfValues(ncol); k++) {
-                               int countK = counts[k];
-                               for(int j = 0; j < ncol; j++) {
-                                       kplus.execute3(kbuff, 
getValue(valOff++), countK);
-                               }
-                       }
-                       return kbuff._sum;
-               }
+       protected double sum(int[] counts, int ncol) {
+               throw new NotImplementedException("Not Implemented");
+               // if(!(kplus instanceof KahanPlusSq)) {
+               // int sum = 0;
+               // int valOff = 0;
+               // for(int k = 0; k < getNumberOfValues(ncol); k++) {
+               // int countK = counts[k];
+               // for(int j = 0; j < ncol; j++) {
+               // sum += countK * getValueByte(valOff++);
+               // }
+               // }
+               // return sum * _scale;
+               // }
+               // else {
+               // KahanObject kbuff = new KahanObject(0, 0);
+               // int valOff = 0;
+               // for(int k = 0; k < getNumberOfValues(ncol); k++) {
+               // int countK = counts[k];
+               // for(int j = 0; j < ncol; j++) {
+               // kplus.execute3(kbuff, getValue(valOff++), countK);
+               // }
+               // }
+               // return kbuff._sum;
+               // }
+       }
+
+       @Override
+       protected double sumsq(int[] counts, int ncol) {
+               throw new NotImplementedException("Not Implemented");
        }
 
        @Override
-       protected void addMaxAndMin(double[] ret, int[] colIndexes){
+       protected void addMaxAndMin(double[] ret, int[] colIndexes) {
                byte[] mins = new byte[colIndexes.length];
                byte[] maxs = new byte[colIndexes.length];
-               for(int i = 0; i < colIndexes.length; i++){
+               for(int i = 0; i < colIndexes.length; i++) {
                        mins[i] = _values[i];
                        maxs[i] = _values[i];
                }
-               for(int i = colIndexes.length; i < _values.length; i++){
+               for(int i = colIndexes.length; i < _values.length; i++) {
                        int idx = i % colIndexes.length;
                        mins[idx] = (byte) Math.min(_values[i], mins[idx]);
                        maxs[idx] = (byte) Math.max(_values[i], maxs[idx]);
                }
-               for(int i = 0; i < colIndexes.length; i ++){
-                       int idy = colIndexes[i]*2;
+               for(int i = 0; i < colIndexes.length; i++) {
+                       int idy = colIndexes[i] * 2;
                        ret[idy] += mins[i] * _scale;
-                       ret[idy+1] += maxs[i] * _scale;
+                       ret[idy + 1] += maxs[i] * _scale;
                }
        }
 
@@ -389,19 +398,19 @@ public class QDictionary extends ADictionary {
                return sb;
        }
 
-       public Dictionary makeDoubleDictionary(){
+       public Dictionary makeDoubleDictionary() {
                double[] doubleValues = getValues();
                return new Dictionary(doubleValues);
        }
 
-       public ADictionary sliceOutColumnRange(int idxStart, int idxEnd, int 
previousNumberOfColumns){
+       public ADictionary sliceOutColumnRange(int idxStart, int idxEnd, int 
previousNumberOfColumns) {
                int numberTuples = getNumberOfValues(previousNumberOfColumns);
                int tupleLengthAfter = idxEnd - idxStart;
                byte[] newDictValues = new byte[tupleLengthAfter * 
numberTuples];
                int orgOffset = idxStart;
                int targetOffset = 0;
-               for(int v = 0; v < numberTuples; v++){
-                       for(int c = 0; c< tupleLengthAfter; c++, orgOffset++, 
targetOffset++){
+               for(int v = 0; v < numberTuples; v++) {
+                       for(int c = 0; c < tupleLengthAfter; c++, orgOffset++, 
targetOffset++) {
                                newDictValues[targetOffset] = 
_values[orgOffset];
                        }
                        orgOffset += previousNumberOfColumns - idxEnd + 
idxStart;
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/lib/LibCompAgg.java 
b/src/main/java/org/apache/sysds/runtime/compress/lib/LibCompAgg.java
index b5015de..585b7ad 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/LibCompAgg.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibCompAgg.java
@@ -55,7 +55,8 @@ public class LibCompAgg {
 
     // private static final Log LOG = 
LogFactory.getLog(LibCompAgg.class.getName());
 
-    private static final long MIN_PAR_AGG_THRESHOLD = 8 * 1024 * 1024;
+    // private static final long MIN_PAR_AGG_THRESHOLD = 8 * 1024 * 1024;
+    private static final long MIN_PAR_AGG_THRESHOLD = 8;
 
     private static ThreadLocal<MatrixBlock> memPool = new 
ThreadLocal<MatrixBlock>() {
         @Override
@@ -66,7 +67,6 @@ public class LibCompAgg {
 
     public static MatrixBlock aggregateUnary(CompressedMatrixBlock 
inputMatrix, MatrixBlock outputMatrix,
         AggregateUnaryOperator op, int blen, MatrixIndexes indexesIn, boolean 
inCP) {
-
         if(inputMatrix.getColGroups() != null) {
             fillStart(outputMatrix, op);
 
@@ -114,7 +114,7 @@ public class LibCompAgg {
     }
 
     private static void tryToAggregateInParallel(CompressedMatrixBlock m1, 
MatrixBlock ret, AggregateUnaryOperator op) {
-        int k = getNumberOfThreadsToUse(op);
+        int k = op.getNumThreads();
         if(k == 1)
             aggregateUnaryOperations(op, m1.getColGroups(), ret, 0, 
m1.getNumRows(), m1.getNumColumns());
         else
@@ -125,64 +125,62 @@ public class LibCompAgg {
     private static void aggregateInParallel(CompressedMatrixBlock m1, 
MatrixBlock ret, AggregateUnaryOperator op,
         int k) {
 
-        List<List<ColGroup>> grpParts = 
createTaskPartitionNotIncludingUncompressable(m1.getColGroups(), k);
-
         ExecutorService pool = CommonThreadPool.get(op.getNumThreads());
         ArrayList<UnaryAggregateTask> tasks = new ArrayList<>();
-
         try {
             // compute all compressed column groups
-            if(op.indexFn instanceof ReduceCol && grpParts.size() > 0) {
+            if(op.indexFn instanceof ReduceCol) {
                 final int blkz = CompressionSettings.BITMAP_BLOCK_SZ;
-                int blklen = Math.min((int) Math.ceil((double) m1.getNumRows() 
/ op.getNumThreads()), blkz / 2);
+                int blklen = Math.max((int) Math.ceil((double) m1.getNumRows() 
/ (op.getNumThreads() * 2)), blkz);
                 blklen += (blklen % blkz != 0) ? blkz - blklen % blkz : 0;
-                for(int i = 0; i < op.getNumThreads() & i * blklen < 
m1.getNumRows(); i++) {
-                    tasks.add(new UnaryAggregateTask(grpParts.get(0), ret, i * 
blklen,
+
+                for(int i = 0; i < op.getNumThreads() & i * blklen < 
m1.getNumRows(); i++)
+                    tasks.add(new UnaryAggregateTask(m1.getColGroups(), ret, i 
* blklen,
                         Math.min((i + 1) * blklen, m1.getNumRows()), op, 
m1.getNumColumns()));
 
-                }
             }
-            else
-                for(List<ColGroup> grp : grpParts) {
-                    if(grp != null)
-                        tasks.add(new UnaryAggregateTask(grp, ret, 0, 
m1.getNumRows(), op, m1.getNumColumns()));
-                }
-            List<Future<MatrixBlock>> rtasks = pool.invokeAll(tasks);
+            else {
+                List<List<ColGroup>> grpParts = 
createTaskPartitionNotIncludingUncompressable(m1.getColGroups(), k);
+                for(List<ColGroup> grp : grpParts)
+                    tasks.add(new UnaryAggregateTask(grp, ret, 0, 
m1.getNumRows(), op, m1.getNumColumns()));
+            }
+
+            List<Future<MatrixBlock>> futures = pool.invokeAll(tasks);
             pool.shutdown();
 
             // aggregate partial results
-            if(op.indexFn instanceof ReduceAll) {
-                if(op.aggOp.increOp.fn instanceof KahanFunction) {
-                    KahanObject kbuff = new KahanObject(ret.quickGetValue(0, 
0), 0);
-                    for(Future<MatrixBlock> rtask : rtasks) {
-                        double tmp = rtask.get().quickGetValue(0, 0);
-                        ((KahanFunction) op.aggOp.increOp.fn).execute2(kbuff, 
tmp);
-                    }
-                    ret.quickSetValue(0, 0, kbuff._sum);
-                }
-                else if(op.aggOp.increOp.fn instanceof Mean) {
-                    double val = ret.quickGetValue(0, 0);
-                    for(Future<MatrixBlock> rtask : rtasks) {
-                        double tmp = rtask.get().quickGetValue(0, 0);
-                        val = val + tmp;
-                    }
-                    ret.quickSetValue(0, 0, val);
-                }
-                else {
-                    double val = ret.quickGetValue(0, 0);
-                    for(Future<MatrixBlock> rtask : rtasks) {
-                        double tmp = rtask.get().quickGetValue(0, 0);
-                        val = op.aggOp.increOp.fn.execute(val, tmp);
-                    }
-                    ret.quickSetValue(0, 0, val);
-                }
-            }
+            if(op.indexFn instanceof ReduceAll)
+                if(op.aggOp.increOp.fn instanceof Builtin)
+                    aggregateResults(ret, futures, op);
+                else
+                    sumResults(ret, futures);
         }
         catch(InterruptedException | ExecutionException e) {
             throw new DMLRuntimeException(e);
         }
     }
 
+    private static void sumResults(MatrixBlock ret, List<Future<MatrixBlock>> 
futures)
+        throws InterruptedException, ExecutionException {
+        double val = ret.quickGetValue(0, 0);
+        for(Future<MatrixBlock> rtask : futures) {
+            double tmp = rtask.get().quickGetValue(0, 0);
+            val = val + tmp;
+        }
+        ret.quickSetValue(0, 0, val);
+
+    }
+
+    private static void aggregateResults(MatrixBlock ret, 
List<Future<MatrixBlock>> futures, AggregateUnaryOperator op)
+        throws InterruptedException, ExecutionException {
+        double val = ret.quickGetValue(0, 0);
+        for(Future<MatrixBlock> rtask : futures) {
+            double tmp = rtask.get().quickGetValue(0, 0);
+            val = op.aggOp.increOp.fn.execute(val, tmp);
+        }
+        ret.quickSetValue(0, 0, val);
+    }
+
     private static void aggregateSingleThreaded(CompressedMatrixBlock m1, 
MatrixBlock ret, AggregateUnaryOperator op) {
         aggregateUnaryOperations(op, m1.getColGroups(), ret, 0, 
m1.getNumRows(), m1.getNumColumns());
     }
@@ -220,7 +218,6 @@ public class LibCompAgg {
 
     private static void aggregateUnaryOverlapping(CompressedMatrixBlock m1, 
MatrixBlock ret, AggregateUnaryOperator op,
         MatrixIndexes indexesIn, boolean inCP) {
-
         try {
             List<Future<MatrixBlock>> rtasks = 
generateUnaryAggregateOverlappingFutures(m1, ret, op);
             reduceOverlappingFutures(rtasks, ret, op);
@@ -322,36 +319,37 @@ public class LibCompAgg {
         return grpParts;
     }
 
-    private static int getNumberOfThreadsToUse(AggregateUnaryOperator op) {
-        return (op.indexFn instanceof ReduceCol) ? 1 : op.getNumThreads();
+    private static void aggregateUnaryOperations(AggregateUnaryOperator op, 
List<ColGroup> groups, MatrixBlock ret,
+        int rl, int ru, int numColumns) {
+        if(op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn instanceof 
Builtin)
+            aggregateUnaryBuiltinRowOperation(op, groups, ret, rl, ru, 
numColumns);
+        else
+            aggregateUnaryNormalOperation(op, groups, ret, rl, ru, numColumns);
     }
 
-    private static void aggregateUnaryOperations(AggregateUnaryOperator op, 
List<ColGroup> groups, MatrixBlock ret,
+    private static void aggregateUnaryNormalOperation(AggregateUnaryOperator 
op, List<ColGroup> groups, MatrixBlock ret,
         int rl, int ru, int numColumns) {
+        for(ColGroup grp : groups)
+            grp.unaryAggregateOperations(op, ret, rl, ru);
+
+    }
 
-        int[] rnnz = (op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn 
instanceof Builtin) ? new int[ru -
-            rl] : null;
+    private static void 
aggregateUnaryBuiltinRowOperation(AggregateUnaryOperator op, List<ColGroup> 
groups,
+        MatrixBlock ret, int rl, int ru, int numColumns) {
+
+        int[] rnnz = new int[ru - rl];
         int numberDenseColumns = 0;
         for(ColGroup grp : groups) {
-            if(grp != null && !(grp instanceof ColGroupUncompressed)) {
-                grp.unaryAggregateOperations(op, ret, rl, ru);
-                if(grp.isDense()) {
-                    numberDenseColumns += grp.getNumCols();
-                }
-                else if(op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn 
instanceof Builtin) {
-                    grp.countNonZerosPerRow(rnnz, rl, ru);
-                }
-            }
+            grp.unaryAggregateOperations(op, ret, rl, ru);
+            if(grp.isDense())
+                numberDenseColumns += grp.getNumCols();
+            else
+                grp.countNonZerosPerRow(rnnz, rl, ru);
         }
 
-        if(op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn instanceof 
Builtin) {
-            for(int row = rl; row < ru; row++) {
-                if(rnnz[row] + numberDenseColumns < numColumns) {
-                    ret.quickSetValue(row, 0, 
op.aggOp.increOp.fn.execute(ret.quickGetValue(row, 0), 0.0));
-                }
-            }
-
-        }
+        for(int row = rl; row < ru; row++)
+            if(rnnz[row] + numberDenseColumns < numColumns)
+                ret.quickSetValue(row, 0, 
op.aggOp.increOp.fn.execute(ret.quickGetValue(row, 0), 0.0));
 
     }
 
@@ -391,7 +389,7 @@ public class LibCompAgg {
             _numColumns = numColumns;
 
             if(_op.indexFn instanceof ReduceAll) { // sum
-                _ret = new MatrixBlock(ret.getNumRows(), ret.getNumColumns(), 
false);
+                _ret = new MatrixBlock(1, 1, false);
                 _ret.allocateDenseBlock();
                 if(_op.aggOp.increOp.fn instanceof Builtin)
                     System.arraycopy(ret.getDenseBlockValues(),
@@ -400,9 +398,9 @@ public class LibCompAgg {
                         0,
                         ret.getNumRows() * ret.getNumColumns());
             }
-            else { // colSums
+            else // colSums / rowSums
                 _ret = ret;
-            }
+
         }
 
         @Override
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/lib/LibLeftMultBy.java 
b/src/main/java/org/apache/sysds/runtime/compress/lib/LibLeftMultBy.java
index 0d360ff..5314dba 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/LibLeftMultBy.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibLeftMultBy.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,7 +38,6 @@ import org.apache.sysds.runtime.compress.colgroup.ColGroup;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupOLE;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupUncompressed;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupValue;
-import org.apache.sysds.runtime.compress.utils.LinearAlgebraUtils;
 import org.apache.sysds.runtime.data.DenseBlock;
 import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.functionobjects.SwapIndex;
@@ -49,7 +49,7 @@ import org.apache.sysds.runtime.util.CommonThreadPool;
 public class LibLeftMultBy {
        private static final Log LOG = 
LogFactory.getLog(LibLeftMultBy.class.getName());
 
-       private static ThreadLocal<double[]> memPoolOLE = new 
ThreadLocal<double[]>() {
+       private static ThreadLocal<double[]> memPoolLeftMult = new 
ThreadLocal<double[]>() {
                @Override
                protected double[] initialValue() {
                        return null;
@@ -97,26 +97,68 @@ public class LibLeftMultBy {
 
        public static void leftMultByTransposeSelf(List<ColGroup> groups, 
MatrixBlock result, int k, int numColumns,
                Pair<Integer, int[]> v, boolean overlapping) {
+               if(!overlapping)
+                       leftMultBySelfDiagonalColGroup(groups, result, 
numColumns, v);
 
-               if(k <= 1) {
-                       leftMultByTransposeSelfOverlapping(groups, result, v, 
0, numColumns, overlapping);
-               }
-               else {
-                       try {
-                               ExecutorService pool = CommonThreadPool.get(k);
-                               ArrayList<MatrixMultTransposeTaskOverlapping> 
tasks = new ArrayList<>();
-                               int blklen = (int) (Math.ceil((double) 
numColumns / k));
-                               for(int i = 0; i * blklen < numColumns; i++)
-                                       tasks.add(new 
MatrixMultTransposeTaskOverlapping(groups, result, i * blklen,
-                                               Math.min((i + 1) * blklen, 
numColumns), v, overlapping));
-                               List<Future<Object>> ret = 
pool.invokeAll(tasks);
-                               for(Future<Object> tret : ret)
-                                       tret.get(); // check for errors
-                               pool.shutdown();
-                       }
-                       catch(InterruptedException | ExecutionException e) {
-                               throw new DMLRuntimeException(e);
+               if(groups.size() > 1)
+                       if(k <= 1)
+                               leftMultByTransposeSelf(groups, result, v, 0, 
numColumns, 0, numColumns, overlapping);
+                       else
+                               try {
+                                       ExecutorService pool = 
CommonThreadPool.get(k);
+                                       
ArrayList<MatrixMultTransposeReflectedTask> tasks = new ArrayList<>();
+                                       int odd = numColumns % 2;
+                                       for(int i = 0; i < numColumns / 2 + 
odd; i++) {
+                                               // if(i < numColumns / 4 && 
numColumns > 100) {
+                                               // int halfRemainingInRow = (i 
- numColumns) / 2;
+                                               tasks.add(new 
MatrixMultTransposeReflectedTask(groups, result, i, i + 1, i, numColumns, v,
+                                                       overlapping));
+                                               // tasks.add(new 
MatrixMultTransposeReflectedTask(groups, result, i, i + 1, i +
+                                               // halfRemainingInRow,
+                                               // numColumns, v, overlapping));
+                                               // }
+                                               // else
+                                               // tasks.add(
+                                               // new 
MatrixMultTransposeReflectedTask(groups, result, i, i + 1, i, numColumns, v,
+                                               // overlapping));
+                                       }
+
+                                       List<Future<Object>> ret = 
pool.invokeAll(tasks);
+                                       for(Future<Object> tret : ret)
+                                               tret.get();
+                                       pool.shutdown();
+                               }
+                               catch(InterruptedException | ExecutionException 
e) {
+                                       throw new DMLRuntimeException(e);
+                               }
+
+       }
+
+       private static void leftMultBySelfDiagonalColGroup(List<ColGroup> 
groups, MatrixBlock result, int numColumns,
+               Pair<Integer, int[]> v) {
+               double[] outValues = result.getDenseBlockValues();
+               for(ColGroup g : groups) {
+                       if(g instanceof ColGroupValue) {
+                               ColGroupValue gv = (ColGroupValue) g;
+                               int[] counts = gv.getCounts();
+                               double[] values = gv.getValues();
+                               int[] columns = gv.getColIndices();
+
+                               for(int i = 0; i < columns.length; i++) {
+                                       int y = columns[i];
+                                       for(int j = 0; j < columns.length; j++) 
{
+                                               int x = columns[j];
+                                               for(int h = 0; h < 
gv.getValues().length / columns.length; h++) {
+                                                       double a = values[h * 
columns.length + i];
+                                                       double b = values[h * 
columns.length + j];
+                                                       outValues[x + y * 
numColumns] += a * b * counts[h];
+                                               }
+                                       }
+                               }
                        }
+                       else
+                               throw new NotImplementedException("Not 
Implemented diagonal on non ColGroupValue type.");
+
                }
        }
 
@@ -272,26 +314,17 @@ public class LibLeftMultBy {
                return ret;
        }
 
-       private static MatrixBlock leftMultByVectorTranspose(List<ColGroup> 
colGroups, MatrixBlock vector,
-               MatrixBlock result, boolean doTranspose, boolean allocTmp, 
Pair<Integer, int[]> v, boolean overlap) {
-
-               MatrixBlock rowVector = vector;
-
-               // Note that transpose here is a metadata operation since the 
input is a vector.
-               if(doTranspose) {
-                       rowVector = new MatrixBlock(1, vector.getNumRows(), 
false);
-                       LibMatrixReorg.transpose(vector, rowVector);
-               }
-
-               // initialize and allocate the result
-               result.reset();
-               result.allocateDenseBlock();
-
+       private static double[] leftMultSelfVectorTranspose(List<ColGroup> 
colGroups, double[] rowVector, double[] result,
+               int cl, int cu, int j) {
+               // j is the current decompressed rowVector.
+               Arrays.fill(result, 0);
                for(ColGroup grp : colGroups) {
-                       
grp.leftMultByRowVector(rowVector.getDenseBlockValues(), 
result.getDenseBlockValues());
-               }
+                       int[] columns = grp.getColIndices();
+                       if(columns[columns.length - 1] >= cl && columns[0] < cu)
+                               if(Arrays.binarySearch(columns, j) < 0) // if 
the colGroup is not it self.
+                                       grp.leftMultByRowVector(rowVector, 
result);
 
-               result.recomputeNonZeros();
+               }
 
                return result;
        }
@@ -360,7 +393,7 @@ public class LibLeftMultBy {
                                pool.shutdown();
                                for(Future<Object> future : futures)
                                        future.get();
-                               memPoolOLE.remove();
+                               memPoolLeftMult.remove();
                        }
                        catch(InterruptedException | ExecutionException e) {
                                throw new DMLRuntimeException(e);
@@ -371,33 +404,68 @@ public class LibLeftMultBy {
 
        }
 
-       private static void leftMultByTransposeSelfOverlapping(List<ColGroup> 
groups, MatrixBlock result,
-               Pair<Integer, int[]> v, int cl, int cu, boolean overlapping) {
+       private static void leftMultByTransposeSelf(List<ColGroup> groups, 
MatrixBlock result, Pair<Integer, int[]> v,
+               int rl, int ru, int cl, int cu, boolean overlapping) {
+               if(overlapping)
+                       leftMultByTransposeSelfOverlapping(groups, result, v, 
rl, ru, cl, cu);
+               else
+                       leftMultByTransposeSelfNormal(groups, result, v, rl, 
ru, cl, cu);
+       }
+
+       private static void leftMultByTransposeSelfNormal(List<ColGroup> 
groups, MatrixBlock result, Pair<Integer, int[]> v,
+               int rl, int ru, int cl, int cu) {
                // It should be possible to get better performance exploiting 
if the matrix is not overlapping.
                // TODO: exploit specfic column groups (DDC most likely) to 
gain better performance.
-               // Idea multiplying with one self simply use count of values, 
and then
                // calculate : count * v^2
 
-               final int numRows = groups.get(0).getNumRows();
-
                // preallocated dense tmp matrix blocks
-               MatrixBlock lhs = new MatrixBlock(1, numRows, false);
-               MatrixBlock tmpret = new MatrixBlock(1, result.getNumColumns(), 
false);
-               lhs.allocateDenseBlock();
-               tmpret.allocateDenseBlock();
 
-               // setup memory pool for reuse
+               final int numRows = groups.get(0).getNumRows();
+               double[] lhs = memPoolLeftMult.get() != null ? 
memPoolLeftMult.get(): new double[numRows];
+               double[] tmpret = new double[result.getNumColumns()];
+
+               for(int j = rl; j < ru; j++) {
+                       if(!rowIsDone(result.getDenseBlockValues(), 
result.getNumColumns(), j)) {
 
-               for(int j = cl; j < cu; j++) {
-                       ColGroup.decompressColumnToBlock(lhs, j, groups);
-                       if(!lhs.isEmptyBlock(false)) {
-                               leftMultByVectorTranspose(groups, lhs, tmpret, 
false, true, v, overlapping);
-                               
LinearAlgebraUtils.copyNonZerosToUpperTriangle(result, tmpret, j);
+                               ColGroup.decompressColumnToArray(lhs, j, 
groups);
+                               leftMultSelfVectorTranspose(groups, lhs, 
tmpret, Math.max(cl, rl), cu, j);
+                               copyToUpperTriangle(result
+                                       .getDenseBlockValues(), tmpret, j, 
result.getNumColumns(), Math.max(cl, rl), cu);
+                               Arrays.fill(lhs, 0);
                        }
-                       lhs.reset();
+               }
+       }
+
+       private static boolean rowIsDone(double[] values, int nrColumns, int 
row) {
+               int offset = nrColumns * row + row;
+               for(int i = row; i < nrColumns; i++) {
+                       if(values[offset++] == 0.0)
+                               return false;
+               }
+               return true;
+       }
+
+       private static void copyToUpperTriangle(double[] ret, double[] tmp, int 
row, int totalRows, int cl, int cu) {
+               int offOut = row * totalRows;
+               for(int i = cl; i < cu; i++) {
+                       ret[offOut + i] += tmp[i];
+               }
+       }
+
+       private static void leftMultByTransposeSelfOverlapping(List<ColGroup> 
groups, MatrixBlock result,
+               Pair<Integer, int[]> v, int rl, int ru, int cl, int cu) {
+
+               final int numRows = groups.get(0).getNumRows();
+               double[] lhs = new double[numRows];
+               double[] tmpret = new double[result.getNumColumns()];
+
+               for(int j = rl; j < ru; j++) {
+                       ColGroup.decompressColumnToArray(lhs, j, groups);
+                       leftMultSelfVectorTranspose(groups, lhs, tmpret, 
Math.max(cl, rl), cu, -1);
+                       copyToUpperTriangle(result.getDenseBlockValues(), 
tmpret, j, result.getNumColumns(), Math.max(cl, rl), cu);
+                       Arrays.fill(lhs, 0);
                }
 
-               // post processing
        }
 
        private static void 
leftMultByCompressedTransposeRowSection(List<ColGroup> thisGroups, 
List<ColGroup> thatGroups,
@@ -610,7 +678,7 @@ public class LibLeftMultBy {
                @Override
                public Object call() {
                        // Temporary Array to store 2 * block size in
-                       double[] tmpA = memPoolOLE.get();
+                       double[] tmpA = memPoolLeftMult.get();
                        if(tmpA == null) {
                                if(_groups != null) {
                                        tmpA = new 
double[Math.min(CompressionSettings.BITMAP_BLOCK_SZ * 2, 
_groups.get(0).getNumRows())];
@@ -618,6 +686,7 @@ public class LibLeftMultBy {
                                else {
                                        tmpA = new 
double[Math.min(CompressionSettings.BITMAP_BLOCK_SZ * 2, _group.getNumRows())];
                                }
+                               memPoolLeftMult.set(tmpA);
                        }
                        else {
                                Arrays.fill(tmpA, 0.0);
@@ -653,20 +722,30 @@ public class LibLeftMultBy {
                }
        }
 
-       private static class MatrixMultTransposeTaskOverlapping implements 
Callable<Object> {
+       /**
+        * The reflected part means that this task is executed on the rl to ru 
specified ... AND! the reflection on the
+        * bottom half of the matrix. This makes each task have equal size in 
execution because the TSMM only require us to
+        * calculate the upper half of the output matrix. if the number of 
columns is uneven the first column does not get
+        * extra work.
+        */
+       private static class MatrixMultTransposeReflectedTask implements 
Callable<Object> {
                private final List<ColGroup> _groups;
                private final MatrixBlock _ret;
-               private final int _gl;
-               private final int _gu;
+               private int _cl;
+               private int _cu;
+               private int _rl;
+               private int _ru;
                private final Pair<Integer, int[]> _v;
                private final boolean _overlapping;
 
-               protected MatrixMultTransposeTaskOverlapping(List<ColGroup> 
groups, MatrixBlock ret, int gl, int gu,
-                       Pair<Integer, int[]> v, boolean overlapping) {
+               protected MatrixMultTransposeReflectedTask(List<ColGroup> 
groups, MatrixBlock ret, int rl, int ru, int cl,
+                       int cu, Pair<Integer, int[]> v, boolean overlapping) {
                        _groups = groups;
                        _ret = ret;
-                       _gl = gl;
-                       _gu = gu;
+                       _cl = cl;
+                       _cu = cu;
+                       _rl = rl;
+                       _ru = ru;
                        _v = v;
                        _overlapping = overlapping;
                }
@@ -674,8 +753,23 @@ public class LibLeftMultBy {
                @Override
                public Object call() {
                        ColGroupValue.setupThreadLocalMemory(_v.getLeft() + 1);
-                       leftMultByTransposeSelfOverlapping(_groups, _ret, _v, 
_gl, _gu, _overlapping);
-                       return null;
+                       leftMultByTransposeSelf(_groups, _ret, _v, _rl, _ru, 
_cl, _cu, _overlapping);
+                       int nCol = _ret.getNumColumns();
+                       double[] tmpA = memPoolLeftMult.get();
+                       if(tmpA == null) {
+                               tmpA = new double[_groups.get(0).getNumRows()];
+                               memPoolLeftMult.set(tmpA);
+                       }
+                       else
+                               Arrays.fill(tmpA, 0);
+                       if(nCol % 2 == 1) {
+                               _rl = _rl == 0 ? 0 : _rl - 1;
+                               _ru = _ru - 1;
+                       }
+                       if(_rl != _ru)
+                               leftMultByTransposeSelf(_groups, _ret, _v, nCol 
- _ru, nCol - _rl, _cl, _cu, _overlapping);
+
+it                     return null;
                }
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java 
b/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java
index f8bfb40..5d0f84c 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibRightMultBy.java
@@ -57,17 +57,16 @@ public class LibRightMultBy {
        public static MatrixBlock rightMultByMatrix(List<ColGroup> colGroups, 
MatrixBlock that, MatrixBlock ret, int k,
                Pair<Integer, int[]> v, boolean allowOverlap) {
 
-               if(that instanceof CompressedMatrixBlock) {
+               if(that instanceof CompressedMatrixBlock)
                        LOG.info("Decompression Right matrix");
-               }
+               
                that = that instanceof CompressedMatrixBlock ? 
((CompressedMatrixBlock) that).decompress(k) : that;
 
-               if(allowOverlappingOutput(colGroups, allowOverlap)) {
+               if(allowOverlappingOutput(colGroups, allowOverlap)) 
                        return rightMultByMatrixOverlapping(colGroups, that, 
ret, k, v);
-               }
-               else {
+               else 
                        return rightMultByMatrixNonOverlapping(colGroups, that, 
ret, k, v);
-               }
+               
        }
 
        private static boolean allowOverlappingOutput(List<ColGroup> colGroups, 
boolean allowOverlap) {
@@ -118,7 +117,6 @@ public class LibRightMultBy {
                ret.setNumColumns(cl);
                ret.setNumRows(rl);
                CompressedMatrixBlock retC = (CompressedMatrixBlock) ret;
-               retC.setOverlapping(true);
                ret = rightMultByMatrixCompressed(colGroups, that, retC, k, v);
                return ret;
        }
@@ -260,12 +258,11 @@ public class LibRightMultBy {
        private static MatrixBlock rightMultByMatrixCompressed(List<ColGroup> 
colGroups, MatrixBlock that,
                CompressedMatrixBlock ret, int k, Pair<Integer, int[]> v) {
 
-               for(ColGroup grp : colGroups) {
-                       if(grp instanceof ColGroupUncompressed) {
+               for(ColGroup grp : colGroups) 
+                       if(grp instanceof ColGroupUncompressed) 
                                throw new DMLCompressionException(
                                        "Right Mult by dense with compressed 
output is not efficient to do with uncompressed Compressed ColGroups and 
therefore not supported.");
-                       }
-               }
+                       
 
                List<ColGroup> retCg = new ArrayList<>();
                if(k == 1) {
@@ -295,9 +292,9 @@ public class LibRightMultBy {
                        }
                }
                ret.allocateColGroupList(retCg);
-               ret.setOverlapping(true);
+               if(retCg.size() > 1)
+                       ret.setOverlapping(true);
                ret.setNonZeros(-1);
-
                return ret;
        }
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/LibScalar.java 
b/src/main/java/org/apache/sysds/runtime/compress/lib/LibScalar.java
index 800c83b..1bf18eb 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/LibScalar.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/LibScalar.java
@@ -60,7 +60,7 @@ public class LibScalar {
                        return 
LibRelationalOp.overlappingRelativeRelationalOperation(sop, m1);
                }
 
-               if(isInValidForCompressedOutput(m1, sop)) {
+               if(isInvalidForCompressedOutput(m1, sop)) {
                        LOG.warn("scalar overlapping not supported for op: " + 
sop.fn);
                        MatrixBlock m1d = m1.decompress(sop.getNumThreads());
                        return m1d.scalarOperations(sop, result);
@@ -142,7 +142,7 @@ public class LibScalar {
                return newColGroups;
        }
 
-       private static boolean 
isInValidForCompressedOutput(CompressedMatrixBlock m1, ScalarOperator sop) {
+       private static boolean 
isInvalidForCompressedOutput(CompressedMatrixBlock m1, ScalarOperator sop) {
                return m1.isOverlapping() &&
                        (!(sop.fn instanceof Multiply || (sop.fn instanceof 
Divide && sop instanceof RightScalarOperator) ||
                                sop.fn instanceof Plus || sop.fn instanceof 
Minus));
diff --git 
a/src/main/java/org/apache/sysds/runtime/matrix/data/LibCommonsMath.java 
b/src/main/java/org/apache/sysds/runtime/matrix/data/LibCommonsMath.java
index c822259..8283352 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibCommonsMath.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibCommonsMath.java
@@ -40,6 +40,8 @@ import org.apache.sysds.runtime.util.DataConverter;
  */
 public class LibCommonsMath 
 {
+       // private static final Log LOG = 
LogFactory.getLog(LibCommonsMath.class.getName());
+
        static final double RELATIVE_SYMMETRY_THRESHOLD = 1e-6;
 
        private LibCommonsMath() {
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java
 
b/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java
index 2d895aa..c98de00 100644
--- 
a/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java
@@ -170,16 +170,12 @@ public abstract class AbstractCompressedUnaryTests 
extends CompressedTestBase {
                        MatrixBlock ret1 = mb.aggregateUnaryOperations(auop, 
new MatrixBlock(), Math.max(rows, cols), null, true);
                        // matrix-vector compressed
                        MatrixBlock ret2 = cmb.aggregateUnaryOperations(auop, 
new MatrixBlock(), Math.max(rows, cols), null, true);
-                       // LOG.error(cmb);
+                       // LOG.error(ret1);
+                       // LOG.error(ret2);
                        // compare result with input
                        double[][] d1 = 
DataConverter.convertToDoubleMatrix(ret1);
                        double[][] d2 = 
DataConverter.convertToDoubleMatrix(ret2);
-                       // for(double[] row : d1) {
-                       // LOG.error(Arrays.toString(row));
-                       // }
-                       // for(double[] row : d2) {
-                       // LOG.error(Arrays.toString(row));
-                       // }
+
                        int dim1 = (aggType == AggType.ROWSUMS || aggType == 
AggType.ROWSUMSSQ || aggType == AggType.ROWMAXS ||
                                aggType == AggType.ROWMINS || aggType == 
AggType.ROWMEAN) ? rows : 1;
                        int dim2 = (aggType == AggType.COLSUMS || aggType == 
AggType.COLSUMSSQ || aggType == AggType.COLMAXS ||

Reply via email to