This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 475da07f59 [SYSTEMDS-3590] Frame DDC Encoding
475da07f59 is described below

commit 475da07f592cafb570db082901b47e264e996a6b
Author: baunsgaard <[email protected]>
AuthorDate: Fri Jun 30 22:53:59 2023 +0200

    [SYSTEMDS-3590] Frame DDC Encoding
    
    This commit adds the primitive column encoding of DDC to the Frames.
    This allows us to in a followup commit add compression to frames.
    
    Closes #1854
---
 .../compress/colgroup/mapping/AMapToData.java      |   20 +
 .../sysds/runtime/frame/data/FrameBlock.java       |    8 +-
 .../frame/data/columns/ACompressedArray.java       |  105 ++
 .../sysds/runtime/frame/data/columns/Array.java    |   43 +-
 .../runtime/frame/data/columns/ArrayFactory.java   |    4 +-
 .../runtime/frame/data/columns/BitSetArray.java    |    2 +-
 .../runtime/frame/data/columns/CharArray.java      |    2 +-
 .../sysds/runtime/frame/data/columns/DDCArray.java |  297 ++++
 .../runtime/frame/data/columns/OptionalArray.java  |    4 +-
 .../runtime/frame/data/columns/StringArray.java    |   34 +-
 .../transform/encode/ColumnEncoderRecode.java      |   16 +-
 .../encode/ColumnEncoderWordEmbedding.java         |   11 +-
 .../runtime/transform/encode/CompressedEncode.java |    7 +-
 .../component/frame/array/CustomArrayTests.java    |  175 ++-
 .../component/frame/array/FrameArrayTests.java     | 1509 +++++++++++++-------
 .../component/frame/array/NegativeArrayTests.java  |   25 +-
 16 files changed, 1653 insertions(+), 609 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/AMapToData.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/AMapToData.java
