This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 779cb364d7 [MINOR] SDC Offsets cleanup
779cb364d7 is described below

commit 779cb364d7ff9b8a1a2f8bb4211158127e4630b3
Author: Sebastian Baunsgaard <[email protected]>
AuthorDate: Thu Aug 29 23:42:16 2024 +0200

    [MINOR] SDC Offsets cleanup
    
    This commit cleanup the AOffset used in SDC compression.
    
    Closes #2074
---
 .../runtime/compress/cocode/CoCodeHybrid.java      |  17 +-
 .../runtime/compress/colgroup/ColGroupFactory.java |  10 +-
 .../compress/colgroup/ColGroupSDCSingle.java       |  11 +-
 .../compress/colgroup/ColGroupSDCSingleZeros.java  |   2 +-
 .../compress/colgroup/ColGroupSDCZeros.java        |   2 +-
 .../compress/colgroup/mapping/AMapToData.java      |  34 +-
 .../compress/colgroup/mapping/MapToFactory.java    |  92 ++---
 .../runtime/compress/colgroup/offset/AOffset.java  | 416 +++++++++++++-------
 .../compress/colgroup/offset/OffsetChar.java       |   8 +-
 .../compress/colgroup/offset/OffsetEmpty.java      |  18 +-
 .../compress/colgroup/offset/OffsetFactory.java    |  16 +-
 .../compress/colgroup/offset/OffsetSingle.java     |   4 +-
 .../compress/colgroup/offset/OffsetTwo.java        |   6 +-
 .../compress/estim/encoding/DenseEncoding.java     |   8 +-
 .../org/apache/sysds/runtime/data/DenseBlock.java  |   2 +-
 .../compress/combine/CombineEncodings.java         |  24 +-
 .../estim/encoding/EncodeSampleCustom.java         |   4 +-
 .../compress/mapping/CustomMappingTest.java        |   2 +-
 .../compress/mapping/MappingPreAggregateTests.java |   2 +-
 .../component/compress/mapping/MappingTests.java   |   2 +-
 .../compress/mapping/MappingTestsResize.java       |   2 +-
 .../compress/offset/CustomOffsetTest.java          |  58 ++-
 .../component/compress/offset/LargeOffsetTest.java |  55 +++
 .../compress/offset/NegativeOffsetTest.java        |  93 +++++
 .../compress/offset/OffsetPreAggTests.java         | 350 +++++++++++++++++
 .../compress/offset/OffsetReverseTest.java         |  16 +-
 .../compress/offset/OffsetSingleTests.java         |  12 +-
 .../compress/offset/OffsetTestPreAggregate.java    | 427 ---------------------
 .../offset/OffsetTestPreAggregateSparse.java       | 160 --------
 .../component/compress/offset/OffsetTests.java     |  73 +++-
 .../component/frame/array/CustomArrayTests.java    |   4 +-
 31 files changed, 1053 insertions(+), 877 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeHybrid.java 
