This is an automated email from the ASF dual-hosted git repository.

janniklinde pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new f0af3ece02 [SYSTEMDS-3779] Add LZW ColGroup
f0af3ece02 is described below

commit f0af3ece02d2f6e0ea6f89667bd12b62184a9659
Author: fjobs <[email protected]>
AuthorDate: Tue Mar 17 15:17:39 2026 +0100

    [SYSTEMDS-3779] Add LZW ColGroup
    
    Closes #2398
    
    Co-authored-by: Luka Dekanozishvili <[email protected]>
    Co-authored-by: Annika Lehmann <[email protected]>
---
 .../sysds/runtime/compress/colgroup/AColGroup.java |    6 +-
 .../runtime/compress/colgroup/ColGroupDDC.java     |   15 +-
 .../runtime/compress/colgroup/ColGroupDDCLZW.java  | 1011 ++++++++++++++++
 .../runtime/compress/colgroup/ColGroupFactory.java |   12 +
 .../runtime/compress/colgroup/ColGroupIO.java      |    2 +
 .../compress/colgroup/scheme/DDCLZWScheme.java     |   38 +
 .../compress/colgroup/scheme/DDCLZWSchemeMC.java   |  222 ++++
 .../compress/colgroup/scheme/DDCLZWSchemeSC.java   |  360 ++++++
 .../compress/estim/CompressedSizeInfoColGroup.java |   11 +-
 .../colgroup/DDCLZW/ColGroupDDCLZWBenchmark.java   |  316 +++++
 .../colgroup/DDCLZW/ColGroupDDCLZWTest.java        | 1238 ++++++++++++++++++++
 .../colgroup/DDCLZW/ColGroupDDCLZWTestUtils.java   |   94 ++
 12 files changed, 3313 insertions(+), 12 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java