index 21be58dae5..65fc1b22b2 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/AMapToData.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/AMapToData.java
@@ -92,6 +92,26 @@ public abstract class AMapToData implements Serializable {
         */
        public abstract int getIndex(int n);
 
+       /**
+        * Shortcut method to support Long objects, not really efficient but 
for the purpose of reusing code.
+        * 
+        * @param n The index to set.
+        * @param v The value to set.
+        */
+       public void set(int n, Long v) {
+               set(n, (int) (long) v);
+       }
+
+       /**
+        * Shortcut method to support Integer objects, not really efficient but 
for the purpose of reusing code.
+        * 
+        * @param n The index to set.
+        * @param v The value to set.
+        */
+       public void set(int n, Integer v) {
+               set(n, (int) v);
+       }
+
        /**
         * Set the index to the value.
         * 
diff --git a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
index 69c3a9985f..5faacaf8d1 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
@@ -83,10 +83,12 @@ public class FrameBlock implements CacheBlock<FrameBlock>, 
Externalizable {
        private static final Log LOG = 
LogFactory.getLog(FrameBlock.class.getName());
        private static final long serialVersionUID = -3993450030207130665L;
        private static final IDSequence CLASS_ID = new IDSequence();
-
        /** Buffer size variable: 1M elements, size of default matrix block */
        public static final int BUFFER_SIZE = 1 * 1000 * 1000;
 
+       /** If debugging is enabled for the FrameBlocks in stable state*/
+       public static boolean debug = false;
+
        /** The schema of the data frame as an ordered list of value types */
        private ValueType[] _schema = null;
 
@@ -828,7 +830,7 @@ public class FrameBlock implements CacheBlock<FrameBlock>, 
Externalizable {
                        else {
                                for(Array<?> aa : _coldata)
                                        size += aa.getInMemorySize();
-                               
+
                        }
                }
                return size;
@@ -1185,7 +1187,7 @@ public class FrameBlock implements 
CacheBlock<FrameBlock>, Externalizable {
         * @param col is the column # from frame data which contains Recode map 
generated earlier.
         * @return map of token and code for every element in the input column 
of a frame containing Recode map
         */
-       public HashMap<Object, Long> getRecodeMap(int col) {
+       public Map<Object, Long> getRecodeMap(int col) {
                return _coldata[col].getRecodeMap();
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/ACompressedArray.java
 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/ACompressedArray.java
new file mode 100644
index 0000000000..a36a0c3cc5
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/ACompressedArray.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.frame.data.columns;
+
+import org.apache.sysds.runtime.compress.DMLCompressionException;
+
+/**
+ * A Compressed Array, in general does not allow us to set or modify the array.
+ * 
+ * In all cases of modification it throws an DMLCompressionException.
+ */
+public abstract class ACompressedArray<T> extends Array<T> {
+
+       public ACompressedArray(int size) {
+               super(size);
+       }
+
+       @Override
+       public Object get() {
+               throw new DMLCompressionException("Invalid to call 'get' to 
access primitive array on CompressedArray");
+       }
+
+       @Override
+       public void set(int index, T value) {
+               throw new DMLCompressionException("Invalid to set value in 
CompressedArray");
+       }
+
+       @Override
+       public void set(int index, double value) {
+               throw new DMLCompressionException("Invalid to set value in 
CompressedArray");
+       }
+
+       @Override
+       public void set(int index, String value) {
+               throw new DMLCompressionException("Invalid to set value in 
CompressedArray");
+       }
+
+       @Override
+       public void setFromOtherType(int rl, int ru, Array<?> value) {
+               throw new DMLCompressionException("Invalid to set value in 
CompressedArray");
+       }
+
+       @Override
+       public void set(int rl, int ru, Array<T> value) {
+               throw new DMLCompressionException("Invalid to set value in 
CompressedArray");
+       }
+
+       @Override
+       public void set(int rl, int ru, Array<T> value, int rlSrc) {
+               throw new DMLCompressionException("Invalid to set value in 
CompressedArray");
+       }
+
+       @Override
+       public void setNz(int rl, int ru, Array<T> value) {
+               throw new DMLCompressionException("Invalid to set value in 
CompressedArray");
+       }
+
+       @Override
+       public void setFromOtherTypeNz(int rl, int ru, Array<?> value) {
+               throw new DMLCompressionException("Invalid to set value in 
CompressedArray");
+       }
+
+       @Override
+       public void append(String value) {
+               throw new DMLCompressionException("Invalid to 'append' single 
values in CompressedArray");
+       }
+
+       @Override
+       public void append(T value) {
+               throw new DMLCompressionException("Invalid to 'append' single 
values in CompressedArray");
+       }
+
+       @Override
+       public void fill(String val) {
+               throw new DMLCompressionException("Unimplemented method 
'fill'");
+       }
+
+       @Override
+       public void fill(T val) {
+               throw new DMLCompressionException("Unimplemented method 
'fill'");
+       }
+
+       @Override
+       public void reset(int size) {
+               throw new DMLCompressionException("Invalid to reset compressed 
array");
+       }
+
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/Array.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/Array.java
index 9111550725..3fbf3ed2d0 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/columns/Array.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/Array.java
@@ -22,6 +22,7 @@ package org.apache.sysds.runtime.frame.data.columns;
 import java.lang.ref.SoftReference;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.logging.Log;
@@ -42,7 +43,7 @@ public abstract class Array<T> implements Writable {
        private static final boolean REUSE_RECODE_MAPS = true;
 
        /** A soft reference to a memorization of this arrays mapping, used in 
transformEncode */
-       protected SoftReference<HashMap<T, Long>> _rcdMapCache = null;
+       protected SoftReference<Map<T, Long>> _rcdMapCache = null;
 
        /** The current allocated number of elements in this Array */
        protected int _size;
@@ -62,7 +63,7 @@ public abstract class Array<T> implements Writable {
         * 
         * @return The cached object
         */
-       public final SoftReference<HashMap<T, Long>> getCache() {
+       public final SoftReference<Map<T, Long>> getCache() {
                return _rcdMapCache;
        }
 
@@ -71,21 +72,21 @@ public abstract class Array<T> implements Writable {
         * 
         * @param m The element to cache.
         */
-       public final void setCache(SoftReference<HashMap<T, Long>> m) {
+       public final void setCache(SoftReference<Map<T, Long>> m) {
                _rcdMapCache = m;
        }
 
-       public HashMap<T, Long> getRecodeMap() {
+       public Map<T, Long> getRecodeMap() {
                // probe cache for existing map
                if(REUSE_RECODE_MAPS) {
-                       SoftReference<HashMap<T, Long>> tmp = getCache();
-                       HashMap<T, Long> map = (tmp != null) ? tmp.get() : null;
+                       SoftReference<Map<T, Long>> tmp = getCache();
+                       Map<T, Long> map = (tmp != null) ? tmp.get() : null;
                        if(map != null)
                                return map;
                }
 
                // construct recode map
-               HashMap<T, Long> map = createRecodeMap();
+               Map<T, Long> map = createRecodeMap();
 
                // put created map into cache
                if(REUSE_RECODE_MAPS)
@@ -94,9 +95,12 @@ public abstract class Array<T> implements Writable {
                return map;
        }
 
-       
-       protected HashMap<T, Long> createRecodeMap(){
-               HashMap<T, Long> map = new HashMap<>();
+       /**
+        * Recreate the recode map from what is already there.
+        * @return
+        */
+       protected Map<T, Long> createRecodeMap(){
+               Map<T, Long> map = new HashMap<>();
                long id = 0;
                for(int i = 0; i < size(); i++) {
                        T val = get(i);
@@ -110,6 +114,23 @@ public abstract class Array<T> implements Writable {
        }
 
 
+       /**
+        * Get the dictionary of the contained values, including null.
+        * 
+        * @return a dictionary containing all unique values.
+        */
+       protected Map<T, Integer> getDictionary(){
+               Map<T, Integer> dict = new HashMap<>();
+               int id = 0;
+               for(int i = 0 ; i < size(); i ++){
+                       T val = get(i);
+                       Integer v = dict.putIfAbsent(val, id);
+                       if(v== null)
+                               id++;
+               }
+
+               return dict;
+       }
 
        /**
         * Get the number of elements in the array, this does not necessarily 
reflect the current allocated size.
@@ -255,7 +276,7 @@ public abstract class Array<T> implements Writable {
        public abstract void append(T value);
 
        /**
-        * append other array, if the other array is fitting in current 
allocated size use that allocated size, otherwise
+        * Append other array, if the other array is fitting in current 
allocated size use that allocated size, otherwise
         * allocate new array to combine the other with this.
         * 
         * This method should use the set range function, and should be 
preferred over the append single values.
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/ArrayFactory.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/ArrayFactory.java
index 8af5623708..11defd31cd 100644
--- 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/ArrayFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/ArrayFactory.java
@@ -32,7 +32,7 @@ public interface ArrayFactory {
        public final static int bitSetSwitchPoint = 64;
 
        public enum FrameArrayType {
-               STRING, BOOLEAN, BITSET, INT32, INT64, FP32, FP64, CHARACTER, 
OPTIONAL;
+               STRING, BOOLEAN, BITSET, INT32, INT64, FP32, FP64, CHARACTER, 
OPTIONAL, DDC;
        }
 
        public static StringArray create(String[] col) {
@@ -188,6 +188,8 @@ public interface ArrayFactory {
                                break;
                        case OPTIONAL:
                                return OptionalArray.readOpt(in, nRow);
+                       case DDC:
+                               return DDCArray.read(in);
                        default: // String
                                arr = new StringArray(new String[nRow]);
                                break;
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/BitSetArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/BitSetArray.java
index 5eed5ce3e0..d6c2489ec2 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/columns/BitSetArray.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/BitSetArray.java
@@ -39,7 +39,7 @@ public class BitSetArray extends ABooleanArray {
        private static final boolean useVectorizedKernel = true;
 
        /** Vectorized "words" containing all the bits set */
-       long[] _data;
+       protected long[] _data;
 
        protected BitSetArray(int size) {
                this(new long[longSize(size)], size);
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/CharArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/CharArray.java
index b08232cde2..1096f076e2 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/columns/CharArray.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/CharArray.java
@@ -35,7 +35,7 @@ import org.apache.sysds.runtime.util.UtilFunctions;
 
 public class CharArray extends Array<Character> {
 
-       protected char[] _data;
+       private char[] _data;
 
        public CharArray(char[] data) {
                super(data.length);
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/DDCArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/DDCArray.java
new file mode 100644
index 0000000000..f7a810d0fd
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/DDCArray.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.frame.data.columns;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.compress.DMLCompressionException;
+import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
+import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.frame.data.columns.ArrayFactory.FrameArrayType;
+import org.apache.sysds.runtime.matrix.data.Pair;
+
+/**
+ * A dense dictionary version of an column array
+ */
+public class DDCArray<T> extends ACompressedArray<T> {
+
+       /** The unique values contained */
+       private final Array<T> dict;
+       /** A Map containing the mapping from the dict to rows */
+       private final AMapToData map;
+
+       public DDCArray(Array<T> dict, AMapToData map) {
+               super(map.size());
+               this.dict = dict;
+               this.map = map;
+
+               if(FrameBlock.debug) {
+                       if(dict.size() != map.getUnique())
+                               throw new DMLRuntimeException("Invalid 
DDCArray, dictionary size is not equal to map unique");
+               }
+       }
+
+       private static <V, K> Map<V, K> invert(Map<K, V> map) {
+               Map<V, K> invMap = new HashMap<V, K>();
+               for(Entry<K, V> e : map.entrySet())
+                       invMap.put(e.getValue(), e.getKey());
+               return invMap;
+       }
+
+       /**
+        * Try to compress array into DDC format.
+        * 
+        * @param <T> The type of the Array
+        * @param arr The array to try to compress
+        * @return Either a compressed version or the original.
+        */
+       @SuppressWarnings("unchecked")
+       public static <T> Array<T> compressToDDC(Array<T> arr) {
+               // two pass algorithm
+               if(arr.size() <= 10)
+                       return arr;
+
+               // 1. Get unique
+               Map<T, Integer> rcd = arr.getDictionary();
+
+               if(rcd.size() > arr.size() / 2)
+                       return arr;
+
+               Array<T> ar;
+
+               if(rcd.keySet().contains(null))
+                       ar = (Array<T>) 
ArrayFactory.allocateOptional(arr.getValueType(), rcd.size());
+               else
+                       ar = (Array<T>) 
ArrayFactory.allocate(arr.getValueType(), rcd.size());
+
+               Map<Integer, T> rcdInv = invert(rcd);
+               for(int i = 0; i < rcd.size(); i++)
+                       ar.set(i, rcdInv.get(Integer.valueOf(i)));
+
+               // 2. Make map
+               AMapToData m = MapToFactory.create(arr.size(), rcd.size());
+
+               for(int i = 0; i < arr.size(); i++)
+                       m.set(i, rcd.get(arr.get(i)));
+
+               return new DDCArray<T>(ar, m);
+       }
+
+       @Override
+       public void write(DataOutput out) throws IOException {
+               out.writeByte(FrameArrayType.DDC.ordinal());
+               map.write(out);
+               dict.write(out);
+       }
+
+       @Override
+       public void readFields(DataInput in) throws IOException {
+               throw new DMLRuntimeException("Should not be called");
+       }
+
+       @SuppressWarnings("unchecked")
+       public static DDCArray<?> read(DataInput in) throws IOException {
+               AMapToData map = MapToFactory.readIn(in);
+               Array<?> dict = ArrayFactory.read(in, map.getUnique());
+               switch(dict.getValueType()) {
+                       case BOOLEAN:
+                               // Interesting case, that does not make much 
sense.
+                               return new DDCArray<Boolean>((Array<Boolean>) 
dict, map);
+                       case FP32:
+                               return new DDCArray<Float>((Array<Float>) dict, 
map);
+                       case FP64:
+                               return new DDCArray<Double>((Array<Double>) 
dict, map);
+                       case UINT8:
+                       case INT32:
+                               return new DDCArray<Integer>((Array<Integer>) 
dict, map);
+                       case INT64:
+                               return new DDCArray<Long>((Array<Long>) dict, 
map);
+                       case CHARACTER:
+                               return new 
DDCArray<Character>((Array<Character>) dict, map);
+                       case STRING:
+                       default:
+                               return new DDCArray<String>((Array<String>) 
dict, map);
+               }
+       }
+
+       @Override
+       public T get(int index) {
+               return dict.get(map.getIndex(index));
+       }
+
+       @Override
+       public double getAsDouble(int i) {
+               return dict.getAsDouble(map.getIndex(i));
+       }
+
+       @Override
+       public double getAsNaNDouble(int i) {
+               return dict.getAsNaNDouble(map.getIndex(i));
+       }
+
+       @Override
+       public Array<T> append(Array<T> other) {
+               // TODO add append compressed to each other.
+               throw new DMLCompressionException("Currently not supported to 
append compressed but could be cool");
+       }
+
+       @Override
+       public Array<T> slice(int rl, int ru) {
+               return new DDCArray<>(dict, map.slice(rl, ru));
+       }
+
+       @Override
+       public byte[] getAsByteArray() {
+               throw new DMLCompressionException("Unimplemented method 
'getAsByteArray'");
+       }
+
+       @Override
+       public ValueType getValueType() {
+               return dict.getValueType();
+       }
+
+       @Override
+       public Pair<ValueType, Boolean> analyzeValueType() {
+               return dict.analyzeValueType();
+       }
+
+       @Override
+       public FrameArrayType getFrameArrayType() {
+               return FrameArrayType.DDC;
+       }
+
+       @Override
+       public long getExactSerializedSize() {
+               return 1L + map.getExactSizeOnDisk() + 
dict.getExactSerializedSize();
+       }
+
+       @Override
+       protected Array<Boolean> changeTypeBitSet() {
+               return new DDCArray<Boolean>(dict.changeTypeBitSet(), map);
+       }
+
+       @Override
+       protected Array<Boolean> changeTypeBoolean() {
+               return new DDCArray<Boolean>(dict.changeTypeBoolean(), map);
+       }
+
+       @Override
+       protected Array<Double> changeTypeDouble() {
+               return new DDCArray<Double>(dict.changeTypeDouble(), map);
+       }
+
+       @Override
+       protected Array<Float> changeTypeFloat() {
+               return new DDCArray<Float>(dict.changeTypeFloat(), map);
+       }
+
+       @Override
+       protected Array<Integer> changeTypeInteger() {
+               return new DDCArray<Integer>(dict.changeTypeInteger(), map);
+       }
+
+       @Override
+       protected Array<Long> changeTypeLong() {
+               return new DDCArray<Long>(dict.changeTypeLong(), map);
+       }
+
+       @Override
+       protected Array<String> changeTypeString() {
+               return new DDCArray<String>(dict.changeTypeString(), map);
+       }
+
+       @Override
+       protected Array<Character> changeTypeCharacter() {
+               return new DDCArray<Character>(dict.changeTypeCharacter(), map);
+       }
+
+       @Override
+       public boolean isShallowSerialize() {
+               return true; // Always the case if we use this compression 
scheme.
+       }
+
+       @Override
+       public boolean isEmpty() {
+               return false;
+       }
+
+       @Override
+       public Array<T> select(int[] indices) {
+               final int[] newSelect = new int[indices.length];
+               for(int i = 0; i < newSelect.length; i++)
+                       newSelect[i] = map.getIndex(indices[i]);
+               return dict.select(newSelect);
+       }
+
+       @Override
+       public Array<T> select(boolean[] select, int nTrue) {
+               final AMapToData map2 = MapToFactory.create(nTrue, 
map.getUnique());
+               int j = 0;
+               for(int i = 0; i < select.length; i++)
+                       if(select[i])
+                               map2.set(j++, map.getIndex(i));
+               return new DDCArray<T>(dict, map2);
+       }
+
+       @Override
+       public boolean isNotEmpty(int i) {
+               return dict.isNotEmpty(map.getIndex(i));
+       }
+
+       @Override
+       public Array<T> clone() {
+               // Since the compressed formats are immutable, it is allowed to 
return the same.
+               return new DDCArray<T>(dict, map);
+       }
+
+       @Override
+       public double hashDouble(int idx) {
+               return dict.hashDouble(map.getIndex(idx));
+       }
+
+       @Override
+       public long getInMemorySize() {
+               return super.getInMemorySize() + map.getInMemorySize() + 
dict.getInMemorySize();
+       }
+
+       @Override
+       protected Map<T, Integer> getDictionary() {
+               // Nice shortcut!
+               return dict.getDictionary();
+       }
+
+       @Override
+       public String toString() {
+               StringBuilder sb = new StringBuilder();
+               sb.append(String.format("\n%15s", "Values: "));
+               sb.append(dict);
+               sb.append(String.format("\n%15s", "Data: "));
+               sb.append(map);
+               return sb.toString();
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/OptionalArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/OptionalArray.java
index bf69c4f56d..6898e9472c 100644
--- 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/OptionalArray.java
+++ 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/OptionalArray.java
@@ -33,9 +33,9 @@ import org.apache.sysds.runtime.util.UtilFunctions;
 public class OptionalArray<T> extends Array<T> {
 
        /** Underlying values not able to contain null values */
-       protected Array<T> _a;
+       protected final Array<T> _a;
        /** A Bitset specifying where there are null, in it false means null */
-       protected ABooleanArray _n;
+       protected final ABooleanArray _n;
 
        @SuppressWarnings("unchecked")
        public OptionalArray(T[] a) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/StringArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/StringArray.java
index 00be24e0c7..e99903d4b3 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/columns/StringArray.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/StringArray.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.sysds.common.Types.ValueType;
@@ -297,7 +298,7 @@ public class StringArray extends Array<String> {
        protected Array<Boolean> changeTypeBoolean() {
                String firstNN = _data[0];
                int i = 1;
-               while(firstNN == null && i < size()){
+               while(firstNN == null && i < size()) {
                        firstNN = _data[i++];
                }
 
@@ -423,7 +424,6 @@ public class StringArray extends Array<String> {
                        return changeTypeBooleanFloatArray();
        }
 
-
        protected Array<Boolean> changeTypeBooleanFloatBitSet() {
                BitSet ret = new BitSet(size());
                for(int i = 0; i < size(); i++) {
@@ -561,34 +561,34 @@ public class StringArray extends Array<String> {
 
        @Override
        public double getAsDouble(int i) {
-               if(_data[i] != null && !_data[i].isEmpty()){
+               if(_data[i] != null && !_data[i].isEmpty()) {
                        return getAsDouble(_data[i]);
                }
-               else{
+               else {
                        return 0.0;
                }
        }
 
        @Override
        public double getAsNaNDouble(int i) {
-               if(_data[i] != null && !_data[i].isEmpty()){
+               if(_data[i] != null && !_data[i].isEmpty()) {
                        return getAsDouble(_data[i]);
                }
-               else{
+               else {
                        return Double.NaN;
                }
        }
 
-       private static double getAsDouble(String s){
-               try{
+       private static double getAsDouble(String s) {
+               try {
 
                        return DoubleArray.parseDouble(s);
                }
-               catch(Exception e){
+               catch(Exception e) {
                        String ls = s.toLowerCase();
                        if(ls.equals("true") || ls.equals("t"))
                                return 1;
-                       else if (ls.equals("false") || ls.equals("f"))
+                       else if(ls.equals("false") || ls.equals("f"))
                                return 0;
                        else
                                throw new DMLRuntimeException("Unable to change 
to double: " + s, e);
@@ -608,9 +608,9 @@ public class StringArray extends Array<String> {
                                return false;
                return true;
        }
-       
+
        @Override
-       public boolean containsNull(){
+       public boolean containsNull() {
                for(int i = 0; i < _data.length; i++)
                        if(_data[i] == null)
                                return true;
@@ -641,10 +641,10 @@ public class StringArray extends Array<String> {
        }
 
        @Override
-       protected HashMap<String, Long> createRecodeMap(){
-               try{
+       protected Map<String, Long> createRecodeMap() {
+               try {
 
-                       HashMap<String, Long> map = new HashMap<>();
+                       Map<String, Long> map = new HashMap<>();
                        for(int i = 0; i < size(); i++) {
                                Object val = get(i);
                                if(val != null) {
@@ -656,13 +656,13 @@ public class StringArray extends Array<String> {
                        }
                        return map;
                }
-               catch(Exception e){
+               catch(Exception e) {
                        return super.createRecodeMap();
                }
        }
 
        @Override
-       public double hashDouble(int idx){
+       public double hashDouble(int idx) {
                if(_data[idx] != null)
                        return _data[idx].hashCode();
                else
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
index 334149515b..cbb4f79664 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
@@ -47,7 +47,7 @@ public class ColumnEncoderRecode extends ColumnEncoder {
        public static boolean SORT_RECODE_MAP = false;
 
        // recode maps and custom map for partial recode maps
-       private HashMap<Object, Long> _rcdMap;
+       private Map<Object, Long> _rcdMap;
        private HashSet<Object> _rcdMapPart = null;
 
        public ColumnEncoderRecode(int colID) {
@@ -103,7 +103,7 @@ public class ColumnEncoderRecode extends ColumnEncoder {
                return new String[] {value.substring(0, pos), 
value.substring(pos + 1)};
        }
 
-       public HashMap<Object, Long> getCPRecodeMaps() {
+       public Map<Object, Long> getCPRecodeMaps() {
                return _rcdMap;
        }
 
@@ -115,7 +115,7 @@ public class ColumnEncoderRecode extends ColumnEncoder {
                sortCPRecodeMaps(_rcdMap);
        }
 
-       private static void sortCPRecodeMaps(HashMap<Object, Long> map) {
+       private static void sortCPRecodeMaps(Map<Object, Long> map) {
                Object[] keys = map.keySet().toArray(new Object[0]);
                Arrays.sort(keys);
                map.clear();
@@ -123,7 +123,7 @@ public class ColumnEncoderRecode extends ColumnEncoder {
                        putCode(map, key);
        }
 
-       private static void makeRcdMap(CacheBlock<?> in, HashMap<Object, Long> 
map, int colID, int startRow, int blk) {
+       private static void makeRcdMap(CacheBlock<?> in, Map<Object, Long> map, 
int colID, int startRow, int blk) {
                for(int row = startRow; row < getEndIndex(in.getNumRows(), 
startRow, blk); row++){
                        String key = in.getString(row, colID - 1);
                        if(key != null && !key.isEmpty() && 
!map.containsKey(key))
@@ -211,7 +211,7 @@ public class ColumnEncoderRecode extends ColumnEncoder {
         * @param map column map
         * @param key key for the new entry
         */
-       protected static void putCode(HashMap<Object, Long> map, Object key) {
+       protected static void putCode(Map<Object, Long> map, Object key) {
                map.put(key, (long) (map.size() + 1));
        }
 
@@ -279,7 +279,7 @@ public class ColumnEncoderRecode extends ColumnEncoder {
                assert other._colID == _colID;
                // merge together overlapping columns
                ColumnEncoderRecode otherRec = (ColumnEncoderRecode) other;
-               HashMap<Object, Long> otherMap = otherRec._rcdMap;
+               Map<Object, Long> otherMap = otherRec._rcdMap;
                if(otherMap != null) {
                        // for each column, add all non present recode values
                        for(Map.Entry<Object, Long> entry : 
otherMap.entrySet()) {
@@ -372,7 +372,7 @@ public class ColumnEncoderRecode extends ColumnEncoder {
                return Objects.hash(_rcdMap);
        }
 
-       public HashMap<Object, Long> getRcdMap() {
+       public Map<Object, Long> getRcdMap() {
                return _rcdMap;
        }
 
@@ -469,7 +469,7 @@ public class ColumnEncoderRecode extends ColumnEncoder {
                @Override
                public Object call() throws Exception {
                        long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-                       HashMap<Object, Long> rcdMap = _encoder.getRcdMap();
+                       Map<Object, Long> rcdMap = _encoder.getRcdMap();
                        _partialMaps.forEach((start_row, map) -> {
                                ((HashMap<?, ?>) map).forEach((k, v) -> {
                                        if(!rcdMap.containsKey(k))
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderWordEmbedding.java
 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderWordEmbedding.java
index 9d08c3bc20..7b7b326431 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderWordEmbedding.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderWordEmbedding.java
@@ -19,18 +19,19 @@
 
 package org.apache.sysds.runtime.transform.encode;
 
+import static org.apache.sysds.runtime.util.UtilFunctions.getEndIndex;
+
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysds.runtime.frame.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
-import java.util.HashMap;
-
-import static org.apache.sysds.runtime.util.UtilFunctions.getEndIndex;
-
 public class ColumnEncoderWordEmbedding extends ColumnEncoder {
     private MatrixBlock _wordEmbeddings;
-    private HashMap<Object, Long> _rcdMap;
+    private Map<Object, Long> _rcdMap;
     private HashMap<String, double[]> _embMap;
 
     private long lookupRCDMap(Object key) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/transform/encode/CompressedEncode.java 
b/src/main/java/org/apache/sysds/runtime/transform/encode/CompressedEncode.java
index 150133c469..9f376cd31c 100644
--- 
a/src/main/java/org/apache/sysds/runtime/transform/encode/CompressedEncode.java
+++ 
b/src/main/java/org/apache/sysds/runtime/transform/encode/CompressedEncode.java
@@ -22,6 +22,7 @@ package org.apache.sysds.runtime.transform.encode;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -154,7 +155,7 @@ public class CompressedEncode {
                int colId = c._colID;
                Array<?> a = in.getColumn(colId - 1);
                boolean containsNull = a.containsNull();
-               HashMap<?, Long> map = a.getRecodeMap();
+               Map<?, Long> map = a.getRecodeMap();
                int domain = map.size();
                if(containsNull && domain == 0)
                        return new ColGroupEmpty(ColIndexFactory.create(1));
@@ -232,7 +233,7 @@ public class CompressedEncode {
        private AColGroup recode(ColumnEncoderComposite c) {
                int colId = c._colID;
                Array<?> a = in.getColumn(colId - 1);
-               HashMap<?, Long> map = a.getRecodeMap();
+               Map<?, Long> map = a.getRecodeMap();
                boolean containsNull = a.containsNull();
                int domain = map.size();
 
@@ -284,7 +285,7 @@ public class CompressedEncode {
 
        }
 
-       private AMapToData createMappingAMapToData(Array<?> a, HashMap<?, Long> 
map, boolean containsNull) {
+       private AMapToData createMappingAMapToData(Array<?> a, Map<?, Long> 
map, boolean containsNull) {
                final int si = map.size();
                AMapToData m = MapToFactory.create(in.getNumRows(), si + 
(containsNull ? 1 : 0));
                Array<?>.ArrayIterator it = a.getIterator();
diff --git 
a/src/test/java/org/apache/sysds/test/component/frame/array/CustomArrayTests.java
 
b/src/test/java/org/apache/sysds/test/component/frame/array/CustomArrayTests.java
index cc6c3510af..2bbd7f8515 100644
--- 
a/src/test/java/org/apache/sysds/test/component/frame/array/CustomArrayTests.java
+++ 
b/src/test/java/org/apache/sysds/test/component/frame/array/CustomArrayTests.java
@@ -28,17 +28,21 @@ import java.lang.ref.SoftReference;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
 import org.apache.sysds.runtime.frame.data.columns.Array;
 import org.apache.sysds.runtime.frame.data.columns.ArrayFactory;
 import org.apache.sysds.runtime.frame.data.columns.BitSetArray;
 import org.apache.sysds.runtime.frame.data.columns.BooleanArray;
 import org.apache.sysds.runtime.frame.data.columns.CharArray;
+import org.apache.sysds.runtime.frame.data.columns.DDCArray;
 import org.apache.sysds.runtime.frame.data.columns.DoubleArray;
 import org.apache.sysds.runtime.frame.data.columns.FloatArray;
 import org.apache.sysds.runtime.frame.data.columns.IntegerArray;
@@ -1152,13 +1156,178 @@ public class CustomArrayTests {
        public void mappingCache() {
                Array<String> a = new StringArray(new String[] {"1", null});
                assertEquals(null, a.getCache());
-               a.setCache(new SoftReference<HashMap<String, Long>>(null));
+               a.setCache(new SoftReference<Map<String, Long>>(null));
                assertTrue(null != a.getCache());
-               a.setCache(new SoftReference<HashMap<String, Long>>(new 
HashMap<>()));
+               a.setCache(new SoftReference<Map<String, Long>>(new 
HashMap<>()));
                assertTrue(null != a.getCache());
-               HashMap<String, Long> hm = a.getCache().get();
+               Map<String, Long> hm = a.getCache().get();
                hm.put("1", 0L);
                hm.put(null, 2L);
                assertEquals(Long.valueOf(0L), a.getCache().get().get("1"));
        }
+
+       @Test
+       public void DDCCompress() {
+               try {
+                       Array<String> a = 
ArrayFactory.create(FrameArrayTests.generateRandomStringNUniqueLength(100, 32, 
2, 2000));
+                       Array<String> ddc = DDCArray.compressToDDC(a);
+                       FrameArrayTests.compare(a, ddc);
+                       assertTrue(a.getInMemorySize() > ddc.getInMemorySize());
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void DDCCompressMemSize() {
+               try {
+                       Array<String> a = 
ArrayFactory.create(FrameArrayTests.generateRandomStringNUniqueLength(100, 32, 
5, 2000));
+                       Array<String> ddc = DDCArray.compressToDDC(a);
+                       assertTrue(a.getInMemorySize() > ddc.getInMemorySize());
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void DDCCompressAbort() {
+               try {
+                       Array<String> a = 
ArrayFactory.create(FrameArrayTests.generateRandomStringNUniqueLength(100, 32, 
100, 2000));
+                       Array<String> ddc = DDCArray.compressToDDC(a);
+                       assertFalse(ddc instanceof DDCArray);
+                       // when abort keep original
+                       assertEquals(a, ddc);
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test(expected = DMLRuntimeException.class)
+       public void DDCCompressInvalid() {
+               FrameBlock.debug = true; // should be fine in general to set 
while testing
+               Array<Boolean> b = ArrayFactory.create(new boolean[4]);
+               new DDCArray<Boolean>(b, MapToFactory.create(10, 10));
+       }
+
+       @Test
+       public void DDCCompressSerialize() {
+               try {
+                       Array<String> a = 
ArrayFactory.create(FrameArrayTests.generateRandomStringNUniqueLength(100, 32, 
5, 2000));
+                       Array<String> ddc = DDCArray.compressToDDC(a);
+
+                       Array<?> ddcs = FrameArrayTests.serializeAndBack(ddc);
+                       FrameArrayTests.compare(a, ddcs);
+
+                       assertTrue(a.getInMemorySize() > ddc.getInMemorySize());
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test(expected = DMLRuntimeException.class)
+       public void DDCInvalidReadFields() {
+               try {
+                       Array<String> a = 
ArrayFactory.create(FrameArrayTests.generateRandomStringNUniqueLength(100, 32, 
5, 2000));
+                       Array<String> ddc = DDCArray.compressToDDC(a);
+                       ddc.readFields(null);
+               }
+               catch(Exception e) {
+                       throw new DMLRuntimeException(e);
+               }
+       }
+
+       @Test(expected = DMLRuntimeException.class)
+       public void DDCget() {
+               Array<String> a = 
ArrayFactory.create(FrameArrayTests.generateRandomStringNUniqueLength(100, 32, 
5, 2000));
+               Array<String> ddc = DDCArray.compressToDDC(a);
+               ddc.get();
+       }
+
+       @Test
+       public void DDCHash() {
+               Array<Float> a = 
ArrayFactory.create(FrameArrayTests.generateRandomFloatNUniqueLengthOpt(100, 
32, 5));
+               Array<Float> ddc = DDCArray.compressToDDC(a);
+               for(int i = 0; i < a.size(); i++) {
+                       Double aa = a.hashDouble(i);
+                       Double bb = ddc.hashDouble(i);
+                       if(aa.isNaN() && bb.isNaN())
+                               // all good
+                               continue;
+                       else {
+                               assertEquals(a.hashDouble(i), 
ddc.hashDouble(i), 0.0);
+                       }
+               }
+       }
+
+       @Test
+       public void hashDoubleOnString() {
+               Array<String> a = 
ArrayFactory.create(FrameArrayTests.generateRandom01String(100, 32));
+               for(int i = 0; i < a.size(); i++) {
+                       assertEquals(a.hashDouble(i), (double) 
a.get(i).hashCode(), 0.0);
+               }
+       }
+
+       @Test
+       public void hashDoubleOnChar() {
+               Array<Character> a = 
ArrayFactory.create(FrameArrayTests.generateRandom01chars(5, 32));
+               for(int i = 0; i < a.size(); i++) {
+                       assertEquals(a.hashDouble(i), (double) 
a.get(i).hashCode(), 0.0);
+               }
+       }
+
+       @Test
+       public void hashDoubleOnInt() {
+               Array<Integer> a = 
ArrayFactory.create(FrameArrayTests.generateRandomInt8(5, 32));
+               for(int i = 0; i < a.size(); i++) {
+                       assertEquals(a.hashDouble(i), (double) 
a.get(i).hashCode(), 0.0);
+               }
+       }
+
+       @Test
+       public void hashDoubleOnLong() {
+               Array<Long> a = 
ArrayFactory.create(FrameArrayTests.generateRandomLong(5, 32));
+               for(int i = 0; i < a.size(); i++) {
+                       assertEquals(a.hashDouble(i), (double) 
a.get(i).hashCode(), 0.0);
+               }
+       }
+
+       @Test
+       public void hashDoubleOnFloat() {
+               Array<Float> a = 
ArrayFactory.create(FrameArrayTests.generateRandomFloat(5, 32));
+               for(int i = 0; i < a.size(); i++) {
+                       assertEquals(a.hashDouble(i), (double) 
a.get(i).hashCode(), 0.0);
+               }
+       }
+
+       @Test
+       public void hashDoubleOnBoolean() {
+               Array<Boolean> a = 
ArrayFactory.create(FrameArrayTests.generateRandomBoolean(5, 32));
+               for(int i = 0; i < a.size(); i++) {
+                       assertEquals(a.hashDouble(i), a.get(i) ? 1 : 0, 0.0);
+               }
+       }
+
+       @Test
+       public void hashDoubleOnBitSet() {
+               Array<Boolean> a = 
ArrayFactory.create(FrameArrayTests.generateRandomBitSet(324, 32), 324);
+               for(int i = 0; i < a.size(); i++) {
+                       assertEquals(a.hashDouble(i), a.get(i) ? 1 : 0, 0.0);
+               }
+       }
+
+       @Test
+       public void hashDoubleOnStringNull() {
+               Array<String> a = ArrayFactory.create(new String[4]);
+               for(int i = 0; i < a.size(); i++) {
+                       assertEquals(a.hashDouble(i), Double.NaN, 0.0);
+               }
+       }
 }
diff --git 
a/src/test/java/org/apache/sysds/test/component/frame/array/FrameArrayTests.java
 
b/src/test/java/org/apache/sysds/test/component/frame/array/FrameArrayTests.java
index 601555576b..81908f8d39 100644
--- 
a/src/test/java/org/apache/sysds/test/component/frame/array/FrameArrayTests.java
+++ 
b/src/test/java/org/apache/sysds/test/component/frame/array/FrameArrayTests.java
@@ -20,6 +20,7 @@
 package org.apache.sysds.test.component.frame.array;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -38,12 +39,14 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.compress.DMLCompressionException;
 import org.apache.sysds.runtime.frame.data.columns.Array;
 import org.apache.sysds.runtime.frame.data.columns.ArrayFactory;
 import org.apache.sysds.runtime.frame.data.columns.ArrayFactory.FrameArrayType;
 import org.apache.sysds.runtime.frame.data.columns.BitSetArray;
 import org.apache.sysds.runtime.frame.data.columns.BooleanArray;
 import org.apache.sysds.runtime.frame.data.columns.CharArray;
+import org.apache.sysds.runtime.frame.data.columns.DDCArray;
 import org.apache.sysds.runtime.frame.data.columns.DoubleArray;
 import org.apache.sysds.runtime.frame.data.columns.FloatArray;
 import org.apache.sysds.runtime.frame.data.columns.IntegerArray;
@@ -79,6 +82,9 @@ public class FrameArrayTests {
                                        tests.add(new Object[] {create(t, 124, 
s), t});
                                        tests.add(new Object[] {create(t, 130, 
s), t});
                                        tests.add(new Object[] {create(t, 200, 
s), t});
+                                       tests.add(new Object[] {createDDC(t, 
100, s), t});
+                                       tests.add(new Object[] {createDDC(t, 
200, s), t});
+                                       tests.add(new Object[] {createDDC(t, 
205, s), t});
                                        if(t != FrameArrayType.STRING) {
                                                tests.add(new Object[] 
{createOptional(t, 13, s), FrameArrayType.OPTIONAL});
                                                tests.add(new Object[] 
{createOptional(t, 321, s), FrameArrayType.OPTIONAL});
@@ -128,6 +134,10 @@ public class FrameArrayTests {
                        tests.add(new Object[] {ArrayFactory.create(new char[] 
{'0', '2', '3', '4', '9'}), FrameArrayType.CHARACTER});
                        tests.add(new Object[] 
{ArrayFactory.create(generateRandom01chars(150, 221)), 
FrameArrayType.CHARACTER});
                        tests.add(new Object[] 
{ArrayFactory.create(generateRandom01chars(67, 221)), 
FrameArrayType.CHARACTER});
+                       tests.add(new Object[] 
{DDCArray.compressToDDC(ArrayFactory.create(generateRandom01chars(67, 221))),
+                               FrameArrayType.CHARACTER});
+                       tests.add(new Object[] 
{DDCArray.compressToDDC(ArrayFactory.create(generateRandom01chars(30, 221))),
+                               FrameArrayType.CHARACTER});
                        // Long to int
                        tests.add(new Object[] {ArrayFactory.create(new long[] 
{3214, 424, 13, 22, 111, 134}), FrameArrayType.INT64});
 
@@ -176,12 +186,37 @@ public class FrameArrayTests {
 
        @Test(expected = ArrayIndexOutOfBoundsException.class)
        public void testGetOutOfBoundsUpper() {
-               a.get(a.size() + 1);
+               try {
+                       a.get(a.size() + 1);
+               }
+               catch(IndexOutOfBoundsException e) {
+                       throw new ArrayIndexOutOfBoundsException();
+               }
+               if(a.getFrameArrayType() == FrameArrayType.DDC) {
+                       // all good. but we do not handle edge cases in map.
+                       throw new ArrayIndexOutOfBoundsException();
+               }
+               else {
+                       fail("Not out of bounds error");
+               }
        }
 
        @Test(expected = ArrayIndexOutOfBoundsException.class)
        public void testGetOutOfBoundsLower() {
-               a.get(-1);
+               try {
+
+                       a.get(-1);
+               }
+               catch(IndexOutOfBoundsException e) {
+                       throw new ArrayIndexOutOfBoundsException();
+               }
+               if(a.getFrameArrayType() == FrameArrayType.DDC) {
+                       // all good. but we do not handle edge cases in map.
+                       throw new ArrayIndexOutOfBoundsException();
+               }
+               else {
+                       fail("Not out of bounds error");
+               }
        }
 
        @Test
@@ -196,6 +231,8 @@ public class FrameArrayTests {
                                        estSize = 
BooleanArray.estimateInMemorySize(a.size());
                        default: // nothing
                }
+               if(a.getFrameArrayType() == FrameArrayType.DDC)
+                       return;
                if(memSize > estSize)
                        fail("Estimated size is not smaller than actual:" + 
memSize + "  " + estSize + "\n" + a.getValueType() + " "
                                + a.getClass().getSimpleName());
@@ -265,6 +302,11 @@ public class FrameArrayTests {
        public void getFrameArrayType() {
                if(t == FrameArrayType.BITSET)
                        return;
+               if(t == FrameArrayType.DDC)// can be many things.
+                       return;
+               if(a.getFrameArrayType() == FrameArrayType.DDC)
+                       return; // can happen where DDC is wrapping Optional.
+
                assertEquals(t, a.getFrameArrayType());
        }
 
@@ -299,40 +341,48 @@ public class FrameArrayTests {
        @SuppressWarnings("unused")
        public void get() {
                Object x = null;
-               switch(a.getFrameArrayType()) {
-                       case FP64:
-                               x = (double[]) a.get();
-                               return;
-                       case FP32:
-                               x = (float[]) a.get();
-                               return;
-                       case INT32:
-                               x = (int[]) a.get();
-                               return;
-                       case BOOLEAN:
-                               x = (boolean[]) a.get();
-                               return;
-                       case INT64:
-                               x = (long[]) a.get();
-                               return;
-                       case BITSET:
-                               x = (BitSet) a.get();
-                               return;
-                       case STRING:
-                               x = (String[]) a.get();
-                               return;
-                       case CHARACTER:
-                               x = (char[]) a.get();
-                       case OPTIONAL:
-                               try {
+               try {
+
+                       switch(a.getFrameArrayType()) {
+                               case FP64:
+                                       x = (double[]) a.get();
+                                       return;
+                               case FP32:
+                                       x = (float[]) a.get();
+                                       return;
+                               case INT32:
+                                       x = (int[]) a.get();
+                                       return;
+                               case BOOLEAN:
+                                       x = (boolean[]) a.get();
+                                       return;
+                               case INT64:
+                                       x = (long[]) a.get();
+                                       return;
+                               case BITSET:
+                                       x = (BitSet) a.get();
+                                       return;
+                               case STRING:
+                                       x = (String[]) a.get();
+                                       return;
+                               case CHARACTER:
+                                       x = (char[]) a.get();
+                               case OPTIONAL:
+                                       try {
+                                               a.get();
+                                       }
+                                       catch(NotImplementedException e) {
+                                               // all good;
+                                       }
+                                       return;
+                               case DDC:
                                        a.get();
-                               }
-                               catch(NotImplementedException e) {
-                                       // all good;
-                               }
-                               return;
-                       default:
-                               throw new NotImplementedException();
+                               default:
+                                       throw new NotImplementedException();
+                       }
+               }
+               catch(DMLCompressionException e) {
+                       return;// valid
                }
        }
 
@@ -391,6 +441,9 @@ public class FrameArrayTests {
                        }
                        compareSetSubRange(aa, a, start, end, off, 
aa.getValueType());
                }
+               catch(DMLCompressionException e) {
+                       return;// valid
+               }
                catch(Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
@@ -443,6 +496,9 @@ public class FrameArrayTests {
                        compareSetSubRange(aa, other, start, end, 0, 
aa.getValueType());
 
                }
+               catch(DMLCompressionException e) {
+                       return;// valid
+               }
                catch(Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
@@ -453,44 +509,50 @@ public class FrameArrayTests {
        @SuppressWarnings("unchecked")
        public void set() {
                Array<?> a = this.a.clone();
-               switch(a.getValueType()) {
-                       case FP64:
-                               Double vd = 1324.42d;
-                               ((Array<Double>) a).set(0, vd);
-                               assertEquals(((Array<Double>) a).get(0), vd, 
0.0000001);
-                               return;
-                       case FP32:
-                               Float vf = 1324.42f;
-                               ((Array<Float>) a).set(0, vf);
-                               assertEquals(((Array<Float>) a).get(0), vf, 
0.0000001);
-                               return;
-                       case INT32:
-                               Integer vi = 1324;
-                               ((Array<Integer>) a).set(0, vi);
-                               assertEquals(((Array<Integer>) a).get(0), vi);
-                               return;
-                       case INT64:
-                               Long vl = 1324L;
-                               ((Array<Long>) a).set(0, vl);
-                               assertEquals(((Array<Long>) a).get(0), vl);
-                               return;
-                       case BOOLEAN:
-                               Boolean vb = true;
-                               ((Array<Boolean>) a).set(0, vb);
-                               assertEquals(((Array<Boolean>) a).get(0), vb);
-                               return;
-                       case STRING:
-                               String vs = "1324L";
-                               a.set(0, vs);
-                               assertEquals(((Array<String>) a).get(0), vs);
-                               return;
-                       case CHARACTER:
-                               Character c = '~';
-                               ((Array<Character>) a).set(0, c);
-                               assertEquals(((Array<Character>) a).get(0), c);
-                               return;
-                       default:
-                               throw new NotImplementedException();
+               try {
+
+                       switch(a.getValueType()) {
+                               case FP64:
+                                       Double vd = 1324.42d;
+                                       ((Array<Double>) a).set(0, vd);
+                                       assertEquals(((Array<Double>) 
a).get(0), vd, 0.0000001);
+                                       return;
+                               case FP32:
+                                       Float vf = 1324.42f;
+                                       ((Array<Float>) a).set(0, vf);
+                                       assertEquals(((Array<Float>) a).get(0), 
vf, 0.0000001);
+                                       return;
+                               case INT32:
+                                       Integer vi = 1324;
+                                       ((Array<Integer>) a).set(0, vi);
+                                       assertEquals(((Array<Integer>) 
a).get(0), vi);
+                                       return;
+                               case INT64:
+                                       Long vl = 1324L;
+                                       ((Array<Long>) a).set(0, vl);
+                                       assertEquals(((Array<Long>) a).get(0), 
vl);
+                                       return;
+                               case BOOLEAN:
+                                       Boolean vb = true;
+                                       ((Array<Boolean>) a).set(0, vb);
+                                       assertEquals(((Array<Boolean>) 
a).get(0), vb);
+                                       return;
+                               case STRING:
+                                       String vs = "1324L";
+                                       a.set(0, vs);
+                                       assertEquals(((Array<String>) 
a).get(0), vs);
+                                       return;
+                               case CHARACTER:
+                                       Character c = '~';
+                                       ((Array<Character>) a).set(0, c);
+                                       assertEquals(((Array<Character>) 
a).get(0), c);
+                                       return;
+                               default:
+                                       throw new NotImplementedException();
+                       }
+               }
+               catch(DMLCompressionException e) {
+                       return;// valid
                }
        }
 
@@ -499,31 +561,37 @@ public class FrameArrayTests {
        public void setDouble() {
                Double vd = 1.0d;
                Array<?> a = this.a.clone();
-               a.set(0, vd);
-               switch(a.getValueType()) {
-                       case FP64:
-                               assertEquals(((Array<Double>) a).get(0), vd, 
0.0000001);
-                               return;
-                       case FP32:
-                               assertEquals(((Array<Float>) a).get(0), vd, 
0.0000001);
-                               return;
-                       case INT32:
-                               assertEquals(((Array<Integer>) a).get(0), 
Integer.valueOf((int) (double) vd));
-                               return;
-                       case INT64:
-                               assertEquals(((Array<Long>) a).get(0), 
Long.valueOf((long) (double) vd));
-                               return;
-                       case BOOLEAN:
-                               assertEquals(((Array<Boolean>) a).get(0), vd == 
1.0d);
-                               return;
-                       case STRING:
-                               assertEquals(((Array<String>) a).get(0), 
Double.toString(vd));
-                               return;
-                       case CHARACTER:
-                               assertEquals((int) ((Array<Character>) 
a).get(0), 1);
-                               return;
-                       default:
-                               throw new NotImplementedException();
+               try {
+
+                       a.set(0, vd);
+                       switch(a.getValueType()) {
+                               case FP64:
+                                       assertEquals(((Array<Double>) 
a).get(0), vd, 0.0000001);
+                                       return;
+                               case FP32:
+                                       assertEquals(((Array<Float>) a).get(0), 
vd, 0.0000001);
+                                       return;
+                               case INT32:
+                                       assertEquals(((Array<Integer>) 
a).get(0), Integer.valueOf((int) (double) vd));
+                                       return;
+                               case INT64:
+                                       assertEquals(((Array<Long>) a).get(0), 
Long.valueOf((long) (double) vd));
+                                       return;
+                               case BOOLEAN:
+                                       assertEquals(((Array<Boolean>) 
a).get(0), vd == 1.0d);
+                                       return;
+                               case STRING:
+                                       assertEquals(((Array<String>) 
a).get(0), Double.toString(vd));
+                                       return;
+                               case CHARACTER:
+                                       assertEquals((int) ((Array<Character>) 
a).get(0), 1);
+                                       return;
+                               default:
+                                       throw new NotImplementedException();
+                       }
+               }
+               catch(DMLCompressionException e) {
+                       return;// valid
                }
        }
 
@@ -532,31 +600,37 @@ public class FrameArrayTests {
        public void setDouble_2() {
                Double vd = 0.0d;
                Array<?> a = this.a.clone();
-               a.set(0, vd);
-               switch(a.getValueType()) {
-                       case FP64:
-                               assertEquals(((Array<Double>) a).get(0), vd, 
0.0000001);
-                               return;
-                       case FP32:
-                               assertEquals(((Array<Float>) a).get(0), vd, 
0.0000001);
-                               return;
-                       case INT32:
-                               assertEquals(((Array<Integer>) a).get(0), 
Integer.valueOf((int) (double) vd));
-                               return;
-                       case INT64:
-                               assertEquals(((Array<Long>) a).get(0), 
Long.valueOf((long) (double) vd));
-                               return;
-                       case BOOLEAN:
-                               assertEquals(((Array<Boolean>) a).get(0), 
false);
-                               return;
-                       case STRING:
-                               assertEquals(((Array<String>) a).get(0), 
Double.toString(vd));
-                               return;
-                       case CHARACTER:
-                               assertEquals(((Array<Character>) a).get(0), 
Character.valueOf((char) 0));
-                               return;
-                       default:
-                               throw new NotImplementedException();
+               try {
+
+                       a.set(0, vd);
+                       switch(a.getValueType()) {
+                               case FP64:
+                                       assertEquals(((Array<Double>) 
a).get(0), vd, 0.0000001);
+                                       return;
+                               case FP32:
+                                       assertEquals(((Array<Float>) a).get(0), 
vd, 0.0000001);
+                                       return;
+                               case INT32:
+                                       assertEquals(((Array<Integer>) 
a).get(0), Integer.valueOf((int) (double) vd));
+                                       return;
+                               case INT64:
+                                       assertEquals(((Array<Long>) a).get(0), 
Long.valueOf((long) (double) vd));
+                                       return;
+                               case BOOLEAN:
+                                       assertEquals(((Array<Boolean>) 
a).get(0), false);
+                                       return;
+                               case STRING:
+                                       assertEquals(((Array<String>) 
a).get(0), Double.toString(vd));
+                                       return;
+                               case CHARACTER:
+                                       assertEquals(((Array<Character>) 
a).get(0), Character.valueOf((char) 0));
+                                       return;
+                               default:
+                                       throw new NotImplementedException();
+                       }
+               }
+               catch(DMLCompressionException e) {
+                       return;// valid
                }
        }
 
@@ -634,187 +708,216 @@ public class FrameArrayTests {
        public void setNull() {
                Array<?> a = this.a.clone();
                // should not crash
-               a.set(0, (String) null);
+               try {
+
+                       a.set(0, (String) null);
+               }
+               catch(DMLCompressionException e) {
+                       return; // valid error
+               }
        }
 
        @Test
        public void toByteArray() {
                if(a.getValueType() == ValueType.STRING)
                        return;
-               // just test that it serialize as byte array with no crashes
-               a.getAsByteArray();
-       }
-
-       @Test
-       public void appendString() {
-               Array<?> aa = a.clone();
-
-               switch(a.getValueType()) {
-                       case BOOLEAN:
-                               aa.append("0");
-                               assertEquals((Boolean) aa.get(aa.size() - 1), 
false);
-                               aa.append("1");
-                               assertEquals((Boolean) aa.get(aa.size() - 1), 
true);
-                               break;
-                       case FP32:
-                               float vf = 3215216.222f;
-                               String vfs = vf + "";
-                               aa.append(vfs);
-                               assertEquals((float) aa.get(aa.size() - 1), vf, 
0.00001);
-
-                               vf = 32152336.222f;
-                               vfs = vf + "";
-                               aa.append(vfs);
-                               assertEquals((float) aa.get(aa.size() - 1), vf, 
0.00001);
-                               break;
-                       case FP64:
-                               double vd = 3215216.222;
-                               String vds = vd + "";
-                               aa.append(vds);
-                               assertEquals((double) aa.get(aa.size() - 1), 
vd, 0.00001);
-
-                               vd = 222.222;
-                               vds = vd + "";
-                               aa.append(vds);
-                               assertEquals((double) aa.get(aa.size() - 1), 
vd, 0.00001);
-                               break;
-                       case INT32:
-                               int vi = 321521;
-                               String vis = vi + "";
-                               aa.append(vis);
-                               assertEquals((int) aa.get(aa.size() - 1), vi);
-
-                               vi = -2321;
-                               vis = vi + "";
-                               aa.append(vis);
-                               assertEquals((int) aa.get(aa.size() - 1), vi);
-                               break;
-                       case INT64:
-                               long vl = 321521;
-                               String vls = vl + "";
-                               aa.append(vls);
-                               assertEquals((long) aa.get(aa.size() - 1), vl);
-
-                               vl = -22223;
-                               vls = vl + "";
-                               aa.append(vls);
-                               assertEquals((long) aa.get(aa.size() - 1), vl);
-                               break;
-                       case STRING:
-                               String vs = "ThisIsAMonkeyTestSting";
-                               aa.append(vs);
-                               assertEquals((String) aa.get(aa.size() - 1), 
vs);
+               try {
 
-                               vs = "£$&*%!))";
-                               aa.append(vs);
-                               assertEquals((String) aa.get(aa.size() - 1), 
vs);
-                               break;
-                       case UINT8:
-                               int vi8 = 234;
-                               String vi8s = vi8 + "";
-                               aa.append(vi8s);
-                               assertEquals((int) aa.get(aa.size() - 1), vi8);
-
-                               vi8 = 42;
-                               vi8s = vi8 + "";
-                               aa.append(vi8s);
-                               assertEquals((int) aa.get(aa.size() - 1), vi8);
-                               break;
-                       case CHARACTER:
-                               char vc = '@';
-                               String vci = vc + "";
-                               aa.append(vci);
-                               assertEquals((char) aa.get(aa.size() - 1), vc);
-                               vc = (char) 42;
-                               vci = vc + "";
-                               aa.append(vci);
-                               assertEquals((char) aa.get(aa.size() - 1), vc);
-                               break;
-                       case UNKNOWN:
-                       default:
-                               throw new DMLRuntimeException("Invalid type");
+                       // just test that it serialize as byte array with no 
crashes
+                       a.getAsByteArray();
+               }
+               catch(DMLCompressionException e) {
+                       return; // valid
                }
        }
 
        @Test
-       public void appendNull() {
+       public void appendString() {
                Array<?> aa = a.clone();
+               try {
 
-               aa.append((String) null);
-               if(a.getFrameArrayType() == FrameArrayType.OPTIONAL)
-                       assertEquals((String) aa.get(aa.size() - 1), null);
-               else {
                        switch(a.getValueType()) {
                                case BOOLEAN:
+                                       aa.append("0");
                                        assertEquals((Boolean) aa.get(aa.size() 
- 1), false);
+                                       aa.append("1");
+                                       assertEquals((Boolean) aa.get(aa.size() 
- 1), true);
                                        break;
                                case FP32:
-                                       assertEquals((float) aa.get(aa.size() - 
1), 0.0, 0.00001);
+                                       float vf = 3215216.222f;
+                                       String vfs = vf + "";
+                                       aa.append(vfs);
+                                       assertEquals((float) aa.get(aa.size() - 
1), vf, 0.00001);
+
+                                       vf = 32152336.222f;
+                                       vfs = vf + "";
+                                       aa.append(vfs);
+                                       assertEquals((float) aa.get(aa.size() - 
1), vf, 0.00001);
                                        break;
                                case FP64:
-                                       assertEquals((double) aa.get(aa.size() 
- 1), 0.0, 0.00001);
+                                       double vd = 3215216.222;
+                                       String vds = vd + "";
+                                       aa.append(vds);
+                                       assertEquals((double) aa.get(aa.size() 
- 1), vd, 0.00001);
+
+                                       vd = 222.222;
+                                       vds = vd + "";
+                                       aa.append(vds);
+                                       assertEquals((double) aa.get(aa.size() 
- 1), vd, 0.00001);
                                        break;
                                case INT32:
-                                       assertEquals((int) aa.get(aa.size() - 
1), 0);
+                                       int vi = 321521;
+                                       String vis = vi + "";
+                                       aa.append(vis);
+                                       assertEquals((int) aa.get(aa.size() - 
1), vi);
+
+                                       vi = -2321;
+                                       vis = vi + "";
+                                       aa.append(vis);
+                                       assertEquals((int) aa.get(aa.size() - 
1), vi);
                                        break;
                                case INT64:
-                                       assertEquals((long) aa.get(aa.size() - 
1), 0);
+                                       long vl = 321521;
+                                       String vls = vl + "";
+                                       aa.append(vls);
+                                       assertEquals((long) aa.get(aa.size() - 
1), vl);
+
+                                       vl = -22223;
+                                       vls = vl + "";
+                                       aa.append(vls);
+                                       assertEquals((long) aa.get(aa.size() - 
1), vl);
                                        break;
                                case STRING:
-                                       assertEquals((String) aa.get(aa.size() 
- 1), null);
+                                       String vs = "ThisIsAMonkeyTestSting";
+                                       aa.append(vs);
+                                       assertEquals((String) aa.get(aa.size() 
- 1), vs);
+
+                                       vs = "£$&*%!))";
+                                       aa.append(vs);
+                                       assertEquals((String) aa.get(aa.size() 
- 1), vs);
                                        break;
                                case UINT8:
-                                       assertEquals((int) aa.get(aa.size() - 
1), 0);
+                                       int vi8 = 234;
+                                       String vi8s = vi8 + "";
+                                       aa.append(vi8s);
+                                       assertEquals((int) aa.get(aa.size() - 
1), vi8);
+
+                                       vi8 = 42;
+                                       vi8s = vi8 + "";
+                                       aa.append(vi8s);
+                                       assertEquals((int) aa.get(aa.size() - 
1), vi8);
                                        break;
                                case CHARACTER:
-                                       assertEquals((char) aa.get(aa.size() - 
1), 0);
+                                       char vc = '@';
+                                       String vci = vc + "";
+                                       aa.append(vci);
+                                       assertEquals((char) aa.get(aa.size() - 
1), vc);
+                                       vc = (char) 42;
+                                       vci = vc + "";
+                                       aa.append(vci);
+                                       assertEquals((char) aa.get(aa.size() - 
1), vc);
                                        break;
                                case UNKNOWN:
                                default:
                                        throw new DMLRuntimeException("Invalid 
type");
                        }
                }
+               catch(DMLCompressionException e) {
+                       return;// valid
+               }
        }
 
        @Test
-       public void append60Null() {
+       public void appendNull() {
                Array<?> aa = a.clone();
 
-               for(int i = 0; i < 60; i++)
+               try {
+
                        aa.append((String) null);
-               if(a.getFrameArrayType() == FrameArrayType.OPTIONAL)
-                       assertEquals((String) aa.get(aa.size() - 1), null);
-               else {
-                       switch(a.getValueType()) {
-                               case BOOLEAN:
-                                       assertEquals((Boolean) aa.get(aa.size() 
- 1), false);
-                                       break;
-                               case FP32:
-                                       assertEquals((float) aa.get(aa.size() - 
1), 0.0, 0.00001);
-                                       break;
-                               case FP64:
-                                       assertEquals((double) aa.get(aa.size() 
- 1), 0.0, 0.00001);
-                                       break;
-                               case INT32:
-                                       assertEquals((int) aa.get(aa.size() - 
1), 0);
-                                       break;
-                               case INT64:
-                                       assertEquals((long) aa.get(aa.size() - 
1), 0);
-                                       break;
-                               case STRING:
-                                       assertEquals((String) aa.get(aa.size() 
- 1), null);
-                                       break;
-                               case UINT8:
-                                       assertEquals((int) aa.get(aa.size() - 
1), 0);
-                                       break;
-                               case CHARACTER:
-                                       assertEquals((char) aa.get(aa.size() - 
1), 0);
-                                       break;
-                               case UNKNOWN:
-                               default:
-                                       throw new DMLRuntimeException("Invalid 
type");
+                       if(a.getFrameArrayType() == FrameArrayType.OPTIONAL)
+                               assertEquals((String) aa.get(aa.size() - 1), 
null);
+                       else {
+                               switch(a.getValueType()) {
+                                       case BOOLEAN:
+                                               assertEquals((Boolean) 
aa.get(aa.size() - 1), false);
+                                               break;
+                                       case FP32:
+                                               assertEquals((float) 
aa.get(aa.size() - 1), 0.0, 0.00001);
+                                               break;
+                                       case FP64:
+                                               assertEquals((double) 
aa.get(aa.size() - 1), 0.0, 0.00001);
+                                               break;
+                                       case INT32:
+                                               assertEquals((int) 
aa.get(aa.size() - 1), 0);
+                                               break;
+                                       case INT64:
+                                               assertEquals((long) 
aa.get(aa.size() - 1), 0);
+                                               break;
+                                       case STRING:
+                                               assertEquals((String) 
aa.get(aa.size() - 1), null);
+                                               break;
+                                       case UINT8:
+                                               assertEquals((int) 
aa.get(aa.size() - 1), 0);
+                                               break;
+                                       case CHARACTER:
+                                               assertEquals((char) 
aa.get(aa.size() - 1), 0);
+                                               break;
+                                       case UNKNOWN:
+                                       default:
+                                               throw new 
DMLRuntimeException("Invalid type");
+                               }
+                       }
+               }
+               catch(DMLCompressionException e) {
+                       return;// valid
+               }
+       }
+
+       @Test
+       public void append60Null() {
+               Array<?> aa = a.clone();
+
+               try {
+
+                       for(int i = 0; i < 60; i++)
+                               aa.append((String) null);
+                       if(a.getFrameArrayType() == FrameArrayType.OPTIONAL)
+                               assertEquals((String) aa.get(aa.size() - 1), 
null);
+                       else {
+                               switch(a.getValueType()) {
+                                       case BOOLEAN:
+                                               assertEquals((Boolean) 
aa.get(aa.size() - 1), false);
+                                               break;
+                                       case FP32:
+                                               assertEquals((float) 
aa.get(aa.size() - 1), 0.0, 0.00001);
+                                               break;
+                                       case FP64:
+                                               assertEquals((double) 
aa.get(aa.size() - 1), 0.0, 0.00001);
+                                               break;
+                                       case INT32:
+                                               assertEquals((int) 
aa.get(aa.size() - 1), 0);
+                                               break;
+                                       case INT64:
+                                               assertEquals((long) 
aa.get(aa.size() - 1), 0);
+                                               break;
+                                       case STRING:
+                                               assertEquals((String) 
aa.get(aa.size() - 1), null);
+                                               break;
+                                       case UINT8:
+                                               assertEquals((int) 
aa.get(aa.size() - 1), 0);
+                                               break;
+                                       case CHARACTER:
+                                               assertEquals((char) 
aa.get(aa.size() - 1), 0);
+                                               break;
+                                       case UNKNOWN:
+                                       default:
+                                               throw new 
DMLRuntimeException("Invalid type");
+                               }
                        }
                }
+               catch(DMLCompressionException e) {
+                       return; // valid
+               }
        }
 
        @Test
@@ -851,6 +954,9 @@ public class FrameArrayTests {
                                        throw new DMLRuntimeException("Invalid 
type");
                        }
                }
+               catch(DMLCompressionException e) {
+                       return;// valid
+               }
                catch(Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
@@ -868,6 +974,9 @@ public class FrameArrayTests {
 
                        aa.setFromOtherTypeNz(af);
                }
+               catch(DMLCompressionException e) {
+                       return;// valid
+               }
                catch(Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
@@ -885,6 +994,9 @@ public class FrameArrayTests {
 
                        aa.setFromOtherTypeNz(af);
                }
+               catch(DMLCompressionException e) {
+                       return;// valid
+               }
                catch(Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
@@ -902,6 +1014,9 @@ public class FrameArrayTests {
 
                        aa.setFromOtherType(0, af.size() - 1, af);
                }
+               catch(DMLCompressionException e) {
+                       return;// valid
+               }
                catch(Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
@@ -923,6 +1038,9 @@ public class FrameArrayTests {
 
                        aa.setFromOtherType(0, af.size() - 1, af);
                }
+               catch(DMLCompressionException e) {
+                       return;// valid
+               }
                catch(Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
@@ -950,6 +1068,9 @@ public class FrameArrayTests {
                                }
                        }
                }
+               catch(DMLCompressionException e) {
+                       return;// valid
+               }
                catch(Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
@@ -1044,42 +1165,48 @@ public class FrameArrayTests {
        @SuppressWarnings("unchecked")
        public void setNullType() {
                Array<?> aa = a.clone();
-               switch(aa.getValueType()) {
-                       case BOOLEAN:
-                               ((Array<Boolean>) aa).set(0, (Boolean) null);
-                               assertTrue(aa.get(0) == null || 
aa.get(0).equals(Boolean.valueOf(false)));
-                               break;
-                       case CHARACTER:
-                               ((Array<Character>) aa).set(0, (Character) 
null);
-                               assertTrue(aa.get(0) == null || 
aa.get(0).equals(Character.valueOf((char) 0)));
-                               break;
+               try {
 
-                       case FP32:
-                               ((Array<Float>) aa).set(0, (Float) null);
-                               assertTrue(aa.get(0) == null || 
aa.get(0).equals(Float.valueOf(0.0f)));
-                               break;
-                       case FP64:
-                               ((Array<Double>) aa).set(0, (Double) null);
-                               assertTrue(aa.get(0) == null || 
aa.get(0).equals(Double.valueOf(0.0d)));
-                               break;
-                       case INT32:
-                               ((Array<Integer>) aa).set(0, (Integer) null);
-                               assertTrue(aa.get(0) == null || 
aa.get(0).equals(Integer.valueOf(0)));
-                               break;
-                       case INT64:
-                               ((Array<Long>) aa).set(0, (Long) null);
-                               assertTrue(aa.get(0) == null || 
aa.get(0).equals(Long.valueOf(0)));
-                               break;
-                       case UINT8:
-                               ((Array<Integer>) aa).set(0, (Integer) null);
-                               assertTrue(aa.get(0) == null || 
aa.get(0).equals(Integer.valueOf(0)));
-                               break;
-                       default:
-                       case STRING:
-                       case UNKNOWN:
-                               aa.set(0, (String) null);
-                               assertTrue(aa.get(0) == null);
-                               break;
+                       switch(aa.getValueType()) {
+                               case BOOLEAN:
+                                       ((Array<Boolean>) aa).set(0, (Boolean) 
null);
+                                       assertTrue(aa.get(0) == null || 
aa.get(0).equals(Boolean.valueOf(false)));
+                                       break;
+                               case CHARACTER:
+                                       ((Array<Character>) aa).set(0, 
(Character) null);
+                                       assertTrue(aa.get(0) == null || 
aa.get(0).equals(Character.valueOf((char) 0)));
+                                       break;
+
+                               case FP32:
+                                       ((Array<Float>) aa).set(0, (Float) 
null);
+                                       assertTrue(aa.get(0) == null || 
aa.get(0).equals(Float.valueOf(0.0f)));
+                                       break;
+                               case FP64:
+                                       ((Array<Double>) aa).set(0, (Double) 
null);
+                                       assertTrue(aa.get(0) == null || 
aa.get(0).equals(Double.valueOf(0.0d)));
+                                       break;
+                               case INT32:
+                                       ((Array<Integer>) aa).set(0, (Integer) 
null);
+                                       assertTrue(aa.get(0) == null || 
aa.get(0).equals(Integer.valueOf(0)));
+                                       break;
+                               case INT64:
+                                       ((Array<Long>) aa).set(0, (Long) null);
+                                       assertTrue(aa.get(0) == null || 
aa.get(0).equals(Long.valueOf(0)));
+                                       break;
+                               case UINT8:
+                                       ((Array<Integer>) aa).set(0, (Integer) 
null);
+                                       assertTrue(aa.get(0) == null || 
aa.get(0).equals(Integer.valueOf(0)));
+                                       break;
+                               default:
+                               case STRING:
+                               case UNKNOWN:
+                                       aa.set(0, (String) null);
+                                       assertTrue(aa.get(0) == null);
+                                       break;
+                       }
+               }
+               catch(DMLCompressionException e) {
+                       return; // valid exception
                }
        }
 
@@ -1088,57 +1215,63 @@ public class FrameArrayTests {
        public void testAppendArray() {
                Array<?> aa = a.clone();
 
-               switch(a.getValueType()) {
-                       case BOOLEAN:
-                               aa = ((Array<Boolean>) aa).append(new 
BooleanArray(new boolean[10]));
-                               assertEquals(aa.size(), a.size() + 10);
-                               for(int i = 0; i < 10; i++)
-                                       assertEquals(aa.get(i + a.size()), 
false);
-                               break;
-                       case CHARACTER:
-                               aa = ((Array<Character>) aa).append(new 
CharArray(new char[10]));
-                               assertEquals(aa.size(), a.size() + 10);
-                               for(int i = 0; i < 10; i++)
-                                       assertEquals(aa.get(i + a.size()), 
(char) 0);
-                               break;
-                       case FP32:
-                               aa = ((Array<Float>) aa).append(new 
FloatArray(new float[10]));
-                               assertEquals(aa.size(), a.size() + 10);
-                               for(int i = 0; i < 10; i++)
-                                       assertEquals(aa.get(i + a.size()), 
0.0f);
-                               break;
-                       case FP64:
-                               aa = ((Array<Double>) aa).append(new 
DoubleArray(new double[10]));
-                               assertEquals(aa.size(), a.size() + 10);
-                               for(int i = 0; i < 10; i++)
-                                       assertEquals(aa.get(i + a.size()), 
0.0d);
-                               break;
-                       case UINT8:
-                       case INT32:
-                               aa = ((Array<Integer>) aa).append(new 
IntegerArray(new int[10]));
-                               assertEquals(aa.size(), a.size() + 10);
-                               for(int i = 0; i < 10; i++)
-                                       assertEquals(aa.get(i + a.size()), 0);
-                               break;
-                       case INT64:
-                               aa = ((Array<Long>) aa).append(new 
LongArray(new long[10]));
-                               assertEquals(aa.size(), a.size() + 10);
-                               for(int i = 0; i < 10; i++)
-                                       assertEquals(aa.get(i + a.size()), 0L);
-                               break;
-                       case STRING:
-                               aa = ((Array<String>) aa).append(new 
StringArray(new String[10]));
-                               assertEquals(aa.size(), a.size() + 10);
-                               for(int i = 0; i < 10; i++)
-                                       assertEquals(aa.get(i + a.size()), 
null);
-                               break;
-                       case UNKNOWN:
-                       default:
-                               throw new NotImplementedException("Not 
supported");
-               }
+               try {
+
+                       switch(a.getValueType()) {
+                               case BOOLEAN:
+                                       aa = ((Array<Boolean>) aa).append(new 
BooleanArray(new boolean[10]));
+                                       assertEquals(aa.size(), a.size() + 10);
+                                       for(int i = 0; i < 10; i++)
+                                               assertEquals(aa.get(i + 
a.size()), false);
+                                       break;
+                               case CHARACTER:
+                                       aa = ((Array<Character>) aa).append(new 
CharArray(new char[10]));
+                                       assertEquals(aa.size(), a.size() + 10);
+                                       for(int i = 0; i < 10; i++)
+                                               assertEquals(aa.get(i + 
a.size()), (char) 0);
+                                       break;
+                               case FP32:
+                                       aa = ((Array<Float>) aa).append(new 
FloatArray(new float[10]));
+                                       assertEquals(aa.size(), a.size() + 10);
+                                       for(int i = 0; i < 10; i++)
+                                               assertEquals(aa.get(i + 
a.size()), 0.0f);
+                                       break;
+                               case FP64:
+                                       aa = ((Array<Double>) aa).append(new 
DoubleArray(new double[10]));
+                                       assertEquals(aa.size(), a.size() + 10);
+                                       for(int i = 0; i < 10; i++)
+                                               assertEquals(aa.get(i + 
a.size()), 0.0d);
+                                       break;
+                               case UINT8:
+                               case INT32:
+                                       aa = ((Array<Integer>) aa).append(new 
IntegerArray(new int[10]));
+                                       assertEquals(aa.size(), a.size() + 10);
+                                       for(int i = 0; i < 10; i++)
+                                               assertEquals(aa.get(i + 
a.size()), 0);
+                                       break;
+                               case INT64:
+                                       aa = ((Array<Long>) aa).append(new 
LongArray(new long[10]));
+                                       assertEquals(aa.size(), a.size() + 10);
+                                       for(int i = 0; i < 10; i++)
+                                               assertEquals(aa.get(i + 
a.size()), 0L);
+                                       break;
+                               case STRING:
+                                       aa = ((Array<String>) aa).append(new 
StringArray(new String[10]));
+                                       assertEquals(aa.size(), a.size() + 10);
+                                       for(int i = 0; i < 10; i++)
+                                               assertEquals(aa.get(i + 
a.size()), null);
+                                       break;
+                               case UNKNOWN:
+                               default:
+                                       throw new NotImplementedException("Not 
supported");
+                       }
 
-               for(int i = 0; i < a.size(); i++)
-                       assertEquals(a.get(i), aa.get(i));
+                       for(int i = 0; i < a.size(); i++)
+                               assertEquals(a.get(i), aa.get(i));
+               }
+               catch(DMLCompressionException e) {
+                       return; // valid
+               }
 
        }
 
@@ -1147,59 +1280,65 @@ public class FrameArrayTests {
        public void testAppendValue() {
                Array<?> aa = a.clone();
                boolean isOptional = aa instanceof OptionalArray;
-               switch(a.getValueType()) {
-                       case BOOLEAN:
-                               ((Array<Boolean>) aa).append((Boolean) null);
-                               assertEquals(aa.size(), a.size() + 1);
-                               if(!isOptional)
-                                       assertEquals(aa.get(a.size()), false);
-                               break;
-                       case CHARACTER:
-                               ((Array<Character>) aa).append((Character) 
null);
-                               assertEquals(aa.size(), a.size() + 1);
-                               if(!isOptional)
-                                       assertEquals(aa.get(a.size()), (char) 
0);
-                               break;
-                       case FP32:
-                               ((Array<Float>) aa).append((Float) null);
-                               assertEquals(aa.size(), a.size() + 1);
-                               if(!isOptional)
-                                       assertEquals(aa.get(a.size()), 0.0f);
-                               break;
-                       case FP64:
-                               ((Array<Double>) aa).append((Double) null);
-                               assertEquals(aa.size(), a.size() + 1);
-                               if(!isOptional)
-                                       assertEquals(aa.get(a.size()), 0.0d);
-                               break;
-                       case UINT8:
-                       case INT32:
-                               ((Array<Integer>) aa).append((Integer) null);
-                               assertEquals(aa.size(), a.size() + 1);
-                               if(!isOptional)
-                                       assertEquals(aa.get(a.size()), 0);
-                               break;
-                       case INT64:
-                               ((Array<Long>) aa).append((Long) null);
-                               assertEquals(aa.size(), a.size() + 1);
-                               if(!isOptional)
-                                       assertEquals(aa.get(a.size()), 0L);
-                               break;
-                       case STRING:
-                               aa.append((String) null);
-                               assertEquals(aa.size(), a.size() + 1);
-                               if(!isOptional)
-                                       assertEquals(aa.get(a.size()), null);
-                               break;
-                       case UNKNOWN:
-                       default:
-                               throw new NotImplementedException("Not 
supported");
-               }
+               try {
 
-               for(int i = 0; i < a.size(); i++)
-                       assertEquals(a.get(i), aa.get(i));
-               if(isOptional)
-                       assertEquals(aa.get(a.size()), null);
+                       switch(a.getValueType()) {
+                               case BOOLEAN:
+                                       ((Array<Boolean>) aa).append((Boolean) 
null);
+                                       assertEquals(aa.size(), a.size() + 1);
+                                       if(!isOptional)
+                                               assertEquals(aa.get(a.size()), 
false);
+                                       break;
+                               case CHARACTER:
+                                       ((Array<Character>) 
aa).append((Character) null);
+                                       assertEquals(aa.size(), a.size() + 1);
+                                       if(!isOptional)
+                                               assertEquals(aa.get(a.size()), 
(char) 0);
+                                       break;
+                               case FP32:
+                                       ((Array<Float>) aa).append((Float) 
null);
+                                       assertEquals(aa.size(), a.size() + 1);
+                                       if(!isOptional)
+                                               assertEquals(aa.get(a.size()), 
0.0f);
+                                       break;
+                               case FP64:
+                                       ((Array<Double>) aa).append((Double) 
null);
+                                       assertEquals(aa.size(), a.size() + 1);
+                                       if(!isOptional)
+                                               assertEquals(aa.get(a.size()), 
0.0d);
+                                       break;
+                               case UINT8:
+                               case INT32:
+                                       ((Array<Integer>) aa).append((Integer) 
null);
+                                       assertEquals(aa.size(), a.size() + 1);
+                                       if(!isOptional)
+                                               assertEquals(aa.get(a.size()), 
0);
+                                       break;
+                               case INT64:
+                                       ((Array<Long>) aa).append((Long) null);
+                                       assertEquals(aa.size(), a.size() + 1);
+                                       if(!isOptional)
+                                               assertEquals(aa.get(a.size()), 
0L);
+                                       break;
+                               case STRING:
+                                       aa.append((String) null);
+                                       assertEquals(aa.size(), a.size() + 1);
+                                       if(!isOptional)
+                                               assertEquals(aa.get(a.size()), 
null);
+                                       break;
+                               case UNKNOWN:
+                               default:
+                                       throw new NotImplementedException("Not 
supported");
+                       }
+
+                       for(int i = 0; i < a.size(); i++)
+                               assertEquals(a.get(i), aa.get(i));
+                       if(isOptional)
+                               assertEquals(aa.get(a.size()), null);
+               }
+               catch(DMLCompressionException e) {
+                       return;// valid
+               }
        }
 
        @Test
@@ -1207,139 +1346,160 @@ public class FrameArrayTests {
        public void testAppendArrayOptional() {
                Array<?> aa = a.clone();
 
-               switch(a.getValueType()) {
-                       case BOOLEAN:
-                               try {
-                                       aa = ((Array<Boolean>) aa).append(new 
OptionalArray<>(new Boolean[10]));
-                               }
-                               catch(Exception e) {
-                                       e.printStackTrace();
-                                       fail(e.getMessage());
-                               }
-                               break;
-                       case CHARACTER:
-                               aa = ((Array<Character>) aa).append(new 
OptionalArray<>(new Character[10]));
-                               break;
-                       case FP32:
-                               aa = ((Array<Float>) aa).append(new 
OptionalArray<>(new Float[10]));
-                               break;
-                       case FP64:
-                               aa = ((Array<Double>) aa).append(new 
OptionalArray<>(new Double[10]));
-                               break;
-                       case UINT8:
-                       case INT32:
-                               aa = ((Array<Integer>) aa).append(new 
OptionalArray<>(new Integer[10]));
-                               break;
-                       case INT64:
-                               aa = ((Array<Long>) aa).append(new 
OptionalArray<>(new Long[10]));
-                               break;
-                       case STRING:
-                               return; // not relevant
-                       case UNKNOWN:
-                       default:
-                               throw new NotImplementedException("Not 
supported");
-               }
+               try {
 
-               assertEquals(aa.size(), a.size() + 10);
+                       switch(a.getValueType()) {
+                               case BOOLEAN:
+                                       try {
+                                               aa = ((Array<Boolean>) 
aa).append(new OptionalArray<>(new Boolean[10]));
+                                       }
+                                       catch(DMLCompressionException e) {
+                                               throw e;
+                                       }
+                                       catch(Exception e) {
+                                               e.printStackTrace();
+                                               fail(e.getMessage());
+                                       }
+                                       break;
+                               case CHARACTER:
+                                       aa = ((Array<Character>) aa).append(new 
OptionalArray<>(new Character[10]));
+                                       break;
+                               case FP32:
+                                       aa = ((Array<Float>) aa).append(new 
OptionalArray<>(new Float[10]));
+                                       break;
+                               case FP64:
+                                       aa = ((Array<Double>) aa).append(new 
OptionalArray<>(new Double[10]));
+                                       break;
+                               case UINT8:
+                               case INT32:
+                                       aa = ((Array<Integer>) aa).append(new 
OptionalArray<>(new Integer[10]));
+                                       break;
+                               case INT64:
+                                       aa = ((Array<Long>) aa).append(new 
OptionalArray<>(new Long[10]));
+                                       break;
+                               case STRING:
+                                       return; // not relevant
+                               case UNKNOWN:
+                               default:
+                                       throw new NotImplementedException("Not 
supported");
+                       }
 
-               for(int i = 0; i < a.size(); i++)
-                       assertEquals(a.get(i), aa.get(i));
+                       assertEquals(aa.size(), a.size() + 10);
 
-               for(int i = 0; i < 10; i++)
-                       assertEquals(null, aa.get(i + a.size()));
+                       for(int i = 0; i < a.size(); i++)
+                               assertEquals(a.get(i), aa.get(i));
+
+                       for(int i = 0; i < 10; i++)
+                               assertEquals(null, aa.get(i + a.size()));
+               }
+               catch(DMLCompressionException e) {
+                       return; // valid
+               }
        }
 
        @Test
        public void fillNull() {
                Array<?> aa = a.clone();
                boolean isOptional = aa instanceof OptionalArray;
-               aa.fill((String) null);
-               switch(a.getValueType()) {
-                       case BOOLEAN:
-                               if(!isOptional)
+               try {
+
+                       aa.fill((String) null);
+                       switch(a.getValueType()) {
+                               case BOOLEAN:
+                                       if(!isOptional)
+
+                                               for(int i = 0; i < aa.size(); 
i++)
+                                                       assertEquals(aa.get(i), 
false);
+                                       break;
+                               case CHARACTER:
+                                       if(!isOptional)
+                                               for(int i = 0; i < aa.size(); 
i++)
+                                                       assertEquals(aa.get(i), 
(char) 0);
+                                       break;
+                               case FP32:
+                                       if(!isOptional)
+                                               for(int i = 0; i < aa.size(); 
i++)
+                                                       assertEquals(aa.get(i), 
0.0f);
+                                       break;
+                               case FP64:
+                                       if(!isOptional)
+                                               for(int i = 0; i < aa.size(); 
i++)
+                                                       assertEquals(aa.get(i), 
0.0d);
+                                       break;
+                               case UINT8:
+                               case INT32:
+                                       if(!isOptional)
+                                               for(int i = 0; i < aa.size(); 
i++)
+                                                       assertEquals(aa.get(i), 
0);
+                                       break;
+                               case INT64:
+                                       if(!isOptional)
+                                               for(int i = 0; i < aa.size(); 
i++)
+                                                       assertEquals(aa.get(i), 
0L);
+                                       break;
+                               case STRING:
+                                       if(!isOptional)
+                                               for(int i = 0; i < aa.size(); 
i++)
+                                                       assertEquals(aa.get(i), 
null);
+                                       break;
+                               case UNKNOWN:
+                               default:
+                                       throw new NotImplementedException("Not 
supported");
+                       }
 
+                       if(isOptional)
+                               for(int i = 0; i < aa.size(); i++)
+                                       assertEquals(aa.get(i), null);
+               }
+               catch(DMLCompressionException e) {
+                       return;// valid
+               }
+       }
+
+       @Test
+       public void fill1String() {
+               Array<?> aa = a.clone();
+               try {
+
+                       // boolean isOptional = aa instanceof OptionalArray;
+                       aa.fill("1");
+                       switch(a.getValueType()) {
+                               case BOOLEAN:
                                        for(int i = 0; i < aa.size(); i++)
-                                               assertEquals(aa.get(i), false);
-                               break;
-                       case CHARACTER:
-                               if(!isOptional)
+                                               assertEquals(aa.get(i), true);
+                                       break;
+                               case CHARACTER:
                                        for(int i = 0; i < aa.size(); i++)
-                                               assertEquals(aa.get(i), (char) 
0);
-                               break;
-                       case FP32:
-                               if(!isOptional)
+                                               assertEquals(aa.get(i), '1');
+                                       break;
+                               case FP32:
                                        for(int i = 0; i < aa.size(); i++)
-                                               assertEquals(aa.get(i), 0.0f);
-                               break;
-                       case FP64:
-                               if(!isOptional)
+                                               assertEquals(aa.get(i), 1.0f);
+                                       break;
+                               case FP64:
                                        for(int i = 0; i < aa.size(); i++)
-                                               assertEquals(aa.get(i), 0.0d);
-                               break;
-                       case UINT8:
-                       case INT32:
-                               if(!isOptional)
+                                               assertEquals(aa.get(i), 1.0d);
+                                       break;
+                               case UINT8:
+                               case INT32:
                                        for(int i = 0; i < aa.size(); i++)
-                                               assertEquals(aa.get(i), 0);
-                               break;
-                       case INT64:
-                               if(!isOptional)
+                                               assertEquals(aa.get(i), 1);
+                                       break;
+                               case INT64:
                                        for(int i = 0; i < aa.size(); i++)
-                                               assertEquals(aa.get(i), 0L);
-                               break;
-                       case STRING:
-                               if(!isOptional)
+                                               assertEquals(aa.get(i), 1L);
+                                       break;
+                               case STRING:
                                        for(int i = 0; i < aa.size(); i++)
-                                               assertEquals(aa.get(i), null);
-                               break;
-                       case UNKNOWN:
-                       default:
-                               throw new NotImplementedException("Not 
supported");
+                                               assertEquals(aa.get(i), "1");
+                                       break;
+                               case UNKNOWN:
+                               default:
+                                       throw new NotImplementedException("Not 
supported");
+                       }
                }
-
-               if(isOptional)
-                       for(int i = 0; i < aa.size(); i++)
-                               assertEquals(aa.get(i), null);
-       }
-
-       @Test
-       public void fill1String() {
-               Array<?> aa = a.clone();
-               // boolean isOptional = aa instanceof OptionalArray;
-               aa.fill("1");
-               switch(a.getValueType()) {
-                       case BOOLEAN:
-                               for(int i = 0; i < aa.size(); i++)
-                                       assertEquals(aa.get(i), true);
-                               break;
-                       case CHARACTER:
-                               for(int i = 0; i < aa.size(); i++)
-                                       assertEquals(aa.get(i), '1');
-                               break;
-                       case FP32:
-                               for(int i = 0; i < aa.size(); i++)
-                                       assertEquals(aa.get(i), 1.0f);
-                               break;
-                       case FP64:
-                               for(int i = 0; i < aa.size(); i++)
-                                       assertEquals(aa.get(i), 1.0d);
-                               break;
-                       case UINT8:
-                       case INT32:
-                               for(int i = 0; i < aa.size(); i++)
-                                       assertEquals(aa.get(i), 1);
-                               break;
-                       case INT64:
-                               for(int i = 0; i < aa.size(); i++)
-                                       assertEquals(aa.get(i), 1L);
-                               break;
-                       case STRING:
-                               for(int i = 0; i < aa.size(); i++)
-                                       assertEquals(aa.get(i), "1");
-                               break;
-                       case UNKNOWN:
-                       default:
-                               throw new NotImplementedException("Not 
supported");
+               catch(DMLCompressionException e) {
+                       return;// valid
                }
        }
 
@@ -1347,46 +1507,52 @@ public class FrameArrayTests {
        @SuppressWarnings("unchecked")
        public void fill1Value() {
                Array<?> aa = a.clone();
-               switch(a.getValueType()) {
-                       case BOOLEAN:
-                               ((Array<Boolean>) aa).fill(true);
-                               for(int i = 0; i < aa.size(); i++)
-                                       assertEquals(aa.get(i), true);
-                               break;
-                       case CHARACTER:
-                               ((Array<Character>) aa).fill('1');
-                               for(int i = 0; i < aa.size(); i++)
-                                       assertEquals(aa.get(i), '1');
-                               break;
-                       case FP32:
-                               ((Array<Float>) aa).fill(1.0f);
-                               for(int i = 0; i < aa.size(); i++)
-                                       assertEquals(aa.get(i), 1.0f);
-                               break;
-                       case FP64:
-                               ((Array<Double>) aa).fill(1.0d);
-                               for(int i = 0; i < aa.size(); i++)
-                                       assertEquals(aa.get(i), 1.0d);
-                               break;
-                       case UINT8:
-                       case INT32:
-                               ((Array<Integer>) aa).fill(1);
-                               for(int i = 0; i < aa.size(); i++)
-                                       assertEquals(aa.get(i), 1);
-                               break;
-                       case INT64:
-                               ((Array<Long>) aa).fill(1L);
-                               for(int i = 0; i < aa.size(); i++)
-                                       assertEquals(aa.get(i), 1L);
-                               break;
-                       case STRING:
-                               aa.fill("1");
-                               for(int i = 0; i < aa.size(); i++)
-                                       assertEquals(aa.get(i), "1");
-                               break;
-                       case UNKNOWN:
-                       default:
-                               throw new NotImplementedException("Not 
supported");
+               try {
+
+                       switch(a.getValueType()) {
+                               case BOOLEAN:
+                                       ((Array<Boolean>) aa).fill(true);
+                                       for(int i = 0; i < aa.size(); i++)
+                                               assertEquals(aa.get(i), true);
+                                       break;
+                               case CHARACTER:
+                                       ((Array<Character>) aa).fill('1');
+                                       for(int i = 0; i < aa.size(); i++)
+                                               assertEquals(aa.get(i), '1');
+                                       break;
+                               case FP32:
+                                       ((Array<Float>) aa).fill(1.0f);
+                                       for(int i = 0; i < aa.size(); i++)
+                                               assertEquals(aa.get(i), 1.0f);
+                                       break;
+                               case FP64:
+                                       ((Array<Double>) aa).fill(1.0d);
+                                       for(int i = 0; i < aa.size(); i++)
+                                               assertEquals(aa.get(i), 1.0d);
+                                       break;
+                               case UINT8:
+                               case INT32:
+                                       ((Array<Integer>) aa).fill(1);
+                                       for(int i = 0; i < aa.size(); i++)
+                                               assertEquals(aa.get(i), 1);
+                                       break;
+                               case INT64:
+                                       ((Array<Long>) aa).fill(1L);
+                                       for(int i = 0; i < aa.size(); i++)
+                                               assertEquals(aa.get(i), 1L);
+                                       break;
+                               case STRING:
+                                       aa.fill("1");
+                                       for(int i = 0; i < aa.size(); i++)
+                                               assertEquals(aa.get(i), "1");
+                                       break;
+                               case UNKNOWN:
+                               default:
+                                       throw new NotImplementedException("Not 
supported");
+                       }
+               }
+               catch(DMLCompressionException e) {
+                       return;// valid
                }
        }
 
@@ -1450,6 +1616,9 @@ public class FrameArrayTests {
                                for(int i = 0; i < aa.size(); i++)
                                        assertEquals(aa.get(i), null);
                }
+               catch(DMLCompressionException e) {
+                       return;// valid
+               }
                catch(Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
@@ -1511,6 +1680,33 @@ public class FrameArrayTests {
                }
        }
 
+       @Test
+       public void testIsEmpty() {
+               a.isEmpty();
+       }
+
+       @Test
+       public void containsNull() {
+               if(a.containsNull()) {
+                       for(int i = 0; i < a.size(); i++) {
+                               if(a.get(i) == null)
+                                       return;
+                       }
+                       fail("No Null detected.");
+               }
+       }
+
+       @Test
+       public void testIterator() {
+               Array<?>.ArrayIterator ar = a.getIterator();
+               assertTrue(ar.hasNext());
+               for(int i = 0; i < a.size(); i++) {
+                       assertEquals(ar.next(), a.get(i));
+                       assertEquals(ar.getIndex(), i);
+               }
+               assertFalse(ar.hasNext());
+       }
+
        protected static void compare(Array<?> a, Array<?> b) {
                int size = a.size();
                String err = a.getClass().getSimpleName() + " " + 
a.getValueType() + " " + b.getClass().getSimpleName() + " "
@@ -1561,6 +1757,79 @@ public class FrameArrayTests {
                }
        }
 
+       protected static Array<?> createDDC(FrameArrayType t, int size, int 
seed) {
+               int nUnique = Math.max(size / 100, 2);
+               switch(t) {
+                       case STRING:
+                               return 
DDCArray.compressToDDC(ArrayFactory.create(generateRandomStringOpt(size, 
seed)));
+                       case BITSET:// not a thing
+                       case BOOLEAN:
+                               return 
DDCArray.compressToDDC(ArrayFactory.create(generateRandomBooleanOpt(size, 
seed)));
+                       case INT32:
+                               return DDCArray
+                                       
.compressToDDC(ArrayFactory.create(generateRandomIntegerNUniqueLengthOpt(size, 
seed, nUnique)));
+                       case INT64:
+                               return 
DDCArray.compressToDDC(ArrayFactory.create(generateRandomLongNUniqueLengthOpt(size,
 seed, nUnique)));
+                       case FP32:
+                               return DDCArray
+                                       
.compressToDDC(ArrayFactory.create(generateRandomFloatNUniqueLengthOpt(size, 
seed, nUnique)));
+                       case FP64:
+                               return DDCArray
+                                       
.compressToDDC(ArrayFactory.create(generateRandomDoubleNUniqueLengthOpt(size, 
seed, nUnique)));
+                       case CHARACTER:
+                               return DDCArray
+                                       
.compressToDDC(ArrayFactory.create(generateRandomCharacterNUniqueLengthOpt(size,
 seed, nUnique)));
+                       case OPTIONAL:
+                               Random r = new Random(seed);
+                               switch(r.nextInt(7)) {
+                                       case 0:
+                                               return DDCArray
+                                                       
.compressToDDC(ArrayFactory.create(generateRandomIntegerNUniqueLengthOpt(size, 
seed, nUnique)));
+                                       case 1:
+                                               return DDCArray
+                                                       
.compressToDDC(ArrayFactory.create(generateRandomLongNUniqueLengthOpt(size, 
seed, nUnique)));
+                                       case 2:
+                                               return DDCArray
+                                                       
.compressToDDC(ArrayFactory.create(generateRandomDoubleNUniqueLengthOpt(size, 
seed, nUnique)));
+                                       case 3:
+                                               return DDCArray
+                                                       
.compressToDDC(ArrayFactory.create(generateRandomFloatNUniqueLengthOpt(size, 
seed, nUnique)));
+                                       case 4:
+                                               return DDCArray
+                                                       
.compressToDDC(ArrayFactory.create(generateRandomCharacterNUniqueLengthOpt(size,
 seed, nUnique)));
+                                       default:
+                                               return 
DDCArray.compressToDDC(ArrayFactory.create(generateRandomBooleanOpt(size, 
seed)));
+                               }
+                       case DDC:
+                               Random r2 = new Random(seed);
+                               switch(r2.nextInt(7)) {
+                                       case 0:
+                                               return DDCArray
+                                                       
.compressToDDC(ArrayFactory.create(generateRandomIntegerNUniqueLengthOpt(size, 
seed, nUnique)));
+                                       case 1:
+                                               return DDCArray
+                                                       
.compressToDDC(ArrayFactory.create(generateRandomLongNUniqueLengthOpt(size, 
seed, nUnique)));
+                                       case 2:
+                                               return DDCArray
+                                                       
.compressToDDC(ArrayFactory.create(generateRandomDoubleNUniqueLengthOpt(size, 
seed, nUnique)));
+                                       case 3:
+                                               return DDCArray
+                                                       
.compressToDDC(ArrayFactory.create(generateRandomFloatNUniqueLengthOpt(size, 
seed, nUnique)));
+                                       case 4:
+                                               return DDCArray
+                                                       
.compressToDDC(ArrayFactory.create(generateRandomCharacterNUniqueLengthOpt(size,
 seed, nUnique)));
+                                       case 5:
+                                               return DDCArray
+                                                       
.compressToDDC(ArrayFactory.create(generateRandomStringNUniqueLengthOpt(size, 
seed, nUnique, 32)));
+                                       default:
+                                               return 
DDCArray.compressToDDC(ArrayFactory.create(generateRandomBooleanOpt(size, 
seed)));
+                               }
+                       default:
+                               throw new DMLRuntimeException("Unsupported 
value type: " + t);
+
+               }
+       }
+
        protected static Array<?> createOptional(FrameArrayType t, int size, 
int seed) {
                switch(t) {
                        case STRING:
@@ -1594,6 +1863,31 @@ public class FrameArrayTests {
                                        default:
                                                return 
ArrayFactory.create(generateRandomBooleanOpt(size, seed));
                                }
+                       case DDC:
+                               Random r2 = new Random(seed);
+                               int nUnique = Math.max(size / 100, 2);
+                               switch(r2.nextInt(7)) {
+                                       case 0:
+                                               return DDCArray
+                                                       
.compressToDDC(ArrayFactory.create(generateRandomIntegerNUniqueLengthOpt(size, 
seed, nUnique)));
+                                       case 1:
+                                               return DDCArray
+                                                       
.compressToDDC(ArrayFactory.create(generateRandomLongNUniqueLengthOpt(size, 
seed, nUnique)));
+                                       case 2:
+                                               return DDCArray
+                                                       
.compressToDDC(ArrayFactory.create(generateRandomDoubleNUniqueLengthOpt(size, 
seed, nUnique)));
+                                       case 3:
+                                               return DDCArray
+                                                       
.compressToDDC(ArrayFactory.create(generateRandomFloatNUniqueLengthOpt(size, 
seed, nUnique)));
+                                       case 4:
+                                               return DDCArray
+                                                       
.compressToDDC(ArrayFactory.create(generateRandomCharacterNUniqueLengthOpt(size,
 seed, nUnique)));
+                                       case 5:
+                                               return DDCArray
+                                                       
.compressToDDC(ArrayFactory.create(generateRandomStringNUniqueLengthOpt(size, 
seed, nUnique, 32)));
+                                       default:
+                                               return 
DDCArray.compressToDDC(ArrayFactory.create(generateRandomBooleanOpt(size, 
seed)));
+                               }
                        default:
                                throw new DMLRuntimeException("Unsupported 
value type: " + t);
 
@@ -1634,6 +1928,31 @@ public class FrameArrayTests {
                                        default:
                                                return 
ArrayFactory.create(generateRandomBooleanOpt(size, seed));
                                }
+                       case DDC:
+                               Random r2 = new Random(seed);
+                               int nUnique = Math.max(size / 100, 2);
+                               switch(r2.nextInt(7)) {
+                                       case 0:
+                                               return DDCArray
+                                                       
.compressToDDC(ArrayFactory.create(generateRandomIntegerNUniqueLengthOpt(size, 
seed, nUnique)));
+                                       case 1:
+                                               return DDCArray
+                                                       
.compressToDDC(ArrayFactory.create(generateRandomLongNUniqueLengthOpt(size, 
seed, nUnique)));
+                                       case 2:
+                                               return DDCArray
+                                                       
.compressToDDC(ArrayFactory.create(generateRandomDoubleNUniqueLengthOpt(size, 
seed, nUnique)));
+                                       case 3:
+                                               return DDCArray
+                                                       
.compressToDDC(ArrayFactory.create(generateRandomFloatNUniqueLengthOpt(size, 
seed, nUnique)));
+                                       case 4:
+                                               return DDCArray
+                                                       
.compressToDDC(ArrayFactory.create(generateRandomCharacterNUniqueLengthOpt(size,
 seed, nUnique)));
+                                       case 5:
+                                               return DDCArray
+                                                       
.compressToDDC(ArrayFactory.create(generateRandomStringNUniqueLengthOpt(size, 
seed, nUnique, 32)));
+                                       default:
+                                               return 
DDCArray.compressToDDC(ArrayFactory.create(generateRandomBooleanOpt(size, 
seed)));
+                               }
                        default:
                                throw new DMLRuntimeException("Unsupported 
value type: " + t);
                }
@@ -1650,10 +1969,94 @@ public class FrameArrayTests {
        }
 
        public static String[] generateRandomString(int size, int seed) {
+               return generateRandomStringLength(size, seed, 99);
+       }
+
+       public static String[] generateRandomStringLength(int size, int seed, 
int stringLength) {
                Random r = new Random(seed);
                String[] ret = new String[size];
                for(int i = 0; i < size; i++)
-                       ret[i] = r.nextInt(99) + "ad " + r.nextInt(99);
+                       ret[i] = r.nextInt(stringLength) + "ad" + 
r.nextInt(stringLength);
+               return ret;
+       }
+
+       public static String[] generateRandomStringNUnique(int size, int seed, 
int nUnique) {
+               return generateRandomStringNUniqueLength(size, seed, nUnique, 
99);
+       }
+
+       public static String[] generateRandomStringNUniqueLength(int size, int 
seed, int nUnique, int stringLength) {
+               String[] rands = generateRandomStringLength(nUnique, seed, 
stringLength);
+               Random r = new Random(seed + 1);
+
+               String[] ret = new String[size];
+               for(int i = 0; i < size; i++)
+                       ret[i] = rands[r.nextInt(nUnique)];
+               return ret;
+       }
+
+       public static String[] generateRandomStringNUniqueLengthOpt(int size, 
int seed, int nUnique, int stringLength) {
+               String[] rands = generateRandomStringLength(nUnique, seed, 
stringLength);
+               rands[rands.length - 1] = null;
+               Random r = new Random(seed + 1);
+
+               String[] ret = new String[size];
+               for(int i = 0; i < size; i++)
+                       ret[i] = rands[r.nextInt(nUnique)];
+               return ret;
+       }
+
+       public static Character[] generateRandomCharacterNUniqueLengthOpt(int 
size, int seed, int nUnique) {
+               Character[] rands = generateRandomCharacterOpt(nUnique, seed);
+               rands[rands.length - 1] = null;
+               Random r = new Random(seed + 1);
+
+               Character[] ret = new Character[size];
+               for(int i = 0; i < size; i++)
+                       ret[i] = rands[r.nextInt(nUnique)];
+               return ret;
+       }
+
+       public static Float[] generateRandomFloatNUniqueLengthOpt(int size, int 
seed, int nUnique) {
+               Float[] rands = generateRandomFloatOpt(nUnique, seed);
+               rands[rands.length - 1] = null;
+               Random r = new Random(seed + 1);
+
+               Float[] ret = new Float[size];
+               for(int i = 0; i < size; i++)
+                       ret[i] = rands[r.nextInt(nUnique)];
+               return ret;
+       }
+
+       public static Double[] generateRandomDoubleNUniqueLengthOpt(int size, 
int seed, int nUnique) {
+               Double[] rands = generateRandomDoubleOpt(nUnique, seed);
+               rands[rands.length - 1] = null;
+               Random r = new Random(seed + 1);
+
+               Double[] ret = new Double[size];
+               for(int i = 0; i < size; i++)
+                       ret[i] = rands[r.nextInt(nUnique)];
+               return ret;
+       }
+
+       public static Long[] generateRandomLongNUniqueLengthOpt(int size, int 
seed, int nUnique) {
+               Long[] rands = generateRandomLongOpt(nUnique, seed);
+               rands[rands.length - 1] = null;
+               Random r = new Random(seed + 1);
+
+               Long[] ret = new Long[size];
+               for(int i = 0; i < size; i++)
+                       ret[i] = rands[r.nextInt(nUnique)];
+               return ret;
+       }
+
+       public static Integer[] generateRandomIntegerNUniqueLengthOpt(int size, 
int seed, int nUnique) {
+               Integer[] rands = generateRandomIntegerOpt(nUnique, seed);
+               rands[rands.length - 1] = null;
+               Random r = new Random(seed + 1);
+
+               Integer[] ret = new Integer[size];
+               for(int i = 0; i < size; i++)
+                       ret[i] = rands[r.nextInt(nUnique)];
                return ret;
        }
 
diff --git 
a/src/test/java/org/apache/sysds/test/component/frame/array/NegativeArrayTests.java
 
b/src/test/java/org/apache/sysds/test/component/frame/array/NegativeArrayTests.java
index 84b43f6e72..61442cdd0f 100644
--- 
a/src/test/java/org/apache/sysds/test/component/frame/array/NegativeArrayTests.java
+++ 
b/src/test/java/org/apache/sysds/test/component/frame/array/NegativeArrayTests.java
@@ -21,12 +21,15 @@ package org.apache.sysds.test.component.frame.array;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
 
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.compress.DMLCompressionException;
+import org.apache.sysds.runtime.frame.data.columns.ACompressedArray;
 import org.apache.sysds.runtime.frame.data.columns.Array;
 import org.apache.sysds.runtime.frame.data.columns.ArrayFactory;
 import org.apache.sysds.runtime.frame.data.columns.BitSetArray;
@@ -39,6 +42,7 @@ import org.apache.sysds.runtime.frame.data.columns.LongArray;
 import org.apache.sysds.runtime.frame.data.columns.OptionalArray;
 import org.apache.sysds.runtime.frame.data.columns.StringArray;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class NegativeArrayTests {
 
@@ -248,7 +252,26 @@ public class NegativeArrayTests {
        }
 
        @Test(expected = NotImplementedException.class)
-       public void byteArrayString(){
+       public void byteArrayString() {
                new StringArray(new String[10]).getAsByteArray();
        }
+
+       @Test(expected = DMLCompressionException.class)
+       public void set1() {
+               ACompressedArray<?> a = mock(ACompressedArray.class, 
Mockito.CALLS_REAL_METHODS);
+               a.set(0, 0.0);
+       }
+
+       @Test(expected = DMLCompressionException.class)
+       public void set2() {
+               ACompressedArray<?> a = mock(ACompressedArray.class, 
Mockito.CALLS_REAL_METHODS);
+               a.set(0, "0mm0");
+       }
+
+       @Test(expected = DMLCompressionException.class)
+       @SuppressWarnings("unchecked")
+       public void set3() {
+               ACompressedArray<Integer> a = mock(ACompressedArray.class, 
Mockito.CALLS_REAL_METHODS);
+               a.set(0, Integer.valueOf(13));
+       }
 }


Reply via email to