b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeHybrid.java
index 4845f6aaa3..8a5bda995c 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeHybrid.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeHybrid.java
@@ -39,12 +39,13 @@ public class CoCodeHybrid extends AColumnCoCoder {
        protected CompressedSizeInfo coCodeColumns(CompressedSizeInfo colInfos, 
int k) {
                final int startSize = colInfos.getInfo().size();
                final int pqColumnThreashold = Math.max(128, 
(_sest.getNumColumns() / startSize) * 100);
-               LOG.error(pqColumnThreashold);
 
                if(startSize == 1)
                        return colInfos; // nothing to join when there only is 
one column
                else if(startSize <= 16) {// Greedy all compare all if small 
number of columns
-                       LOG.debug("Hybrid chose to do greedy CoCode because of 
few columns");
+                       
+                       if(LOG.isDebugEnabled())
+                               LOG.debug("Hybrid chose to do greedy CoCode 
because of few columns");
                        CoCodeGreedy gd = new CoCodeGreedy(_sest, _cest, _cs);
                        return colInfos.setInfo(gd.combine(colInfos.getInfo(), 
k));
                }
@@ -53,7 +54,8 @@ public class CoCodeHybrid extends AColumnCoCoder {
 
                        return colInfos.setInfo(pq.join(colInfos.getInfo(), 1, 
k));
                }
-               LOG.debug("Using Hybrid CoCode Strategy: ");
+               if(LOG.isDebugEnabled())
+                       LOG.debug("Using Hybrid CoCode Strategy: ");
 
                final int PriorityQueGoal = startSize / 5;
                if(PriorityQueGoal > 30) { // hybrid if there is a large number 
of columns to begin with
@@ -62,16 +64,19 @@ public class CoCodeHybrid extends AColumnCoCoder {
                        colInfos.setInfo(pq.join(colInfos.getInfo(), 
PriorityQueGoal, k));
                        final int pqSize = colInfos.getInfo().size();
 
-                       LOG.debug("Que based time: " + time.stop());
+                       if(LOG.isDebugEnabled())
+                               LOG.debug("Que based time: " + time.stop());
                        if(pqSize < PriorityQueGoal || (pqSize < startSize && 
_cest instanceof ComputationCostEstimator)) {
                                CoCodeGreedy gd = new CoCodeGreedy(_sest, 
_cest, _cs);
                                colInfos.setInfo(gd.combine(colInfos.getInfo(), 
k));
-                               LOG.debug("Greedy time:     " + time.stop());
+                               if(LOG.isDebugEnabled())
+                                       LOG.debug("Greedy time:     " + 
time.stop());
                        }
                        return colInfos;
                }
                else {
-                       LOG.debug("Using only Greedy based since Nr Column 
groups: " + startSize + " is not large enough");
+                       if(LOG.isDebugEnabled())
+                               LOG.debug("Using only Greedy based since Nr 
Column groups: " + startSize + " is not large enough");
                        CoCodeGreedy gd = new CoCodeGreedy(_sest, _cest, _cs);
                        colInfos.setInfo(gd.combine(colInfos.getInfo(), k));
                        return colInfos;
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
index de90d67fee..957308b32a 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
@@ -471,7 +471,7 @@ public class ColGroupFactory {
                        return new ColGroupEmpty(colIndexes);
                IDictionary dict = DictionaryFactory.create(map);
                final int nUnique = map.size();
-               final AMapToData resData = MapToFactory.resize(d, nUnique);
+               final AMapToData resData = d.resize( nUnique);
                return ColGroupDDC.create(colIndexes, dict, resData, null);
        }
 
@@ -498,7 +498,7 @@ public class ColGroupFactory {
                if(extra)
                        d.replace(fill, map.size());
                final int nUnique = map.size() + (extra ? 1 : 0);
-               final AMapToData resData = MapToFactory.resize(d, nUnique);
+               final AMapToData resData = d.resize(nUnique);
                return ColGroupDDC.create(colIndexes, dict, resData, null);
        }
 
@@ -659,7 +659,7 @@ public class ColGroupFactory {
                AInsertionSorter s = InsertionSorterFactory.create(rlen, 
offsets, cs.sdcSortType);
                AOffset indexes = OffsetFactory.createOffset(s.getIndexes());
                AMapToData data = s.getData();
-               data = MapToFactory.resize(data, 
dict.getNumberOfValues(colIndexes.size()));
+               data = data.resize(dict.getNumberOfValues(colIndexes.size()));
                return ColGroupSDCZeros.create(colIndexes, rlen, dict, indexes, 
data, null);
        }
 
@@ -671,7 +671,7 @@ public class ColGroupFactory {
                        cs.sdcSortType);
                AOffset indexes = OffsetFactory.createOffset(s.getIndexes());
                AMapToData _data = s.getData();
-               _data = MapToFactory.resize(_data, 
dict.getNumberOfValues(colIndexes.size()));
+               _data = _data.resize( 
dict.getNumberOfValues(colIndexes.size()));
                return ColGroupSDC.create(colIndexes, rlen, dict, defaultTuple, 
indexes, _data, null);
        }
 
@@ -775,7 +775,7 @@ public class ColGroupFactory {
                }
 
                IDictionary dict = DictionaryFactory.create(map, cols.size(), 
false, tupleSparsity);
-               data = MapToFactory.resize(data, map.size());
+               data = data.resize(map.size());
 
                AOffset offs = OffsetFactory.createOffset(offsetsInt);
                return ColGroupSDCZeros.create(cols, in.getNumColumns(), dict, 
offs, data, null);
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingle.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingle.java
index a13150c12c..3e9b1deba2 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingle.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingle.java
@@ -25,18 +25,17 @@ import java.io.IOException;
 import java.util.Arrays;
 
 import org.apache.sysds.runtime.DMLRuntimeException;
-import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary;
-import org.apache.sysds.runtime.compress.colgroup.dictionary.PlaceHolderDict;
 import org.apache.sysds.runtime.compress.DMLCompressionException;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory;
+import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary;
+import org.apache.sysds.runtime.compress.colgroup.dictionary.PlaceHolderDict;
 import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
 import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
 import org.apache.sysds.runtime.compress.colgroup.mapping.MapToZero;
 import org.apache.sysds.runtime.compress.colgroup.offset.AIterator;
 import org.apache.sysds.runtime.compress.colgroup.offset.AOffset;
 import 
org.apache.sysds.runtime.compress.colgroup.offset.AOffset.OffsetSliceInfo;
-import org.apache.sysds.runtime.compress.colgroup.offset.AOffsetIterator;
 import org.apache.sysds.runtime.compress.colgroup.offset.OffsetFactory;
 import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator;
 import org.apache.sysds.runtime.compress.estim.encoding.EncodingFactory;
@@ -76,7 +75,7 @@ public class ColGroupSDCSingle extends ASDC {
                if(dict == null && allZero)
                        return new ColGroupEmpty(colIndexes);
                else if(dict == null && offsets.getSize() * 2 > numRows + 2) {
-                       AOffset rev = AOffset.reverse(numRows, offsets);
+                       AOffset rev = offsets.reverse(numRows);
                        return ColGroupSDCSingleZeros.create(colIndexes, 
numRows, Dictionary.create(defaultTuple), rev, cachedCounts);
                }
                else if(dict == null)
@@ -84,7 +83,7 @@ public class ColGroupSDCSingle extends ASDC {
                else if(allZero)
                        return ColGroupSDCSingleZeros.create(colIndexes, 
numRows, dict, offsets, cachedCounts);
                else if(offsets.getSize() * 2 > numRows + 2 && !(dict 
instanceof PlaceHolderDict)) {
-                       AOffset rev = AOffset.reverse(numRows, offsets);
+                       AOffset rev = offsets.reverse(numRows);
                        return new ColGroupSDCSingle(colIndexes, numRows, 
Dictionary.create(defaultTuple), dict.getValues(), rev,
                                null);
                }
@@ -110,7 +109,7 @@ public class ColGroupSDCSingle extends ASDC {
 
        @Override
        public double getIdx(int r, int colIdx) {
-               final AOffsetIterator it = _indexes.getOffsetIterator(r);
+               final AIterator it = _indexes.getIterator(r);
                if(it == null || it.value() != r)
                        return _defaultTuple[colIdx];
                else
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingleZeros.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingleZeros.java
index 7a3309aafa..cc25e54b41 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingleZeros.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingleZeros.java
@@ -76,7 +76,7 @@ public class ColGroupSDCSingleZeros extends ASDCZero {
                if(dict == null)
                        return new ColGroupEmpty(colIndices);
                else if(offsets.getSize() * 2 > numRows + 2 && !(dict 
instanceof PlaceHolderDict)) {
-                       AOffset rev = AOffset.reverse(numRows, offsets);
+                       AOffset rev = offsets.reverse(numRows);
                        IDictionary empty = MatrixBlockDictionary.create(new 
MatrixBlock(1, colIndices.size(), true));
                        return ColGroupSDCSingle.create(colIndices, numRows, 
empty, dict.getValues(), rev, null);
                }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCZeros.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCZeros.java
index f4c7c6f615..38d88ef106 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCZeros.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCZeros.java
@@ -485,7 +485,7 @@ public class ColGroupSDCZeros extends ASDCZero implements 
IMapToDataGroup {
 
        @Override
        public void preAggregateDense(MatrixBlock m, double[] preAgg, int rl, 
int ru, int cl, int cu) {
-               _data.preAggregateDense(m, preAgg, rl, ru, cl, cu, _indexes);
+               _data.preAggregateDense(m.getDenseBlock(), preAgg, rl, ru, cl, 
cu, _indexes);
        }
 
        @Override
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/AMapToData.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/AMapToData.java
index e68f191c6f..0765a158c5 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/AMapToData.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/AMapToData.java
@@ -95,15 +95,6 @@ public abstract class AMapToData implements Serializable {
         */
        public abstract int getIndex(int n);
 
-       /**
-        * Shortcut method to support Long objects, not really efficient but 
for the purpose of reusing code.
-        * 
-        * @param n The index to set.
-        * @param v The value to set.
-        */
-       public void set(int n, Long v) {
-               set(n, (int) (long) v);
-       }
 
        /**
         * Shortcut method to support Integer objects, not really efficient but 
for the purpose of reusing code.
@@ -333,7 +324,7 @@ public abstract class AMapToData implements Serializable {
         * @param cu      The column in m to end at (not inclusive)
         * @param indexes The Offset Indexes to iterate through
         */
-       public final void preAggregateDense(MatrixBlock m, double[] preAV, int 
rl, int ru, int cl, int cu, AOffset indexes) {
+       public final void preAggregateDense(DenseBlock m, double[] preAV, int 
rl, int ru, int cl, int cu, AOffset indexes) {
                indexes.preAggregateDenseMap(m, preAV, rl, ru, cl, cu, 
getUnique(), this);
        }
 
@@ -347,7 +338,7 @@ public abstract class AMapToData implements Serializable {
         * @param indexes The Offset Indexes to iterate through
         */
        public final void preAggregateSparse(SparseBlock sb, double[] preAV, 
int rl, int ru, AOffset indexes) {
-               indexes.preAggregateSparseMap(sb, preAV, rl, ru, getUnique(), 
this);
+               indexes.preAggSparseMap(sb, preAV, rl, ru, getUnique(), this);
        }
 
        /**
@@ -813,12 +804,19 @@ public abstract class AMapToData implements Serializable {
                copyInt(d.getData());
        }
 
+       /**
+        * Copy the values of the given array into this.
+        * 
+        * Note that this operation stops at the length of this AMapToData
+        * 
+        * Therefore the given d length can not be longer than this size.
+        * 
+        * @param d The array to copy
+        */
        public abstract void copyInt(int[] d);
 
        public abstract void copyBit(BitSet d);
 
-       // public abstract void copyBitLong(long[] d);
-
        public int getMax() {
                int m = -1;
                for(int i = 0; i < size(); i++) {
@@ -835,6 +833,16 @@ public abstract class AMapToData implements Serializable {
         */
        public abstract int getMaxPossible();
 
+       /**
+        * Reallocate the map, to a smaller instance if applicable. Note it 
does not change the length of the array, just the
+        * datatype.
+        * 
+        * Note that it returns the input if the input is the smallest 
representation that fits, otherwise it will return
+        * something that is smaller.
+        * 
+        * @param unique The number of tuples that should be supported in the 
resulting map
+        * @return The returned hopefully reduced map.
+        */
        public abstract AMapToData resize(int unique);
 
        /**
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java
index 1727bedbd9..970220d9cc 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java
@@ -24,44 +24,43 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.sysds.runtime.compress.DMLCompressionException;
-import org.apache.sysds.runtime.compress.bitmap.ABitmap;
-import org.apache.sysds.runtime.compress.utils.IntArrayList;
 
+/** Interface for the factory design pattern for construction all AMapToData. 
*/
 public interface MapToFactory {
        static final Log LOG = LogFactory.getLog(MapToFactory.class.getName());
 
+       /** The different supported types of mappings. */
        public enum MAP_TYPE {
                ZERO, BIT, UBYTE, BYTE, CHAR, CHAR_BYTE, INT;
        }
 
-       public static AMapToData create(int size, ABitmap ubm) {
-               if(ubm == null)
-                       return null;
-               return create(size, ubm.containsZero(), ubm.getOffsetList());
-       }
-
-       public static AMapToData create(int size, boolean zeros, IntArrayList[] 
values) {
-               AMapToData _data = create(size, values.length + (zeros ? 1 : 
0));
-
-               if(zeros)
-                       _data.fill(values.length);
-
-               for(int i = 0; i < values.length; i++) {
-                       final IntArrayList tmpList = values[i];
-                       final int sz = tmpList.size();
-                       for(int k = 0; k < sz; k++) {
-                               _data.set(tmpList.get(k), i);
-                       }
-               }
+       /**
+        * Construct a mapping with the given values contained. The size is the 
length of the int array given.
+        * 
+        * @param values  The values contained.
+        * @param nUnique The number of unique expected to be contained (is not 
verified.)
+        * @return An appropriate AMapToData
+        */
+       public static AMapToData create(int[] values, int nUnique) {
+               AMapToData _data = create(values.length, nUnique);
+               _data.copyInt(values);
                return _data;
        }
 
+       /**
+        * Construct a mapping with the given values contained.
+        * 
+        * Only copies the values from the array given until size.
+        * 
+        * @param size    The number of elements to take from the values array.
+        * @param values  The values contained.
+        * @param nUnique The number of unique expected to be contained (is not 
verified.)
+        * @return An appropriate AMapToData
+        */
        public static AMapToData create(int size, int[] values, int nUnique) {
                AMapToData _data = create(size, nUnique);
                _data.copyInt(values);
                return _data;
-
        }
 
        /**
@@ -88,7 +87,14 @@ public interface MapToFactory {
                        return new MapToInt(numTuples, size);
        }
 
-       public static AMapToData create(int size, MAP_TYPE t) {
+       /**
+        * Allocate a specific type of map. Note that once in use it is 
recommended to set the number of unique values.
+        * 
+        * @param size The size to allocate
+        * @param t    The mapping type.
+        * @return An AMapToData allocation
+        */
+       public static AMapToData create(final int size, final MAP_TYPE t) {
                switch(t) {
                        case ZERO:
                                return new MapToZero(size);
@@ -103,26 +109,11 @@ public interface MapToFactory {
                        case CHAR_BYTE:
                                return new MapToCharPByte(size);
                        case INT:
-                               return new MapToInt(size);
                        default:
-                               throw new DMLCompressionException("Unsupported 
type " + t);
+                               return new MapToInt(size);
                }
        }
 
-       /**
-        * Reshape the map, to a smaller instance if applicable.
-        * 
-        * Note that it returns the input if the input is the smallest 
representation that fits, otherwise it will return
-        * something that is smaller.
-        * 
-        * @param d         The Input mat to potentially reduce the size of.
-        * @param numTuples The number of tuples that should be in the 
resulting map
-        * @return The returned hopefully reduced map.
-        */
-       public static AMapToData resize(AMapToData d, int numTuples) {
-               return d.resize(numTuples);
-       }
-
        /**
         * Force the mapping into an other mapping type. This method is unsafe 
since if there is overflows in the
         * conversions, they are not handled. Also if the change is into the 
same type a new map is allocated anyway.
@@ -154,15 +145,20 @@ public interface MapToFactory {
                                ret = new MapToCharPByte(numTuples, size);
                                break;
                        case INT:
-                               ret = new MapToInt(numTuples, size);
-                               break;
                        default:
-                               throw new DMLCompressionException("Unsupported 
type of map " + t);
+                               ret = new MapToInt(numTuples, size);
                }
                ret.copy(d);
                return ret;
        }
 
+       /**
+        * Estimate the size in memory of a MapToFactory.
+        * 
+        * @param size      The size of the mapping
+        * @param numTuples The number of unique values to be supported by the 
mapping
+        * @return The size in number of bytes.
+        */
        public static long estimateInMemorySize(int size, int numTuples) {
                if(numTuples <= 1)
                        return MapToZero.getInMemorySize(size);
@@ -178,6 +174,13 @@ public interface MapToFactory {
                        return MapToInt.getInMemorySize(size);
        }
 
+       /**
+        * General interface to read in an AMapToData.
+        * 
+        * @param in The data input to read from
+        * @return The parsed AMapToData
+        * @throws IOException If there is complications or errors in reading.
+        */
        public static AMapToData readIn(DataInput in) throws IOException {
                MAP_TYPE t = MAP_TYPE.values()[in.readByte()];
                switch(t) {
@@ -194,9 +197,8 @@ public interface MapToFactory {
                        case CHAR_BYTE:
                                return MapToCharPByte.readFields(in);
                        case INT:
-                               return MapToInt.readFields(in);
                        default:
-                               throw new DMLCompressionException("unsupported 
type " + t);
+                               return MapToInt.readFields(in);
                }
        }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/AOffset.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/AOffset.java
index adefe49a52..6b5c37cf4d 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/AOffset.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/AOffset.java
@@ -28,41 +28,44 @@ import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysds.runtime.compress.DMLCompressionException;
 import org.apache.sysds.runtime.compress.colgroup.AOffsetsGroup;
 import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
+import org.apache.sysds.runtime.compress.colgroup.mapping.MapToByte;
+import org.apache.sysds.runtime.compress.colgroup.mapping.MapToChar;
+import org.apache.sysds.runtime.compress.colgroup.mapping.MapToUByte;
 import org.apache.sysds.runtime.compress.utils.IntArrayList;
 import org.apache.sysds.runtime.data.DenseBlock;
 import org.apache.sysds.runtime.data.SparseBlock;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
 /**
- * Offset list encoder interface.
- * 
+ * Offset list encoder abstract class.
+ * <p>
  * It is assumed that the input is in sorted order, all values are positive 
and there are no duplicates.
- * 
+ * </p>
+ * <p>
  * The no duplicate is important since 0 values are exploited to encode an 
offset of max representable value + 1. This
  * gives the ability to encode data, where the offsets are greater than the 
available highest value that can be
  * represented size.
+ * </p>
  */
 public abstract class AOffset implements Serializable {
        private static final long serialVersionUID = 6910025321078561338L;
 
        protected static final Log LOG = 
LogFactory.getLog(AOffset.class.getName());
 
+       /** Cached final empty slice to return in cases of empty slice returns 
to avoid object allocation */
+       protected static final OffsetSliceInfo EMPTY_SLICE = new 
OffsetSliceInfo(-1, -1, new OffsetEmpty());
+
        /** The skip list stride size, aka how many indexes skipped for each 
index. */
-       protected static final int skipStride = 1000;
+       protected static final int SKIP_STRIDE = 1000;
 
        /** SoftReference of the skip list to be dematerialized on memory 
pressure */
        private volatile SoftReference<OffsetCacheV2[]> skipList = null;
 
        /** Thread local cache for a single recently used Iterator, this is 
used for cache blocking */
-       private volatile ThreadLocal<OffsetCache> cacheRow = new 
ThreadLocal<>() {
-               @Override
-               protected OffsetCache initialValue() {
-                       return null;
-               }
-       };
+       private volatile ThreadLocal<OffsetCache> cacheRow = null;
 
        /**
         * Get an iterator of the offsets while also maintaining the data index 
pointer.
@@ -95,21 +98,35 @@ public abstract class AOffset implements Serializable {
                        return getIterator();
                else if(row > getOffsetToLast())
                        return null;
-               final OffsetCache c = getLength() < skipStride ? null : 
cacheRow.get();
+
+               final OffsetCache c;
+               if(getLength() < SKIP_STRIDE)
+                       c = null;
+               else if(cacheRow == null)
+                       c = null;
+               else
+                       c = cacheRow.get();
+
                if(c != null && c.row == row)
                        return c.it.clone();
-               else if(getLength() < skipStride)
+               else if(getLength() < SKIP_STRIDE)
                        return getIteratorSmallOffset(row);
                else
                        return getIteratorLargeOffset(row);
        }
 
-       private AIterator getIteratorSkipCache(int row){
+       /**
+        * Get an iterator that is pointing to a specific offset, this method 
skips looking at our cache of iterators.
+        * 
+        * @param row The row to look at
+        * @return The iterator associated with the row.
+        */
+       private AIterator getIteratorSkipCache(int row) {
                if(row <= getOffsetToFirst())
                        return getIterator();
                else if(row > getOffsetToLast())
                        return null;
-               else if(getLength() < skipStride)
+               else if(getLength() < SKIP_STRIDE)
                        return getIteratorSmallOffset(row);
                else
                        return getIteratorLargeOffset(row);
@@ -126,8 +143,12 @@ public abstract class AOffset implements Serializable {
                if(skipList == null || skipList.get() == null)
                        constructSkipList();
                final OffsetCacheV2[] skip = skipList.get();
+
+               // guaranteed not to go over limit of skip list.
                int idx = 0;
-               while(idx < skip.length && skip[idx] != null && skip[idx].row 
<= row)
+               while(idx < skip.length //
+                       && skip[idx] != null //
+                       && skip[idx].row <= row)
                        idx++;
 
                final AIterator it = idx == 0 ? getIterator() : 
getIteratorFromSkipList(skip[idx - 1]);
@@ -142,16 +163,16 @@ public abstract class AOffset implements Serializable {
 
                // not actual accurate but applicable.
                final int last = getOffsetToLast();
-               final int skipSize = last / skipStride + 1;
-               if(skipSize == 0)
-                       return;
+               final int skipSize = last / SKIP_STRIDE + 1;
+               if(skipSize == 1)
+                       return; // do not construct the skip if the size is 
less than SKIP_STRIDE
 
                final OffsetCacheV2[] skipListTmp = new OffsetCacheV2[skipSize];
                final AIterator it = getIterator();
 
                int skipListIdx = 0;
-               while(it.value() < last && skipListIdx < skipListTmp.length) {
-                       int next = skipListIdx * skipStride + skipStride;
+               while(it.value() < last) { // guaranteed not to go over 
skipListTmpLength
+                       int next = skipListIdx * SKIP_STRIDE + SKIP_STRIDE;
                        while(it.value() < next && it.value() < last)
                                it.next();
                        skipListTmp[skipListIdx++] = new 
OffsetCacheV2(it.value(), it.getDataIndex(), it.getOffsetsIndex());
@@ -160,18 +181,9 @@ public abstract class AOffset implements Serializable {
                skipList = new SoftReference<>(skipListTmp);
        }
 
-       /**
-        * Get an iterator that is pointing only at offsets from specific offset
-        * 
-        * @param row The row requested.
-        * @return AOffsetIterator that iterate through index only offset 
values.
-        */
-       public AOffsetIterator getOffsetIterator(int row) {
-               AOffsetIterator it = getOffsetIterator();
-               final int last = Math.min(row, getOffsetToLast());
-               while(it.value() < last)
-                       it.next();
-               return it;
+       public synchronized void clearSkipList() {
+               if(skipList != null)
+                       skipList.clear();
        }
 
        /**
@@ -181,9 +193,20 @@ public abstract class AOffset implements Serializable {
         * @param row The row index to cache the iterator as.
         */
        public void cacheIterator(AIterator it, int row) {
-               if(it == null || getLength() < skipStride)
+               if(it == null || getLength() < SKIP_STRIDE)
                        return;
-               cacheRow.set(new OffsetCache(it, row));
+
+               if(cacheRow == null) {
+                       cacheRow = new ThreadLocal<>() {
+                               @Override
+                               protected OffsetCache initialValue() {
+                                       return new OffsetCache(it, row);
+                               }
+                       };
+               }
+               else {
+                       cacheRow.set(new OffsetCache(it, row));
+               }
        }
 
        /**
@@ -232,7 +255,20 @@ public abstract class AOffset implements Serializable {
         */
        public abstract int getSize();
 
-       public final void preAggregateDenseMap(MatrixBlock m, double[] preAV, 
int rl, int ru, int cl, int cu, int nVal,
+       /**
+        * Pre aggregate the specified row and block range from a dense 
MatrixBlock to prepare for compressed multiplication.
+        * 
+        * @param db    The DenseBlock to extract values from.
+        * @param preAV The pre aggregate row linearized double array to put 
the values into.
+        * @param rl    The row lower to start from (this is referring to the 
left matrix of the multiplication)
+        * @param ru    The row upper to end at (not inclusive) (this is 
referring to the left matrix of the multiplication)
+        * @param cl    The column lower to start at (this is referring to the 
right matrix of the multiplication)
+        * @param cu    The column upper to end at (not inclusive) (this is 
referring to the right matrix of the
+        *              multiplication)
+        * @param nVal  The number of distinct values in the PreAV indicating 
number of columns in the Pre aggregate
+        * @param data  The mapping to column positions in the preAV
+        */
+       public final void preAggregateDenseMap(DenseBlock db, double[] preAV, 
int rl, int ru, int cl, int cu, int nVal,
                AMapToData data) {
                // multi row iterator.
                final AIterator it = getIterator(cl);
@@ -241,42 +277,87 @@ public abstract class AOffset implements Serializable {
                else if(it.offset > cu)
                        cacheIterator(it, cu); // cache this iterator.
                else if(rl == ru - 1) {
-                       final DenseBlock db = m.getDenseBlock();
                        final double[] mV = db.values(rl);
                        final int off = db.pos(rl);
                        // guaranteed contiguous.
-                       preAggregateDenseMapRow(mV, off, preAV, cu, nVal, data, 
it);
+                       preAggDenseMapSingleRow(mV, off, preAV, cu, nVal, data, 
it);
                }
                else {
-                       final DenseBlock db = m.getDenseBlock();
-                       preAggregateDenseMapRows(db, preAV, rl, ru, cl, cu, 
nVal, data, it);
+                       preAggDenseMapMultiRows(db, preAV, rl, ru, cl, cu, 
nVal, data, it);
                }
        }
 
-       protected final void preAggregateDenseMapRow(double[] mV, int off, 
double[] preAV, int cu, int nVal, AMapToData data,
+       private final void preAggDenseMapSingleRow(double[] mV, int off, 
double[] preAV, int cu, int nVal, AMapToData data,
                AIterator it) {
                final int last = getOffsetToLast();
                if(cu <= last)
-                       preAggregateDenseMapRowBellowEnd(mV, off, preAV, cu, 
nVal, data, it);
+                       preAggDenseMapRowBellowEnd(mV, off, preAV, cu, data, 
it);
                else
-                       preAggregateDenseMapRowEnd(mV, off, preAV, last, nVal, 
data, it);
+                       preAggDenseMapSingleRowEnd(mV, off, preAV, last, data, 
it);
        }
 
-       protected final void preAggregateDenseMapRowBellowEnd(final double[] 
mV, final int off, final double[] preAV, int cu,
-               final int nVal, final AMapToData data, final AIterator it) {
+       private final void preAggDenseMapRowBellowEnd(final double[] mV, final 
int off, final double[] preAV, int cu,
+               final AMapToData data, final AIterator it) {
+               // Increment and prepare iterator.
                it.offset += off;
                cu += off;
+               preAggDenseMapRowBE(mV, preAV, cu, data, it);
+               // Decrement iterator for next call.
+               it.offset -= off;
+               cu -= off;
+               cacheIterator(it, cu);
+       }
+
+       private final void preAggDenseMapRowBE(final double[] mV, final 
double[] preAV, final int cu, final AMapToData data,
+               final AIterator it) {
+               if(data instanceof MapToUByte)
+                       preAggDenseMapRowBE_UByte(mV, preAV, cu, (MapToUByte) 
data, it);
+               else if(data instanceof MapToByte)
+                       preAggDenseMapRowBE_Byte(mV, preAV, cu, (MapToByte) 
data, it);
+               else if(data instanceof MapToChar)
+                       preAggDenseMapRowBE_Char(mV, preAV, cu, (MapToChar) 
data, it);
+               else
+                       preAggDenseMapRowBE_Generic(mV, preAV, cu, data, it);
+       }
+
+       private final void preAggDenseMapRowBE_UByte(final double[] mV, final 
double[] preAV, final int cu,
+               final MapToUByte data, final AIterator it) {
+               // for JIT Compilation
                while(it.offset < cu) {
                        preAV[data.getIndex(it.getDataIndex())] += 
mV[it.offset];
                        it.next();
                }
-               it.offset -= off;
-               cu -= off;
-               cacheIterator(it, cu);
        }
 
-       protected final void preAggregateDenseMapRowEnd(final double[] mV, 
final int off, final double[] preAV,
-               final int last, final int nVal, final AMapToData data, final 
AIterator it) {
+       private final void preAggDenseMapRowBE_Byte(final double[] mV, final 
double[] preAV, final int cu,
+               final MapToByte data, final AIterator it) {
+               // for JIT Compilation
+               while(it.offset < cu) {
+                       preAV[data.getIndex(it.getDataIndex())] += 
mV[it.offset];
+                       it.next();
+               }
+       }
+
+       private final void preAggDenseMapRowBE_Char(final double[] mV, final 
double[] preAV, final int cu,
+               final MapToChar data, final AIterator it) {
+               // for JIT Compilation
+               while(it.offset < cu) {
+                       preAV[data.getIndex(it.getDataIndex())] += 
mV[it.offset];
+                       it.next();
+               }
+       }
+
+       private final void preAggDenseMapRowBE_Generic(final double[] mV, final 
double[] preAV, final int cu,
+               final AMapToData data, final AIterator it) {
+               // for JIT Compilation
+               while(it.offset < cu) {
+                       preAV[data.getIndex(it.getDataIndex())] += 
mV[it.offset];
+                       it.next();
+               }
+       }
+
+       private final void preAggDenseMapSingleRowEnd(final double[] mV, final 
int off, final double[] preAV, final int last,
+               final AMapToData data, final AIterator it) {
 
                while(it.offset < last) {
                        final int dx = it.getDataIndex();
@@ -286,17 +367,17 @@ public abstract class AOffset implements Serializable {
                preAV[data.getIndex(it.getDataIndex())] += mV[off + last];
        }
 
-       protected final void preAggregateDenseMapRows(DenseBlock db, double[] 
preAV, int rl, int ru, int cl, int cu,
-               int nVal, AMapToData data, AIterator it) {
+       private final void preAggDenseMapMultiRows(DenseBlock db, double[] 
preAV, int rl, int ru, int cl, int cu, int nVal,
+               AMapToData data, AIterator it) {
                if(!db.isContiguous())
                        throw new NotImplementedException("Not implemented 
support for preAggregate non contiguous dense matrix");
                else if(cu <= getOffsetToLast())
-                       preAggregateDenseMapRowsBelowEnd(db, preAV, rl, ru, cl, 
cu, nVal, data, it);
+                       preAggDenseMapMultiRowsBelowEnd(db, preAV, rl, ru, cl, 
cu, nVal, data, it);
                else
-                       preAggregateDenseMapRowsEnd(db, preAV, rl, ru, cl, cu, 
nVal, data, it);
+                       preAggDenseMapMultiRowsEnd(db, preAV, rl, ru, cl, cu, 
nVal, data, it);
        }
 
-       private void preAggregateDenseMapRowsBelowEnd(DenseBlock db, double[] 
preAV, int rl, int ru, int cl, int cu,
+       private final void preAggDenseMapMultiRowsBelowEnd(DenseBlock db, 
double[] preAV, int rl, int ru, int cl, int cu,
                int nVal, AMapToData data, AIterator it) {
                final double[] vals = db.values(rl);
                final int nCol = db.getCumODims(0);
@@ -311,8 +392,8 @@ public abstract class AOffset implements Serializable {
                cacheIterator(it, cu);
        }
 
-       private void preAggregateDenseMapRowsEnd(DenseBlock db, double[] preAV, 
int rl, int ru, int cl, int cu, int nVal,
-               AMapToData data, AIterator it) {
+       private final void preAggDenseMapMultiRowsEnd(DenseBlock db, double[] 
preAV, int rl, int ru, int cl, int cu,
+               int nVal, AMapToData data, AIterator it) {
                final double[] vals = db.values(rl);
                final int nCol = db.getCumODims(0);
                final int last = getOffsetToLast();
@@ -331,15 +412,15 @@ public abstract class AOffset implements Serializable {
                }
        }
 
-       public final void preAggregateSparseMap(SparseBlock sb, double[] preAV, 
int rl, int ru, int nVal, AMapToData data) {
+       public final void preAggSparseMap(SparseBlock sb, double[] preAV, int 
rl, int ru, int nVal, AMapToData data) {
                final AIterator it = getIterator();
                if(rl == ru - 1)
-                       preAggregateSparseMapRow(sb, preAV, rl, nVal, data, it);
+                       preAggSparseMapSingleRow(sb, preAV, rl, nVal, data, it);
                else
-                       preAggregateSparseMapRows(sb, preAV, rl, ru, nVal, 
data, it);
+                       preAggSparseMapMultipleRows(sb, preAV, rl, ru, nVal, 
data, it);
        }
 
-       private void preAggregateSparseMapRow(SparseBlock sb, double[] preAV, 
int r, int nVal, AMapToData data,
+       private final void preAggSparseMapSingleRow(SparseBlock sb, double[] 
preAV, int r, int nVal, AMapToData data,
                AIterator it) {
                if(sb.isEmpty(r))
                        return;
@@ -347,13 +428,13 @@ public abstract class AOffset implements Serializable {
                final int[] aix = sb.indexes(r);
                final int last = getOffsetToLast();
                if(aix[alen - 1] < last)
-                       preAggregateSparseMapRowBellowEnd(sb, preAV, r, nVal, 
data, it);
+                       preAggSparseMapRowBellowEnd(sb, preAV, r, nVal, data, 
it);
                else
-                       preAggregateSparseMapRowEnd(sb, preAV, r, nVal, data, 
it);
+                       preAggSparseMapRowEnd(sb, preAV, r, nVal, data, it);
        }
 
-       private final void preAggregateSparseMapRowBellowEnd(SparseBlock sb, 
double[] preAV, int r, int nVal,
-               AMapToData data, AIterator it) {
+       private final void preAggSparseMapRowBellowEnd(SparseBlock sb, double[] 
preAV, int r, int nVal, AMapToData data,
+               AIterator it) {
                int apos = sb.pos(r);
                final int alen = sb.size(r) + apos;
                final int[] aix = sb.indexes(r);
@@ -371,7 +452,7 @@ public abstract class AOffset implements Serializable {
                }
        }
 
-       private final void preAggregateSparseMapRowEnd(SparseBlock sb, double[] 
preAV, int r, int nVal, AMapToData data,
+       private final void preAggSparseMapRowEnd(SparseBlock sb, double[] 
preAV, int r, int nVal, AMapToData data,
                AIterator it) {
                int apos = sb.pos(r);
                final int alen = sb.size(r) + apos;
@@ -395,8 +476,8 @@ public abstract class AOffset implements Serializable {
                        preAV[data.getIndex(it.getDataIndex())] += avals[apos];
        }
 
-       private void preAggregateSparseMapRows(SparseBlock sb, double[] preAV, 
int rl, int ru, int nVal, AMapToData data,
-               AIterator it) {
+       private final void preAggSparseMapMultipleRows(final SparseBlock sb, 
final double[] preAV, final int rl,
+               final int ru, final int nVal, final AMapToData data, AIterator 
it) {
                int i = it.value();
                final int last = getOffsetToLast();
                final int[] aOffs = new int[ru - rl];
@@ -404,36 +485,29 @@ public abstract class AOffset implements Serializable {
                        aOffs[r - rl] = sb.pos(r);
 
                while(i < last) { // while we are not done iterating
-                       for(int r = rl; r < ru; r++) {
-                               if(sb.isEmpty(r))
-                                       continue;
-                               final int off = r - rl;
-                               int apos = aOffs[off]; // current offset
-                               final int alen = sb.size(r) + sb.pos(r);
-                               final int[] aix = sb.indexes(r);
-                               while(apos < alen && aix[apos] < i)// increment 
all pointers to offset
-                                       apos++;
-
-                               if(apos < alen && aix[apos] == i)
-                                       preAV[off * nVal + 
data.getIndex(it.getDataIndex())] += sb.values(r)[apos];
-                               aOffs[off] = apos;
-                       }
+                       preAggSparseMapRow(sb, preAV, rl, ru, nVal, 
data.getIndex(it.getDataIndex()), i, aOffs);
                        i = it.next();
                }
 
-               // process final element
+               preAggSparseMapRow(sb, preAV, rl, ru, nVal, 
data.getIndex(it.getDataIndex()), last, aOffs);
+       }
+
+       private final void preAggSparseMapRow(final SparseBlock sb, final 
double[] preAV, final int rl, final int ru,
+               final int nVal, final int dataIndex, final int i, final int[] 
aOffs) {
+
                for(int r = rl; r < ru; r++) {
                        if(sb.isEmpty(r))
                                continue;
                        final int off = r - rl;
-                       int apos = aOffs[off];
+                       int apos = aOffs[off]; // current offset
                        final int alen = sb.size(r) + sb.pos(r);
                        final int[] aix = sb.indexes(r);
-                       while(apos < alen && aix[apos] < last)
+                       final double[] avals = sb.values(r);
+                       while(apos < alen && aix[apos] < i)// increment all 
pointers to offset
                                apos++;
 
-                       if(apos < alen && aix[apos] == last)
-                               preAV[off * nVal + 
data.getIndex(it.getDataIndex())] += sb.values(r)[apos];
+                       if(apos < alen && aix[apos] == i)
+                               preAV[off * nVal + dataIndex] += avals[apos];
                        aOffs[off] = apos;
                }
        }
@@ -477,38 +551,62 @@ public abstract class AOffset implements Serializable {
         */
        public abstract int getLength();
 
+       /**
+        * Slice the offsets based on the specified range
+        * 
+        * @param l inclusive lower bound
+        * @param u exclusive upper bound
+        * @return The slice info containing the new slice.
+        */
        public OffsetSliceInfo slice(int l, int u) {
-               AIterator it = getIteratorSkipCache(l);
-               if(it == null || it.value() >= u)
-                       return new OffsetSliceInfo(-1, -1, new OffsetEmpty());
-               else if(l <= getOffsetToFirst() && u > getOffsetToLast()) {
+               final int first = getOffsetToFirst();
+               final int last = getOffsetToLast();
+               final int s = getSize();
+
+               if(l <= first && u > last) {
                        if(l == 0)
-                               return new OffsetSliceInfo(0, getSize(), this);
+                               return new OffsetSliceInfo(0, s, this);
                        else
-                               return new OffsetSliceInfo(0, getSize(), 
moveIndex(l));
+                               return new OffsetSliceInfo(0, s, moveIndex(l));
                }
+
+               final AIterator it = getIteratorSkipCache(l);
+
+               if(it == null || it.value() >= u)
+                       return EMPTY_SLICE;
+
+               if(u >= last) // If including the last do not iterate.
+                       return constructSliceReturn(l, u, it.getDataIndex(), s 
- 1, it.getOffsetsIndex(), getLength(), it.value(),
+                               last);
+               else // Have to iterate through until we find last.
+                       return genericSlice(l, u, it);
+       }
+
+       private OffsetSliceInfo genericSlice(int l, int u, AIterator it) {
+               // point at current one should be guaranteed inside the range.
                final int low = it.getDataIndex();
                final int lowOff = it.getOffsetsIndex();
                final int lowValue = it.value();
-
+               // set c
                int high = low;
                int highOff = lowOff;
                int highValue = lowValue;
-               if(u >= getOffsetToLast()) { // If including the last do not 
iterate.
-                       high = getSize() - 1;
-                       highOff = getLength();
-                       highValue = getOffsetToLast();
-               }
-               else { // Have to iterate through until we find last.
-                       while(it.value() < u) {
-                               // TODO add previous command that would allow 
us to simplify this loop.
-                               high = it.getDataIndex();
-                               highOff = it.getOffsetsIndex();
-                               highValue = it.value();
-                               it.next();
-                       }
+               while(it.value() < u) {
+                       // TODO add previous command that would allow us to 
simplify this loop.
+
+                       highValue = it.value();
+                       high = it.getDataIndex();
+                       highOff = it.getOffsetsIndex();
+
+                       it.next();
                }
-               
+
+               return constructSliceReturn(l, u, low, high, lowOff, highOff, 
lowValue, highValue);
+
+       }
+
+       private final OffsetSliceInfo constructSliceReturn(int l, int u, int 
low, int high, int lowOff, int highOff,
+               int lowValue, int highValue) {
                if(low == high)
                        return new OffsetSliceInfo(low, high + 1, new 
OffsetSingle(lowValue - l));
                else if(low + 1 == high)
@@ -574,21 +672,38 @@ public abstract class AOffset implements Serializable {
                        ss += s;
                }
 
-               try {
-                       return OffsetFactory.createOffset(r);
+               return OffsetFactory.createOffset(r);
+       }
+
+       /**
+        * Verify that the contained AOffset is a certain size, and not bigger 
when iterating though it.
+        * 
+        * @param size The max correct size.
+        */
+       public void verify(int size) {
+               AIterator it = getIterator();
+               if(it != null) {
+                       final int last = getOffsetToLast();
+                       if(it.getDataIndex() > size)
+                               throw new DMLCompressionException("Invalid 
index");
+                       while(it.value() < last) {
+                               it.next();
+                               if(it.getDataIndex() > size) // the last index 
is to high
+                                       throw new 
DMLCompressionException("Invalid index");
+                       }
                }
-               catch(Exception e) {
-                       throw new DMLCompressionException("failed to combine" + 
Arrays.toString(g) + " with S sizes: " + s);
+               else {
+                       if(size != 0)
+                               throw new DMLCompressionException("Invalid 
index");
                }
        }
 
        @Override
        public String toString() {
-               StringBuilder sb = new StringBuilder();
+               final StringBuilder sb = new StringBuilder();
                sb.append(this.getClass().getSimpleName());
                final AIterator it = getIterator();
                if(it != null) {
-                       int i = it.offset;
                        final int last = getOffsetToLast();
                        sb.append("[");
                        sb.append(it.offset);
@@ -596,30 +711,40 @@ public abstract class AOffset implements Serializable {
                                it.next();
                                sb.append(", ");
                                sb.append(it.offset);
-                               if(it.offset - i <= 0)
-                                       throw new 
DMLCompressionException("Invalid offset");
-                               else
-                                       i = it.offset;
                        }
                        sb.append("]");
+               }
+               if(CompressedMatrixBlock.debug) {
+                       if(cacheRow != null && cacheRow.get() != null) {
+                               sb.append("\nOffset CacheRow: ");
+                               sb.append(cacheRow.get().toString());
+                       }
+
+                       if(skipList != null && skipList.get() != null) {
+                               sb.append("\nSkipList:");
+                               sb.append(Arrays.toString(skipList.get()));
+                       }
 
-                       if(it.offset != last)
-                               throw new DMLCompressionException(
-                                       "Invalid iteration of offset when 
making string, the last offset is not equal to a iteration: "
-                                               + getOffsetToLast() + " String: 
" + sb.toString());
                }
                return sb.toString();
        }
 
-       public static AOffset reverse(int numRows, AOffset offsets) {
-               if(numRows < offsets.getOffsetToLast()) {
-                       throw new DMLRuntimeException(
-                               "Invalid number of rows for reverse: last: " + 
offsets.getOffsetToLast() + " numRows: " + numRows);
+       /**
+        * Reverse the locations of the offsets such that the inverse offsets 
are returned.
+        * 
+        * This means that for instance if the current offsets were 1, 3 and 5 
in a 5 long list, we return 0, 2 and 4.
+        * 
+        * @param numRows The total number of rows to be contained, This should 
be greater or equal to last.
+        * @return The reverse offsets.
+        */
+       public AOffset reverse(int numRows) {
+               final int last = getOffsetToLast();
+               if(numRows < last) {
+                       throw new DMLRuntimeException("Invalid number of rows 
for reverse: last: " + last + " numRows: " + numRows);
                }
 
-               int[] newOff = new int[numRows - offsets.getSize()];
-               final AOffsetIterator it = offsets.getOffsetIterator();
-               final int last = offsets.getOffsetToLast();
+               int[] newOff = new int[numRows - getSize()];
+               final AOffsetIterator it = getOffsetIterator();
                int i = 0;
                int j = 0;
 
@@ -635,13 +760,13 @@ public abstract class AOffset implements Serializable {
                while(i < numRows)
                        newOff[j++] = i++;
 
-               if(j != newOff.length)
-                       throw new DMLRuntimeException(
-                               "Not assigned all offsets ... something must be 
wrong:\n" + offsets + "\n" + Arrays.toString(newOff));
                return OffsetFactory.createOffset(newOff);
-
        }
 
+       /**
+        * Offset slice info containing the start and end index an offset that 
contains the slice, and an new AOffset
+        * containing only the sliced elements
+        */
        public static final class OffsetSliceInfo {
                public final int lIndex;
                public final int uIndex;
@@ -667,22 +792,27 @@ public abstract class AOffset implements Serializable {
 
        }
 
-       protected static class OffsetCache {
-               protected final AIterator it;
-               protected final int row;
+       private static class OffsetCache {
+               private final AIterator it;
+               private final int row;
 
-               protected OffsetCache(AIterator it, int row) {
+               private OffsetCache(AIterator it, int row) {
                        this.it = it;
                        this.row = row;
                }
+
+               @Override
+               public String toString() {
+                       return "r " + row + " i" + it + "\n";
+               }
        }
 
-       protected static class OffsetCacheV2 {
-               protected final int row;
-               protected final int offIndex;
-               protected final int dataIndex;
+       private static class OffsetCacheV2 {
+               private final int row;
+               private final int offIndex;
+               private final int dataIndex;
 
-               protected OffsetCacheV2(int row, int dataIndex, int offIndex) {
+               private OffsetCacheV2(int row, int dataIndex, int offIndex) {
                        this.row = row;
                        this.dataIndex = dataIndex;
                        this.offIndex = offIndex;
@@ -690,7 +820,7 @@ public abstract class AOffset implements Serializable {
 
                @Override
                public String toString() {
-                       return "r" + row + " d" + dataIndex + " o" + offIndex;
+                       return "r" + row + " d " + dataIndex + " o " + offIndex 
+ "\n";
                }
        }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetChar.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetChar.java
index 3f37ddac9c..159c73babc 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetChar.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetChar.java
@@ -26,7 +26,7 @@ import java.util.Arrays;
 import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysds.utils.MemoryEstimates;
 
-public class OffsetChar extends AOffset implements ISliceOffset{
+public class OffsetChar extends AOffset implements ISliceOffset {
 
        private static final long serialVersionUID = -1192266421395964882L;
        protected static final int maxV = Character.MAX_VALUE;
@@ -41,7 +41,7 @@ public class OffsetChar extends AOffset implements 
ISliceOffset{
                this.offsetToFirst = offsetToFirst;
                this.offsetToLast = offsetToLast;
                this.noZero = noZero;
-               if(CompressedMatrixBlock.debug){
+               if(CompressedMatrixBlock.debug) {
                        this.toString();
                }
        }
@@ -131,8 +131,8 @@ public class OffsetChar extends AOffset implements 
ISliceOffset{
 
                OffsetFactory.getNoZero(offsets);
                return new OffsetChar(offsets, offsetToFirst, offsetToLast, 
OffsetFactory.getNoZero(offsets));
-       }       
-       
+       }
+
        @Override
        public OffsetSliceInfo slice(int lowOff, int highOff, int lowValue, int 
highValue, int low, int high) {
                char[] newOffsets = Arrays.copyOfRange(offsets, lowOff, 
highOff);
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetEmpty.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetEmpty.java
index 67a6ad55d2..73264c8476 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetEmpty.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetEmpty.java
@@ -23,6 +23,8 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.sysds.runtime.compress.DMLCompressionException;
+
 public class OffsetEmpty extends AOffset {
 
        private static final long serialVersionUID = -471610497392221790L;
@@ -55,14 +57,19 @@ public class OffsetEmpty extends AOffset {
                return new OffsetEmpty();
        }
 
+       @Override 
+       public AIterator getIterator(int row) {
+               return null;
+       }
+
        @Override
        public int getOffsetToFirst() {
-               return Integer.MAX_VALUE;
+               throw new DMLCompressionException("Invalid call");
        }
 
        @Override
        public int getOffsetToLast() {
-               return Integer.MAX_VALUE;
+               throw new DMLCompressionException("Invalid call");
        }
 
        @Override
@@ -86,7 +93,7 @@ public class OffsetEmpty extends AOffset {
 
        @Override
        public OffsetSliceInfo slice(int l, int u) {
-               return new OffsetSliceInfo(-1, -1, this);
+               return EMPTY_SLICE;
        }
 
        @Override
@@ -98,4 +105,9 @@ public class OffsetEmpty extends AOffset {
        public int getLength() {
                return 0;
        }
+
+       @Override 
+       public synchronized void constructSkipList() {
+               // do nothing.
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetFactory.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetFactory.java
index 319b7ce89f..29b7bd0c86 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetFactory.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysds.runtime.compress.DMLCompressionException;
 import org.apache.sysds.runtime.compress.utils.IntArrayList;
 
@@ -125,11 +126,16 @@ public final class OffsetFactory {
 
                        final long byteSize = 
OffsetByte.estimateInMemorySize(endLength + correctionByte);
                        final long charSize = 
OffsetChar.estimateInMemorySize(endLength + correctionChar);
-
+                       final AOffset ret;
                        if(byteSize < charSize)
-                               return createByte(indexes, apos, alen);
+                               ret = createByte(indexes, apos, alen);
                        else
-                               return createChar(indexes, apos, alen);
+                               ret = createChar(indexes, apos, alen);
+                       
+                       if(CompressedMatrixBlock.debug)
+                               ret.verify(alen - apos);
+
+                       return ret;
                }
                catch(Exception e) {
                        if(indexes == null)
@@ -245,7 +251,7 @@ public final class OffsetFactory {
                                final int nv = indexes[i];
                                final int offsetSize = nv - ov;
                                if(offsetSize <= 0)
-                                       throw new 
DMLCompressionException("invalid offset construction with negative sequences");
+                                       throw new 
DMLCompressionException("invalid offset construction with negative sequences 
Byte");
                                final byte mod = (byte) (offsetSize % mp1);
                                offsets[p++] = mod;
                                ov = nv;
@@ -304,7 +310,7 @@ public final class OffsetFactory {
                                final int nv = indexes[i];
                                final int offsetSize = (nv - ov);
                                if(offsetSize <= 0)
-                                       throw new 
DMLCompressionException("invalid offset construction with negative sequences");
+                                       throw new 
DMLCompressionException("invalid offset construction with negative sequences 
Char");
                                final int mod = offsetSize % mp1;
                                offsets[p++] = (char) (mod);
                                ov = nv;
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetSingle.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetSingle.java
index d77fec4a25..98dab591bf 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetSingle.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetSingle.java
@@ -90,12 +90,10 @@ public class OffsetSingle extends AOffset {
 
        @Override
        public OffsetSliceInfo slice(int l, int u) {
-
                if(l <= off && u > off)
                        return new OffsetSliceInfo(0, 1, new OffsetSingle(off - 
l));
                else
-                       return new OffsetSliceInfo(-1, -1, new OffsetEmpty());
-
+                       return EMPTY_SLICE;
        }
 
        @Override
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetTwo.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetTwo.java
index a76fc2112d..9c53e85e70 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetTwo.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetTwo.java
@@ -36,7 +36,7 @@ public class OffsetTwo extends AOffset {
                this.first = first;
                this.last = last;
                if(last <= first)
-                       throw new DMLCompressionException("Invalid offsets last 
should be greater than first");
+                       throw new DMLCompressionException("Invalid offsets last 
should be greater than first: " + first + "->" + last);
        }
 
        @Override
@@ -98,7 +98,7 @@ public class OffsetTwo extends AOffset {
        public OffsetSliceInfo slice(int l, int u) {
                if(l <= first) {
                        if(u < first)
-                               return new OffsetSliceInfo(-1, -1, new 
OffsetEmpty());
+                               return EMPTY_SLICE;
                        else if(u > last)
                                return new OffsetSliceInfo(0, 2, moveIndex(l));
                        else
@@ -107,7 +107,7 @@ public class OffsetTwo extends AOffset {
                else if(l <= last && u > last)
                        return new OffsetSliceInfo(1, 2, new OffsetSingle(last 
- l));
                else
-                       return new OffsetSliceInfo(-1, -1, new OffsetEmpty());
+                       return EMPTY_SLICE;
        }
 
        @Override
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/DenseEncoding.java
 
b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/DenseEncoding.java
index 11e23000ba..8c612955aa 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/DenseEncoding.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/DenseEncoding.java
@@ -118,7 +118,7 @@ public class DenseEncoding extends AEncode {
                        else
                                ret.set(r, mv);
                }
-               return new ImmutablePair<>(new 
DenseEncoding(MapToFactory.resize(ret, m.size())), m);
+               return new ImmutablePair<>(new 
DenseEncoding(ret.resize(m.size())), m);
        }
 
        private final DenseEncoding combineSparseMapToData(final AMapToData 
ret, final int maxUnique, final int nVl) {
@@ -133,7 +133,7 @@ public class DenseEncoding extends AEncode {
                        ret.set(r, mv - 1);
                }
                // Potential iteration 3 of resize
-               return new DenseEncoding(MapToFactory.resize(ret, newUID - 1));
+               return new DenseEncoding(ret.resize(newUID - 1));
        }
 
        protected DenseEncoding combineDense(final DenseEncoding other) {
@@ -198,7 +198,7 @@ public class DenseEncoding extends AEncode {
 
                for(int r = 0; r < size; r++)
                        addValHashMap(lm.getIndex(r) + rm.getIndex(r) * nVL, r, 
m, ret);
-               return new DenseEncoding(MapToFactory.resize(ret, m.size()));
+               return new DenseEncoding(ret.resize(m.size()));
        }
 
        protected final DenseEncoding combineDenseWithMapToData(final 
AMapToData lm, final AMapToData rm, final int size,
@@ -206,7 +206,7 @@ public class DenseEncoding extends AEncode {
                int newUID = 1;
                for(int r = 0; r < size; r++)
                        newUID = addValMapToData(lm.getIndex(r) + 
rm.getIndex(r) * nVL, r, m, newUID, ret);
-               return new DenseEncoding(MapToFactory.resize(ret, newUID - 1));
+               return new DenseEncoding(ret.resize(newUID - 1));
        }
 
        protected static int addValMapToData(final int nv, final int r, final 
AMapToData map, int newId,
diff --git a/src/main/java/org/apache/sysds/runtime/data/DenseBlock.java 
b/src/main/java/org/apache/sysds/runtime/data/DenseBlock.java
index 8796717422..2f085537bd 100644
--- a/src/main/java/org/apache/sysds/runtime/data/DenseBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/data/DenseBlock.java
@@ -775,7 +775,7 @@ public abstract class DenseBlock implements Serializable, 
Block
        }
 
        public void fill(double value){
-               reset(_odims, value);
+               reset(_rlen, _odims, value);
        }
 
        @Override
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/combine/CombineEncodings.java
 
b/src/test/java/org/apache/sysds/test/component/compress/combine/CombineEncodings.java
index 5a3f66ca37..847e7ff1e7 100644
--- 
a/src/test/java/org/apache/sysds/test/component/compress/combine/CombineEncodings.java
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/combine/CombineEncodings.java
@@ -37,52 +37,52 @@ public class CombineEncodings {
 
        @Test
        public void combineCustom() {
-               IEncode ae = new DenseEncoding(MapToFactory.create(10, new 
int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, 10));
-               IEncode be = new DenseEncoding(MapToFactory.create(10, new 
int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, 10));
+               IEncode ae = new DenseEncoding(MapToFactory.create(new int[] 
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, 10));
+               IEncode be = new DenseEncoding(MapToFactory.create(new int[] 
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, 10));
                Pair<IEncode, Map<Integer, Integer>> cec = 
ae.combineWithMap(be);
                IEncode ce = cec.getLeft();
                Map<Integer, Integer> cem = cec.getRight();
                assertTrue(cem.size() == 10);
                assertTrue(cem.size() == ce.getUnique());
-               assertTrue(ce.equals(new DenseEncoding(MapToFactory.create(10, 
new int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, 10))));
+               assertTrue(ce.equals(new DenseEncoding(MapToFactory.create(new 
int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, 10))));
 
        }
 
        @Test
        public void combineCustom2() {
-               IEncode ae = new DenseEncoding(MapToFactory.create(10, new 
int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 8}, 10));
-               IEncode be = new DenseEncoding(MapToFactory.create(10, new 
int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, 10));
+               IEncode ae = new DenseEncoding(MapToFactory.create(new int[] 
{0, 1, 2, 3, 4, 5, 6, 7, 8, 8}, 10));
+               IEncode be = new DenseEncoding(MapToFactory.create(new int[] 
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, 10));
                Pair<IEncode, Map<Integer, Integer>> cec = 
ae.combineWithMap(be);
                IEncode ce = cec.getLeft();
                Map<Integer, Integer> cem = cec.getRight();
                assertTrue(cem.size() == 10);
                assertTrue(cem.size() == ce.getUnique());
-               assertTrue(ce.equals(new DenseEncoding(MapToFactory.create(10, 
new int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, 10))));
+               assertTrue(ce.equals(new DenseEncoding(MapToFactory.create(new 
int[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, 10))));
 
        }
 
        @Test
        public void combineCustom3() {
-               IEncode ae = new DenseEncoding(MapToFactory.create(10, new 
int[] {0, 1, 2, 3, 4, 5, 6, 7, 7, 8}, 10));
-               IEncode be = new DenseEncoding(MapToFactory.create(10, new 
int[] {0, 1, 2, 3, 4, 5, 6, 7, 7, 9}, 10));
+               IEncode ae = new DenseEncoding(MapToFactory.create(new int[] 
{0, 1, 2, 3, 4, 5, 6, 7, 7, 8}, 10));
+               IEncode be = new DenseEncoding(MapToFactory.create(new int[] 
{0, 1, 2, 3, 4, 5, 6, 7, 7, 9}, 10));
                Pair<IEncode, Map<Integer, Integer>> cec = 
ae.combineWithMap(be);
                IEncode ce = cec.getLeft();
                Map<Integer, Integer> cem = cec.getRight();
                assertTrue(cem.size() == 9);
                assertTrue(cem.size() == ce.getUnique());
-               assertTrue(ce.equals(new DenseEncoding(MapToFactory.create(10, 
new int[] {0, 1, 2, 3, 4, 5, 6, 7, 7, 8}, 9))));
+               assertTrue(ce.equals(new DenseEncoding(MapToFactory.create(new 
int[] {0, 1, 2, 3, 4, 5, 6, 7, 7, 8}, 9))));
 
        }
 
        @Test
        public void combineCustom4() {
-               IEncode ae = new DenseEncoding(MapToFactory.create(10, new 
int[] {0, 1, 2, 3, 4, 5, 6, 7, 7, 0}, 10));
-               IEncode be = new DenseEncoding(MapToFactory.create(10, new 
int[] {0, 1, 2, 3, 4, 5, 6, 7, 7, 0}, 10));
+               IEncode ae = new DenseEncoding(MapToFactory.create(new int[] 
{0, 1, 2, 3, 4, 5, 6, 7, 7, 0}, 10));
+               IEncode be = new DenseEncoding(MapToFactory.create(new int[] 
{0, 1, 2, 3, 4, 5, 6, 7, 7, 0}, 10));
                Pair<IEncode, Map<Integer, Integer>> cec = 
ae.combineWithMap(be);
                IEncode ce = cec.getLeft();
                Map<Integer, Integer> cem = cec.getRight();
                assertTrue(cem.size() == 8);
                assertTrue(cem.size() == ce.getUnique());
-               assertTrue(ce.equals(new DenseEncoding(MapToFactory.create(10, 
new int[] {0, 1, 2, 3, 4, 5, 6, 7, 7, 0}, 8))));
+               assertTrue(ce.equals(new DenseEncoding(MapToFactory.create(new 
int[] {0, 1, 2, 3, 4, 5, 6, 7, 7, 0}, 8))));
        }
 }
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/estim/encoding/EncodeSampleCustom.java
 
b/src/test/java/org/apache/sysds/test/component/compress/estim/encoding/EncodeSampleCustom.java
index eab7bdc51f..6c72c4593c 100644
--- 
a/src/test/java/org/apache/sysds/test/component/compress/estim/encoding/EncodeSampleCustom.java
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/estim/encoding/EncodeSampleCustom.java
@@ -56,8 +56,8 @@ public class EncodeSampleCustom {
                int[] d2 = 
readData("src/test/resources/component/compress/sample/s2.dat");
                int m1 = Arrays.stream(d1).max().getAsInt() + 1;
                int m2 = Arrays.stream(d2).max().getAsInt() + 1;
-               AMapToData dm1 = MapToFactory.create(d1.length, d1, m1);
-               AMapToData dm2 = MapToFactory.create(d2.length, d2, m2);
+               AMapToData dm1 = MapToFactory.create(d1, m1);
+               AMapToData dm2 = MapToFactory.create(d2, m2);
 
                DenseEncoding de1 = new DenseEncoding(dm1);
                DenseEncoding de2 = new DenseEncoding(dm2);
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/mapping/CustomMappingTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/mapping/CustomMappingTest.java
index de3dc291b3..8ab13cf9f3 100644
--- 
a/src/test/java/org/apache/sysds/test/component/compress/mapping/CustomMappingTest.java
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/mapping/CustomMappingTest.java
@@ -37,7 +37,7 @@ public class CustomMappingTest {
                try {
 
                        CompressedMatrixBlock.debug = true;
-                       MapToFactory.create(data.length, data, 2);
+                       MapToFactory.create(data, 2);
                        MapToFactory.create(127, data, 2);
                }
                catch(RuntimeException e) {
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/mapping/MappingPreAggregateTests.java
 
b/src/test/java/org/apache/sysds/test/component/compress/mapping/MappingPreAggregateTests.java
index 1c2d105251..7f51c8ed02 100644
--- 
a/src/test/java/org/apache/sysds/test/component/compress/mapping/MappingPreAggregateTests.java
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/mapping/MappingPreAggregateTests.java
@@ -265,7 +265,7 @@ public class MappingPreAggregateTests {
                                try {
                                        final int size = m.size();
                                        double[] pre = new 
double[m.getUnique()];
-                                       m.preAggregateDense(mb, pre, 0, 1, 0, 
size, o);
+                                       m.preAggregateDense(mb.getDenseBlock(), 
pre, 0, 1, 0, size, o);
                                        compareRes(preRef, pre, 0);
                                }
                                catch(Exception e) {
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/mapping/MappingTests.java
 
b/src/test/java/org/apache/sysds/test/component/compress/mapping/MappingTests.java
index 6d9a6f4828..c777a9bfd3 100644
--- 
a/src/test/java/org/apache/sysds/test/component/compress/mapping/MappingTests.java
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/mapping/MappingTests.java
@@ -213,7 +213,7 @@ public class MappingTests {
        @Test
        public void resizeToSameSize() {
                // if we resize to same size return the same object!
-               AMapToData m_same = MapToFactory.resize(m, m.getUnique());
+               AMapToData m_same = m.resize( m.getUnique());
                assertEquals("Resize did not return the correct same objects", 
m_same, m);
        }
 
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/mapping/MappingTestsResize.java
 
b/src/test/java/org/apache/sysds/test/component/compress/mapping/MappingTestsResize.java
index 8b5ffa2b4b..3bcbb7ac04 100644
--- 
a/src/test/java/org/apache/sysds/test/component/compress/mapping/MappingTestsResize.java
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/mapping/MappingTestsResize.java
@@ -73,7 +73,7 @@ public class MappingTestsResize {
 
        @Test
        public void resize() {
-               MappingTests.compare(MapToFactory.resize(m, 
getMaxSmaller(type)), m);
+               MappingTests.compare(m.resize(getMaxSmaller(type)), m);
        }
 
        private int getMaxSmaller(MAP_TYPE type) {
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/offset/CustomOffsetTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/offset/CustomOffsetTest.java
index f3fd30c90a..92736f1fd0 100644
--- 
a/src/test/java/org/apache/sysds/test/component/compress/offset/CustomOffsetTest.java
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/offset/CustomOffsetTest.java
@@ -20,20 +20,76 @@
 package org.apache.sysds.test.component.compress.offset;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
+import org.apache.sysds.runtime.compress.colgroup.offset.AIterator;
 import org.apache.sysds.runtime.compress.colgroup.offset.AOffset;
 import 
org.apache.sysds.runtime.compress.colgroup.offset.AOffset.OffsetSliceInfo;
 import org.apache.sysds.runtime.compress.colgroup.offset.OffsetFactory;
 import org.junit.Test;
 
 public class CustomOffsetTest {
+       protected static final Log LOG = 
LogFactory.getLog(CustomOffsetTest.class.getName());
 
        @Test
        public void sliceE() {
                AOffset a = OffsetFactory.createOffset(new int[] {441, 1299, 
14612, 16110, 18033, 18643, 18768, 25798, 32315});
 
                OffsetSliceInfo i = a.slice(1000, 2000);
-               System.out.println(a);
                assertEquals(OffsetFactory.createOffset(new int[] {299}), 
i.offsetSlice);
        }
+
+       @Test
+       public void catOffset() {
+               // test case for BWARE
+               int[] in = new int[] {17689, 37830, 44395, 57282, 67605, 72565, 
77890, 104114, 127762, 208612, 211534, 216942,
+                       223576, 239395, 245210, 265202, 269410, 301734, 302389, 
302679, 302769, 303286, 303331, 303920, 304125, 304365,
+                       304743, 306244, 306260, 306745, 307624, 307651, 309715, 
310232, 310270, 311177};
+
+               AOffset off = OffsetFactory.createOffset(in);
+               OffsetTests.compare(off, in);
+       }
+
+       @Test
+       public void catOffsetSlice() {
+               // test case for BWARE
+
+               int[] in = new int[] {17689, 37830, 44395, 57282, 67605, 72565, 
77890, 104114, 127762, 208612, 211534, 216942,
+                       223576, 239395, 245210, 265202, 269410, 301734, 302389, 
302679, 302769, 303286, 303331, 303920, 304125, 304365,
+                       304743, 306244, 306260, 306745, 307624, 307651, 309715, 
310232, 310270, 311177};
+
+               AOffset off = OffsetFactory.createOffset(in);
+               off.slice(112000, 128000); // check for no crash
+
+       }
+
+       @Test
+       public void printSlice() {
+               AOffset off = OffsetFactory.createOffset(OffsetTests.gen(100, 
10, 123452));
+               off.slice(10, 20).toString();
+       }
+
+       @Test
+       public void printSkipList() {
+               AOffset off = OffsetFactory.createOffset(OffsetTests.gen(1004, 
10, 123452));
+               CompressedMatrixBlock.debug = true;
+               off.constructSkipList();
+               String s = off.toString();
+               assertTrue(s.contains("SkipList"));
+       }
+
+       @Test
+       public void printCache() {
+               AOffset off = OffsetFactory.createOffset(OffsetTests.gen(1004, 
10, 123452));
+               CompressedMatrixBlock.debug = true;
+
+               AIterator it = off.getIterator(3000);
+
+               off.cacheIterator(it, 3000);
+               String s = off.toString();
+               assertTrue(s.contains("CacheRow"));
+       }
 }
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/offset/LargeOffsetTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/offset/LargeOffsetTest.java
index 4e6c934092..8956a5d43c 100644
--- 
a/src/test/java/org/apache/sysds/test/component/compress/offset/LargeOffsetTest.java
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/offset/LargeOffsetTest.java
@@ -20,6 +20,8 @@
 package org.apache.sysds.test.component.compress.offset;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -57,6 +59,8 @@ public class LargeOffsetTest {
                                tests.add(new Object[] {gen(3030, 10, i), t});
                                tests.add(new Object[] {gen(3030, 300, i), t});
                                tests.add(new Object[] {gen(10000, 501, i), t});
+                               tests.add(new Object[] {gen(1000, 3000, i), t});
+                               tests.add(new Object[] {gen(1000, 10000, i), 
t});
                        }
                }
                return tests;
@@ -67,6 +71,7 @@ public class LargeOffsetTest {
                this.data = data;
                this.type = type;
                this.o = OffsetTestUtil.getOffset(data, type);
+               o.clearSkipList();
        }
 
        @Test
@@ -119,6 +124,56 @@ public class LargeOffsetTest {
                }
        }
 
+       @Test
+       public void slice() {
+               try {
+                       if(data.length > 50) {
+
+                               AOffset sl = o.slice(data[4], data[data.length 
- 30]).offsetSlice;
+
+                               AIterator it = sl.getIterator();
+                               assertEquals(0, it.value());
+                               for(int i = 4; i < data.length - 30; i++) {
+                                       if(it.getDataIndex() != i - 4) {
+                                               fail("not equivalent index 
reached: expected: " + (i - 4) + " got: " + it.getDataIndex());
+                                       }
+                                       if(data[i] - data[4] != it.value()) {
+                                               fail("not equivalent index 
expected: " + (data[i] - data[4]) + " but got " + it.value() + " at index "
+                                                       + it.getDataIndex() + " 
correct index is: " + (i - 4));
+                                       }
+                                       if(i == data.length - 31) { // stop 
here.
+                                               break;
+                                       }
+                                       it.next();
+                               }
+                       }
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void getSkipList() {
+               o.constructSkipList();
+               o.clearSkipList();
+               o.constructSkipList();
+       }
+
+       @Test
+       public void getIteratorClearSkipList() {
+               o.constructSkipList();
+               o.clearSkipList();
+               AIterator it = o.getIterator(data[10]);
+               assertEquals(data[10], it.value());
+               AIterator it2 = o.getIterator(data[10]);
+               assertTrue(it.equals(it2));
+
+               AIterator it3 = o.getIterator(data[data.length-2]);
+               assertEquals(data[data.length-2], it3.value());
+       }
+
        private static void compare(AIterator it, int[] data, int off) {
                for(; off < data.length; off++) {
                        assertEquals(data[off], it.value());
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/offset/NegativeOffsetTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/offset/NegativeOffsetTest.java
new file mode 100644
index 0000000000..b22f5c842e
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/offset/NegativeOffsetTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.compress.offset;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+import org.apache.sysds.runtime.compress.colgroup.offset.AOffset;
+import org.apache.sysds.runtime.compress.colgroup.offset.OffsetFactory;
+import 
org.apache.sysds.runtime.compress.colgroup.offset.OffsetFactory.OFF_TYPE_SPECIALIZATIONS;
+import org.junit.Test;
+
+public class NegativeOffsetTest {
+
+       @Test(expected = Exception.class)
+       public void incorrectConstruct() throws Exception{
+               ByteArrayOutputStream bos = new ByteArrayOutputStream();
+               DataOutputStream fos = new DataOutputStream(bos);
+               
+               fos.writeByte((byte)OFF_TYPE_SPECIALIZATIONS.BYTE.ordinal());
+               fos.writeInt(0);
+               fos.writeInt(2);
+               fos.writeInt(10);
+               fos.writeInt(2);
+
+               fos.writeByte(5);
+               fos.writeByte(4);
+
+               // Serialize in
+               ByteArrayInputStream bis = new 
ByteArrayInputStream(bos.toByteArray());
+               DataInputStream fis = new DataInputStream(bis);
+
+               AOffset n = OffsetFactory.readIn(fis);
+               n.verify(10);
+       }
+
+
+       @Test(expected = Exception.class)
+       public void notEmptyIteratorOnEmpty(){
+               AOffset a = OffsetFactory.createOffset(new int[]{});
+               AOffset spy = spy(a);
+               AOffset b = OffsetFactory.createOffset(new int[]{1,2,3});
+               when(spy.getIterator()).thenReturn(b.getIterator());
+               spy.verify(0);
+       }
+
+       @Test(expected = Exception.class)
+       public void verifyIncorrectSizeGivenOnEmpty(){
+               AOffset a = OffsetFactory.createOffset(new int[]{});
+               a.verify(2);
+       }
+
+       @Test(expected = Exception.class)
+       public void toManyElementsForVerify1(){
+               AOffset b = OffsetFactory.createOffset(new int[]{1,2,3,5});
+               b.verify(2);
+       }
+
+       @Test(expected = Exception.class)
+       public void toManyElementsForVerify2(){
+               AOffset b = OffsetFactory.createOffset(new int[]{1,2,3});
+               b.verify(1);
+       }
+
+       @Test(expected = Exception.class)
+       public void toManyElementsForVerify3(){
+               AOffset b = OffsetFactory.createOffset(new int[]{1,2,3,5});
+               b.verify(-1);// stupid however valid test.
+       }
+
+}
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetPreAggTests.java
 
b/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetPreAggTests.java
new file mode 100644
index 0000000000..cce35ecb33
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetPreAggTests.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.compress.offset;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
+import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
+import 
org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory.MAP_TYPE;
+import org.apache.sysds.runtime.compress.colgroup.offset.AOffset;
+import org.apache.sysds.runtime.compress.colgroup.offset.OffsetFactory;
+import org.apache.sysds.runtime.data.DenseBlock;
+import org.apache.sysds.runtime.data.DenseBlockFactory;
+import org.junit.Test;
+
+public class OffsetPreAggTests {
+
+       static DenseBlock db = DenseBlockFactory.createDenseBlock(2, 5);
+       static{
+               for(int i = 0; i < 5; i++) {
+                       db.set(0, i, i);
+                       db.set(1, i, i);
+               }
+       }
+
+       @Test
+       public void preAggDense() {
+               try {
+
+                       AOffset off = OffsetFactory.createOffset(new int[] {0, 
1, 2, 3, 4});
+                       DenseBlock db = DenseBlockFactory.createDenseBlock(1, 
5);
+
+                       db.fill(1.0);
+
+                       for(MAP_TYPE t : MAP_TYPE.values()) {
+                               double[] preAV = new double[1];
+                               AMapToData data = MapToFactory.create(5, t);
+
+                               off.preAggregateDenseMap(db, preAV, 0, 1, 0, 5, 
1, data);
+
+                               assertEquals(5.0, preAV[0], 0.0);
+                       }
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void preAggDense2() {
+               try {
+
+                       AOffset off = OffsetFactory.createOffset(new int[] {0, 
1, 2, 3, 4});
+               
+
+                       for(MAP_TYPE t : MAP_TYPE.values()) {
+                               double[] preAV = new double[1];
+                               AMapToData data = MapToFactory.create(5, t);
+
+                               off.preAggregateDenseMap(db, preAV, 0, 1, 0, 5, 
1, data);
+
+                               assertEquals(10.0, preAV[0], 0.0);
+                       }
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void preAggDense3() {
+               try {
+
+                       AOffset off = OffsetFactory.createOffset(new int[] {0, 
1, 2, 4});
+               
+
+                       for(MAP_TYPE t : MAP_TYPE.values()) {
+                               double[] preAV = new double[1];
+                               AMapToData data = MapToFactory.create(5, t);
+
+                               off.preAggregateDenseMap(db, preAV, 0, 1, 0, 5, 
1, data);
+
+                               assertEquals(7.0, preAV[0], 0.0);
+                       }
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void preAggDense4() {
+               try {
+
+                       AOffset off = OffsetFactory.createOffset(new int[] {0, 
1, 2});
+               
+                       for(MAP_TYPE t : MAP_TYPE.values()) {
+                               double[] preAV = new double[1];
+                               AMapToData data = MapToFactory.create(5, t);
+
+                               off.preAggregateDenseMap(db, preAV, 0, 1, 0, 5, 
1, data);
+
+                               assertEquals(3.0, preAV[0], 0.0);
+                       }
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void preAggDense5() {
+               try {
+
+                       AOffset off = OffsetFactory.createOffset(new int[] {0, 
2});
+               
+
+                       for(MAP_TYPE t : MAP_TYPE.values()) {
+                               double[] preAV = new double[1];
+                               AMapToData data = MapToFactory.create(5, t);
+
+                               off.preAggregateDenseMap(db, preAV, 0, 1, 0, 5, 
1, data);
+
+                               assertEquals(2.0, preAV[0], 0.0);
+                       }
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void preAggDense6() {
+               try {
+
+                       AOffset off = OffsetFactory.createOffset(new int[] {0, 
2});
+               
+                       for(MAP_TYPE t : MAP_TYPE.values()) {
+                               double[] preAV = new double[1];
+                               AMapToData data = MapToFactory.create(5, t);
+
+                               off.preAggregateDenseMap(db, preAV, 1, 2, 0, 5, 
1, data);
+
+                               assertEquals(2.0, preAV[0], 0.0);
+                       }
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void preAggDense7() {
+               try {
+
+                       AOffset off = OffsetFactory.createOffset(new int[] {1, 
2, 3});
+               
+                       for(MAP_TYPE t : MAP_TYPE.values()) {
+                               double[] preAV = new double[1];
+                               AMapToData data = MapToFactory.create(5, t);
+
+                               off.preAggregateDenseMap(db, preAV, 1, 2, 4, 5, 
1, data);
+
+                               assertEquals(0.0, preAV[0], 0.0);
+                       }
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void preAggDense8() {
+               try {
+
+                       AOffset off = OffsetFactory.createOffset(new int[] {3});
+                       
+
+                       for(MAP_TYPE t : MAP_TYPE.values()) {
+                               double[] preAV = new double[1];
+                               AMapToData data = MapToFactory.create(5, t);
+
+                               off.preAggregateDenseMap(db, preAV, 1, 2, 0, 2, 
1, data);
+
+                               assertEquals(0.0, preAV[0], 0.0);
+                       }
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void preAggDense9() {
+               try {
+
+                       AOffset off = OffsetFactory.createOffset(new int[] {3});
+               
+                       for(MAP_TYPE t : MAP_TYPE.values()) {
+                               double[] preAV = new double[1];
+                               AMapToData data = MapToFactory.create(5, t);
+
+                               off.preAggregateDenseMap(db, preAV, 1, 2, 0, 3, 
1, data);
+
+                               assertEquals(0.0, preAV[0], 0.0);
+                       }
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+
+       @Test
+       public void preAggDense10() {
+               try {
+
+                       AOffset off = OffsetFactory.createOffset(new int[] 
{3,4,5});
+
+
+                       for(MAP_TYPE t : MAP_TYPE.values()) {
+                               double[] preAV = new double[1];
+                               AMapToData data = MapToFactory.create(5, t);
+
+                               off.preAggregateDenseMap(db, preAV, 1, 2, 0, 4, 
1, data);
+
+                               assertEquals(3.0 , preAV[0], 0.0);
+                       }
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void preAggDenseMultiRow1() {
+               try {
+
+                       AOffset off = OffsetFactory.createOffset(new int[] {3});
+               
+                       for(MAP_TYPE t : MAP_TYPE.values()) {
+                               double[] preAV = new double[2];
+                               AMapToData data = MapToFactory.create(5, t);
+
+                               off.preAggregateDenseMap(db, preAV, 0, 2, 0, 5, 
1, data);
+
+                               assertEquals(3.0, preAV[0], 0.0);
+                               assertEquals(3.0, preAV[1], 0.0);
+                       }
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void preAggDenseMultiRow2() {
+               try {
+
+                       AOffset off = OffsetFactory.createOffset(new int[] {2, 
3, 4});
+               
+
+                       for(MAP_TYPE t : MAP_TYPE.values()) {
+                               double[] preAV = new double[2];
+                               AMapToData data = MapToFactory.create(5, t);
+
+                               off.preAggregateDenseMap(db, preAV, 0, 2, 0, 5, 
1, data);
+
+                               assertEquals(2.0 + 3.0 + 4.0, preAV[0], 0.0);
+                               assertEquals(2.0 + 3.0 + 4.0, preAV[1], 0.0);
+                       }
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void preAggDenseMultiRow3() {
+               try {
+
+                       AOffset off = OffsetFactory.createOffset(new int[] {2, 
3, 4});
+               
+
+                       for(MAP_TYPE t : MAP_TYPE.values()) {
+                               double[] preAV = new double[2];
+                               AMapToData data = MapToFactory.create(5, t);
+
+                               off.preAggregateDenseMap(db, preAV, 0, 2, 0, 4, 
1, data);
+
+                               assertEquals(2.0 + 3.0, preAV[0], 0.0);
+                               assertEquals(2.0 + 3.0, preAV[1], 0.0);
+                       }
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test(expected = Exception.class)
+       public void preAggDenseMultiRow4_nonContinuous() {
+
+               AOffset off = OffsetFactory.createOffset(new int[] {2, 3, 4});
+               
+               DenseBlock spy = spy(db);
+               when(spy.isContiguous()).thenReturn(false);
+
+               double[] preAV = new double[2];
+               AMapToData data = MapToFactory.create(5, MAP_TYPE.CHAR);
+
+               off.preAggregateDenseMap(spy, preAV, 0, 2, 0, 4, 1, data);
+
+               assertEquals(2.0 + 3.0, preAV[0], 0.0);
+               assertEquals(2.0 + 3.0, preAV[1], 0.0);
+
+       }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetReverseTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetReverseTest.java
index 23e63bd7ae..9533e7a22f 100644
--- 
a/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetReverseTest.java
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetReverseTest.java
@@ -30,28 +30,28 @@ public class OffsetReverseTest {
        @Test
        public void reverse1() {
                AOffset off = OffsetFactory.createOffset(new int[] {1, 10, 13, 
14});
-               AOffset rev = AOffset.reverse(16, off);
+               AOffset rev = off.reverse(16);
                OffsetTests.compare(rev, new int[] {0, 2, 3, 4, 5, 6, 7, 8, 9, 
11, 12, 15});
        }
 
        @Test
        public void reverse2() {
                AOffset off = OffsetFactory.createOffset(new int[] {1, 10});
-               AOffset rev = AOffset.reverse(16, off);
+               AOffset rev = off.reverse(16);
                OffsetTests.compare(rev, new int[] {0, 2, 3, 4, 5, 6, 7, 8, 9, 
11, 12, 13, 14, 15});
        }
 
        @Test
        public void reverse3() {
                AOffset off = OffsetFactory.createOffset(new int[] {1});
-               AOffset rev = AOffset.reverse(16, off);
+               AOffset rev = off.reverse(16);
                OffsetTests.compare(rev, new int[] {0, 2, 3, 4, 5, 6, 7, 8, 9, 
10, 11, 12, 13, 14, 15});
        }
 
        @Test
        public void reverse4() {
                AOffset off = OffsetFactory.createOffset(new int[] {1, 10, 13, 
14, 15});
-               AOffset rev = AOffset.reverse(16, off);
+               AOffset rev = off.reverse(16);
                OffsetTests.compare(rev, new int[] {0, 2, 3, 4, 5, 6, 7, 8, 9, 
11, 12});
        }
 
@@ -59,7 +59,7 @@ public class OffsetReverseTest {
        public void reverse4_withCreateMethod() {
                int[] exp = new int[] {0, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12};
                AOffset off = OffsetFactory.createOffset(create(exp, 16));
-               AOffset rev = AOffset.reverse(16, off);
+               AOffset rev = off.reverse(16);
                OffsetTests.compare(rev, exp);
        }
 
@@ -67,7 +67,7 @@ public class OffsetReverseTest {
        public void reverse1_withCreateMethod() {
                int[] exp = new int[] {0, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 15};
                AOffset off = OffsetFactory.createOffset(create(exp, 16));
-               AOffset rev = AOffset.reverse(16, off);
+               AOffset rev = off.reverse(16);
                OffsetTests.compare(rev, exp);
        }
 
@@ -119,7 +119,7 @@ public class OffsetReverseTest {
                                        875, 877, 879, 880, 882, 883, 887, 889, 
891, 892, 894, 896, 897, 898, 899, 901, 902, 903, 905, 906, 908,
                                        911, 912, 913, 917, 919, 920, 922, 926, 
927, 931, 933, 935, 936, 938, 940, 941, 943, 944, 945, 946, 948,
                                        950, 953, 957, 959, 961, 967, 968, 974, 
979, 980, 982, 983, 984, 986, 989, 990, 991, 993, 995, 996, 997});
-               AOffset rev = AOffset.reverse(1000, off);
+               AOffset rev = off.reverse(1000);
                // System.out.println(off);
                // System.out.println(rev);
                assertEquals(0, rev.getOffsetIterator().value());
@@ -127,7 +127,7 @@ public class OffsetReverseTest {
 
        private void test(int[] missing, int max) {
                AOffset off = OffsetFactory.createOffset(create(missing, max));
-               AOffset rev = AOffset.reverse(max, off);
+               AOffset rev = off.reverse(max);
                OffsetTests.compare(rev, missing);
        }
 
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetSingleTests.java
 
b/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetSingleTests.java
index 537a445d35..f52631851b 100644
--- 
a/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetSingleTests.java
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetSingleTests.java
@@ -19,7 +19,6 @@
 
 package org.apache.sysds.test.component.compress.offset;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -134,10 +133,13 @@ public class OffsetSingleTests {
                OffsetTestUtil.getOffset(new int[] {1, 2, 3, 2}, OFF_TYPE.BYTE);
        }
 
-       @Test
+       @Test(expected = DMLCompressionException.class)
+       public void emptyOffsetToFirst() {
+               OffsetFactory.createOffset(new int[0]).getOffsetToFirst();
+       }
+
+       @Test(expected = DMLCompressionException.class)
        public void emptyOffsetToLast() {
-               int a = OffsetFactory.createOffset(new 
int[0]).getOffsetToFirst();
-               int b = OffsetFactory.createOffset(new 
int[0]).getOffsetToLast();
-               assertEquals(a, b);
+               OffsetFactory.createOffset(new int[0]).getOffsetToLast();
        }
 }
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetTestPreAggregate.java
 
b/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetTestPreAggregate.java
deleted file mode 100644
index 554b0e0ac3..0000000000
--- 
a/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetTestPreAggregate.java
+++ /dev/null
@@ -1,427 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.test.component.compress.offset;
-
-import static org.junit.Assert.fail;
-
-import java.util.ArrayList;
-import java.util.Collection;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.math3.util.Precision;
-import org.apache.sysds.runtime.compress.colgroup.offset.AOffset;
-import 
org.apache.sysds.runtime.compress.colgroup.offset.OffsetFactory.OFF_TYPE;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.apache.sysds.test.TestUtils;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-@RunWith(value = Parameterized.class)
-public abstract class OffsetTestPreAggregate {
-       protected static final Log LOG = 
LogFactory.getLog(OffsetTestPreAggregate.class.getName());
-
-       protected static final double eps = 0.00001;
-
-       protected final int[] data;
-       protected final AOffset a;
-
-       protected final MatrixBlock leftM;
-
-       // sum of indexes row 1.
-       protected final double[] s;
-
-       @Parameters
-       public static Collection<Object[]> data() {
-               ArrayList<Object[]> tests = new ArrayList<>();
-               // It is assumed that the input is in sorted order, all values 
are positive and there are no duplicates.
-               // note that each tests allocate an matrix of two rows, and the 
last value length.
-               // therefore don't make it to large.
-               for(OFF_TYPE t : OFF_TYPE.values()) {
-                       tests.add(new Object[] {new int[] {1, 2}, t});
-                       tests.add(new Object[] {new int[] {2, 142}, t});
-                       tests.add(new Object[] {new int[] {142, 421}, t});
-                       tests.add(new Object[] {new int[] {1, 1023}, t});
-                       tests.add(new Object[] {new int[] {1023, 1024}, t});
-                       tests.add(new Object[] {new int[] {1023}, t});
-                       tests.add(new Object[] {new int[] {0, 1, 2, 3, 4, 5}, 
t});
-                       tests.add(new Object[] {new int[] {0}, t});
-                       tests.add(new Object[] {new int[] {500}, t});
-                       tests.add(new Object[] {new int[] {1442}, t});
-                       tests.add(new Object[] {new int[] {0, 256}, t});
-                       tests.add(new Object[] {new int[] {0, 254}, t});
-                       tests.add(new Object[] {new int[] {0, 256 * 2}, t});
-                       tests.add(new Object[] {new int[] {0, 255 * 2}, t});
-                       tests.add(new Object[] {new int[] {0, 254 * 2}, t});
-                       tests.add(new Object[] {new int[] {0, 510, 765}, t});
-                       tests.add(new Object[] {new int[] {0, 254 * 3}, t});
-                       tests.add(new Object[] {new int[] {0, 255, 255 * 2, 255 
* 3}, t});
-                       tests.add(new Object[] {new int[] {0, 255 * 2, 255 * 
3}, t});
-                       tests.add(new Object[] {new int[] {0, 255 * 2, 255 * 3, 
255 * 10}, t});
-                       tests.add(new Object[] {new int[] {0, 255 * 3}, t});
-                       tests.add(new Object[] {new int[] {0, 255 * 4}, t});
-                       tests.add(new Object[] {new int[] {0, 256 * 3}, t});
-                       tests.add(new Object[] {new int[] {255 * 3, 255 * 5}, 
t});
-                       tests.add(new Object[] {new int[] {0, 1, 2, 3, 255 * 4, 
1500}, t});
-                       tests.add(new Object[] {new int[] {0, 1, 2, 3, 4, 5}, 
t});
-                       tests.add(new Object[] {new int[] {0, 1, 2, 3, 4, 5, 
125, 142, 161, 1661, 2314}, t});
-                       tests.add(new Object[] {new int[] {51, 4251, 
Character.MAX_VALUE}, t});
-                       tests.add(new Object[] {new int[] {0, 
Character.MAX_VALUE + 10}, t});
-                       tests.add(new Object[] {new int[] {0, 
Character.MAX_VALUE + 10, (Character.MAX_VALUE + 10) * 2}, t});
-                       tests.add(new Object[] {new int[] {4, 6, 11, 14, 24, 
34, 38, 40, 43, 46, 47, 52, 53, 64, 69, 70, 71, 72, 76,
-                               77, 80, 83, 94, 109, 112, 114, 125, 133, 138, 
142, 144, 147, 158, 159, 162, 167, 171, 177, 186, 198, 203,
-                               204, 207, 209, 212, 219, 221, 224, 225, 227, 
229, 231, 234, 242, 252, 253, 255, 257, 263, 271, 276, 277,
-                               288, 296, 297, 300, 306, 313, 320, 321, 336, 
358, 365, 385, 387, 391, 397, 399, 408, 414, 416, 417, 419,
-                               425, 429, 438, 441, 445, 459, 477, 482, 483, 
499}, t});
-               }
-               return tests;
-       }
-
-       public OffsetTestPreAggregate(int[] data, OFF_TYPE type) {
-               this.data = data;
-               this.a = OffsetTestUtil.getOffset(data, type);
-               this.leftM = TestUtils.generateTestMatrixBlock(4, 
data[data.length - 1] + 100, -1, 100, 1.0, 1342);
-               this.s = sumIndexes();
-       }
-
-       @Test
-       public void preAggByteMapFirstRow() {
-               try {
-                       preAggMapRow(0);
-               }
-               catch(Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void preAggByteMapSecondRow() {
-               try {
-                       preAggMapRow(1);
-               }
-               catch(Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       protected abstract void preAggMapRow(int row);
-
-       protected void verifyPreAggMapRowByte(double[] preAV, int row) {
-
-               if(preAV[0] != s[row])
-                       fail("\nThe preaggregate result is not the sum! : " + 
preAV[0] + " vs " + s[row]);
-       }
-
-       @Test
-       public void preAggByteMapFirstRowAll1() {
-               try {
-                       preAggMapRowAll1(0);
-               }
-               catch(Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void preAggByteMapSecondRowAll1() {
-               try {
-                       preAggMapRowAll1(1);
-               }
-               catch(Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       protected abstract void preAggMapRowAll1(int row);
-
-       protected void verifyPreAggMapRowAllBytes1(double[] preAV, int row) {
-               if(preAV[0] != 0)
-                       fail("\naggregate to wrong index");
-               if(preAV[1] != s[row])
-                       fail("\nThe preaggregate result is not the sum! : " + 
preAV[0] + " vs " + s[row]);
-       }
-
-       @Test
-       public void preAggByteMapFirstRowOne1() {
-               try {
-                       preAggMapRowOne1(0);
-               }
-               catch(Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void preAggByteMapSecondRowOne1() {
-               try {
-                       preAggMapRowOne1(1);
-               }
-               catch(Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       protected abstract void preAggMapRowOne1(int row);
-
-       protected void verifyPreAggMapRowOne1(double[] preAV, int row) {
-               double v = leftM.get(row, data[1]);
-               if(preAV[1] != v)
-                       fail("\naggregate to wrong index");
-               if(!Precision.equals(preAV[0], s[row] - v, eps))
-                       fail("\nThe preaggregate result is not the sum! : " + 
preAV[0] + " vs " + (s[row] - v));
-       }
-
-       @Test
-       public abstract void preAggMapAllRowsOne1();
-
-       protected void verifyPreAggAllOne1(double[] preAV) {
-               double v1 = leftM.get(0, data[1]);
-               double v2 = leftM.get(1, data[1]);
-               if(preAV[1] != v1)
-                       fail("\naggregate to wrong index");
-               if(preAV[3] != v2)
-                       fail("\naggregate to wrong index");
-               if(!Precision.equals(preAV[0], s[0] - v1, eps))
-                       fail("\nThe preaggregate result is not the sum! : " + 
preAV[0] + " vs " + (s[0] - v1));
-               if(!Precision.equals(preAV[2], s[1] - v2, eps))
-                       fail("\nThe preaggregate result is not the sum! : " + 
preAV[2] + " vs " + (s[1] - v2));
-       }
-
-       @Test
-       public void preAggByteMapFirstSubOfRow() {
-               preAggMapSubOfRow(0);
-       }
-
-       @Test
-       public void preAggByteMapSecondSubOfRow() {
-               preAggMapSubOfRow(1);
-       }
-
-       protected abstract void preAggMapSubOfRow(int row);
-
-       protected void verifyPreAggMapSubOfRow(double[] preAV, int row) {
-               double v = leftM.get(row, data[1]);
-               double v2 = leftM.get(row, data[data.length - 1]);
-               if(preAV[1] != v)
-                       fail("\naggregate to wrong index");
-               if(!Precision.equals(preAV[0], s[row] - v - v2, eps))
-                       fail("\nThe preaggregate result is not the sum! : " + 
preAV[0] + " vs " + (s[row] - v - v2));
-       }
-
-       @Test
-       public void preAggByteMapFirstSubOfRowV2() {
-               preAggMapSubOfRowV2(0, 2);
-       }
-
-       @Test
-       public void preAggByteMapSecondSubOfRowV2() {
-               preAggMapSubOfRowV2(1, 2);
-       }
-
-       @Test
-       public void preAggByteMapFirstSubOfRowV2V2() {
-               preAggMapSubOfRowV2(0, 244);
-       }
-
-       @Test
-       public void preAggByteMapSecondSubOfRowV2V2() {
-               preAggMapSubOfRowV2(1, 244);
-       }
-
-       protected abstract void preAggMapSubOfRowV2(int row, int nVal);
-
-       protected void verifyPreAggMapSubOfRowV2(double[] preAV, int row) {
-               double v = leftM.get(row, data[1]);
-               double v2 = leftM.get(row, data[data.length - 1]) + 
leftM.get(row, data[data.length - 2]);
-               if(preAV[1] != v)
-                       fail("\naggregate to wrong index");
-               if(!Precision.equals(preAV[0], s[row] - v - v2, eps))
-                       fail("\nThe preaggregate result is not the sum! : " + 
preAV[0] + " vs " + (s[row] - v - v2));
-       }
-
-       @Test
-       public void preAggByteMapFirstOutOfRangeBefore() {
-               preAggMapOutOfRangeBefore(0);
-       }
-
-       @Test
-       public void preAggByteMapSecondOutOfRangeBefore() {
-               preAggMapOutOfRangeBefore(1);
-       }
-
-       protected abstract void preAggMapOutOfRangeBefore(int row);
-
-       @Test
-       public void preAggByteMapFirstOutOfRangeAfter() {
-               preAggMapOutOfRangeAfter(0);
-       }
-
-       @Test
-       public void preAggByteMapSecondOutOfRangeAfter() {
-               preAggMapOutOfRangeAfter(1);
-       }
-
-       protected abstract void preAggMapOutOfRangeAfter(int row);
-
-       @Test
-       public void multiRowPreAggRange01() {
-               if(data.length > 2) {
-                       double[] agg = multiRowPreAggRangeSafe(1, 3);
-                       compareMultiRowAgg(agg, 1, 3);
-               }
-       }
-
-       @Test
-       public void multiRowPreAggRange02() {
-               if(data.length > 2) {
-                       double[] agg = multiRowPreAggRangeSafe(2, 4);
-                       compareMultiRowAgg(agg, 2, 4);
-               }
-       }
-
-       @Test
-       public void multiRowPreAggRange03() {
-               if(data.length > 2) {
-                       double[] agg = multiRowPreAggRangeSafe(0, 4);
-                       compareMultiRowAgg(agg, 0, 4);
-               }
-       }
-
-       @Test
-       public void multiRowPreAggRange04() {
-               if(data.length > 2) {
-                       double[] agg = multiRowPreAggRangeSafe(0, 3);
-                       compareMultiRowAgg(agg, 0, 3);
-               }
-       }
-
-       protected double[] multiRowPreAggRangeSafe(int rl, int ru) {
-               try {
-                       return multiRowPreAggRange(rl, ru);
-               }
-               catch(Exception e) {
-                       e.printStackTrace();
-                       fail(e.toString());
-                       return null;
-               }
-       }
-
-       protected abstract double[] multiRowPreAggRange(int rl, int ru);
-
-       protected void compareMultiRowAgg(double[] agg, int rl, int ru) {
-               for(int r = rl, of = 0; r < ru; r++, of++) {
-                       double v = leftM.get(r, data[1]);
-
-                       if(agg[of * 2 + 1] != v)
-                               fail("\naggregate to wrong index");
-                       if(!Precision.equals(agg[of * 2], s[r] - v, eps))
-                               fail("\naggregate result is not sum minus 
value:" + agg[of * 2] + " vs " + (s[r] - v));
-               }
-       }
-
-       @Test
-       public void multiRowPreAggRangeBeforeLast01() {
-               try {
-                       if(data.length > 2) {
-                               double[] agg = multiRowPreAggRangeBeforeLast(1, 
3);
-                               compareMultiRowAggBeforeLast(agg, 1, 3);
-                       }
-               }
-               catch(Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void multiRowPreAggRangeBeforeLast02() {
-               try {
-                       if(data.length > 2) {
-                               double[] agg = multiRowPreAggRangeBeforeLast(2, 
4);
-                               compareMultiRowAggBeforeLast(agg, 2, 4);
-                       }
-               }
-               catch(Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void multiRowPreAggRangeBeforeLast03() {
-               try {
-                       if(data.length > 2) {
-                               double[] agg = multiRowPreAggRangeBeforeLast(0, 
4);
-                               compareMultiRowAggBeforeLast(agg, 0, 4);
-                       }
-               }
-               catch(Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void multiRowPreAggRangeBeforeLast04() {
-               try {
-                       if(data.length > 2) {
-                               double[] agg = multiRowPreAggRangeBeforeLast(0, 
3);
-                               compareMultiRowAggBeforeLast(agg, 0, 3);
-                       }
-               }
-               catch(Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       protected abstract double[] multiRowPreAggRangeBeforeLast(int rl, int 
ru);
-
-       protected void compareMultiRowAggBeforeLast(double[] agg, int rl, int 
ru) {
-               for(int r = rl, of = 0; r < ru; r++, of++) {
-                       double v = leftM.get(r, data[1]);
-                       double v2 = leftM.get(r, data[data.length - 1]);
-                       if(agg[of * 2 + 1] != v)
-                               fail("\naggregate to wrong index");
-                       if(!Precision.equals(agg[of * 2], s[r] - v - v2, eps))
-                               fail("\naggregate result is not sum minus 
value:" + agg[of * 2] + " vs " + (s[r] - v - v2));
-               }
-       }
-
-       private final double[] sumIndexes() {
-               double[] ret = new double[leftM.getNumRows()];
-               double[] lmv = leftM.getDenseBlockValues();
-               for(int j = 0; j < leftM.getNumRows(); j++) {
-                       final int off = j * leftM.getNumColumns();
-                       for(int i = 0; i < data.length; i++)
-                               ret[j] += lmv[data[i] + off];
-               }
-               return ret;
-       }
-
-}
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetTestPreAggregateSparse.java
 
b/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetTestPreAggregateSparse.java
deleted file mode 100644
index 1298db4f07..0000000000
--- 
a/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetTestPreAggregateSparse.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.test.component.compress.offset;
-
-import static org.junit.Assert.fail;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-
-import org.apache.commons.lang3.NotImplementedException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.math3.util.Precision;
-import org.apache.sysds.runtime.compress.colgroup.offset.AOffset;
-import 
org.apache.sysds.runtime.compress.colgroup.offset.OffsetFactory.OFF_TYPE;
-import org.apache.sysds.runtime.data.SparseBlock;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.apache.sysds.test.TestUtils;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-@RunWith(value = Parameterized.class)
-public abstract class OffsetTestPreAggregateSparse {
-       protected static final Log LOG = 
LogFactory.getLog(OffsetTestPreAggregateSparse.class.getName());
-
-       protected static final double eps = 0.00001;
-
-       protected final int[] data;
-       protected final AOffset a;
-
-       protected final MatrixBlock leftM;
-
-       // sum of indexes row 1.
-       protected final double[] s;
-
-       @Parameters
-       public static Collection<Object[]> data() {
-               ArrayList<Object[]> tests = new ArrayList<>();
-               // It is assumed that the input is in sorted order, all values 
are positive and there are no duplicates.
-               // note that each tests allocate an matrix of two rows, and the 
last value length.
-               // therefore don't make it to large.
-               for(OFF_TYPE t : OFF_TYPE.values()) {
-                       tests.add(new Object[] {new int[] {1, 2}, t});
-                       tests.add(new Object[] {new int[] {2, 142}, t});
-                       tests.add(new Object[] {new int[] {142, 421}, t});
-                       tests.add(new Object[] {new int[] {1, 1023}, t});
-                       tests.add(new Object[] {new int[] {1023, 1024}, t});
-                       tests.add(new Object[] {new int[] {1023}, t});
-                       tests.add(new Object[] {new int[] {0, 1, 2, 3, 4, 5}, 
t});
-                       tests.add(new Object[] {new int[] {0}, t});
-                       tests.add(new Object[] {new int[] {500}, t});
-                       tests.add(new Object[] {new int[] {1442}, t});
-                       tests.add(new Object[] {new int[] {0, 256}, t});
-                       tests.add(new Object[] {new int[] {0, 254}, t});
-                       tests.add(new Object[] {new int[] {0, 256 * 2}, t});
-                       tests.add(new Object[] {new int[] {0, 255 * 2}, t});
-                       tests.add(new Object[] {new int[] {0, 254 * 2}, t});
-                       tests.add(new Object[] {new int[] {0, 510, 765}, t});
-                       tests.add(new Object[] {new int[] {0, 254 * 3}, t});
-                       tests.add(new Object[] {new int[] {0, 255, 255 * 2, 255 
* 3}, t});
-                       tests.add(new Object[] {new int[] {0, 255 * 2, 255 * 
3}, t});
-                       tests.add(new Object[] {new int[] {0, 255 * 2, 255 * 3, 
255 * 10}, t});
-                       tests.add(new Object[] {new int[] {0, 255 * 3}, t});
-                       tests.add(new Object[] {new int[] {0, 255 * 4}, t});
-                       tests.add(new Object[] {new int[] {0, 256 * 3}, t});
-                       tests.add(new Object[] {new int[] {255 * 3, 255 * 5}, 
t});
-                       tests.add(new Object[] {new int[] {0, 1, 2, 3, 255 * 4, 
1500}, t});
-                       tests.add(new Object[] {new int[] {0, 1, 2, 3, 4, 5}, 
t});
-                       tests.add(new Object[] {new int[] {0, 1, 2, 3, 4, 5, 
125, 142, 161, 1661, 2314}, t});
-                       tests.add(new Object[] {new int[] {4, 6, 11, 14, 24, 
34, 38, 40, 43, 46, 47, 52, 53, 64, 69, 70, 71, 72, 76,
-                               77, 80, 83, 94, 109, 112, 114, 125, 133, 138, 
142, 144, 147, 158, 159, 162, 167, 171, 177, 186, 198, 203,
-                               204, 207, 209, 212, 219, 221, 224, 225, 227, 
229, 231, 234, 242, 252, 253, 255, 257, 263, 271, 276, 277,
-                               288, 296, 297, 300, 306, 313, 320, 321, 336, 
358, 365, 385, 387, 391, 397, 399, 408, 414, 416, 417, 419,
-                               425, 429, 438, 441, 445, 459, 477, 482, 483, 
499}, t});
-               }
-               return tests;
-       }
-
-       public OffsetTestPreAggregateSparse(int[] data, OFF_TYPE type) {
-               this.data = data;
-               this.a = OffsetTestUtil.getOffset(data, type);
-               this.leftM = TestUtils.generateTestMatrixBlock(2, 
data[data.length - 1] + 100, 100, 200, 0.35, 23152);
-               this.s = sumIndexes();
-       }
-
-       @Test
-       public void preAggByteMapFirstRow() {
-               preAggMapRow(0);
-       }
-
-       @Test
-       public void preAggByteMapSecondRow() {
-               preAggMapRow(1);
-       }
-
-       protected abstract void preAggMapRow(int row);
-
-       protected void verifyPreAggMapRow(double[] preAV, int row) {
-               if(preAV[0] != s[row]) {
-                       fail("\nThe preaggregate result is not the sum! : " + 
a.getClass().getSimpleName() + "  " + preAV[0] + " vs "
-                               + s[row] + "\n full agg: " + 
Arrays.toString(preAV));
-               }
-       }
-
-       @Test(expected = NotImplementedException.class)
-       public abstract void preAggMapAllRows();
-
-       protected void verifyPreAggMapAllRow(double[] preAV) {
-               if(preAV[1] != 0)
-                       fail("\naggregate to wrong index");
-               if(preAV[3] != 0)
-                       fail("\naggregate to wrong index");
-               if(!Precision.equals(preAV[0], s[0], eps))
-                       fail("\nThe preaggregate result is not the sum!: " + 
preAV[0] + "vs" + s[0]);
-               if(!Precision.equals(preAV[2], s[1], eps))
-                       fail("\nThe preaggregate result is not the sum!: " + 
preAV[2] + "vs" + s[1]);
-       }
-
-       private final double[] sumIndexes() {
-               double[] ret = new double[leftM.getNumRows()];
-               SparseBlock sb = leftM.getSparseBlock();
-               for(int j = 0; j < leftM.getNumRows(); j++) {
-                       if(sb.isEmpty(j))
-                               continue;
-                       final int apos = sb.pos(j);
-                       final int alen = sb.size(j) + apos;
-                       final int[] aix = sb.indexes(j);
-                       final double[] avals = sb.values(j);
-                       int dx = 0;
-                       for(int i = apos; i < alen && dx < data.length; i++) {
-                               while(dx < data.length && data[dx] < aix[i])
-                                       dx++;
-                               if(dx < data.length && data[dx] == aix[i])
-                                       ret[j] += avals[i];
-                       }
-               }
-               return ret;
-       }
-
-}
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetTests.java
 
b/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetTests.java
index 73c26e1c26..2b48c5a853 100644
--- 
a/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetTests.java
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetTests.java
@@ -124,12 +124,21 @@ public class OffsetTests {
                }
                tests.add(new Object[] {new int[] {Character.MAX_VALUE, 
Character.MAX_VALUE * 2}, OFF_TYPE.CHAR});
                tests.add(new Object[] {new int[] {0, Character.MAX_VALUE, 
Character.MAX_VALUE * 2}, OFF_TYPE.CHAR});
-               tests.add(new Object[] {new int[] {1, Character.MAX_VALUE * 2 + 
3, Character.MAX_VALUE * 4 + 4,
-                       Character.MAX_VALUE * 16 + 4}, OFF_TYPE.CHAR});
+               tests.add(new Object[] {
+                       new int[] {1, Character.MAX_VALUE * 2 + 3, 
Character.MAX_VALUE * 4 + 4, Character.MAX_VALUE * 16 + 4},
+                       OFF_TYPE.CHAR});
                return tests;
        }
 
-       private static int[] gen(int i, int j, int seed) {
+       /**
+        * Generate a valid offset
+        * 
+        * @param i    Number of offsets
+        * @param j    distance to sample from between each
+        * @param seed The seed
+        * @return a valid offset
+        */
+       public static int[] gen(int i, int j, int seed) {
                int[] a = new int[i];
                Random r = new Random(seed);
                int o = r.nextInt(j);
@@ -269,13 +278,11 @@ public class OffsetTests {
                                switch(type) {
                                        case BYTE:
                                        case UBYTE:
-                                               final int correctionByte = 
OffsetFactory.correctionByte(data[data.length - 1] - data[0],
-                                                       data.length);
+                                               final int correctionByte = 
OffsetFactory.correctionByte(data[data.length - 1] - data[0], data.length);
                                                estimatedSize = 
OffsetByte.estimateInMemorySize(data.length + correctionByte);
                                                break;
                                        case CHAR:
-                                               final int correctionChar = 
OffsetFactory.correctionChar(data[data.length - 1] - data[0],
-                                                       data.length);
+                                               final int correctionChar = 
OffsetFactory.correctionChar(data[data.length - 1] - data[0], data.length);
                                                estimatedSize = 
OffsetChar.estimateInMemorySize(data.length + correctionChar);
                                                break;
                                        default:
@@ -568,11 +575,28 @@ public class OffsetTests {
                slice(100, 10000);
        }
 
+       @Test
+       public void verify() {
+               o.verify(o.getSize());
+       }
+
        @Test
        public void slice1to4() {
                slice(1, 4);
        }
 
+       @Test
+       public void slice() {
+               if(data.length > 1) {
+                       int n = data[data.length - 1];
+                       for(int i = 0; i < n && i < 100; i++) {
+                               for(int j = i; j < n + 1 && j < 100; j++) {
+                                       slice(i, j, false);
+                               }
+                       }
+               }
+       }
+
        @Test
        public void sliceAllSpecific() {
                if(data.length > 1)
@@ -580,13 +604,19 @@ public class OffsetTests {
        }
 
        private void slice(int l, int u) {
+               slice(l, u, false);
+       }
+
+       private void slice(int l, int u, boolean str) {
                try {
 
                        OffsetSliceInfo a = o.slice(l, u);
-                       a.offsetSlice.toString();
+                       if(str)
+                               a.offsetSlice.toString();
                        if(data.length > 0 && data[data.length - 1] > u) {
 
                                AIterator it = a.offsetSlice.getIterator();
+                               a.offsetSlice.verify(a.uIndex - a.lIndex);
                                int i = 0;
                                while(i < data.length && data[i] < l)
                                        i++;
@@ -601,7 +631,7 @@ public class OffsetTests {
                }
                catch(Exception e) {
                        e.printStackTrace();
-                       fail("Failed to slice first 100");
+                       fail("Failed to slice range: " + l + " -> " + u + " 
in:\n" + o);
                }
 
        }
@@ -676,8 +706,7 @@ public class OffsetTests {
        public void compareAppend_ot() {
                if(data.length > 0) {
                        final int ll = data[data.length - 1] + 1000;
-                       final AOffset r1 = o.appendN(new AOffsetsGroup[] {new 
Con(o), new Con(OffsetFactory.createOffset(data))},
-                               ll);
+                       final AOffset r1 = o.appendN(new AOffsetsGroup[] {new 
Con(o), new Con(OffsetFactory.createOffset(data))}, ll);
                        final AOffset r2 = o.append(o, ll);
                        compare(r1, r2);
                }
@@ -753,10 +782,28 @@ public class OffsetTests {
        }
 
        @Test
-       public void getLength(){
+       public void getLength() {
                assertTrue(o.getLength() + 1 >= data.length);
        }
 
+       @Test(expected = Exception.class)
+       public void invalidReverse() {
+               int last = o.getOffsetToLast();
+               o.reverse(last - 1);
+       }
+
+       @Test
+       public void constructSkipList() {
+               try {
+
+                       o.constructSkipList();
+                       o.constructSkipList();
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
 
        private void compareMoved(AOffset o, int[] v, int m) {
                AIterator i = o.getIterator();
@@ -770,7 +817,7 @@ public class OffsetTests {
                                        + " but was :" + o.toString());
                        for(int j = 1; j < v.length; j++) {
                                i.next();
-                               if(v[j] + m  != i.value())
+                               if(v[j] + m != i.value())
                                        fail("incorrect result using : " + 
o.getClass().getSimpleName() + " expected: " + Arrays.toString(v)
                                                + " but was :" + o.toString());
                        }
diff --git 
a/src/test/java/org/apache/sysds/test/component/frame/array/CustomArrayTests.java
 
b/src/test/java/org/apache/sysds/test/component/frame/array/CustomArrayTests.java
index 66e32fe6bc..642b3b1b84 100644
--- 
a/src/test/java/org/apache/sysds/test/component/frame/array/CustomArrayTests.java
+++ 
b/src/test/java/org/apache/sysds/test/component/frame/array/CustomArrayTests.java
@@ -873,7 +873,7 @@ public class CustomArrayTests {
                try {
                        Array<Long> a = null;
                        Array<Long> b = new DDCArray<>(new LongArray(new long[] 
{1, 2, 3, 4}), //
-                               MapToFactory.create(10, new int[] {0, 0, 0, 0, 
1, 1, 1, 2, 2, 3, 3}, 4));
+                               MapToFactory.create(new int[] {0, 0, 0, 0, 1, 
1, 1, 2, 2, 3, 3}, 4));
                        Array<Long> c = ArrayFactory.set(a, b, 10, 19, 20);
                        assertEquals((long) c.get(0), 0L);
                        assertEquals((long) c.get(10), 1L);
@@ -889,7 +889,7 @@ public class CustomArrayTests {
                try {
                        Array<Long> a = null;
                        Array<Long> b = new DDCArray<>(new OptionalArray<>(new 
Long[] {1L, 2L, 3L, 4L}), //
-                               MapToFactory.create(10, new int[] {0, 0, 0, 0, 
1, 1, 1, 2, 2, 3, 3}, 4));
+                               MapToFactory.create(new int[] {0, 0, 0, 0, 1, 
1, 1, 2, 2, 3, 3}, 4));
                        Array<Long> c = ArrayFactory.set(a, b, 10, 19, 20);
                        assertEquals(c.get(0), null);
                        assertEquals((long) c.get(10), 1L);

Reply via email to