index 003703f86a..fbe04c732e 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java
@@ -65,10 +65,10 @@ public abstract class AColGroup implements Serializable {
 
        /** Public super types of compression ColGroups supported */
        public static enum CompressionType {
-               UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCFOR, DDCFOR, 
DeltaDDC, LinearFunctional;
+               UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCFOR, DDCFOR, 
DeltaDDC, DDCLZW, LinearFunctional;
 
                public boolean isDense() {
-                       return this == DDC || this == CONST || this == DDCFOR 
|| this == DDCFOR;
+                       return this == DDC || this == CONST || this == DDCFOR 
|| this == DDCFOR || this == DDCLZW;
                }
 
                public boolean isConst() {
@@ -86,7 +86,7 @@ public abstract class AColGroup implements Serializable {
         * Protected such that outside the ColGroup package it should be 
unknown which specific subtype is used.
         */
        protected static enum ColGroupType {
-               UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCSingle, 
SDCSingleZeros, SDCZeros, SDCFOR, DDCFOR, DeltaDDC,
+               UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCSingle, 
SDCSingleZeros, SDCZeros, SDCFOR, DDCFOR, DDCLZW, DeltaDDC,
                LinearFunctional;
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
index ac4defcabd..a3fdf1fc89 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
@@ -1112,13 +1112,13 @@ public class ColGroupDDC extends APreAgg implements 
IMapToDataGroup {
        public AColGroup convertToDeltaDDC() {
                int numCols = _colIndexes.size();
                int numRows = _data.size();
-               
+
                DblArrayCountHashMap map = new 
DblArrayCountHashMap(Math.max(numRows, 64));
                double[] rowDelta = new double[numCols];
                double[] prevRow = new double[numCols];
                DblArray dblArray = new DblArray(rowDelta);
                int[] rowToDictId = new int[numRows];
-               
+
                double[] dictVals = _dict.getValues();
 
                for(int i = 0; i < numRows; i++) {
@@ -1134,13 +1134,13 @@ public class ColGroupDDC extends APreAgg implements 
IMapToDataGroup {
                                        prevRow[j] = val;
                                }
                        }
-                       
+
                        rowToDictId[i] = map.increment(dblArray);
                }
-               
+
                if(map.size() == 0)
                        return new ColGroupEmpty(_colIndexes);
-               
+
                ACount<DblArray>[] vals = map.extractValues();
                final int nVals = vals.length;
                final double[] dictValues = new double[nVals * numCols];
@@ -1153,7 +1153,7 @@ public class ColGroupDDC extends APreAgg implements 
IMapToDataGroup {
                        oldIdToNewId[dac.id] = i;
                        idx += numCols;
                }
-               
+
                DeltaDictionary deltaDict = new DeltaDictionary(dictValues, 
numCols);
                AMapToData newData = MapToFactory.create(numRows, nVals);
                for(int i = 0; i < numRows; i++) {
@@ -1162,4 +1162,7 @@ public class ColGroupDDC extends APreAgg implements 
IMapToDataGroup {
                return ColGroupDeltaDDC.create(_colIndexes, deltaDict, newData, 
null);
        }
 
+       public AColGroup convertToDDCLZW() {
+               return ColGroupDDCLZW.create(_colIndexes, _dict, _data, null);
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDCLZW.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDCLZW.java
new file mode 100644
index 0000000000..8748f628bd
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDCLZW.java
@@ -0,0 +1,1011 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress.colgroup;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
+import org.apache.sysds.runtime.compress.DMLCompressionException;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupUtils.P;
+import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
+import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory;
+import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary;
+import 
org.apache.sysds.runtime.compress.colgroup.dictionary.MatrixBlockDictionary;
+import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
+import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
+import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
+import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
+import org.apache.sysds.runtime.compress.colgroup.scheme.DDCLZWScheme;
+import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme;
+import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator;
+import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
+import org.apache.sysds.runtime.compress.estim.EstimationFactors;
+import org.apache.sysds.runtime.compress.estim.encoding.IEncode;
+import org.apache.sysds.runtime.data.DenseBlock;
+import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.data.SparseBlockMCSR;
+import org.apache.sysds.runtime.functionobjects.Builtin;
+import org.apache.sysds.runtime.functionobjects.Minus;
+import org.apache.sysds.runtime.functionobjects.Plus;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
+import org.apache.sysds.runtime.matrix.operators.RightScalarOperator;
+import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
+import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
+import org.apache.sysds.utils.MemoryEstimates;
+import shaded.parquet.it.unimi.dsi.fastutil.ints.IntArrayList;
+import shaded.parquet.it.unimi.dsi.fastutil.longs.Long2IntLinkedOpenHashMap;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Stack;
+import java.util.HashMap;
+import java.util.NoSuchElementException;
+
+/**
+ * Class to encapsulate information about a column group that is encoded with 
dense dictionary encoding (DDC) whose
+ * mapping vector is additionally lzw compressed. Idea: - DDCLZW stores the 
mapping vector exclusively in compressed
+ * form. - No persistent MapToData cache is maintained. - Sequential 
operations decode on-the-fly, while operations
+ * requiring random access explicitly materialize and fall back to DDC.
+ */
+public class ColGroupDDCLZW extends APreAgg implements IMapToDataGroup {
+       private static final long serialVersionUID = -5769772089913918987L;
+
+       /**
+        * Stores the LZW-compressed representation of data mapping.
+        */
+       private final int[] _dataLZW;
+       private final int _nRows;
+       private final int _nUnique;
+
+       private ColGroupDDCLZW(IColIndex colIndexes, IDictionary dict, 
AMapToData data, int[] cachedCounts) {
+               super(colIndexes, dict, cachedCounts);
+               _nRows = data.size();
+               _nUnique = dict.getNumberOfValues(colIndexes.size());
+               _dataLZW = compress(data);
+
+               if(CompressedMatrixBlock.debug) {
+                       if(getNumValues() == 0)
+                               throw new DMLCompressionException("Invalid 
construction with empty dictionary");
+                       if(_nRows == 0)
+                               throw new DMLCompressionException("Invalid 
length of the data. is zero");
+                       if(data.getUnique() != 
dict.getNumberOfValues(colIndexes.size()))
+                               throw new DMLCompressionException(
+                                       "Invalid map to dict Map has:" + 
data.getUnique() + " while dict has " +
+                                               
dict.getNumberOfValues(colIndexes.size()));
+                       int[] c = getCounts();
+                       if(c.length != 
dict.getNumberOfValues(colIndexes.size()))
+                               throw new DMLCompressionException("Invalid DDC 
Construction");
+                       data.verify();
+               }
+       }
+
+       private ColGroupDDCLZW(IColIndex colIndexes, IDictionary dict, int[] 
dataLZW, int nRows, int nUnique,
+               int[] cachedCounts) {
+               super(colIndexes, dict, cachedCounts);
+               _dataLZW = dataLZW;
+               _nRows = nRows;
+               _nUnique = nUnique;
+
+               if(CompressedMatrixBlock.debug) {
+                       if(getNumValues() == 0)
+                               throw new DMLCompressionException("Invalid 
construction with empty dictionary");
+                       if(_nRows <= 0)
+                               throw new DMLCompressionException("Invalid 
length of the data. is zero");
+                       if(_nUnique != 
dict.getNumberOfValues(colIndexes.size()))
+                               throw new DMLCompressionException("Invalid map 
to dict Map has:" + _nUnique + " while dict has " +
+                                       
dict.getNumberOfValues(colIndexes.size()));
+                       int[] c = getCounts();
+                       if(c.length != 
dict.getNumberOfValues(colIndexes.size()))
+                               throw new DMLCompressionException("Invalid DDC 
Construction");
+               }
+       }
+
+       public static AColGroup create(IColIndex colIndexes, IDictionary dict, 
AMapToData data, int[] cachedCounts) {
+               if(dict == null)
+                       return new ColGroupEmpty(colIndexes);
+               else if(data.getUnique() == 1)
+                       return ColGroupConst.create(colIndexes, dict);
+               else
+                       return new ColGroupDDCLZW(colIndexes, dict, data, 
cachedCounts);
+       }
+
+       /**
+        * Compresses the given data using the Lempel-Ziv-Welch (LZW) 
compression algorithm. The compression is performed on
+        * integer dictionary indices stored in the provided AMapToData object.
+        *
+        * @param data The input data to be compressed, represented as an 
AMapToData object. The data must not be null, must
+        *             have at least one row, and contain valid dictionary 
indices.
+        * @return An array of integers representing the compressed data, with 
each integer corresponding to a dictionary
+        * code.
+        * @throws IllegalArgumentException If the input data is null, has no 
rows, contains no unique values, or has
+        *                                  symbols that exceed the valid 
dictionary index range.
+        */
+       private static int[] compress(AMapToData data) {
+               if(data == null)
+                       throw new IllegalArgumentException("Invalid input: data 
is null");
+
+               final int nRows = data.size();
+               if(nRows <= 0) {
+                       throw new IllegalArgumentException("Invalid input: data 
has no rows");
+               }
+
+               final int nUnique = data.getUnique();
+               if(nUnique <= 0) {
+                       throw new IllegalArgumentException("Invalid input: data 
has no unique values");
+               }
+
+               if(nRows == 1)
+                       return new int[] {data.getIndex(0)};
+
+               final Long2IntLinkedOpenHashMap dict = new 
Long2IntLinkedOpenHashMap(1 << 16);
+               dict.defaultReturnValue(-1);
+
+               final IntArrayList out = new IntArrayList(Math.max(16, nRows / 
2));
+
+               int nextCode = nUnique;
+
+               int w = data.getIndex(0);
+
+               for(int i = 1; i < nRows; i++) {
+                       final int k = data.getIndex(i);
+
+                       if(k < 0 || k >= nUnique)
+                               throw new IllegalArgumentException("Symbol out 
of range: " + k + " (nUnique=" + nUnique + ")");
+
+                       final long key = packKey(w, k);
+
+                       int wk = dict.get(key);
+                       if(wk != -1) {
+                               w = wk;
+                       }
+                       else {
+                               out.add(w);
+                               dict.put(key, nextCode++);
+                               w = k;
+                       }
+               }
+
+               out.add(w);
+               return out.toIntArray();
+       }
+
+       /**
+        * Decompresses the given lzw-encoded data into a full representation 
based on the provided parameters.
+        *
+        * @param codes   an array of integers representing the lzw-compressed 
data
+        * @param nUnique the number of unique values in dictionary _dict
+        * @param nRows   the total number of rows in the original data
+        * @return a fully decompressed AMapToData object representing the 
complete decompressed data
+        */
+       private static AMapToData decompressFull(int[] codes, int nUnique, int 
nRows) {
+               if(codes == null)
+                       throw new IllegalArgumentException("codes is null");
+               if(codes.length == 0)
+                       throw new IllegalArgumentException("codes is empty");
+               if(nUnique <= 0)
+                       throw new IllegalArgumentException("Invalid alphabet 
size: " + nUnique);
+               if(nRows <= 0) {
+                       throw new IllegalArgumentException("Invalid nRows: " + 
nRows);
+               }
+
+               final Map<Integer, Long> dict = new HashMap<>();
+
+               AMapToData out = MapToFactory.create(nRows, nUnique);
+               int outPos = 0;
+
+               int old = codes[0];
+               int[] oldPhrase = unpack(old, nUnique, dict);
+
+               for(int v : oldPhrase) {
+                       out.set(outPos++, v);
+               }
+
+               int nextCode = nUnique;
+
+               for(int i = 1; i < codes.length; i++) {
+                       int key = codes[i];
+
+                       int[] next;
+                       if(key < nUnique || dict.containsKey(key)) {
+                               next = unpack(key, nUnique, dict);
+                       }
+                       else {
+                               int first = oldPhrase[0];
+                               next = packint(oldPhrase, first);
+                       }
+
+                       for(int v : next) {
+                               out.set(outPos++, v);
+                       }
+
+                       final int first = next[0];
+                       dict.put(nextCode++, packKey(old, first));
+
+                       old = key;
+                       oldPhrase = next;
+               }
+
+               if(outPos != nRows)
+                       throw new IllegalStateException("Decompression length 
mismatch: got " + outPos + " expected " + nRows);
+
+               return out;
+       }
+
+       /**
+        * The LZWMappingIterator class is responsible for decoding and 
iterating through an LZW (Lempel-Ziv-Welch)
+        * compressed mapping. This iterator is primarily used for 
reconstructing symbols and phrases from the compressed
+        * mapping data, maintaining the internal state of the LZW 
decompression process.
+        *
+        * The decoding process maintains an LZW dictionary, tracks the current 
and previous phrases, handles new LZW codes,
+        * and provides methods to retrieve or skip mapping symbols.
+        */
+       private final class LZWMappingIterator {
+               private final Map<Integer, Long> dict = new HashMap<>();
+               private int lzwIndex = 0;
+               private int mapIndex = 0;
+               private int nextCode = _nUnique;
+               private int[] currentPhrase = null;
+               private int currentPhraseIndex = 0;
+               private int[] oldPhrase = null;
+               private int oldCode = -1;
+
+               LZWMappingIterator() {
+                       lzwIndex = 1;
+                       oldCode = _dataLZW[0];
+                       oldPhrase = unpack(oldCode, _nUnique, dict);
+                       currentPhrase = oldPhrase;
+                       currentPhraseIndex = 0;
+                       mapIndex = 0;
+               }
+
+               boolean hasNext() {
+                       return mapIndex < _nRows;
+               }
+
+               void skip(int k) {
+                       for(int i = 0; i < k; i++)
+                               next();
+               }
+
+               int next() {
+                       if(!hasNext())
+                               throw new NoSuchElementException();
+
+                       if(currentPhraseIndex < currentPhrase.length) {
+                               mapIndex++;
+                               return currentPhrase[currentPhraseIndex++];
+                       }
+
+                       if(lzwIndex >= _dataLZW.length)
+                               throw new IllegalStateException("Invalid LZW 
index: " + lzwIndex);
+
+                       final int key = _dataLZW[lzwIndex++];
+
+                       final int[] next;
+                       if(key < _nUnique || dict.containsKey(key)) {
+                               next = unpack(key, _nUnique, dict);
+                       }
+                       else {
+                               next = packint(oldPhrase, oldPhrase[0]);
+                       }
+
+                       dict.put(nextCode++, packKey(oldCode, next[0]));
+
+                       oldCode = key;
+                       oldPhrase = next;
+
+                       currentPhrase = next;
+                       currentPhraseIndex = 0;
+
+                       mapIndex++;
+                       return currentPhrase[currentPhraseIndex++];
+               }
+       }
+
+       /**
+        * Builds a packed 64-bit key from a prefix code and a next symbol, 
typically used in an LZW dictionary.
+        *
+        * @return a 64-bit packed key representing the (prefixCode, 
nextSymbol) pair
+        */
+       private static long packKey(int prefixCode, int nextSymbol) {
+               return (((long) prefixCode) << 32) | (nextSymbol & 0xffffffffL);
+       }
+
+       /**
+        * Extracts and returns the higher 32 bits of the given long key as an 
integer.
+        *
+        * @param key the long value from which the higher 32 bits will be 
unpacked
+        * @return the higher 32 bits of the given key as an integer
+        */
+       private static int unpackfirst(long key) {
+               return (int) (key >>> 32);
+       }
+
+       /**
+        * Extracts and returns the second component from the given long value 
key. This method assumes the key encodes the
+        * second component in its least significant bits.
+        *
+        * @param key the long value containing the encoded second component
+        * @return the extracted second component as an integer
+        */
+       private static int unpacksecond(long key) {
+               return (int) (key);
+       }
+
+       /**
+        * Creates a new array by appending the specified integer to the end of 
the given array.
+        *
+        * @param arr  the original array to which the integer will be added
+        * @param last the integer value to be appended to the end of the array
+        * @return a new array containing all elements of the original array 
followed by the specified integer
+        */
+       private static int[] packint(int[] arr, int last) {
+               int[] result = Arrays.copyOf(arr, arr.length + 1);
+               result[arr.length] = last;
+               return result;
+       }
+
+       /**
+        * Decodes a given code into an array of integers based on a dictionary 
mapping. If the code is less than the number
+        * of unique symbols, it directly returns the code as a single-element 
array. Otherwise, it iteratively unpacks the
+        * code using a dictionary until the base symbols are resolved.
+        *
+        * @param code    the encoded integer value to be unpacked
+        * @param nUnique the number of unique symbols; codes less than this 
are directly returned
+        * @param dict    a mapping of integer codes to packed values 
represented as {@code Long}, used for unpacking
+        * @return an array of integers representing the unpacked sequence for 
the input code
+        * @throws IllegalStateException if the provided code does not have a 
corresponding entry in the dictionary
+        */
+       private static int[] unpack(int code, int nUnique, Map<Integer, Long> 
dict) {
+               if(code < nUnique)
+                       return new int[] {code};
+
+               Stack<Integer> stack = new Stack<>();
+               int c = code;
+
+               while(c >= nUnique) {
+                       Long key = dict.get(c);
+                       if(key == null)
+                               throw new IllegalStateException("Missing 
dictionary entry for code: " + c);
+
+                       int symbol = unpacksecond(key);
+                       stack.push(symbol);
+                       c = unpackfirst(key);
+               }
+
+               stack.push(c);
+               int[] outarray = new int[stack.size()];
+               int i = 0;
+               while(!stack.isEmpty()) {
+                       outarray[i++] = stack.pop();
+               }
+               return outarray;
+       }
+
+       /**
+        * Converts the current ColGroupDDCLZW instance to a ColGroupDDC 
instance. The method decompresses the
+        * LZW-compressed data of this instance, reconstructs the mapping to 
the decompressed data, and creates a new
+        * ColGroupDDC instance with the decompressed mapping, dictionary, and 
column indexes of this instance.
+        *
+        * @return an AColGroup instance representing the decoded ColGroup in 
DDC format
+        */
+       public AColGroup convertToDDC() {
+               final AMapToData map = decompressFull(_dataLZW, _nUnique, 
_nRows);
+               final int[] counts = getCounts();
+               return ColGroupDDC.create(_colIndexes, _dict, map, counts);
+       }
+
+       public static ColGroupDDCLZW read(DataInput in) throws IOException {
+               final IColIndex colIndexes = ColIndexFactory.read(in);
+               final IDictionary dict = DictionaryFactory.read(in);
+
+               final int nRows = in.readInt();
+               final int nUnique = in.readInt();
+
+               final int len = in.readInt();
+               if(len < 0)
+                       throw new IOException("Invalid LZW data length: " + 
len);
+
+               final int[] dataLZW = new int[len];
+               for(int i = 0; i < len; i++)
+                       dataLZW[i] = in.readInt();
+
+               return new ColGroupDDCLZW(colIndexes, dict, dataLZW, nRows, 
nUnique, null);
+       }
+
+       @Override
+       public void write(DataOutput out) throws IOException {
+               super.write(out);
+               out.writeInt(_nRows);
+               out.writeInt(_nUnique);
+               out.writeInt(_dataLZW.length);
+               for(int i : _dataLZW)
+                       out.writeInt(i);
+       }
+
+       @Override
+       public double getIdx(int r, int colIdx) {
+               if(r < 0 || r >= _nRows)
+                       throw new DMLRuntimeException("Row index out of 
bounds");
+
+               if(colIdx < 0 || colIdx >= _colIndexes.size())
+                       throw new DMLRuntimeException("Column index out of 
bounds");
+
+               final LZWMappingIterator it = new LZWMappingIterator();
+               int dictIdx = -1;
+               for(int i = 0; i <= r; i++) {
+                       dictIdx = it.next();
+               }
+               return _dict.getValue(dictIdx, colIdx, _colIndexes.size());
+       }
+
+       @Override
+       public CompressionType getCompType() {
+               return CompressionType.DDCLZW;
+       }
+
+       @Override
+       protected ColGroupType getColGroupType() {
+               return ColGroupType.DDCLZW;
+       }
+
+       @Override
+       public boolean containsValue(double pattern) {
+               return _dict.containsValue(pattern);
+       }
+
+       @Override
+       public double getCost(ComputationCostEstimator e, int nRows) {
+               final int nVals = getNumValues();
+               final int nCols = getNumCols();
+               return e.getCost(nRows, nRows, nCols, nVals, 
_dict.getSparsity());
+       }
+
+       @Override
+       public ICLAScheme getCompressionScheme() {
+               return DDCLZWScheme.create(this);
+       }
+
+       @Override
+       protected int numRowsToMultiply() {
+               return _nRows;
+       }
+
+       @Override
+       protected AColGroup copyAndSet(IColIndex colIndexes, IDictionary 
newDictionary) {
+               return new ColGroupDDCLZW(colIndexes, newDictionary, _dataLZW, 
_nRows, _nUnique, getCachedCounts());
+       }
+
+       @Override
+       public long estimateInMemorySize() {
+               long size = super.estimateInMemorySize();
+
+               if(_dataLZW != null)
+                       size += (long) 
MemoryEstimates.intArrayCost(_dataLZW.length);
+
+               size += 2L * Long.BYTES; // _nRows, _nUnique, _dataLZW.length
+               return size;
+       }
+
+       @Override
+       public long getExactSizeOnDisk() {
+               long ret = super.getExactSizeOnDisk();
+               ret += 12;
+               ret += (long) _dataLZW.length * 4;
+               return ret;
+       }
+
+       @Override
+       public AMapToData getMapToData() {
+               return decompressFull(_dataLZW, _nUnique, _nRows);
+       }
+
+       @Override
+       public boolean sameIndexStructure(AColGroupCompressed that) {
+               return that instanceof ColGroupDDCLZW && ((ColGroupDDCLZW) 
that)._dataLZW == this._dataLZW;
+       }
+
+       @Override
+       protected double computeMxx(double c, Builtin builtin) {
+               return _dict.aggregate(c, builtin);
+       }
+
+       @Override
+       protected void computeColMxx(double[] c, Builtin builtin) {
+               _dict.aggregateCols(c, builtin, _colIndexes);
+       }
+
+       @Override
+       public AColGroup sliceRows(int rl, int ru) {
+               try {
+                       if(rl < 0 || ru > _nRows)
+                               throw new DMLRuntimeException("Invalid slice 
range: " + rl + " - " + ru);
+
+                       final int len = ru - rl;
+                       if(len == 0)
+                               return new ColGroupEmpty(_colIndexes);
+
+                       final int[] slicedMapping = new int[len];
+
+                       final LZWMappingIterator it = new LZWMappingIterator();
+
+                       for(int i = 0; i < rl; i++)
+                               it.next();
+
+                       for(int i = rl; i < ru; i++)
+                               slicedMapping[i - rl] = it.next();
+
+                       AMapToData slicedMappingAMapToData = 
MapToFactory.create(len, _nUnique);
+                       for(int i = 0; i < len; i++) {
+                               slicedMappingAMapToData.set(i, 
slicedMapping[i]);
+                       }
+
+                       return new ColGroupDDCLZW(_colIndexes, _dict, 
slicedMappingAMapToData, null);
+               }
+               catch(Exception e) {
+                       throw new DMLRuntimeException("Failed to slice out sub 
part DDCLZW: " + rl + ", " + ru, e);
+               }
+       }
+
+       @Override
+       protected void 
decompressToDenseBlockTransposedSparseDictionary(DenseBlock db, int rl, int ru, 
SparseBlock sb) {
+               LZWMappingIterator it = new LZWMappingIterator();
+               for(int i = 0; i < rl; i++) {
+                       it.next();
+               }
+
+               for(int i = rl; i < ru; i++) {
+                       final int vr = it.next();
+                       if(sb.isEmpty(vr))
+                               continue;
+                       final int apos = sb.pos(vr);
+                       final int alen = sb.size(vr) + apos;
+                       final int[] aix = sb.indexes(vr);
+                       final double[] aval = sb.values(vr);
+                       for(int j = apos; j < alen; j++) {
+                               final int rowOut = _colIndexes.get(aix[j]);
+                               final double[] c = db.values(rowOut);
+                               final int off = db.pos(rowOut);
+                               c[off + i] += aval[j];
+                       }
+               }
+       }
+
+       @Override
+       protected void 
decompressToDenseBlockTransposedDenseDictionary(DenseBlock db, int rl, int ru, 
double[] dict) {
+               ColGroupDDC g = (ColGroupDDC) convertToDDC();
+               g.decompressToDenseBlockTransposedDenseDictionary(db, rl, ru, 
dict);
+
+       }
+
+       @Override
+       protected void 
decompressToSparseBlockTransposedSparseDictionary(SparseBlockMCSR sbr, 
SparseBlock sb, int nColOut) {
+
+               int[] colCounts = _dict.countNNZZeroColumns(getCounts());
+               for(int j = 0; j < _colIndexes.size(); j++)
+                       sbr.allocate(_colIndexes.get(j), colCounts[j]);
+
+               LZWMappingIterator it = new LZWMappingIterator();
+
+               for(int i = 0; i < _nRows; i++) {
+                       int di = it.next();
+                       if(sb.isEmpty(di))
+                               continue;
+
+                       final int apos = sb.pos(di);
+                       final int alen = sb.size(di) + apos;
+                       final int[] aix = sb.indexes(di);
+                       final double[] aval = sb.values(di);
+
+                       for(int j = apos; j < alen; j++) {
+                               sbr.append(_colIndexes.get(aix[j]), i, aval[j]);
+                       }
+               }
+       }
+
+       @Override
+       protected void 
decompressToSparseBlockTransposedDenseDictionary(SparseBlockMCSR db, double[] 
dict, int nColOut) {
+               ColGroupDDC g = (ColGroupDDC) convertToDDC();
+               g.decompressToSparseBlockTransposedDenseDictionary(db, dict, 
nColOut);
+       }
+
+       @Override
+       protected void decompressToDenseBlockSparseDictionary(DenseBlock db, 
int rl, int ru, int offR, int offC,
+               SparseBlock sb) {
+               LZWMappingIterator it = new LZWMappingIterator();
+               for(int i = 0; i < rl; i++) {
+                       it.next();
+               }
+
+               for(int r = rl, offT = rl + offR; r < ru; r++, offT++) {
+                       final int vr = it.next();
+                       if(sb.isEmpty(vr))
+                               continue;
+                       final double[] c = db.values(offT);
+                       final int off = db.pos(offT) + offC;
+                       _colIndexes.decompressToDenseFromSparse(sb, vr, off, c);
+               }
+       }
+
+       @Override
+       protected void decompressToDenseBlockDenseDictionary(DenseBlock db, int 
rl, int ru, int offR, int offC,
+               double[] values) {
+               final int nCol = _colIndexes.size();
+               final LZWMappingIterator it = new LZWMappingIterator();
+
+               for(int i = 0; i < rl; i++) {
+                       it.next();
+               }
+
+               if(db.isContiguous() && nCol == db.getDim(1) && offC == 0) {
+                       final int nColOut = db.getDim(1);
+                       final double[] c = db.values(0);
+
+                       for(int i = rl; i < ru; i++) {
+                               final int dictIdx = it.next();
+                               final int rowIndex = dictIdx * nCol;
+                               final int rowBaseOff = (i + offR) * nColOut;
+
+                               for(int j = 0; j < nCol; j++)
+                                       c[rowBaseOff + j] = values[rowIndex + 
j];
+                       }
+               }
+               else {
+                       for(int i = rl, offT = rl + offR; i < ru; i++, offT++) {
+                               final double[] c = db.values(offT);
+                               final int off = db.pos(offT) + offC;
+                               final int dictIdx = it.next();
+                               final int rowIndex = dictIdx * nCol;
+
+                               for(int j = 0; j < nCol; j++) {
+                                       final int colIdx = _colIndexes.get(j);
+                                       c[off + colIdx] = values[rowIndex + j];
+                               }
+                       }
+               }
+       }
+
+       @Override
+       protected void decompressToSparseBlockSparseDictionary(SparseBlock ret, 
int rl, int ru, int offR, int offC,
+               SparseBlock sb) {
+               LZWMappingIterator it = new LZWMappingIterator();
+               for(int i = 0; i < rl; i++) {
+                       it.next();
+               }
+
+               for(int r = rl, offT = rl + offR; r < ru; r++, offT++) {
+                       final int vr = it.next();
+                       if(sb.isEmpty(vr))
+                               continue;
+                       final int apos = sb.pos(vr);
+                       final int alen = sb.size(vr) + apos;
+                       final int[] aix = sb.indexes(vr);
+                       final double[] aval = sb.values(vr);
+                       for(int j = apos; j < alen; j++)
+                               ret.append(offT, offC + 
_colIndexes.get(aix[j]), aval[j]);
+               }
+       }
+
+       @Override
+       protected void decompressToSparseBlockDenseDictionary(SparseBlock ret, 
int rl, int ru, int offR, int offC,
+               double[] values) {
+               decompressToSparseBlockDenseDictionary(ret, rl, ru, offR, offC, 
values, _colIndexes.size());
+       }
+
+       protected void decompressToSparseBlockDenseDictionary(SparseBlock ret, 
int rl, int ru, int offR, int offC,
+               double[] values, int nCol) {
+               LZWMappingIterator it = new LZWMappingIterator();
+               for(int i = 0; i < rl; i++) {
+                       it.next();
+               }
+
+               for(int i = rl, offT = rl + offR; i < ru; i++, offT++) {
+                       final int rowIndex = it.next() * nCol;
+                       for(int j = 0; j < nCol; j++)
+                               ret.append(offT, _colIndexes.get(j) + offC, 
values[rowIndex + j]);
+               }
+       }
+
+       @Override
+       public void leftMultByMatrixNoPreAgg(MatrixBlock matrix, MatrixBlock 
result, int rl, int ru, int cl, int cu) {
+               convertToDDC().leftMultByMatrixNoPreAgg(matrix, result, rl, ru, 
cl, cu); // Fallback to DDC.
+       }
+
+       @Override
+       public AColGroup scalarOperation(ScalarOperator op) {
+               if((op.fn instanceof Plus || op.fn instanceof Minus)) {
+                       final double v0 = op.executeScalar(0);
+                       if(v0 == 0)
+                               return this;
+               }
+               return new ColGroupDDCLZW(_colIndexes, _dict.applyScalarOp(op), 
_dataLZW, _nRows, _nUnique, getCachedCounts());
+       }
+
+       @Override
+       public AColGroup unaryOperation(UnaryOperator op) {
+               return new ColGroupDDCLZW(_colIndexes, _dict.applyUnaryOp(op), 
_dataLZW, _nRows, _nUnique, getCachedCounts());
+       }
+
+       @Override
+       public AColGroup binaryRowOpLeft(BinaryOperator op, double[] v, boolean 
isRowSafe) {
+               IDictionary ret = _dict.binOpLeft(op, v, _colIndexes);
+               return new ColGroupDDCLZW(_colIndexes, ret, _dataLZW, _nRows, 
_nUnique, getCachedCounts());
+       }
+
+       @Override
+       public AColGroup binaryRowOpRight(BinaryOperator op, double[] v, 
boolean isRowSafe) {
+               if((op.fn instanceof Plus || op.fn instanceof Minus) && _dict 
instanceof MatrixBlockDictionary &&
+                       ((MatrixBlockDictionary) 
_dict).getMatrixBlock().isInSparseFormat()) {
+                       return convertToDDC().binaryRowOpRight(op, v, 
isRowSafe);
+               }
+               final IDictionary ret;
+               if(_colIndexes.size() == 1)
+                       ret = _dict.applyScalarOp(new 
RightScalarOperator(op.fn, v[_colIndexes.get(0)]));
+               else
+                       ret = _dict.binOpRight(op, v, _colIndexes);
+               return new ColGroupDDCLZW(_colIndexes, ret, _dataLZW, _nRows, 
_nUnique, getCachedCounts());
+       }
+
+       @Override
+       public AColGroup append(AColGroup g) {
+               if(g instanceof ColGroupDDCLZW) {
+                       if(g.getColIndices().equals(_colIndexes)) {
+                               ColGroupDDCLZW gDDCLZW = (ColGroupDDCLZW) g;
+                               if(gDDCLZW._dict.equals(_dict)) {
+                                       if(_nUnique == gDDCLZW._nUnique) {
+                                               int[] mergedMap = new 
int[this._nRows + gDDCLZW._nRows];
+
+                                               LZWMappingIterator it = new 
LZWMappingIterator();
+                                               for(int i = 0; i < this._nRows; 
i++) {
+                                                       mergedMap[i] = 
it.next();
+                                               }
+
+                                               LZWMappingIterator gLZWit = 
gDDCLZW.new LZWMappingIterator();
+                                               for(int i = this._nRows; i < 
mergedMap.length; i++) {
+                                                       mergedMap[i] = 
gLZWit.next();
+                                               }
+
+                                               AMapToData mergedDataAMap = 
MapToFactory.create(mergedMap.length, _nUnique);
+                                               int mergedDataAMapPos = 0;
+
+                                               for(int j : mergedMap) {
+                                                       
mergedDataAMap.set(mergedDataAMapPos++, j);
+                                               }
+
+                                               int[] mergedDataAMapCompressed 
= compress(mergedDataAMap);
+
+                                               return new 
ColGroupDDCLZW(_colIndexes, _dict, mergedDataAMapCompressed, mergedMap.length,
+                                                       _nUnique, null);
+                                       }
+                                       else
+                                               LOG.warn("Not same unique 
values therefore not appending DDCLZW\n" + _nUnique + "\n\n" +
+                                                       gDDCLZW._nUnique);
+                               }
+                               else
+                                       LOG.warn("Not same Dictionaries 
therefore not appending DDCLZW\n" + _dict + "\n\n" + gDDCLZW._dict);
+                       }
+                       else
+                               LOG.warn(
+                                       "Not same columns therefore not 
appending DDCLZW\n" + _colIndexes + "\n\n" + g.getColIndices());
+               }
+               else
+                       LOG.warn("Not DDCLZW but " + 
g.getClass().getSimpleName() + ", therefore not appending DDCLZW");
+               return null;
+       }
+
+       @Override
+       protected AColGroup appendNInternal(AColGroup[] g, int blen, int rlen) {
+               int[] mergedMap = new int[rlen];
+               int mergedMapPos = 0;
+
+               for(int i = 1; i < g.length; i++) {
+                       if(!_colIndexes.equals(g[i]._colIndexes)) {
+                               LOG.warn("Not same columns therefore not 
appending DDCLZW\n" + _colIndexes + "\n\n" + g[i]._colIndexes);
+                               return null;
+                       }
+
+                       if(!(g[i] instanceof ColGroupDDCLZW)) {
+                               LOG.warn("Not DDCLZW but " + 
g[i].getClass().getSimpleName() + ", therefore not appending DDCLZW");
+                               return null;
+                       }
+
+                       final ColGroupDDCLZW gDDCLZW = (ColGroupDDCLZW) g[i];
+                       if(!gDDCLZW._dict.equals(_dict)) {
+                               LOG.warn("Not same Dictionaries therefore not 
appending DDCLZW\n" + _dict + "\n\n" + gDDCLZW._dict);
+                               return null;
+                       }
+                       if(!(_nUnique == gDDCLZW._nUnique)) {
+                               LOG.warn(
+                                       "Not same unique values therefore not 
appending DDCLZW\n" + _nUnique + "\n\n" + gDDCLZW._nUnique);
+                               return null;
+                       }
+               }
+
+               for(AColGroup group : g) {
+                       ColGroupDDCLZW gDDCLZW = (ColGroupDDCLZW) group;
+
+                       LZWMappingIterator gLZWit = gDDCLZW.new 
LZWMappingIterator();
+                       for(int j = 0; j < gDDCLZW._nRows; j++)
+                               mergedMap[mergedMapPos++] = gLZWit.next();
+               }
+
+               AMapToData mergedDataAMap = MapToFactory.create(rlen, _nUnique);
+
+               for(int k = 0; k < rlen; k++) {
+                       mergedDataAMap.set(k, mergedMap[k]);
+               }
+
+               int[] mergedDataAMapCompressed = compress(mergedDataAMap);
+
+               return new ColGroupDDCLZW(_colIndexes, _dict, 
mergedDataAMapCompressed, rlen, _nUnique, null);
+       }
+
+       @Override
+       public AColGroup recompress() {
+               return this;
+       }
+
+       @Override
+       public CompressedSizeInfoColGroup getCompressionInfo(int nRow) {
+               try {
+                       IEncode enc = getEncoding();
+                       EstimationFactors ef = new EstimationFactors(_nUnique, 
_nRows, _nRows, _dict.getSparsity());
+                       return new CompressedSizeInfoColGroup(_colIndexes, ef, 
estimateInMemorySize(), getCompType(), enc);
+               }
+               catch(Exception e) {
+                       throw new DMLCompressionException(this.toString(), e);
+               }
+       }
+
+       @Override
+       protected AColGroup fixColIndexes(IColIndex newColIndex, int[] 
reordering) {
+               return new ColGroupDDCLZW(newColIndex, 
_dict.reorder(reordering), _dataLZW, _nRows, _nUnique,
+                       getCachedCounts());
+       }
+
+       @Override
+       public void sparseSelection(MatrixBlock selection, P[] points, 
MatrixBlock ret, int rl, int ru) {
+               final SparseBlock sb = selection.getSparseBlock();
+               final SparseBlock retB = ret.getSparseBlock();
+               for(int r = rl; r < ru; r++) {
+                       if(sb.isEmpty(r))
+                               continue;
+                       final int sPos = sb.pos(r);
+                       final int rowCompressed = sb.indexes(r)[sPos]; // 
column index with 1
+                       decompressToSparseBlock(retB, rowCompressed, 
rowCompressed + 1, r - rowCompressed, 0);
+               }
+       }
+
+       @Override // Correct ?
+       protected void denseSelection(MatrixBlock selection, P[] points, 
MatrixBlock ret, int rl, int ru) {
+               final SparseBlock sb = selection.getSparseBlock();
+               final DenseBlock retB = ret.getDenseBlock();
+               for(int r = rl; r < ru; r++) {
+                       if(sb.isEmpty(r))
+                               continue;
+                       final int sPos = sb.pos(r);
+                       final int rowCompressed = sb.indexes(r)[sPos]; // 
column index with 1
+                       decompressToDenseBlock(retB, rowCompressed, 
rowCompressed + 1, r - rowCompressed, 0);
+               }
+       }
+
+       @Override
+       public AColGroup[] splitReshape(int multiplier, int nRow, int nColOrg) {
+               ColGroupDDC g = (ColGroupDDC) convertToDDC();
+               return g.splitReshape(multiplier, nRow, nColOrg);
+       }
+
+       @Override
+       protected boolean allowShallowIdentityRightMult() {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       protected AColGroup allocateRightMultiplication(MatrixBlock right, 
IColIndex colIndexes, IDictionary preAgg) {
+               if(preAgg == null)
+                       return null;
+               else
+                       return new ColGroupDDCLZW(colIndexes, preAgg, _dataLZW, 
_nRows, _nUnique, getCachedCounts());
+       }
+
+       @Override
+       public void preAggregateDense(MatrixBlock m, double[] preAgg, int rl, 
int ru, int cl, int cu) {
+               ColGroupDDC g = (ColGroupDDC) convertToDDC();
+               g.preAggregateDense(m, preAgg, rl, ru, cl, cu);
+       }
+
+       @Override
+       public void preAggregateSparse(SparseBlock sb, double[] preAgg, int rl, 
int ru, int cl, int cu) {
+               ColGroupDDC g = (ColGroupDDC) convertToDDC();
+               g.preAggregateSparse(sb, preAgg, rl, ru, cl, cu);
+       }
+
+       @Override
+       protected void preAggregateThatDDCStructure(ColGroupDDC that, 
Dictionary ret) {
+               ColGroupDDC g = (ColGroupDDC) convertToDDC();
+               g.preAggregateThatDDCStructure(that, ret);
+       }
+
+       @Override
+       protected void preAggregateThatSDCZerosStructure(ColGroupSDCZeros that, 
Dictionary ret) {
+               ColGroupDDC g = (ColGroupDDC) convertToDDC();
+               g.preAggregateThatSDCZerosStructure(that, ret);
+       }
+
+       @Override
+       protected void 
preAggregateThatSDCSingleZerosStructure(ColGroupSDCSingleZeros that, Dictionary 
ret) {
+               ColGroupDDC g = (ColGroupDDC) convertToDDC();
+               g.preAggregateThatSDCSingleZerosStructure(that, ret);
+
+       }
+
+       @Override
+       protected void preAggregateThatRLEStructure(ColGroupRLE that, 
Dictionary ret) {
+               ColGroupDDC g = (ColGroupDDC) convertToDDC();
+               g.preAggregateThatRLEStructure(that, ret);
+
+       }
+
+       @Override
+       public void leftMMIdentityPreAggregateDense(MatrixBlock that, 
MatrixBlock ret, int rl, int ru, int cl, int cu) {
+               ColGroupDDC g = (ColGroupDDC) convertToDDC();
+               g.leftMMIdentityPreAggregateDense(that, ret, rl, ru, cl, cu);
+       }
+
+       @Override
+       protected int[] getCounts(int[] out) {
+               AMapToData data = decompressFull(_dataLZW, _nUnique, _nRows);
+               return data.getCounts();
+       }
+
+       @Override
+       protected void computeRowSums(double[] c, int rl, int ru, double[] 
preAgg) {
+               final LZWMappingIterator it = new LZWMappingIterator();
+               for(int i = 0; i < rl; i++)
+                       it.next();
+
+               for(int rix = rl; rix < ru; rix++)
+                       c[rix] += preAgg[it.next()];
+       }
+
+       @Override
+       protected void computeRowMxx(double[] c, Builtin builtin, int rl, int 
ru, double[] preAgg) {
+               final LZWMappingIterator it = new LZWMappingIterator();
+               for(int i = 0; i < rl; i++)
+                       it.next();
+
+               for(int i = rl; i < ru; i++)
+                       c[i] = builtin.execute(c[i], preAgg[it.next()]);
+       }
+
+       @Override
+       protected void computeRowProduct(double[] c, int rl, int ru, double[] 
preAgg) {
+               final LZWMappingIterator it = new LZWMappingIterator();
+               for(int i = 0; i < rl; i++)
+                       it.next();
+
+               for(int rix = rl; rix < ru; rix++)
+                       c[rix] *= preAgg[it.next()];
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
index 273df9ff26..7699d7b7c1 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
@@ -292,6 +292,9 @@ public class ColGroupFactory {
                else if(ct == CompressionType.DeltaDDC) {
                        return directCompressDeltaDDC(colIndexes, cg);
                }
+               else if(ct == CompressionType.DDCLZW) {
+                       return directCompressDDCLZW(colIndexes, cg);
+               }
                else if(ct == CompressionType.CONST && cs.preferDeltaEncoding) {
                        return directCompressDeltaDDC(colIndexes, cg);
                }
@@ -662,6 +665,15 @@ public class ColGroupFactory {
                return ColGroupDDC.create(colIndexes, dict, resData, null);
        }
 
+       private AColGroup directCompressDDCLZW(IColIndex colIndexes, 
CompressedSizeInfoColGroup cg) throws Exception {
+               final AColGroup g = directCompressDDC(colIndexes, cg);
+
+               if(g instanceof ColGroupDDC)
+                       return ((ColGroupDDC) g).convertToDDCLZW();
+
+               return g;
+       }
+
        private AColGroup directCompressDDCMultiCol(IColIndex colIndexes, 
CompressedSizeInfoColGroup cg) throws Exception {
                final AMapToData d = MapToFactory.create(nRow, 
Math.max(Math.min(cg.getNumOffs() + 1, nRow), 126));
                final int fill = d.getUpperBoundValue();
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupIO.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupIO.java
index b47100d4e6..f4e9007575 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupIO.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupIO.java
@@ -109,6 +109,8 @@ public interface ColGroupIO {
                        return ColGroupDDCFOR.read(in);
                case DeltaDDC:
                        return ColGroupDeltaDDC.read(in);
+               case DDCLZW:
+                       return ColGroupDDCLZW.read(in);
                case OLE:
                        return ColGroupOLE.read(in, nRows);
                        case RLE:
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCLZWScheme.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCLZWScheme.java
new file mode 100644
index 0000000000..cfc4e401e1
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCLZWScheme.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress.colgroup.scheme;
+
+import org.apache.sysds.runtime.compress.colgroup.ColGroupDDCLZW;
+import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
+
+public abstract class DDCLZWScheme extends DDCScheme {
+       protected DDCLZWScheme(IColIndex cols) {
+               super(cols);
+       }
+
+       public static DDCLZWScheme create(ColGroupDDCLZW g) {
+               return g.getNumCols() == 1 ? new DDCLZWSchemeSC(g) : new 
DDCLZWSchemeMC(g);
+       }
+
+       public static DDCLZWScheme create(IColIndex cols) {
+               return cols.size() == 1 ? new DDCLZWSchemeSC(cols) : new 
DDCLZWSchemeMC(cols);
+       }
+
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCLZWSchemeMC.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCLZWSchemeMC.java
new file mode 100644
index 0000000000..75464e2367
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCLZWSchemeMC.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress.colgroup.scheme;
+
+import org.apache.sysds.runtime.compress.DMLCompressionException;
+import org.apache.sysds.runtime.compress.colgroup.AColGroup;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupDDCLZW;
+import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory;
+import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
+import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
+import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
+import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
+import org.apache.sysds.runtime.compress.readers.ReaderColumnSelection;
+import org.apache.sysds.runtime.compress.utils.ACount;
+import org.apache.sysds.runtime.compress.utils.DblArray;
+import org.apache.sysds.runtime.compress.utils.DblArrayCountHashMap;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.data.Pair;
+
+public class DDCLZWSchemeMC extends DDCLZWScheme {
+       private final DblArray emptyRow;
+
+       private final DblArrayCountHashMap map;
+
+       private DDCLZWSchemeMC(IColIndex cols, DblArrayCountHashMap map) {
+               super(cols);
+               this.map = map;
+               this.emptyRow = new DblArray(new double[cols.size()]);
+       }
+
+       protected DDCLZWSchemeMC(ColGroupDDCLZW g) {
+               super(g.getColIndices());
+               this.lastDict = g.getDictionary();
+               final MatrixBlock mbDict = 
lastDict.getMBDict(this.cols.size()).getMatrixBlock();
+               final int dictRows = mbDict.getNumRows();
+               final int dictCols = mbDict.getNumColumns();
+
+               // Read the mapping data and materialize map.
+               map = new DblArrayCountHashMap(dictRows * 2);
+               final ReaderColumnSelection r = 
ReaderColumnSelection.createReader(mbDict, //
+                       ColIndexFactory.create(dictCols), false, 0, dictRows);
+
+               DblArray d = null;
+               while((d = r.nextRow()) != null)
+                       map.increment(d);
+
+               emptyRow = new DblArray(new double[dictCols]);
+       }
+
+       protected DDCLZWSchemeMC(IColIndex cols) {
+               super(cols);
+               final int nCol = cols.size();
+               this.map = new DblArrayCountHashMap(4);
+               this.emptyRow = new DblArray(new double[nCol]);
+       }
+
+       @Override
+       protected AColGroup encodeV(MatrixBlock data, IColIndex columns) {
+               final int nRow = data.getNumRows();
+               final ReaderColumnSelection reader = 
ReaderColumnSelection.createReader(//
+                       data, columns, false, 0, nRow);
+               return encode(data, reader, nRow, columns);
+       }
+
+       @Override
+       protected AColGroup encodeVT(MatrixBlock data, IColIndex columns) {
+               final int nRow = data.getNumColumns();
+               final ReaderColumnSelection reader = 
ReaderColumnSelection.createReader(//
+                       data, columns, true, 0, nRow);
+               return encode(data, reader, nRow, columns);
+       }
+
+       private AColGroup encode(MatrixBlock data, ReaderColumnSelection 
reader, int nRow, IColIndex columns) {
+               final AMapToData d = MapToFactory.create(nRow, map.size());
+               DblArray cellVals;
+               ACount<DblArray> emptyIdx = map.getC(emptyRow);
+               if(emptyIdx == null) {
+
+                       while((cellVals = reader.nextRow()) != null) {
+                               final int row = reader.getCurrentRowIndex();
+
+                               final int id = map.getId(cellVals);
+                               d.set(row, id);
+
+                       }
+               }
+               else {
+                       int r = 0;
+                       while((cellVals = reader.nextRow()) != null) {
+                               final int row = reader.getCurrentRowIndex();
+                               if(row != r) {
+                                       while(r < row)
+                                               d.set(r++, emptyIdx.id);
+                               }
+                               final int id = map.getId(cellVals);
+                               d.set(row, id);
+                               r++;
+                       }
+                       while(r < nRow)
+                               d.set(r++, emptyIdx.id);
+               }
+               if(lastDict == null || 
lastDict.getNumberOfValues(columns.size()) != map.size())
+                       lastDict = DictionaryFactory.create(map, 
columns.size(), false, data.getSparsity());
+               return ColGroupDDCLZW.create(columns, lastDict, d, null);
+
+       }
+
+       @Override
+       protected ICLAScheme updateV(MatrixBlock data, IColIndex columns) {
+               final int nRow = data.getNumRows();
+               final ReaderColumnSelection reader = 
ReaderColumnSelection.createReader(//
+                       data, columns, false, 0, nRow);
+               return update(data, reader, nRow, columns);
+       }
+
+       private ICLAScheme update(MatrixBlock data, ReaderColumnSelection 
reader, int nRow, IColIndex columns) {
+               DblArray d = null;
+               int r = 0;
+               while((d = reader.nextRow()) != null) {
+                       final int cr = reader.getCurrentRowIndex();
+                       if(cr != r) {
+                               map.increment(emptyRow, cr - r);
+                               r = cr;
+                       }
+                       map.increment(d);
+                       r++;
+               }
+               if(r < nRow)
+                       map.increment(emptyRow, nRow - r - 1);
+
+               return this;
+       }
+
+       @Override
+       protected ICLAScheme updateVT(MatrixBlock data, IColIndex columns) {
+               final int nRow = data.getNumColumns();
+               final ReaderColumnSelection reader = 
ReaderColumnSelection.createReader(//
+                       data, columns, true, 0, nRow);
+               return update(data, reader, nRow, columns);
+       }
+
+       @Override
+       protected Pair<ICLAScheme, AColGroup> tryUpdateAndEncodeT(MatrixBlock 
data, IColIndex columns) {
+               final int nRow = data.getNumColumns();
+               final ReaderColumnSelection reader = 
ReaderColumnSelection.createReader(//
+                       data, columns, true, 0, nRow);
+               return tryUpdateAndEncode(data, reader, nRow, columns);
+       }
+
+       private Pair<ICLAScheme, AColGroup> tryUpdateAndEncode(MatrixBlock 
data, ReaderColumnSelection reader, int nRow,
+               IColIndex columns) {
+               final AMapToData d = MapToFactory.create(nRow, map.size());
+               int max = d.getUpperBoundValue();
+
+               DblArray cellVals;
+               ACount<DblArray> emptyIdx = map.getC(emptyRow);
+               if(emptyIdx == null) {
+                       while((cellVals = reader.nextRow()) != null) {
+                               final int row = reader.getCurrentRowIndex();
+                               final int id = map.increment(cellVals);
+                               if(id > max)
+                                       throw new 
DMLCompressionException("Failed update and encode with " + max + " possible 
values");
+                               d.set(row, id);
+                       }
+               }
+               else {
+                       int r = 0;
+                       while((cellVals = reader.nextRow()) != null) {
+                               final int row = reader.getCurrentRowIndex();
+                               if(row != r) {
+                                       map.increment(emptyRow, row - r);
+                                       while(r < row)
+                                               d.set(r++, emptyIdx.id);
+                               }
+                               final int id = map.increment(cellVals);
+                               if(id > max)
+                                       throw new DMLCompressionException(
+                                               "Failed update and encode with 
" + max + " possible values" + map + " " + map.size());
+                               d.set(row, id);
+                               r++;
+                       }
+                       if(r < nRow)
+
+                               map.increment(emptyRow, nRow - r);
+                       while(r < nRow)
+                               d.set(r++, emptyIdx.id);
+               }
+               if(lastDict == null || 
lastDict.getNumberOfValues(columns.size()) != map.size())
+                       lastDict = DictionaryFactory.create(map, 
columns.size(), false, data.getSparsity());
+
+               AColGroup g = ColGroupDDCLZW.create(columns, lastDict, d, null);
+               ICLAScheme s = this;
+               return new Pair<>(s, g);
+       }
+
+       @Override
+       public ACLAScheme clone() {
+               return new DDCLZWSchemeMC(cols, map.clone());
+       }
+
+       @Override
+       protected final Object getMap() {
+               return map;
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCLZWSchemeSC.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCLZWSchemeSC.java
new file mode 100644
index 0000000000..89ee74faff
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/scheme/DDCLZWSchemeSC.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress.colgroup.scheme;
+
+import org.apache.sysds.runtime.compress.DMLCompressionException;
+import org.apache.sysds.runtime.compress.colgroup.AColGroup;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupDDCLZW;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty;
+import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory;
+import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
+import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
+import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
+import org.apache.sysds.runtime.compress.utils.DoubleCountHashMap;
+import org.apache.sysds.runtime.data.DenseBlock;
+import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.data.Pair;
+
+public class DDCLZWSchemeSC extends DDCLZWScheme {
+       final private DoubleCountHashMap map;
+
+       private DDCLZWSchemeSC(IColIndex cols, DoubleCountHashMap map) {
+               super(cols);
+               this.map = map;
+       }
+
+       protected DDCLZWSchemeSC(ColGroupDDCLZW g) {
+               super(g.getColIndices());
+               this.lastDict = g.getDictionary();
+               int unique = lastDict.getNumberOfValues(1);
+               map = new DoubleCountHashMap(unique);
+               for(int i = 0; i < unique; i++)
+                       map.increment(lastDict.getValue(i));
+       }
+
+       protected DDCLZWSchemeSC(IColIndex cols) {
+               super(cols);
+               this.map = new DoubleCountHashMap(4);
+       }
+
+       @Override
+       protected AColGroup encodeV(MatrixBlock data, IColIndex columns) {
+               if(data.isEmpty())
+                       return new ColGroupEmpty(columns);
+               final int nRow = data.getNumRows();
+
+               final AMapToData d = MapToFactory.create(nRow, map.size());
+
+               encode(data, d, cols.get(0));
+               if(lastDict == null || 
lastDict.getNumberOfValues(columns.size()) != map.size())
+                       lastDict = DictionaryFactory.create(map);
+
+               return ColGroupDDCLZW.create(columns, lastDict, d, null);
+       }
+
+       private void encodeSparse(MatrixBlock data, AMapToData d, int col) {
+               final int nRow = data.getNumRows();
+               final SparseBlock sb = data.getSparseBlock();
+               for(int i = 0; i < nRow; i++)
+                       d.set(i, map.getId(sb.get(i, col)));
+
+       }
+
+       private void encode(MatrixBlock data, AMapToData d, int col) {
+               if(data.isInSparseFormat())
+                       encodeSparse(data, d, col);
+               else if(data.getDenseBlock().isContiguous())
+                       encodeDense(data, d, col);
+               else
+                       encodeGeneric(data, d, col);
+       }
+
+       private void encodeDense(final MatrixBlock data, final AMapToData d, 
final int col) {
+               final int nRow = data.getNumRows();
+               final double[] vals = data.getDenseBlockValues();
+               final int nCol = data.getNumColumns();
+               final int max = nRow * nCol; // guaranteed lower than intmax.
+               for(int i = 0, off = col; off < max; i++, off += nCol)
+                       d.set(i, map.getId(vals[off]));
+       }
+
+       private void encodeGeneric(MatrixBlock data, AMapToData d, int col) {
+               final int nRow = data.getNumRows();
+               final DenseBlock db = data.getDenseBlock();
+               for(int i = 0; i < nRow; i++) {
+                       final double[] c = db.values(i);
+                       final int off = db.pos(i) + col;
+                       d.set(i, map.getId(c[off]));
+               }
+       }
+
+       @Override
+       protected AColGroup encodeVT(MatrixBlock data, IColIndex columns) {
+               if(data.isEmpty())
+                       return new ColGroupEmpty(columns);
+               final int nRow = data.getNumColumns();
+
+               final AMapToData d = MapToFactory.create(nRow, map.size());
+
+               encodeT(data, d, cols.get(0));
+               if(lastDict == null || 
lastDict.getNumberOfValues(columns.size()) != map.size())
+                       lastDict = DictionaryFactory.create(map);
+
+               return ColGroupDDCLZW.create(columns, lastDict, d, null);
+       }
+
+       private void encodeT(MatrixBlock data, AMapToData d, int col) {
+               if(data.isInSparseFormat())
+                       encodeSparseT(data, d, col);
+               else
+                       encodeDenseT(data, d, col);
+       }
+
+       private void encodeSparseT(MatrixBlock data, AMapToData d, int col) {
+               final SparseBlock sb = data.getSparseBlock();
+               d.fill(map.getId(0.0));
+               if(!sb.isEmpty(col)) {
+                       int apos = sb.pos(col);
+                       final int[] aix = sb.indexes(col);
+                       final int alen = sb.size(col) + apos;
+                       final double[] aval = sb.values(col);
+                       while(apos < alen) {
+                               final double v = aval[apos];
+                               final int idx = aix[apos++];
+                               d.set(idx, map.getId(v));
+                       }
+               }
+       }
+
+       private void encodeDenseT(MatrixBlock data, AMapToData d, int col) {
+               final DenseBlock db = data.getDenseBlock();
+               final double[] vals = db.values(col);
+               final int nCol = data.getNumColumns();
+               for(int i = 0, off = db.pos(col); i < nCol; i++, off++)
+                       d.set(i, map.getId(vals[off]));
+       }
+
+       @Override
+       protected ICLAScheme updateV(MatrixBlock data, IColIndex columns) {
+               if(data.isEmpty())
+                       map.increment(0.0, data.getNumRows());
+               else if(data.isInSparseFormat())
+                       updateSparse(data, columns.get(0));
+               else if(data.getDenseBlock().isContiguous())
+                       updateDense(data, columns.get(0));
+               else
+                       updateGeneric(data, columns.get(0));
+
+               return this;
+       }
+
+       private ICLAScheme updateSparse(MatrixBlock data, int col) {
+               final int nRow = data.getNumRows();
+               final SparseBlock sb = data.getSparseBlock();
+               for(int i = 0; i < nRow; i++)
+                       map.increment(sb.get(i, col));
+               return this;
+       }
+
+       private ICLAScheme updateDense(MatrixBlock data, int col) {
+
+               final int nRow = data.getNumRows();
+               final double[] vals = data.getDenseBlockValues();
+               final int nCol = data.getNumColumns();
+               final int max = nRow * nCol; // guaranteed lower than intmax.
+               for(int off = col; off < max; off += nCol)
+                       map.increment(vals[off]);
+               return this;
+       }
+
+       private ICLAScheme updateGeneric(MatrixBlock data, int col) {
+               final int nRow = data.getNumRows();
+               final DenseBlock db = data.getDenseBlock();
+               for(int i = 0; i < nRow; i++) {
+                       final double[] c = db.values(i);
+                       final int off = db.pos(i) + col;
+                       map.increment(c[off]);
+               }
+               return this;
+       }
+
+       @Override
+       protected ICLAScheme updateVT(MatrixBlock data, IColIndex columns) {
+               if(data.isEmpty())
+                       map.increment(0.0, data.getNumColumns());
+               else if(data.isInSparseFormat())
+                       updateSparseT(data, columns.get(0));
+               else // dense and generic can be handled together if transposed
+                       updateDenseT(data, columns.get(0));
+
+               return this;
+       }
+
+       private void updateDenseT(MatrixBlock data, int col) {
+               final DenseBlock db = data.getDenseBlock();
+               final double[] vals = db.values(col);
+               final int nCol = data.getNumColumns();
+               for(int i = 0, off = db.pos(col); i < nCol; i++, off++)
+                       map.increment(vals[off]);
+       }
+
+       private void updateSparseT(MatrixBlock data, int col) {
+               final SparseBlock sb = data.getSparseBlock();
+
+               if(!sb.isEmpty(col)) {
+                       int apos = sb.pos(col);
+                       final int alen = sb.size(col) + apos;
+                       final double[] aval = sb.values(col);
+                       map.increment(0.0, alen - apos);
+                       while(apos < alen)
+                               map.increment(aval[apos++]);
+               }
+               else
+                       map.increment(0.0, data.getNumColumns());
+
+       }
+
+       @Override
+       public DDCLZWSchemeSC clone() {
+               return new DDCLZWSchemeSC(cols, map.clone());
+       }
+
+       @Override
+       protected final Object getMap() {
+               return map;
+       }
+
+       // TODO: zwingend erforderlich?
+       @Override
+       protected Pair<ICLAScheme, AColGroup> tryUpdateAndEncode(MatrixBlock 
data, IColIndex columns) {
+               if(data.isEmpty()) {
+                       map.increment(0.0, data.getNumRows());
+                       return new Pair<>(this, new ColGroupEmpty(columns));
+               }
+               final int nRow = data.getNumRows();
+
+               final AMapToData d = MapToFactory.create(nRow, map.size());
+
+               encodeAndUpdate(data, d, cols.get(0));
+               if(lastDict == null || 
lastDict.getNumberOfValues(columns.size()) != map.size())
+                       lastDict = DictionaryFactory.create(map);
+
+               return new Pair<>(this, ColGroupDDCLZW.create(columns, 
lastDict, d, null));
+       }
+
+       private void encodeAndUpdate(MatrixBlock data, AMapToData d, int col) {
+               final int max = d.getUpperBoundValue();
+               if(data.isInSparseFormat())
+                       encodeAndUpdateSparse(data, d, col, max);
+               else if(data.getDenseBlock().isContiguous())
+                       encodeAndUpdateDense(data, d, col, max);
+               else
+                       encodeAndUpdateGeneric(data, d, col, max);
+       }
+
+       private void encodeAndUpdateSparse(MatrixBlock data, AMapToData d, int 
col, int max) {
+               final int nRow = data.getNumRows();
+               final SparseBlock sb = data.getSparseBlock();
+
+               for(int i = 0; i < nRow; i++) {
+                       int id = map.increment(sb.get(i, col));
+                       if(id > max)
+                               throw new DMLCompressionException("Failed 
update and encode with " + max + " possible values");
+                       d.set(i, id);
+               }
+
+       }
+
+       private void encodeAndUpdateDense(final MatrixBlock data, final 
AMapToData d, final int col, int max) {
+               final int nRow = data.getNumRows();
+               final double[] vals = data.getDenseBlockValues();
+               final int nCol = data.getNumColumns();
+               final int end = nRow * nCol; // guaranteed lower than intend.
+               for(int i = 0, off = col; off < end; i++, off += nCol) {
+                       int id = map.increment(vals[off]);
+                       if(id > max)
+                               throw new DMLCompressionException("Failed 
update and encode with " + max + " possible values");
+                       d.set(i, id);
+               }
+       }
+
+       private void encodeAndUpdateGeneric(MatrixBlock data, AMapToData d, int 
col, int max) {
+               final int nRow = data.getNumRows();
+               final DenseBlock db = data.getDenseBlock();
+               for(int i = 0; i < nRow; i++) {
+                       final double[] c = db.values(i);
+                       final int off = db.pos(i) + col;
+                       int id = map.increment(c[off]);
+                       if(id > max)
+                               throw new DMLCompressionException("Failed 
update and encode with " + max + " possible values");
+                       d.set(i, id);
+               }
+       }
+
+       @Override
+       protected Pair<ICLAScheme, AColGroup> tryUpdateAndEncodeT(MatrixBlock 
data, IColIndex columns) {
+               if(data.isEmpty())
+                       return new Pair<>(this, new ColGroupEmpty(columns));
+               final int nRow = data.getNumColumns();
+
+               final AMapToData d = MapToFactory.create(nRow, map.size());
+
+               encodeAndUpdateT(data, d, cols.get(0));
+               if(lastDict == null || 
lastDict.getNumberOfValues(columns.size()) != map.size())
+                       lastDict = DictionaryFactory.create(map);
+
+               return new Pair<>(this, ColGroupDDCLZW.create(columns, 
lastDict, d, null));
+       }
+
+       private void encodeAndUpdateT(MatrixBlock data, AMapToData d, int col) {
+               if(data.isInSparseFormat())
+                       encodeAndUpdateSparseT(data, d, col);
+               else
+                       encodeAndUpdateDenseT(data, d, col);
+       }
+
+       private void encodeAndUpdateSparseT(MatrixBlock data, AMapToData d, int 
col) {
+               final SparseBlock sb = data.getSparseBlock();
+               if(!sb.isEmpty(col)) {
+                       int apos = sb.pos(col);
+                       final int[] aix = sb.indexes(col);
+                       final int alen = sb.size(col) + apos;
+                       d.fill(map.increment(0.0, data.getNumColumns() - alen - 
apos));
+                       final double[] aval = sb.values(col);
+                       while(apos < alen) {
+                               final double v = aval[apos];
+                               final int idx = aix[apos++];
+                               d.set(idx, map.increment(v));
+                       }
+               }
+               else
+                       d.fill(map.increment(0.0, data.getNumColumns()));
+       }
+
+       private void encodeAndUpdateDenseT(MatrixBlock data, AMapToData d, int 
col) {
+               final DenseBlock db = data.getDenseBlock();
+               final double[] vals = db.values(col);
+               final int nCol = data.getNumColumns();
+               for(int i = 0, off = db.pos(col); i < nCol; i++, off++)
+                       d.set(i, map.increment(vals[off]));
+       }
+
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
index df353931c0..0b60f2fb09 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
@@ -112,7 +112,7 @@ public class CompressedSizeInfoColGroup {
 
        /**
         * Create empty or const.
-        * 
+        *
         * @param columns columns
         * @param nRows   number of rows
         * @param ct      The type intended either Empty or Const
@@ -170,7 +170,7 @@ public class CompressedSizeInfoColGroup {
 
        /**
         * Note cardinality is the same as number of distinct values.
-        * 
+        *
         * @return cardinality or number of distinct values.
         */
        public int getNumVals() {
@@ -179,7 +179,7 @@ public class CompressedSizeInfoColGroup {
 
        /**
         * Number of offsets, or number of non zero values.
-        * 
+        *
         * @return Number of non zeros or number of values.
         */
        public int getNumOffs() {
@@ -264,6 +264,11 @@ public class CompressedSizeInfoColGroup {
                                nv = fact.numVals + (fact.numOffs < 
fact.numRows ? 1 : 0);
                                return 
ColGroupSizes.estimateInMemorySizeDDC(numCols, contiguousColumns, nv, 
fact.numRows,
                                        fact.tupleSparsity, fact.lossy);
+                       case DDCLZW:
+                               // DDCLZW uses a DDC-like structure (dictionary 
+ mapping). We estimate it as DDC for now.
+                               nv = fact.numVals + (fact.numOffs < 
fact.numRows ? 1 : 0);
+                               return 
ColGroupSizes.estimateInMemorySizeDDC(numCols, contiguousColumns, nv, 
fact.numRows,
+                                       fact.tupleSparsity, fact.lossy);
                        case RLE:
                                return 
ColGroupSizes.estimateInMemorySizeRLE(numCols, contiguousColumns, fact.numVals, 
fact.numRuns,
                                        fact.numRows, fact.tupleSparsity, 
fact.lossy);
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/colgroup/DDCLZW/ColGroupDDCLZWBenchmark.java
 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/DDCLZW/ColGroupDDCLZWBenchmark.java
new file mode 100644
index 0000000000..d71f954eae
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/DDCLZW/ColGroupDDCLZWBenchmark.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.compress.colgroup.DDCLZW;
+
+import org.apache.sysds.runtime.compress.colgroup.AColGroup;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupDDC;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupDDCLZW;
+import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+
+import java.util.stream.IntStream;
+
+public class ColGroupDDCLZWBenchmark {
+       private static final int BENCHMARK_ITERATIONS = 10;
+
+       private static class BenchmarkResult {
+               int dataSize;
+               int nUnique;
+               double entropy;
+
+               long ddcMemoryBytes;
+               long ddcCompressionTimeNs;
+               long ddcDecompressionTimeNs;
+
+               long ddclzwMemoryBytes;
+               long ddclzwCompressionTimeNs;
+               long ddclzwDecompressionTimeNs;
+
+               // Comparison info
+               double memoryReduction;
+               double compressionSpeedup;
+               double decompressionSpeedup;
+
+               void calculateMetrics() {
+                       memoryReduction = (double) ddclzwMemoryBytes / 
ddcMemoryBytes;
+                       compressionSpeedup = (double) ddcCompressionTimeNs / 
ddclzwCompressionTimeNs;
+                       decompressionSpeedup = (double) ddcDecompressionTimeNs 
/ ddclzwDecompressionTimeNs;
+               }
+
+               /// Format a percent for pretty-printing
+               String formatPercent(double ratio) {
+                       double percent = (100.0 * (1.0 - ratio));
+                       String ansiColor = percent > 0 ? "\u001B[32m" : 
"\u001B[31m";
+                       return ansiColor + String.format("%7.2f%%", percent) + 
"\u001B[0m";
+               }
+
+               /// Calculate entropy, and format a percent for pretty-printing
+               String formatEntropyPercent(double entropy, int nUnique) {
+                       double maxEntropy = Math.log(nUnique) / Math.log(2); // 
log_2{nUnique}
+                       double percent = (entropy / maxEntropy) * 100;
+
+                       String ansiColor;
+                       if(percent < 33)
+                               ansiColor = "\u001B[32m";
+                       else if(percent < 66)
+                               ansiColor = "\u001B[33m";
+                       else
+                               ansiColor = "\u001B[31m";
+
+                       return String.format("%s%6.2f%%%s", ansiColor, percent, 
"\u001B[0m");
+               }
+
+               @Override
+               public String toString() {
+                       return String.format("Size: %7d | nUnique: %5d | 
Entropy: %s | DDC: %7d bytes | DDCLZW: %7d bytes | " +
+                                       "Memory reduction: %s | De-/Compression 
speedup: %.2f/%.2f times", dataSize, nUnique,
+                               formatEntropyPercent(entropy, nUnique), 
ddcMemoryBytes, ddclzwMemoryBytes,
+                               formatPercent(memoryReduction), 
decompressionSpeedup, compressionSpeedup);
+               }
+       }
+
+       /// Calculates the entropy of the given array. Returns value between 0 
(predictable) and log_2{nUnique}
+       private double calculateEntropy(int[] arr, int nUnique) {
+               int[] freq = new int[nUnique];
+               for(int val : arr) {
+                       if(val >= 0 && val < nUnique) {
+                               freq[val]++;
+                       }
+               }
+               double entropy = 0.0;
+               int total = arr.length;
+               for(int f : freq) {
+                       if(f > 0) {
+                               double p = (double) f / total;
+                               entropy -= p * (Math.log(p) / Math.log(2));
+                       }
+               }
+               return entropy;
+       }
+
+       private void printBenchmarkTitle() {
+               String callerMethodName = StackWalker.getInstance().walk(stream 
-> stream.skip(1).findFirst().get())
+                       .getMethodName();
+
+               System.out.println();
+               System.out.println("=".repeat(80));
+               System.out.println("Benchmark: " + callerMethodName);
+               System.out.println("=".repeat(80));
+               System.out.println();
+       }
+
+       private static void assertSameDecompression(AColGroup a, AColGroup b, 
int nRows, int nCols) {
+               MatrixBlock mba = new MatrixBlock(nRows, nCols, false);
+               mba.allocateDenseBlock();
+               a.decompressToDenseBlock(mba.getDenseBlock(), 0, nRows);
+
+               MatrixBlock mbb = new MatrixBlock(nRows, nCols, false);
+               mbb.allocateDenseBlock();
+               b.decompressToDenseBlock(mbb.getDenseBlock(), 0, nRows);
+
+               double[] da = mba.getDenseBlockValues();
+               double[] db = mbb.getDenseBlockValues();
+
+               if(da.length != db.length)
+                       throw new AssertionError("Length mismatch: " + 
da.length + " vs " + db.length);
+
+               for(int i = 0; i < da.length; i++) {
+                       if(da[i] != db[i])
+                               throw new AssertionError("Decompression 
mismatch at flat index " + i + " : " + da[i] + " != " + db[i]);
+               }
+       }
+
+       private BenchmarkResult runBenchmark(int[] mapping, int nUnique, int 
nCols) {
+               BenchmarkResult result = new BenchmarkResult();
+               result.dataSize = mapping.length;
+               result.nUnique = nUnique;
+               result.entropy = calculateEntropy(mapping, nUnique);
+
+               ColGroupDDC ddc = ColGroupDDCLZWTestUtils.createDDC(mapping, 
nUnique, nCols);
+               ColGroupDDCLZW ddclzwTest = (ColGroupDDCLZW) 
ddc.convertToDDCLZW();
+               ColGroupDDC ddclzwTestAsDDC = (ColGroupDDC) 
ddclzwTest.convertToDDC();
+
+               assertSameDecompression(ddc, ddclzwTest, mapping.length, nCols);
+               assertSameDecompression(ddc, ddclzwTestAsDDC, mapping.length, 
nCols);
+
+               // Measure DDC memory (though the method calculates how much 
storage it would take if the data structure were written to disk)
+               result.ddcMemoryBytes = ddc.estimateInMemorySize();
+
+               // Measure DDC decompression time (it's already decompressed, 
so measure access time)
+               long startTime = System.nanoTime();
+               for(int iter = 0; iter < BENCHMARK_ITERATIONS; iter++) {
+                       AMapToData mapping_copy = ddc.getMapToData();
+                       mapping_copy.getIndex(mapping.length / 2);
+               }
+               long endTime = System.nanoTime();
+               result.ddcDecompressionTimeNs = (endTime - startTime) / 
BENCHMARK_ITERATIONS;
+
+               // Measure DDCLZW compression time
+               startTime = System.nanoTime();
+               ColGroupDDCLZW ddclzw = null;
+               for(int iter = 0; iter < BENCHMARK_ITERATIONS; iter++) {
+                       ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW();
+               }
+               endTime = System.nanoTime();
+               result.ddclzwCompressionTimeNs = (endTime - startTime) / 
BENCHMARK_ITERATIONS;
+
+               // Measure DDCLZW memory
+               result.ddclzwMemoryBytes = ddclzw.estimateInMemorySize();
+
+               // Measure DDCLZW decompression time
+               startTime = System.nanoTime();
+               for(int iter = 0; iter < BENCHMARK_ITERATIONS; iter++) {
+                       ColGroupDDC decompressed = (ColGroupDDC) 
ddclzw.convertToDDC();
+                       AMapToData mapping_copy = decompressed.getMapToData();
+                       mapping_copy.getIndex(mapping.length / 2);
+               }
+               endTime = System.nanoTime();
+               result.ddclzwDecompressionTimeNs = (endTime - startTime) / 
BENCHMARK_ITERATIONS;
+
+               result.calculateMetrics();
+               return result;
+       }
+
+       //@Test
+       public void benchmarkLZWOptimalScaling() {
+               printBenchmarkTitle();
+
+               for(int size : new int[] {100, 1000, 10_000, 40_000}) {
+                       System.out.println(".".repeat(35) + " Size: " + size + 
" " + ".".repeat(35));
+                       for(int nUnique : new int[] {2, 3, 5, 10, 20, 50, 100, 
200, 500, 1000, 10_000, 20_000}) {
+                               if(nUnique > size)
+                                       continue;
+                               int[] mapping = 
ColGroupDDCLZWTestUtils.genPatternLZWOptimal(size, nUnique);
+                               BenchmarkResult result = runBenchmark(mapping, 
nUnique, 1);
+                               System.out.println(result);
+                       }
+               }
+
+               System.out.println("\nExpected memory growths for size n: DDC: 
O(n), DDCLZW: O(sqrt(n))");
+       }
+
+       //@Test
+       public void benchmarkDistributed() {
+               printBenchmarkTitle();
+
+               for(int size : new int[] {100, 1000, 10_000, 70_000, 100_000}) {
+                       System.out.println(".".repeat(35) + " Size: " + size + 
" " + ".".repeat(35));
+                       for(int nUnique : new int[] {2, 3, 5, 10, 20, 50, 100, 
200, 256, 257, 500, 1000, 10_000, 65_536, 65_537,
+                               80_000}) {
+                               if(nUnique > size)
+                                       continue;
+                               int[] mapping = 
ColGroupDDCLZWTestUtils.genPatternDistributed(size, nUnique);
+                               BenchmarkResult result = runBenchmark(mapping, 
nUnique, 1);
+                               System.out.println(result);
+                       }
+               }
+       }
+
+       //@Test
+       public void benchmarkUniquesRepeating() {
+               printBenchmarkTitle();
+               for(int size : new int[] {100, 1000, 10_000, 40_000}) {
+                       System.out.println(".".repeat(35) + " Size: " + size + 
" " + ".".repeat(35));
+                       for(int nUnique : new int[] {2, 3, 5, 10, 20, 50, 100, 
200, 500, 1000}) {
+                               if(nUnique > size)
+                                       continue;
+                               int[] mapping = 
ColGroupDDCLZWTestUtils.genPatternRepeating(size,
+                                       IntStream.range(0, nUnique).toArray());
+                               BenchmarkResult result = runBenchmark(mapping, 
nUnique, 1);
+                               System.out.println(result);
+                       }
+               }
+       }
+
+       //@Test
+       public void benchmarkUniquesLZWOptimal() {
+               printBenchmarkTitle();
+               for(int size : new int[] {100, 1000, 10_000, 40_000}) {
+                       System.out.println(".".repeat(35) + " Size: " + size + 
" " + ".".repeat(35));
+                       for(int nUnique : new int[] {2, 3, 5, 10, 20, 50, 100, 
200, 500, 1000}) {
+                               if(nUnique > size)
+                                       continue;
+                               int[] mapping = 
ColGroupDDCLZWTestUtils.genPatternLZWOptimal(size, nUnique);
+                               BenchmarkResult result = runBenchmark(mapping, 
nUnique, 1);
+                               System.out.println(result);
+                       }
+               }
+       }
+
+       //@Test
+       public void benchmarkGetIdx() { // TODO: benchmark a different, 
efficient method instead
+               printBenchmarkTitle();
+
+               final int[] DATA_SIZES_GET_IDX = {10, 50, 100};
+               for(int size : DATA_SIZES_GET_IDX) {
+                       int[] mapping = 
ColGroupDDCLZWTestUtils.genPatternRepeating(size, 0, 1, 2, 2, 2, 1, 0, 0, 1);
+                       ColGroupDDC ddc = 
ColGroupDDCLZWTestUtils.createDDC(mapping, 3, 2);
+                       ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) 
ddc.convertToDDCLZW();
+
+                       // Benchmark DDC
+                       long startTime = System.nanoTime();
+                       for(int iter = 0; iter < BENCHMARK_ITERATIONS * 100; 
iter++) {
+                               ddc.getIdx(size / 2, 0);
+                       }
+                       long ddcTime = System.nanoTime() - startTime;
+
+                       // Benchmark DDCLZW
+                       startTime = System.nanoTime();
+                       for(int iter = 0; iter < BENCHMARK_ITERATIONS * 100; 
iter++) {
+                               ddclzw.getIdx(size / 2, 0);
+                       }
+                       long ddclzwTime = System.nanoTime() - startTime;
+
+                       System.out.printf("Size: %7d | DDC: %6.2f ms | DDCLZW: 
%6d ms | Slowdown: %.2f times\n", size,
+                               (double) ddcTime / 1_000_000, ddclzwTime / 
1_000_000, (double) ddclzwTime / ddcTime);
+               }
+       }
+
+       //      @Test
+       //      public void benchmarkSlice() {
+       //              printBenchmarkTitle();
+       //
+       //              for(int size : DATA_SIZES) {
+       //                      int[] mapping = 
ColGroupDDCLZWTestUtils.genPatternRepeating(size, 0, 1, 2);
+       //                      ColGroupDDC ddc = 
ColGroupDDCLZWTestUtils.createBenchmarkDDC(mapping, 3, 1);
+       //                      ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) 
ddc.convertToDDCLZW();
+       //
+       //                      int sliceStart = size / 4;
+       //                      int sliceEnd = 3 * size / 4;
+       //
+       //                      // Benchmark DDC
+       //                      long startTime = System.nanoTime();
+       //                      for(int iter = 0; iter < BENCHMARK_ITERATIONS; 
iter++) {
+       //                              ddc.sliceRows(sliceStart, sliceEnd);
+       //                      }
+       //                      long ddcTime = System.nanoTime() - startTime;
+       //
+       //                      // Benchmark DDCLZW
+       //                      startTime = System.nanoTime();
+       //                      for(int iter = 0; iter < BENCHMARK_ITERATIONS; 
iter++) {
+       //                              ddclzw.sliceRows(sliceStart, sliceEnd);
+       //                      }
+       //                      long ddclzwTime = System.nanoTime() - startTime;
+       //
+       //                      System.out.printf("Size: %7d | Slice[%5d:%5d] | 
DDC: %6d ms | DDCLZW: %6d ms | Slowdown: %.2f times\n",
+       //                              size, sliceStart, sliceEnd, ddcTime / 
1_000_000, ddclzwTime / 1_000_000, (double) ddclzwTime / ddcTime);
+       //              }
+       //      }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/colgroup/DDCLZW/ColGroupDDCLZWTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/DDCLZW/ColGroupDDCLZWTest.java
new file mode 100644
index 0000000000..311826f5e7
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/DDCLZW/ColGroupDDCLZWTest.java
@@ -0,0 +1,1238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.compress.colgroup.DDCLZW;
+
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.compress.CompressionSettings;
+import org.apache.sysds.runtime.compress.CompressionSettingsBuilder;
+import org.apache.sysds.runtime.compress.colgroup.AColGroup;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupConst;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupDDC;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupDDCLZW;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupIO;
+import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
+import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
+import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
+import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
+import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
+import org.apache.sysds.runtime.compress.estim.ComEstExact;
+import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
+import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
+import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.data.SparseBlockMCSR;
+import org.apache.sysds.runtime.functionobjects.Builtin;
+import org.apache.sysds.runtime.functionobjects.Divide;
+import org.apache.sysds.runtime.functionobjects.Equals;
+import org.apache.sysds.runtime.functionobjects.GreaterThan;
+import org.apache.sysds.runtime.functionobjects.Minus;
+import org.apache.sysds.runtime.functionobjects.Multiply;
+import org.apache.sysds.runtime.functionobjects.Plus;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.RightScalarOperator;
+import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
+import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
+import org.apache.sysds.runtime.util.DataConverter;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class ColGroupDDCLZWTest {
+       protected static final Log LOG = 
LogFactory.getLog(ColGroupDDCLZWTest.class.getName());
+
+       /**
+        * Asserts that two maps are identical
+        */
+       private void assertMapsEqual(AMapToData expected, AMapToData actual) {
+               assertEquals("Size mismatch", expected.size(), actual.size());
+               assertEquals("Unique count mismatch", expected.getUnique(), 
actual.getUnique());
+
+               for(int i = 0; i < expected.size(); i++) {
+                       assertEquals("Mapping mismatch at row " + i, 
expected.getIndex(i), actual.getIndex(i));
+               }
+       }
+
+       /**
+        * Applies DDCLZW compression/decompression and asserts that it's left 
unchanged
+        */
+       private void assertLosslessCompression(ColGroupDDC original) {
+               // Compress
+               AColGroup compressed = original.convertToDDCLZW();
+               assertNotNull("Compression returned null", compressed);
+               assertTrue(compressed instanceof ColGroupDDCLZW);
+
+               // Decompress
+               ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) compressed;
+               AColGroup decompressed = ddclzw.convertToDDC();
+               assertNotNull("Decompression returned null", decompressed);
+               assertTrue(decompressed instanceof ColGroupDDC);
+
+               // Assert
+               ColGroupDDC result = (ColGroupDDC) decompressed;
+
+               AMapToData d1 = original.getMapToData();
+               AMapToData d2 = result.getMapToData();
+
+               assertMapsEqual(d1, d2);
+               assertEquals("Column indices mismatch", 
original.getColIndices(), result.getColIndices());
+
+               assertEquals("Size mismatch", d1.size(), d2.size());
+               assertEquals("Unique count mismatch", d1.getUnique(), 
d2.getUnique());
+
+               for(int i = 0; i < d1.size(); i++) {
+                       assertEquals("Mapping mismatch at row " + i, 
d1.getIndex(i), d2.getIndex(i));
+               }
+       }
+
+       // TODO: used for only one test. Refactor?
+       public void assertLosslessCompressionNoRepetition(ColGroupDDCLZW 
original, int nRows) {
+               AColGroup decompressed = original.convertToDDC();
+               assertNotNull(decompressed);
+               assertTrue(decompressed instanceof ColGroupDDC);
+
+               ColGroupDDC result = (ColGroupDDC) decompressed;
+
+               AMapToData map = result.getMapToData();
+               assertNotNull(map);
+
+               assertEquals(nRows, map.size());
+               assertEquals(nRows, map.getUnique());
+
+               for(int i = 0; i < nRows; i++) {
+                       assertEquals("Mapping mismatch at row " + i, i, 
map.getIndex(i));
+               }
+       }
+
+       /**
+        * Asserts if the slice operation matches DDC's slice
+        */
+       private void assertSlice(ColGroupDDCLZW ddclzw, ColGroupDDC 
originalDDC, int low, int high) {
+               AColGroup sliced = ddclzw.sliceRows(low, high);
+               assertTrue(sliced instanceof ColGroupDDCLZW);
+
+               ColGroupDDCLZW ddclzwSlice = (ColGroupDDCLZW) sliced;
+               ColGroupDDC ddcSlice = (ColGroupDDC) ddclzwSlice.convertToDDC();
+               ColGroupDDC expectedSlice = (ColGroupDDC) 
originalDDC.sliceRows(low, high);
+
+               assertMapsEqual(expectedSlice.getMapToData(), 
ddcSlice.getMapToData());
+       }
+
+       private AColGroup compressForTest(double[][] data, 
AColGroup.CompressionType compType) {
+               MatrixBlock mb = DataConverter.convertToMatrixBlock(data);
+               IColIndex colIndexes = ColIndexFactory.create(data[0].length);
+
+               CompressionSettingsBuilder csb = new 
CompressionSettingsBuilder().setSamplingRatio(1.0)
+                       
.setValidCompressions(EnumSet.of(compType)).setTransposeInput("false");
+               CompressionSettings cs = csb.create();
+
+               final CompressedSizeInfoColGroup cgi = new ComEstExact(mb, 
cs).getColGroupInfo(colIndexes);
+               CompressedSizeInfo csi = new CompressedSizeInfo(cgi);
+               return ColGroupFactory.compressColGroups(mb, csi, cs, 1).get(0);
+       }
+
+       @Test
+       public void testConvertToDDCLZWBasic() {
+               double[][] data = new double[][] {{10, 20}, {11, 21}, {12, 22}};
+
+               ColGroupDDC ddc = (ColGroupDDC) compressForTest(data, 
AColGroup.CompressionType.DDC);
+               AColGroup result = ddc.convertToDDCLZW();
+
+               assertNotNull(result);
+               assertTrue(result instanceof ColGroupDDCLZW);
+               ColGroupDDCLZW DDCLZW = (ColGroupDDCLZW) result;
+
+               MatrixBlock mb = new MatrixBlock(3, 2, false);
+               mb.allocateDenseBlock();
+               DDCLZW.decompressToDenseBlock(mb.getDenseBlock(), 0, 3);
+
+               assertEquals(10.0, mb.get(0, 0), 0.0);
+               assertEquals(20.0, mb.get(0, 1), 0.0);
+               assertEquals(11.0, mb.get(1, 0), 0.0);
+               assertEquals(21.0, mb.get(1, 1), 0.0);
+               assertEquals(12.0, mb.get(2, 0), 0.0);
+               assertEquals(22.0, mb.get(2, 1), 0.0);
+       }
+
+       @Test
+       public void testRandomData() {
+               // Single element cols
+               ColGroupDDC ddc = 
ColGroupDDCLZWTestUtils.createDDC(ColGroupDDCLZWTestUtils.genPatternRandom(1000,
 50, 111111),
+                       50, 1000);
+               assertLosslessCompression(ddc);
+               assertSlice((ColGroupDDCLZW) ddc.convertToDDCLZW(), ddc, 250, 
750);
+
+               // Prime numbers as values
+               ddc = 
ColGroupDDCLZWTestUtils.createDDC(ColGroupDDCLZWTestUtils.genPatternRandom(1039,
 199, 222222), 199, 53);
+               assertLosslessCompression(ddc);
+               assertSlice((ColGroupDDCLZW) ddc.convertToDDCLZW(), ddc, 251, 
491);
+
+               // Very large size
+               ddc = 
ColGroupDDCLZWTestUtils.createDDC(ColGroupDDCLZWTestUtils.genPatternRandom(25_000,
 1000, 333333), 1000,
+                       200);
+               assertLosslessCompression(ddc);
+               assertSlice((ColGroupDDCLZW) ddc.convertToDDCLZW(), ddc, 1000, 
20_000);
+       }
+
+       @Test
+       public void testGetIdxFirstElement() {
+               double[][] src = new double[][] {{10, 11}, {20, 21}, {30, 31}, 
{20, 21}, {10, 11}};
+               ColGroupDDC ddc = (ColGroupDDC) compressForTest(src, 
AColGroup.CompressionType.DDC);
+               ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW();
+
+               double expected = ddc.getIdx(0, 0);
+               assertEquals(expected, ddclzw.getIdx(0, 0), 0.0001);
+       }
+
+       @Test
+       public void testGetIdxLastElement() {
+               double[][] src = new double[][] {{10, 11}, {20, 21}, {30, 31}, 
{20, 21}, {10, 11}};
+               ColGroupDDC ddc = (ColGroupDDC) compressForTest(src, 
AColGroup.CompressionType.DDC);
+               ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW();
+
+               int lastRow = src.length - 1;
+               double expected = ddc.getIdx(lastRow, 1);
+               assertEquals(expected, ddclzw.getIdx(lastRow, 1), 0.0001);
+       }
+
+       @Test
+       public void testGetIdxAllElements() {
+               double[][] src = new double[][] {{10, 11, 12}, {20, 21, 22}, 
{30, 31, 32}, {20, 21, 22}, {10, 11, 12},
+                       {30, 31, 32}, {20, 21, 22}};
+               ColGroupDDC ddc = (ColGroupDDC) compressForTest(src, 
AColGroup.CompressionType.DDC);
+               ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW();
+
+               for(int row = 0; row < src.length; row++) {
+                       for(int col = 0; col < 2; col++) {
+                               double expected = ddc.getIdx(row, col);
+                               double actual = ddclzw.getIdx(row, col);
+                               assertEquals("Mismatch at [" + row + "," + col 
+ "]", expected, actual, 0.0001);
+                       }
+               }
+       }
+
+       @Test(expected = DMLRuntimeException.class)
+       public void testGetIdxRowOutOfBoundsNegative() {
+               double[][] src = new double[][] {{10}, {20}, {30}};
+               ColGroupDDC ddc = (ColGroupDDC) compressForTest(src, 
AColGroup.CompressionType.DDC);
+               ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW();
+
+               ddclzw.getIdx(-1, 0);
+       }
+
+       @Test(expected = DMLRuntimeException.class)
+       public void testGetIdxRowOutOfBounds() {
+               double[][] src = new double[][] {{10}, {20}, {30}};
+               ColGroupDDC ddc = (ColGroupDDC) compressForTest(src, 
AColGroup.CompressionType.DDC);
+               ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW();
+
+               ddclzw.getIdx(10, 0);
+       }
+
+       @Test(expected = DMLRuntimeException.class)
+       public void testGetIdxColOutOfBoundsNegative() {
+               double[][] src = new double[][] {{10, 11, 12}, {20, 21, 22}, 
{30, 31, 32}};
+               ColGroupDDC ddc = (ColGroupDDC) compressForTest(src, 
AColGroup.CompressionType.DDC);
+               ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW();
+
+               ddclzw.getIdx(0, -1);
+       }
+
+       @Test(expected = DMLRuntimeException.class)
+       public void testGetIdxColOutOfBounds() {
+               double[][] src = new double[][] {{10, 11, 12}, {20, 21, 22}, 
{30, 31, 32}};
+               ColGroupDDC ddc = (ColGroupDDC) compressForTest(src, 
AColGroup.CompressionType.DDC);
+               ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW();
+
+               ddclzw.getIdx(0, 10);
+       }
+
+       @Test
+       public void testCreateWithNullDictionary() {
+               IColIndex colIndexes = ColIndexFactory.create(1);
+               int[] src = new int[] {0, 1, 2};
+               AMapToData data = MapToFactory.create(3, 3);
+               for(int i = 0; i < 3; i++) {
+                       data.set(i, src[i]);
+               }
+
+               AColGroup result = ColGroupDDCLZW.create(colIndexes, null, 
data, null);
+               assertTrue("Should create ColGroupEmpty", result instanceof 
ColGroupEmpty);
+       }
+
+       @Test
+       public void testCreateWithSingleUnique() {
+               IColIndex colIndexes = ColIndexFactory.create(1);
+               double[] dictValues = new double[] {42.0};
+               Dictionary dict = Dictionary.create(dictValues);
+
+               int[] src = new int[] {0, 0, 0, 0};
+               AMapToData data = MapToFactory.create(4, 1);
+               for(int i = 0; i < 4; i++) {
+                       data.set(i, 0);
+               }
+
+               AColGroup result = ColGroupDDCLZW.create(colIndexes, dict, 
data, null);
+               assertTrue("Should create ColGroupConst", result instanceof 
ColGroupConst);
+       }
+
+       @Test
+       public void testCreateValidDDCLZW() {
+               double[][] src = new double[][] {{10}, {20}, {10}, {20}, {30}};
+               ColGroupDDC ddc = (ColGroupDDC) compressForTest(src, 
AColGroup.CompressionType.DDC);
+
+               AColGroup result = ddc.convertToDDCLZW();
+               assertTrue("Should create ColGroupDDCLZW", result instanceof 
ColGroupDDCLZW);
+       }
+
+       @Test
+       public void testAlternatingNumbers() {
+               double[][] src = new double[30][3];
+               for(int i = 0; i < 30; i++) {
+                       int v = i % 2;
+                       src[i][0] = (v + 1) * 10;
+                       src[i][1] = (v + 1) * 10 + 1;
+                       src[i][2] = (v + 1) * 10 + 2;
+               }
+
+               ColGroupDDC ddc = (ColGroupDDC) compressForTest(src, 
AColGroup.CompressionType.DDC);
+               assertLosslessCompression(ddc);
+       }
+
+       @Test
+       public void testDistributedPattern() {
+               assertLosslessCompression(
+                       
ColGroupDDCLZWTestUtils.createDDC(ColGroupDDCLZWTestUtils.genPatternDistributed(1000,
 100), 100, 3));
+       }
+
+       @Test
+       public void testSameIndexStructure() {
+               double[][] src = new double[][] {{10}, {20}, {10}, {20}};
+               ColGroupDDC ddc = (ColGroupDDC) compressForTest(src, 
AColGroup.CompressionType.DDC);
+               ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW();
+
+               assertTrue("Same object should have same structure", 
ddclzw.sameIndexStructure(ddclzw));
+       }
+
+       @Test
+       public void testSameIndexStructureDifferent() {
+               double[][] src = new double[][] {{10}, {20}, {10}, {20}};
+
+               ColGroupDDC ddc1 = (ColGroupDDC) compressForTest(src, 
AColGroup.CompressionType.DDC);
+               ColGroupDDC ddc2 = (ColGroupDDC) compressForTest(src, 
AColGroup.CompressionType.DDC);
+
+               ColGroupDDCLZW ddclzw1 = (ColGroupDDCLZW) 
ddc1.convertToDDCLZW();
+               ColGroupDDCLZW ddclzw2 = (ColGroupDDCLZW) 
ddc2.convertToDDCLZW();
+
+               // Different objects have different _dataLZW arrays
+               assertFalse("Different objects should have different 
structure", ddclzw1.sameIndexStructure(ddclzw2));
+       }
+
+       @Test
+       public void testSameIndexStructureDDCLZW() {
+               double[][] src = new double[][] {{10}, {20}, {30}, {10}, {20}};
+               ColGroupDDC ddc = (ColGroupDDC) compressForTest(src, 
AColGroup.CompressionType.DDC);
+               ColGroupDDCLZW ddclzw = (ColGroupDDCLZW) ddc.convertToDDCLZW();
+
+               assertFalse("Different types should not have same structure", 
ddclzw.sameIndexStructure(ddc));
+       }
+
+       @Test
+       public void testAscending() {
+               assertLosslessCompression(
+                       
ColGroupDDCLZWTestUtils.createDDC(ColGroupDDCLZWTestUtils.genPatternDistributed(
 // [0, 1, 2, ..., 1000]
+                               1000, 1000), 1000, 10));
+       }
+
+       @Test
+       public void testNoRepetition() {
+               double[][] data = new double[20][1];
+               for(int i = 0; i < 20; i++) {
+                       data[i][0] = i;
+               }
+
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+               assertNotNull(cg);
+               assertTrue(cg instanceof ColGroupDDCLZW);
+
+               assertLosslessCompressionNoRepetition((ColGroupDDCLZW) cg, 20);
+       }
+
+       public void testDecompressToDenseBlock(double[][] data, boolean 
isTransposed) {
+               if(isTransposed) {
+                       throw new NotImplementedException("ColGroup LZW coding 
for transposed matrices not yet implemented");
+               }
+
+               MatrixBlock mbt = DataConverter.convertToMatrixBlock(data);
+
+               final int numCols = mbt.getNumColumns();
+               final int numRows = mbt.getNumRows();
+               IColIndex colIndexes = ColIndexFactory.create(numCols);
+
+               try {
+                       CompressionSettingsBuilder csb = new 
CompressionSettingsBuilder().setSamplingRatio(1.0)
+                               
.setValidCompressions(EnumSet.of(AColGroup.CompressionType.DDCLZW)).setTransposeInput("false");
+                       CompressionSettings cs = csb.create();
+
+                       final CompressedSizeInfoColGroup cgi = new 
ComEstExact(mbt, cs).getColGroupInfo(colIndexes);
+                       CompressedSizeInfo csi = new CompressedSizeInfo(cgi);
+                       AColGroup cg = ColGroupFactory.compressColGroups(mbt, 
csi, cs, 1).get(0);
+
+                       MatrixBlock ret = new MatrixBlock(numRows, numCols, 
false);
+                       ret.allocateDenseBlock();
+                       cg.decompressToDenseBlock(ret.getDenseBlock(), 0, 
numRows);
+
+                       MatrixBlock expected = 
DataConverter.convertToMatrixBlock(data);
+                       assertArrayEquals(expected.getDenseBlockValues(), 
ret.getDenseBlockValues(), 0.01);
+
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       throw new DMLRuntimeException("Failed construction : " 
+ this.getClass().getSimpleName(), e);
+               }
+       }
+
+       @Test
+       public void testDecompressToDenseBlockSingleColumn() {
+               testDecompressToDenseBlock(new double[][] {{1, 2, 3, 4, 5}}, 
false);
+       }
+
+       @Test(expected = NotImplementedException.class)
+       public void testDecompressToDenseBlockSingleColumnTransposed() {
+               testDecompressToDenseBlock(new double[][] {{1}, {2}, {3}, {4}, 
{5}}, true);
+       }
+
+       @Test
+       public void testDecompressToDenseBlockTwoColumns() {
+               testDecompressToDenseBlock(new double[][] {{1, 2}, {2, 3}, {3, 
4}, {4, 5}, {5, 6}}, false);
+       }
+
+       @Test(expected = NotImplementedException.class)
+       public void testDecompressToDenseBlockTwoColumnsTransposed() {
+               testDecompressToDenseBlock(new double[][] {{1, 2, 3, 4, 5}, {1, 
1, 1, 1, 1}}, true);
+       }
+
+       public void testDecompressToDenseBlockPartialRange(double[][] data, 
boolean isTransposed, int rl, int ru) {
+               if(isTransposed) {
+                       throw new NotImplementedException("ColGroup enLZWcoding 
for transposed matrices not yet implemented");
+               }
+
+               MatrixBlock mbt = DataConverter.convertToMatrixBlock(data);
+
+               final int numCols = mbt.getNumColumns();
+               final int numRows = mbt.getNumRows();
+               IColIndex colIndexes = ColIndexFactory.create(numCols);
+
+               try {
+                       CompressionSettingsBuilder csb = new 
CompressionSettingsBuilder().setSamplingRatio(1.0)
+                               
.setValidCompressions(EnumSet.of(AColGroup.CompressionType.DDCLZW)).setTransposeInput("false");
+                       CompressionSettings cs = csb.create();
+
+                       final CompressedSizeInfoColGroup cgi = new 
ComEstExact(mbt, cs).getColGroupInfo(colIndexes);
+                       CompressedSizeInfo csi = new CompressedSizeInfo(cgi);
+                       AColGroup cg = ColGroupFactory.compressColGroups(mbt, 
csi, cs, 1).get(0);
+
+                       assertTrue("Column group should be DDCLZW, not Const", 
cg instanceof ColGroupDDCLZW);
+
+                       MatrixBlock ret = new MatrixBlock(numRows, numCols, 
false);
+                       ret.allocateDenseBlock();
+                       cg.decompressToDenseBlock(ret.getDenseBlock(), rl, ru);
+
+                       MatrixBlock expected = 
DataConverter.convertToMatrixBlock(data);
+                       for(int i = rl; i < ru; i++) {
+                               for(int j = 0; j < numCols; j++) {
+                                       double expectedValue = expected.get(i, 
j);
+                                       double actualValue = ret.get(i, j);
+                                       assertArrayEquals(new double[] 
{expectedValue}, new double[] {actualValue}, 0.01);
+                               }
+                       }
+
+               }
+               catch(NotImplementedException e) {
+                       throw e;
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       throw new DMLRuntimeException("Failed partial range 
decompression : " + this.getClass().getSimpleName(), e);
+               }
+       }
+
+       @Test
+       public void testDecompressToDenseBlockPartialRangeSingleColumn() {
+               testDecompressToDenseBlockPartialRange(new double[][] {{1}, 
{2}, {3}, {4}, {5}}, false, 2, 5);
+       }
+
+       @Test
+       public void testDecompressToDenseBlockPartialRangeTwoColumns() {
+               testDecompressToDenseBlockPartialRange(new double[][] {{1, 2}, 
{2, 3}, {3, 4}, {4, 5}, {5, 6}}, false, 1, 4);
+       }
+
+       @Test
+       public void testDecompressToDenseBlockPartialRangeFromMiddle() {
+               testDecompressToDenseBlockPartialRange(new double[][] {{1, 2}, 
{2, 3}, {3, 4}, {4, 5}, {5, 6}, {6, 7}}, false,
+                       3, 6);
+       }
+
+       @Test
+       public void testSerializationSingleColumn() throws IOException {
+               double[][] data = {{1}, {2}, {3}, {4}, {5}};
+               MatrixBlock mbt = DataConverter.convertToMatrixBlock(data);
+               final int numCols = mbt.getNumColumns();
+               final int numRows = mbt.getNumRows();
+               IColIndex colIndexes = ColIndexFactory.create(numCols);
+
+               CompressionSettingsBuilder csb = new 
CompressionSettingsBuilder().setSamplingRatio(1.0)
+                       
.setValidCompressions(EnumSet.of(AColGroup.CompressionType.DDCLZW)).setTransposeInput("false");
+               CompressionSettings cs = csb.create();
+
+               final CompressedSizeInfoColGroup cgi = new ComEstExact(mbt, 
cs).getColGroupInfo(colIndexes);
+               CompressedSizeInfo csi = new CompressedSizeInfo(cgi);
+               AColGroup cg = ColGroupFactory.compressColGroups(mbt, csi, cs, 
1).get(0);
+
+               assertTrue("Original should be ColGroupDDCLZW", cg instanceof 
ColGroupDDCLZW);
+
+               ByteArrayOutputStream bos = new ByteArrayOutputStream();
+               DataOutputStream dos = new DataOutputStream(bos);
+               ColGroupIO.writeGroups(dos, Collections.singletonList(cg));
+               assertEquals(cg.getExactSizeOnDisk() + 4, bos.size());
+
+               ByteArrayInputStream bis = new 
ByteArrayInputStream(bos.toByteArray());
+               DataInputStream dis = new DataInputStream(bis);
+               AColGroup deserialized = ColGroupIO.readGroups(dis, 
numRows).get(0);
+
+               assertTrue("Deserialized should be ColGroupDDCLZW", 
deserialized instanceof ColGroupDDCLZW);
+               assertEquals("Compression type should match", cg.getCompType(), 
deserialized.getCompType());
+               assertEquals("Exact size on disk should match", 
cg.getExactSizeOnDisk(), deserialized.getExactSizeOnDisk());
+
+               MatrixBlock originalDecompressed = new MatrixBlock(numRows, 
numCols, false);
+               originalDecompressed.allocateDenseBlock();
+               cg.decompressToDenseBlock(originalDecompressed.getDenseBlock(), 
0, numRows);
+
+               MatrixBlock deserializedDecompressed = new MatrixBlock(numRows, 
numCols, false);
+               deserializedDecompressed.allocateDenseBlock();
+               
deserialized.decompressToDenseBlock(deserializedDecompressed.getDenseBlock(), 
0, numRows);
+
+               for(int i = 0; i < numRows; i++) {
+                       for(int j = 0; j < numCols; j++) {
+                               assertArrayEquals(new double[] 
{originalDecompressed.get(i, j)},
+                                       new double[] 
{deserializedDecompressed.get(i, j)}, 0.01);
+                       }
+               }
+       }
+
+       @Test
+       public void testSerializationTwoColumns() throws IOException {
+               double[][] data = {{1, 2}, {2, 3}, {3, 4}, {4, 5}, {5, 6}};
+
+               MatrixBlock mb = DataConverter.convertToMatrixBlock(data);
+               final int numCols = mb.getNumColumns();
+               final int numRows = mb.getNumRows();
+               IColIndex colIndexes = ColIndexFactory.create(data[0].length);
+
+               CompressionSettingsBuilder csb = new 
CompressionSettingsBuilder().setSamplingRatio(1.0)
+                       
.setValidCompressions(EnumSet.of(AColGroup.CompressionType.DDCLZW)).setTransposeInput("false");
+               CompressionSettings cs = csb.create();
+
+               final CompressedSizeInfoColGroup cgi = new ComEstExact(mb, 
cs).getColGroupInfo(colIndexes);
+               CompressedSizeInfo csi = new CompressedSizeInfo(cgi);
+               AColGroup original = ColGroupFactory.compressColGroups(mb, csi, 
cs, 1).get(0);
+
+               assertTrue("Original should be ColgroupDDCLZW", original 
instanceof ColGroupDDCLZW);
+
+               ByteArrayOutputStream bos = new ByteArrayOutputStream();
+               DataOutputStream dos = new DataOutputStream(bos);
+               ColGroupIO.writeGroups(dos, 
Collections.singletonList(original));
+               assertEquals(original.getExactSizeOnDisk() + 4, bos.size());
+
+               ByteArrayInputStream bis = new 
ByteArrayInputStream(bos.toByteArray());
+               DataInputStream dis = new DataInputStream(bis);
+               AColGroup deserialized = ColGroupIO.readGroups(dis, 
numRows).get(0);
+
+               assertTrue("Deserialized should be ColGroupDDCLZW", 
deserialized instanceof ColGroupDDCLZW);
+               assertEquals("Compression type should match", 
original.getCompType(), deserialized.getCompType());
+               assertEquals("Exact size on disk should match", 
original.getExactSizeOnDisk(),
+                       deserialized.getExactSizeOnDisk());
+
+               MatrixBlock originalDecompressed = new MatrixBlock(numRows, 
numCols, false);
+               originalDecompressed.allocateDenseBlock();
+               
original.decompressToDenseBlock(originalDecompressed.getDenseBlock(), 0, 
numRows);
+
+               MatrixBlock deserializedDecompressed = new MatrixBlock(numRows, 
numCols, false);
+               deserializedDecompressed.allocateDenseBlock();
+               
deserialized.decompressToDenseBlock(deserializedDecompressed.getDenseBlock(), 
0, numRows);
+
+               for(int i = 0; i < numRows; i++) {
+                       for(int j = 0; j < numCols; j++) {
+                               assertArrayEquals(new double[] 
{originalDecompressed.get(i, j)},
+                                       new double[] 
{deserializedDecompressed.get(i, j)}, 0.01);
+                       }
+               }
+       }
+
+       @Test
+       public void testScalarEquals() {
+               AColGroup cg = 
ColGroupDDCLZWTestUtils.createDDC(ColGroupDDCLZWTestUtils.genPatternRandom(1000,
 50, 605), 50,
+                       25);
+               assertLosslessCompression((ColGroupDDC) cg);
+
+               ScalarOperator op = new 
RightScalarOperator(Equals.getEqualsFnObject(), 0.0);
+               AColGroup res = cg.scalarOperation(op);
+
+               MatrixBlock ret = new MatrixBlock(5, 1, false);
+               ret.allocateDenseBlock();
+               res.decompressToDenseBlock(ret.getDenseBlock(), 0, 5);
+       }
+
+       @Test
+       public void testScalarGreaterThan() {
+               double[][] data = {{0}, {1}, {2}, {3}, {0}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+               assertTrue(cg instanceof ColGroupDDCLZW);
+
+               ScalarOperator op = new 
RightScalarOperator(GreaterThan.getGreaterThanFnObject(), 1.5);
+               AColGroup res = cg.scalarOperation(op);
+
+               MatrixBlock ret = new MatrixBlock(5, 1, false);
+               ret.allocateDenseBlock();
+               res.decompressToDenseBlock(ret.getDenseBlock(), 0, 5);
+
+               assertEquals(0.0, ret.get(0, 0), 0.0);
+               assertEquals(0.0, ret.get(1, 0), 0.0);
+               assertEquals(1.0, ret.get(2, 0), 0.0);
+               assertEquals(1.0, ret.get(3, 0), 0.0);
+               assertEquals(0.0, ret.get(4, 0), 0.0);
+       }
+
+       @Test
+       public void testScalarPlus() {
+               double[][] data = {{1}, {2}, {3}, {4}, {5}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+               assertTrue(cg instanceof ColGroupDDCLZW);
+
+               ScalarOperator op = new 
RightScalarOperator(Plus.getPlusFnObject(), 10.0);
+               AColGroup res = cg.scalarOperation(op);
+               assertTrue("Should remain LZW after shift", res instanceof 
ColGroupDDCLZW);
+
+               MatrixBlock ret = new MatrixBlock(5, 1, false);
+               ret.allocateDenseBlock();
+               res.decompressToDenseBlock(ret.getDenseBlock(), 0, 5);
+
+               assertEquals(11.0, ret.get(0, 0), 0.0);
+               assertEquals(12.0, ret.get(1, 0), 0.0);
+               assertEquals(13.0, ret.get(2, 0), 0.0);
+               assertEquals(14.0, ret.get(3, 0), 0.0);
+               assertEquals(15.0, ret.get(4, 0), 0.0);
+       }
+
+       @Test
+       public void testScalarMinus() {
+               double[][] data = {{11}, {12}, {13}, {14}, {15}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+               assertTrue(cg instanceof ColGroupDDCLZW);
+
+               ScalarOperator op = new 
RightScalarOperator(Minus.getMinusFnObject(), 10.0);
+               AColGroup res = cg.scalarOperation(op);
+               assertTrue("Should remain LZW after shift", res instanceof 
ColGroupDDCLZW);
+
+               MatrixBlock ret = new MatrixBlock(5, 1, false);
+               ret.allocateDenseBlock();
+               res.decompressToDenseBlock(ret.getDenseBlock(), 0, 5);
+
+               assertEquals(1.0, ret.get(0, 0), 0.0);
+               assertEquals(2.0, ret.get(1, 0), 0.0);
+               assertEquals(3.0, ret.get(2, 0), 0.0);
+               assertEquals(4.0, ret.get(3, 0), 0.0);
+               assertEquals(5.0, ret.get(4, 0), 0.0);
+       }
+
+       @Test
+       public void testUnaryOperationSqrt() {
+               double[][] data = {{1}, {4}, {9}, {16}, {25}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+               assertTrue(cg instanceof ColGroupDDCLZW);
+
+               UnaryOperator op = new 
UnaryOperator(Builtin.getBuiltinFnObject(Builtin.BuiltinCode.SQRT));
+               AColGroup res = cg.unaryOperation(op);
+
+               MatrixBlock ret = new MatrixBlock(5, 1, false);
+               ret.allocateDenseBlock();
+               res.decompressToDenseBlock(ret.getDenseBlock(), 0, 5);
+
+               assertEquals(1.0, ret.get(0, 0), 0.01);
+               assertEquals(2.0, ret.get(1, 0), 0.01);
+               assertEquals(3.0, ret.get(2, 0), 0.01);
+               assertEquals(4.0, ret.get(3, 0), 0.01);
+               assertEquals(5.0, ret.get(4, 0), 0.01);
+       }
+
+       @Test
+       public void testScalarEqualsMultiColumn() {
+               AColGroup cg = 
ColGroupDDCLZWTestUtils.createDDC(ColGroupDDCLZWTestUtils.genPatternRandom(1000,
 50, 702), 50,
+                       25);
+               assertLosslessCompression((ColGroupDDC) cg);
+
+               ScalarOperator op = new 
RightScalarOperator(Equals.getEqualsFnObject(), 0.0);
+               AColGroup res = cg.scalarOperation(op);
+
+               MatrixBlock ret = new MatrixBlock(5, 2, false);
+               ret.allocateDenseBlock();
+               res.decompressToDenseBlock(ret.getDenseBlock(), 0, 5);
+       }
+
+       @Test
+       public void testScalarMultiply() {
+               double[][] data = {{1}, {2}, {3}, {4}, {5}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+               assertTrue(cg instanceof ColGroupDDCLZW);
+
+               ScalarOperator op = new 
RightScalarOperator(Multiply.getMultiplyFnObject(), 2.0);
+               AColGroup res = cg.scalarOperation(op);
+
+               MatrixBlock ret = new MatrixBlock(5, 1, false);
+               ret.allocateDenseBlock();
+               res.decompressToDenseBlock(ret.getDenseBlock(), 0, 5);
+
+               assertEquals(2.0, ret.get(0, 0), 0.0);
+               assertEquals(4.0, ret.get(1, 0), 0.0);
+               assertEquals(6.0, ret.get(2, 0), 0.0);
+               assertEquals(8.0, ret.get(3, 0), 0.0);
+               assertEquals(10.0, ret.get(4, 0), 0.0);
+       }
+
+       @Test
+       public void testScalarDivide() {
+               double[][] data = {{2}, {4}, {6}, {8}, {10}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+               assertTrue(cg instanceof ColGroupDDCLZW);
+
+               ScalarOperator op = new 
RightScalarOperator(Divide.getDivideFnObject(), 2.0);
+               AColGroup res = cg.scalarOperation(op);
+
+               MatrixBlock ret = new MatrixBlock(5, 1, false);
+               ret.allocateDenseBlock();
+               res.decompressToDenseBlock(ret.getDenseBlock(), 0, 5);
+
+               assertEquals(1.0, ret.get(0, 0), 0.0);
+               assertEquals(2.0, ret.get(1, 0), 0.0);
+               assertEquals(3.0, ret.get(2, 0), 0.0);
+               assertEquals(4.0, ret.get(3, 0), 0.0);
+               assertEquals(5.0, ret.get(4, 0), 0.0);
+       }
+
+       @Test
+       public void testSliceRowsSingleRow() {
+               double[][] data = {{0}, {1}, {2}, {1}, {0}, {2}, {1}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+               assertTrue(cg instanceof ColGroupDDCLZW);
+
+               AColGroup sliced = cg.sliceRows(3, 4);
+               assertTrue(sliced instanceof ColGroupDDCLZW);
+
+               MatrixBlock ret = new MatrixBlock(1, 1, false);
+               ret.allocateDenseBlock();
+               sliced.decompressToDenseBlock(ret.getDenseBlock(), 0, 1);
+
+               assertEquals(1.0, ret.get(0, 0), 0.0);
+       }
+
+       @Test
+       public void testSliceRowsMiddleRange() {
+               double[][] data = {{0}, {1}, {2}, {0}, {1}, {2}, {0}, {1}, {2}, 
{0}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+               assertTrue(cg instanceof ColGroupDDCLZW);
+
+               AColGroup sliced = cg.sliceRows(2, 7);
+               assertTrue(sliced instanceof ColGroupDDCLZW);
+
+               MatrixBlock ret = new MatrixBlock(5, 1, false);
+               ret.allocateDenseBlock();
+               sliced.decompressToDenseBlock(ret.getDenseBlock(), 0, 5);
+
+               for(int i = 0; i < 5; i++) {
+                       assertEquals(data[2 + i][0], ret.get(i, 0), 0.0);
+               }
+       }
+
+       @Test
+       public void testSliceRowsEntireRange() {
+               double[][] data = {{0}, {1}, {0}, {1}, {2}};
+
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+               assertTrue(cg instanceof ColGroupDDCLZW);
+
+               AColGroup sliced = cg.sliceRows(0, data.length);
+               assertTrue(sliced instanceof ColGroupDDCLZW);
+
+               MatrixBlock ret = new MatrixBlock(data.length, 1, false);
+               ret.allocateDenseBlock();
+               sliced.decompressToDenseBlock(ret.getDenseBlock(), 0, 
data.length);
+
+               for(int i = 0; i < data.length; i++) {
+                       assertEquals(data[i][0], ret.get(i, 0), 0.0);
+               }
+       }
+
+       @Test
+       public void testSliceRowsBeginning() {
+               double[][] data = {{0}, {1}, {2}, {1}, {0}, {2}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+               assertTrue(cg instanceof ColGroupDDCLZW);
+
+               AColGroup sliced = cg.sliceRows(0, 3);
+               assertTrue(sliced instanceof ColGroupDDCLZW);
+
+               MatrixBlock ret = new MatrixBlock(3, 1, false);
+               ret.allocateDenseBlock();
+               sliced.decompressToDenseBlock(ret.getDenseBlock(), 0, 3);
+
+               assertEquals(0.0, ret.get(0, 0), 0.0);
+               assertEquals(1.0, ret.get(1, 0), 0.0);
+               assertEquals(2.0, ret.get(2, 0), 0.0);
+       }
+
+       @Test
+       public void testSliceRowsEnd() {
+               double[][] data = {{0}, {1}, {2}, {1}, {0}, {2}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+               assertTrue(cg instanceof ColGroupDDCLZW);
+
+               AColGroup sliced = cg.sliceRows(3, 6);
+               assertTrue(sliced instanceof ColGroupDDCLZW);
+
+               MatrixBlock ret = new MatrixBlock(3, 1, false);
+               ret.allocateDenseBlock();
+               sliced.decompressToDenseBlock(ret.getDenseBlock(), 0, 3);
+
+               assertEquals(1.0, ret.get(0, 0), 0.0);
+               assertEquals(0.0, ret.get(1, 0), 0.0);
+               assertEquals(2.0, ret.get(2, 0), 0.0);
+       }
+
+       @Test
+       public void testSliceRowsWithLongRuns() {
+               double[][] data = new double[30][1];
+               Arrays.fill(data, 0, 10, new double[] {0});
+               Arrays.fill(data, 10, 20, new double[] {1});
+               Arrays.fill(data, 20, 30, new double[] {2});
+
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+               assertTrue(cg instanceof ColGroupDDCLZW);
+
+               AColGroup sliced = cg.sliceRows(5, 25);
+               assertTrue(sliced instanceof ColGroupDDCLZW);
+
+               MatrixBlock ret = new MatrixBlock(20, 1, false);
+               ret.allocateDenseBlock();
+               sliced.decompressToDenseBlock(ret.getDenseBlock(), 0, 20);
+       }
+
+       @Test
+       public void testSliceRows() {
+               double[][] data = {{1, 2}, {3, 4}, {5, 6}, {7, 8}, {9, 10}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+
+               AColGroup sliced = cg.sliceRows(1, 4);
+               assertTrue(sliced instanceof ColGroupDDCLZW);
+
+               MatrixBlock ret = new MatrixBlock(3, 2, false);
+               ret.allocateDenseBlock();
+               sliced.decompressToDenseBlock(ret.getDenseBlock(), 0, 3);
+
+               assertEquals(3.0, ret.get(0, 0), 0.0);
+               assertEquals(4.0, ret.get(0, 1), 0.0);
+               assertEquals(5.0, ret.get(1, 0), 0.0);
+               assertEquals(6.0, ret.get(1, 1), 0.0);
+               assertEquals(7.0, ret.get(2, 0), 0.0);
+               assertEquals(8.0, ret.get(2, 1), 0.0);
+       }
+
+       @Test
+       public void testSliceRowsWithMatchingDictionaryEntry() {
+               double[][] data = {{1, 2}, {3, 4}, {1, 2}, {5, 6}, {7, 8}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+
+               AColGroup sliced = cg.sliceRows(2, 5);
+               assertTrue(sliced instanceof ColGroupDDCLZW);
+
+               MatrixBlock ret = new MatrixBlock(3, 2, false);
+               ret.allocateDenseBlock();
+               sliced.decompressToDenseBlock(ret.getDenseBlock(), 0, 3);
+
+               assertEquals(1.0, ret.get(0, 0), 0.0);
+               assertEquals(2.0, ret.get(0, 1), 0.0);
+               assertEquals(5.0, ret.get(1, 0), 0.0);
+               assertEquals(6.0, ret.get(1, 1), 0.0);
+               assertEquals(7.0, ret.get(2, 0), 0.0);
+               assertEquals(8.0, ret.get(2, 1), 0.0);
+       }
+
+       @Test
+       public void testSliceRowsWithNoMatchingDictionaryEntry() {
+               double[][] data = {{1, 2}, {3, 4}, {5, 6}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+
+               AColGroup sliced = cg.sliceRows(1, 3);
+               assertTrue(sliced instanceof ColGroupDDCLZW);
+
+               MatrixBlock ret = new MatrixBlock(2, 2, false);
+               ret.allocateDenseBlock();
+               sliced.decompressToDenseBlock(ret.getDenseBlock(), 0, 2);
+
+               assertEquals(3.0, ret.get(0, 0), 0.0);
+               assertEquals(4.0, ret.get(0, 1), 0.0);
+               assertEquals(5.0, ret.get(1, 0), 0.0);
+               assertEquals(6.0, ret.get(1, 1), 0.0);
+       }
+
+       @Test
+       public void testSliceRowsFromMiddleRow() {
+               double[][] data = {{1, 2}, {3, 4}, {5, 6}, {7, 8}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+
+               AColGroup sliced = cg.sliceRows(2, 4);
+               assertTrue(sliced instanceof ColGroupDDCLZW);
+
+               MatrixBlock ret = new MatrixBlock(2, 2, false);
+               ret.allocateDenseBlock();
+               sliced.decompressToDenseBlock(ret.getDenseBlock(), 0, 2);
+
+               assertEquals(5.0, ret.get(0, 0), 0.0);
+               assertEquals(6.0, ret.get(0, 1), 0.0);
+               assertEquals(7.0, ret.get(1, 0), 0.0);
+               assertEquals(8.0, ret.get(1, 1), 0.0);
+       }
+
+       @Test
+       public void testDecompressToSparseBlock() {
+               double[][] data = {{1, 2}, {3, 4}, {5, 6}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+
+               MatrixBlock ret = new MatrixBlock(3, 2, true);
+               ret.allocateSparseRowsBlock();
+               cg.decompressToSparseBlock(ret.getSparseBlock(), 0, 3);
+
+               assertEquals(1.0, ret.get(0, 0), 0.0);
+               assertEquals(2.0, ret.get(0, 1), 0.0);
+               assertEquals(3.0, ret.get(1, 0), 0.0);
+               assertEquals(4.0, ret.get(1, 1), 0.0);
+               assertEquals(5.0, ret.get(2, 0), 0.0);
+               assertEquals(6.0, ret.get(2, 1), 0.0);
+       }
+
+       @Test
+       public void testDecompressToSparseBlockWithRlGreaterThanZero() {
+               double[][] data = {{1, 2}, {3, 4}, {5, 6}, {7, 8}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+
+               MatrixBlock ret = new MatrixBlock(4, 2, true);
+               ret.allocateSparseRowsBlock();
+               cg.decompressToSparseBlock(ret.getSparseBlock(), 2, 4, 0, 0);
+
+               assertEquals(5.0, ret.get(2, 0), 0.0);
+               assertEquals(6.0, ret.get(2, 1), 0.0);
+               assertEquals(7.0, ret.get(3, 0), 0.0);
+               assertEquals(8.0, ret.get(3, 1), 0.0);
+       }
+
+       @Test
+       public void testDecompressToSparseBlockWithOffset() {
+               double[][] data = {{1, 2}, {3, 4}, {5, 6}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+
+               MatrixBlock ret = new MatrixBlock(5, 4, true);
+               ret.allocateSparseRowsBlock();
+               cg.decompressToSparseBlock(ret.getSparseBlock(), 0, 3, 1, 1);
+
+               assertEquals(1.0, ret.get(1, 1), 0.0);
+               assertEquals(2.0, ret.get(1, 2), 0.0);
+               assertEquals(3.0, ret.get(2, 1), 0.0);
+               assertEquals(4.0, ret.get(2, 2), 0.0);
+               assertEquals(5.0, ret.get(3, 1), 0.0);
+               assertEquals(6.0, ret.get(3, 2), 0.0);
+       }
+
+       @Test
+       public void testGetNumberNonZeros() {
+               double[][] data = {{1, 0}, {2, 3}, {0, 4}, {5, 0}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+
+               long nnz = cg.getNumberNonZeros(4);
+               assertEquals(5L, nnz);
+       }
+
+       @Test
+       public void testGetNumberNonZerosAllZeros() {
+               double[][] data = {{0, 0}, {0, 0}, {0, 0}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+
+               long nnz = cg.getNumberNonZeros(3);
+               assertEquals(0L, nnz);
+       }
+
+       @Test
+       public void testGetNumberNonZerosAllNonZeros() {
+               double[][] data = {{1, 2}, {3, 4}, {5, 6}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+
+               long nnz = cg.getNumberNonZeros(3);
+               assertEquals(6L, nnz);
+       }
+
+       @Test
+       public void testDecompressToDenseBlockNonContiguousPath() {
+               double[][] data = {{1, 2}, {3, 4}, {5, 6}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+
+               MatrixBlock ret = new MatrixBlock(3, 5, false);
+               ret.allocateDenseBlock();
+               cg.decompressToDenseBlock(ret.getDenseBlock(), 0, 3, 0, 2);
+
+               assertEquals(1.0, ret.get(0, 2), 0.0);
+               assertEquals(2.0, ret.get(0, 3), 0.0);
+               assertEquals(3.0, ret.get(1, 2), 0.0);
+               assertEquals(4.0, ret.get(1, 3), 0.0);
+               assertEquals(5.0, ret.get(2, 2), 0.0);
+               assertEquals(6.0, ret.get(2, 3), 0.0);
+       }
+
+       @Test
+       public void testDecompressToDenseBlockFirstRowPath() {
+               double[][] data = {{10, 20}, {11, 21}, {12, 22}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+
+               MatrixBlock ret = new MatrixBlock(3, 2, false);
+               ret.allocateDenseBlock();
+               cg.decompressToDenseBlock(ret.getDenseBlock(), 0, 1);
+
+               assertEquals(10.0, ret.get(0, 0), 0.0);
+               assertEquals(20.0, ret.get(0, 1), 0.0);
+       }
+
+       @Test
+       public void testScalarOperationShiftWithExistingMatch() {
+               double[][] data = {{1}, {2}, {3}, {1}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+               assertTrue(cg instanceof ColGroupDDCLZW);
+
+               ScalarOperator op = new 
RightScalarOperator(Plus.getPlusFnObject(), 1.0);
+               AColGroup res = cg.scalarOperation(op);
+               assertTrue("Should remain DeltaDDC after shift", res instanceof 
ColGroupDDCLZW);
+
+               MatrixBlock ret = new MatrixBlock(4, 1, false);
+               ret.allocateDenseBlock();
+               res.decompressToDenseBlock(ret.getDenseBlock(), 0, 4);
+
+               assertEquals(2.0, ret.get(0, 0), 0.0);
+               assertEquals(3.0, ret.get(1, 0), 0.0);
+               assertEquals(4.0, ret.get(2, 0), 0.0);
+               assertEquals(2.0, ret.get(3, 0), 0.0);
+       }
+
+       @Test
+       public void testScalarOperationShiftWithCountsId0EqualsOne() {
+               double[][] data = {{1}, {2}, {3}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+               assertTrue(cg instanceof ColGroupDDCLZW);
+
+               ScalarOperator op = new 
RightScalarOperator(Plus.getPlusFnObject(), 5.0);
+               AColGroup res = cg.scalarOperation(op);
+               assertTrue("Should remain DeltaDDC after shift", res instanceof 
ColGroupDDCLZW);
+
+               MatrixBlock ret = new MatrixBlock(3, 1, false);
+               ret.allocateDenseBlock();
+               res.decompressToDenseBlock(ret.getDenseBlock(), 0, 3);
+
+               assertEquals(6.0, ret.get(0, 0), 0.0);
+               assertEquals(7.0, ret.get(1, 0), 0.0);
+               assertEquals(8.0, ret.get(2, 0), 0.0);
+       }
+
+       @Test
+       public void testScalarOperationShiftWithNoMatch() {
+               double[][] data = {{1}, {2}, {3}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+               assertTrue(cg instanceof ColGroupDDCLZW);
+
+               ScalarOperator op = new 
RightScalarOperator(Plus.getPlusFnObject(), 10.0);
+               AColGroup res = cg.scalarOperation(op);
+               assertTrue("Should remain DeltaDDC after shift", res instanceof 
ColGroupDDCLZW);
+
+               MatrixBlock ret = new MatrixBlock(3, 1, false);
+               ret.allocateDenseBlock();
+               res.decompressToDenseBlock(ret.getDenseBlock(), 0, 3);
+
+               assertEquals(11.0, ret.get(0, 0), 0.0);
+               assertEquals(12.0, ret.get(1, 0), 0.0);
+               assertEquals(13.0, ret.get(2, 0), 0.0);
+       }
+
+       @Test
+       public void testUnaryOperationTriggersConvertToDDC() {
+               double[][] data = {{1, 2}, {3, 4}, {5, 6}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+               assertTrue(cg instanceof ColGroupDDCLZW);
+
+               UnaryOperator op = new 
UnaryOperator(Builtin.getBuiltinFnObject(Builtin.BuiltinCode.ABS));
+               AColGroup res = cg.unaryOperation(op);
+
+               MatrixBlock ret = new MatrixBlock(3, 2, false);
+               ret.allocateDenseBlock();
+               res.decompressToDenseBlock(ret.getDenseBlock(), 0, 3);
+
+               assertEquals(1.0, ret.get(0, 0), 0.01);
+               assertEquals(2.0, ret.get(0, 1), 0.01);
+               assertEquals(3.0, ret.get(1, 0), 0.01);
+               assertEquals(4.0, ret.get(1, 1), 0.01);
+               assertEquals(5.0, ret.get(2, 0), 0.01);
+               assertEquals(6.0, ret.get(2, 1), 0.01);
+       }
+
+       @Test
+       public void testUnaryOperationWithConstantResultSingleColumn() {
+               double[][] data = {{5}, {5}, {5}, {5}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+               // assertTrue(cg instanceof ColGroupDDCLZW); // Type CONST.
+
+               UnaryOperator op = new 
UnaryOperator(Builtin.getBuiltinFnObject(Builtin.BuiltinCode.ABS));
+               AColGroup res = cg.unaryOperation(op);
+
+               MatrixBlock ret = new MatrixBlock(4, 1, false);
+               ret.allocateDenseBlock();
+               res.decompressToDenseBlock(ret.getDenseBlock(), 0, 4);
+
+               assertEquals(5.0, ret.get(0, 0), 0.01);
+               assertEquals(5.0, ret.get(1, 0), 0.01);
+               assertEquals(5.0, ret.get(2, 0), 0.01);
+               assertEquals(5.0, ret.get(3, 0), 0.01);
+       }
+
+       @Test
+       public void testUnaryOperationWithConstantResultMultiColumn() {
+               double[][] data = {{10, 20}, {10, 20}, {10, 20}};
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+
+               UnaryOperator op = new 
UnaryOperator(Builtin.getBuiltinFnObject(Builtin.BuiltinCode.ABS));
+               AColGroup res = cg.unaryOperation(op);
+
+               MatrixBlock ret = new MatrixBlock(3, 2, false);
+               ret.allocateDenseBlock();
+               res.decompressToDenseBlock(ret.getDenseBlock(), 0, 3);
+
+               assertEquals(10.0, ret.get(0, 0), 0.01);
+               assertEquals(20.0, ret.get(0, 1), 0.01);
+               assertEquals(10.0, ret.get(1, 0), 0.01);
+               assertEquals(20.0, ret.get(1, 1), 0.01);
+               assertEquals(10.0, ret.get(2, 0), 0.01);
+               assertEquals(20.0, ret.get(2, 1), 0.01);
+       }
+
+       private static MatrixBlock decompressToMB(AColGroup g, int rows, int 
cols) {
+               MatrixBlock ret = new MatrixBlock(rows, cols, false);
+               ret.allocateDenseBlock();
+               g.decompressToDenseBlock(ret.getDenseBlock(), 0, rows);
+               return ret;
+       }
+
+       private static void assertMatrixEquals(double[][] expected, MatrixBlock 
actual) {
+               assertEquals(expected.length, actual.getNumRows());
+               assertEquals(expected[0].length, actual.getNumColumns());
+
+               for(int r = 0; r < expected.length; r++) {
+                       for(int c = 0; c < expected[0].length; c++) {
+                               assertEquals("Mismatch at (" + r + "," + c + 
")", expected[r][c], actual.get(r, c), 0.0);
+                       }
+               }
+       }
+
+       @Test
+       public void testConvertToDDCRoundtripEqualsOriginalData() {
+               double[][] data = new double[][] {{1, 2}, {3, 4}, {1, 2}, {5, 
6}, {1, 2}, {3, 4}, {7, 8}, {1, 2}, {5, 6},
+                       {1, 2},};
+
+               AColGroup cg = compressForTest(data, 
AColGroup.CompressionType.DDCLZW);
+               assertTrue("Expected DDCLZW from compression framework but was 
" + cg.getClass().getSimpleName(),
+                       cg instanceof ColGroupDDCLZW);
+
+               AColGroup ddc = ((ColGroupDDCLZW) cg).convertToDDC();
+               assertTrue("Expected ColGroupDDC but was " + 
ddc.getClass().getSimpleName(), ddc instanceof ColGroupDDC);
+
+               MatrixBlock mbLZW = decompressToMB(cg, data.length, 
data[0].length);
+               MatrixBlock mbDDC = decompressToMB(ddc, data.length, 
data[0].length);
+
+               assertMatrixEquals(data, mbLZW);
+               assertMatrixEquals(data, mbDDC);
+       }
+
+       @Test
+       public void 
testDecompressToSparseBlockTransposedSparseDictionaryUsesCorrectSparseValues() 
throws Exception {
+               final IColIndex colIndexes = ColIndexFactory.create(new int[] 
{0, 1, 2});
+               final Dictionary dict = Dictionary.create(new double[] {0, 7, 
9, 0, 11, 13});
+
+               final AMapToData data = MapToFactory.create(2, 2);
+               data.set(0, 0);
+               data.set(1, 1);
+
+               final AColGroup cg = ColGroupDDCLZW.create(colIndexes, dict, 
data, null);
+               assertTrue(cg instanceof ColGroupDDCLZW);
+               final ColGroupDDCLZW lzw = (ColGroupDDCLZW) cg;
+
+               final MatrixBlock dictMb = new MatrixBlock(2, 3, true);
+               dictMb.allocateSparseRowsBlock();
+               dictMb.set(0, 1, 7);
+               dictMb.set(0, 2, 9);
+               dictMb.set(1, 1, 11);
+               dictMb.set(1, 2, 13);
+               final SparseBlock sb = dictMb.getSparseBlock();
+
+               final MatrixBlock out = new MatrixBlock(3, 2, true);
+               out.allocateSparseRowsBlock();
+               final SparseBlockMCSR sbr = (SparseBlockMCSR) 
out.getSparseBlock();
+
+               final Method m = 
ColGroupDDCLZW.class.getDeclaredMethod("decompressToSparseBlockTransposedSparseDictionary",
+                       SparseBlockMCSR.class, SparseBlock.class, int.class);
+               m.setAccessible(true);
+               m.invoke(lzw, sbr, sb, 3);
+
+               assertEquals(0.0, out.get(0, 0), 0.0);
+               assertEquals(7.0, out.get(1, 0), 0.0);
+               assertEquals(9.0, out.get(2, 0), 0.0);
+
+               assertEquals(0.0, out.get(0, 1), 0.0);
+               assertEquals(11.0, out.get(1, 1), 0.0);
+               assertEquals(13.0, out.get(2, 1), 0.0);
+       }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/colgroup/DDCLZW/ColGroupDDCLZWTestUtils.java
 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/DDCLZW/ColGroupDDCLZWTestUtils.java
new file mode 100644
index 0000000000..67f9e143b7
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/DDCLZW/ColGroupDDCLZWTestUtils.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.compress.colgroup.DDCLZW;
+
+import org.apache.sysds.runtime.compress.colgroup.ColGroupDDC;
+import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
+import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
+import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
+import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
+import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
+
+import java.util.Arrays;
+
+/// This class contains static methods to generate mappings and DDCs for 
tests/benchmarks for ColGroupDDCLZW
+public class ColGroupDDCLZWTestUtils {
+       /**
+        * Creates a sample DDC group for unit tests from a given mapping
+        */
+       public static ColGroupDDC createDDC(int[] mapping, int nUnique, int 
nCols) {
+               IColIndex colIndexes = ColIndexFactory.create(nCols);
+
+               double[] dictValues = new double[nUnique * nCols];
+               for(int i = 0; i < nUnique; i++) {
+                       for(int c = 0; c < nCols; c++) {
+                               dictValues[i * nCols + c] = (i + 1) * 10.0 + c;
+                       }
+               }
+               Dictionary dict = Dictionary.create(dictValues);
+
+               AMapToData data = MapToFactory.create(mapping.length, nUnique);
+               for(int i = 0; i < mapping.length; i++) {
+                       data.set(i, mapping[i]);
+               }
+
+               return (ColGroupDDC) ColGroupDDC.create(colIndexes, dict, data, 
null);
+       }
+
+       // Pattern generators (array)
+       public static int[] genPatternRepeating(int size, int... pattern) {
+               int[] result = new int[size];
+               for(int i = 0; i < size; i++) {
+                       result[i] = pattern[i % pattern.length];
+               }
+               return result;
+       }
+
+       /// Args (10, 5) generates a pattern like: [0, 0, 1, 1, 2, 2, 3, 3, 4, 
4]
+       public static int[] genPatternDistributed(int size, int nUnique) {
+               int[] result = new int[size];
+               int runLength = size / nUnique;
+               int pos = 0;
+               for(int i = 0; i < nUnique && pos < size; i++) {
+                       int endPos = Math.min(pos + runLength, size);
+                       Arrays.fill(result, pos, endPos, i);
+                       pos = endPos;
+               }
+               return result;
+       }
+
+       public static int[] genPatternRandom(int size, int nUnique, long seed) {
+               int[] result = new int[size];
+               java.util.Random rand = new java.util.Random(seed);
+               for(int i = 0; i < size; i++) {
+                       result[i] = rand.nextInt(nUnique);
+               }
+               return result;
+       }
+
+       /// Args (10, 3) generates a pattern like: [0, 1, 2, 0, 1, 2, 0, 1, 2, 
0]
+       public static int[] genPatternLZWOptimal(int size, int nUnique) {
+               int[] result = new int[size];
+               for(int i = 0; i < size; i++) {
+                       result[i] = i % nUnique;
+               }
+               return result;
+       }
+}

Reply via email to