This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new f0f8f0c190 [SYSTEMDS-3592] Frame Compress
f0f8f0c190 is described below

commit f0f8f0c19083114be5383d91ebbe80d362f6abbe
Author: baunsgaard <[email protected]>
AuthorDate: Mon Jul 3 19:37:37 2023 +0200

    [SYSTEMDS-3592] Frame Compress
    
    This commit adds a compression pipeline for frames to first analyze a
    sample, that then is used to determine compression of individual columns.
    The distinct estimation tools of the matrix compression frame work
    is used.
    Next step is parallelization of the compression.
    
    Closes #1856
---
 .../org/apache/sysds/api/mlcontext/MLContext.java  |   6 +-
 .../estim/sample/SampleEstimatorFactory.java       |  16 ++-
 .../sysds/runtime/frame/data/FrameBlock.java       |  51 ++++++++-
 .../frame/data/columns/ACompressedArray.java       |   7 ++
 .../sysds/runtime/frame/data/columns/Array.java    |  60 ++++++++---
 .../runtime/frame/data/columns/BitSetArray.java    |   9 +-
 .../runtime/frame/data/columns/BooleanArray.java   |   9 +-
 .../sysds/runtime/frame/data/columns/DDCArray.java |  33 +++---
 .../data/compress/ArrayCompressionStatistics.java  |  50 +++++++++
 .../data/compress/CompressedFrameBlockFactory.java | 119 +++++++++++++++++++++
 ...atistics.java => FrameCompressionSettings.java} |  15 ++-
 .../FrameCompressionSettingsBuilder.java}          |  42 +++++---
 .../runtime/frame/data/lib/FrameLibCompress.java   |  10 +-
 .../instructions/cp/CompressionCPInstruction.java  |   8 +-
 .../sysds/runtime/io/FrameWriterCompressed.java    |   2 +-
 .../component/frame/array/FrameArrayTests.java     |   1 +
 .../frame/compress/FrameCompressTest.java          |  57 +++++++++-
 .../frame/compress/FrameCompressTestLogging.java   |  99 +++++++++++++++++
 .../test/functions/codegen/APICodegenTest.java     |  12 +--
 19 files changed, 534 insertions(+), 72 deletions(-)

diff --git a/src/main/java/org/apache/sysds/api/mlcontext/MLContext.java 
b/src/main/java/org/apache/sysds/api/mlcontext/MLContext.java
index 838f8c76ef..64dafd3f5c 100644
--- a/src/main/java/org/apache/sysds/api/mlcontext/MLContext.java
+++ b/src/main/java/org/apache/sysds/api/mlcontext/MLContext.java
@@ -73,6 +73,9 @@ public class MLContext implements ConfigurableAPI
         */
        private static MLContext activeMLContext = null;
 
+       /** Welcome message */
+       public static boolean welcomePrint = false;
+
        /**
         * Contains cleanup methods used by MLContextProxy.
         */
@@ -262,8 +265,9 @@ public class MLContext implements ConfigurableAPI
                        }
                }
 
-               if (activeMLContext == null) {
+               if (!welcomePrint) {
                        System.out.println(MLContextUtil.welcomeMessage());
+                       welcomePrint = true;
                }
                
                this.spark = spark;
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/estim/sample/SampleEstimatorFactory.java
 
b/src/main/java/org/apache/sysds/runtime/compress/estim/sample/SampleEstimatorFactory.java
index 39cb706e34..01d3c8449e 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/estim/sample/SampleEstimatorFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/estim/sample/SampleEstimatorFactory.java
@@ -30,8 +30,20 @@ public interface SampleEstimatorFactory {
 
        public enum EstimationType {
                HassAndStokes, ShlosserEstimator, //
-               ShlosserJackknifeEstimator, SmoothedJackknifeEstimator,
-               HassAndStokesNoSolveCache,
+               ShlosserJackknifeEstimator, SmoothedJackknifeEstimator, 
HassAndStokesNoSolveCache,
+       }
+
+       /**
+        * Estimate a distinct number of values based on frequencies.
+        * 
+        * @param frequencies A list of frequencies of unique values, Note all 
values contained should be larger than zero
+        * @param nRows       The total number of rows to consider, Note should 
always be larger or equal to sum(frequencies)
+        * @param sampleSize  The size of the sample, Note this should ideally 
be scaled to match the sum(frequencies) and
+        *                    should always be lower or equal to nRows
+        * @return A estimated number of unique values
+        */
+       public static int distinctCount(int[] frequencies, int nRows, int 
sampleSize) {
+               return distinctCount(frequencies, nRows, sampleSize, 
EstimationType.HassAndStokes, null);
        }
 
        /**
diff --git a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
index 5faacaf8d1..486bac29fe 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
@@ -86,7 +86,7 @@ public class FrameBlock implements CacheBlock<FrameBlock>, 
Externalizable {
        /** Buffer size variable: 1M elements, size of default matrix block */
        public static final int BUFFER_SIZE = 1 * 1000 * 1000;
 
-       /** If debugging is enabled for the FrameBlocks in stable state*/
+       /** If debugging is enabled for the FrameBlocks in stable state */
        public static boolean debug = false;
 
        /** The schema of the data frame as an ordered list of value types */
@@ -197,6 +197,55 @@ public class FrameBlock implements CacheBlock<FrameBlock>, 
Externalizable {
                _nRow = data[0].size();
        }
 
+       /**
+        * Create a FrameBlock containing columns of the specified arrays
+        * 
+        * @param data The column data contained
+        */
+       public FrameBlock(Array<?>[] data) {
+               _schema = new ValueType[data.length];
+               for(int i = 0; i < data.length; i++)
+                       _schema[i] = data[i].getValueType();
+
+               _colnames = null;
+               ensureAllocateMeta();
+               _coldata = data;
+               _nRow = data[0].size();
+
+               if(debug) {
+                       for(int i = 0; i < data.length; i++) {
+                               if(data[i].size() != getNumRows())
+                                       throw new DMLRuntimeException(
+                                               "Invalid Frame allocation with 
different size arrays " + data[i].size() + " vs " + getNumRows());
+                       }
+               }
+       }
+
+       /**
+        * Create a FrameBlock containing columns of the specified arrays and 
names
+        * 
+        * @param data     The column data contained
+        * @param colnames The column names of the contained columns
+        */
+       public FrameBlock(Array<?>[] data, String[] colnames) {
+               _schema = new ValueType[data.length];
+               for(int i = 0; i < data.length; i++)
+                       _schema[i] = data[i].getValueType();
+
+               _colnames = colnames;
+               ensureAllocateMeta();
+               _coldata = data;
+               _nRow = data[0].size();
+
+               if(debug) {
+                       for(int i = 0; i < data.length; i++) {
+                               if(data[i].size() != getNumRows())
+                                       throw new DMLRuntimeException(
+                                               "Invalid Frame allocation with 
different size arrays " + data[i].size() + " vs " + getNumRows());
+                       }
+               }
+       }
+
        /**
         * Get the number of rows of the frame block.
         *
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/ACompressedArray.java
 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/ACompressedArray.java
index a36a0c3cc5..90ceb5f6a2 100644
--- 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/ACompressedArray.java
+++ 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/ACompressedArray.java
@@ -20,6 +20,7 @@
 package org.apache.sysds.runtime.frame.data.columns;
 
 import org.apache.sysds.runtime.compress.DMLCompressionException;
+import org.apache.sysds.runtime.frame.data.compress.ArrayCompressionStatistics;
 
 /**
  * A Compressed Array, in general does not allow us to set or modify the array.
@@ -102,4 +103,10 @@ public abstract class ACompressedArray<T> extends Array<T> 
{
                throw new DMLCompressionException("Invalid to reset compressed 
array");
        }
 
+       @Override
+       public ArrayCompressionStatistics statistics(int nSamples) {
+               // already compressed
+               return null;
+       }
+
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/Array.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/Array.java
index 3fbf3ed2d0..b544104df0 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/columns/Array.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/Array.java
@@ -23,6 +23,7 @@ import java.lang.ref.SoftReference;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.logging.Log;
@@ -30,7 +31,9 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Writable;
 import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.compress.estim.sample.SampleEstimatorFactory;
 import org.apache.sysds.runtime.frame.data.columns.ArrayFactory.FrameArrayType;
+import org.apache.sysds.runtime.frame.data.compress.ArrayCompressionStatistics;
 import org.apache.sysds.runtime.matrix.data.Pair;
 
 /**
@@ -97,14 +100,15 @@ public abstract class Array<T> implements Writable {
 
        /**
         * Recreate the recode map from what is already there.
+        * 
         * @return
         */
-       protected Map<T, Long> createRecodeMap(){
+       protected Map<T, Long> createRecodeMap() {
                Map<T, Long> map = new HashMap<>();
                long id = 0;
                for(int i = 0; i < size(); i++) {
                        T val = get(i);
-                       if(val != null){
+                       if(val != null) {
                                Long v = map.putIfAbsent(val, id);
                                if(v == null)
                                        id++;
@@ -113,19 +117,18 @@ public abstract class Array<T> implements Writable {
                return map;
        }
 
-
        /**
         * Get the dictionary of the contained values, including null.
         * 
         * @return a dictionary containing all unique values.
         */
-       protected Map<T, Integer> getDictionary(){
+       protected Map<T, Integer> getDictionary() {
                Map<T, Integer> dict = new HashMap<>();
                int id = 0;
-               for(int i = 0 ; i < size(); i ++){
+               for(int i = 0; i < size(); i++) {
                        T val = get(i);
                        Integer v = dict.putIfAbsent(val, id);
-                       if(v== null)
+                       if(v == null)
                                id++;
                }
 
@@ -371,7 +374,7 @@ public abstract class Array<T> implements Writable {
         * 
         * @return If the array contains null.
         */
-       public boolean containsNull(){
+       public boolean containsNull() {
                return false;
        }
 
@@ -424,7 +427,7 @@ public abstract class Array<T> implements Writable {
                                return changeTypeFloat();
                        case FP64:
                                return changeTypeDouble();
-                               case UINT4:
+                       case UINT4:
                        case UINT8:
                                throw new NotImplementedException();
                        case INT32:
@@ -556,7 +559,7 @@ public abstract class Array<T> implements Writable {
         * 
         * @param select Modify this to true in indexes that are not empty.
         */
-       public final void findEmpty(boolean[] select){
+       public final void findEmpty(boolean[] select) {
                for(int i = 0; i < select.length; i++)
                        if(isNotEmpty(i))
                                select[i] = true;
@@ -592,28 +595,57 @@ public abstract class Array<T> implements Writable {
        }
 
        /**
-        * Hash the given index of the array.
-        * It is allowed to return NaN on null elements.
+        * Hash the given index of the array. It is allowed to return NaN on 
null elements.
         * 
         * @param idx The index to hash
         * @return The hash value of that index.
         */
        public abstract double hashDouble(int idx);
 
-       public ArrayIterator getIterator(){
+       public ArrayIterator getIterator() {
                return new ArrayIterator();
        }
 
+       public ArrayCompressionStatistics statistics(int nSamples) {
+
+               Map<T, Integer> d = new HashMap<>();
+               for(int i = 0; i < nSamples; i++) {
+                       // super inefficient, but startup
+                       T key = get(i);
+                       if(d.containsKey(key))
+                               d.put(key, d.get(key) + 1);
+                       else
+                               d.put(key, 1);
+               }
+
+               final int[] freq = new int[d.size()];
+               int id = 0;
+               for(Entry<T, Integer> e : d.entrySet())
+                       freq[id++] = e.getValue();
+
+               int estDistinct = SampleEstimatorFactory.distinctCount(freq, 
size(), nSamples);
+               long memSize = getInMemorySize(); // uncompressed size
+               int memSizePerElement = (int) ((memSize * 8L) / size());
+
+               long ddcSize = DDCArray.estimateInMemorySize(memSizePerElement, 
estDistinct, size());
+
+               if(ddcSize < memSize)
+                       return new 
ArrayCompressionStatistics(memSizePerElement, //
+                               estDistinct, true, FrameArrayType.DDC, memSize, 
ddcSize);
+
+               return null;
+       }
+
        public class ArrayIterator implements Iterator<T> {
                int index = -1;
 
-               public int getIndex(){
+               public int getIndex() {
                        return index;
                }
 
                @Override
                public boolean hasNext() {
-                       return index < size()-1;
+                       return index < size() - 1;
                }
 
                @Override
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/BitSetArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/BitSetArray.java
index d6c2489ec2..27a58e6e6c 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/columns/BitSetArray.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/BitSetArray.java
@@ -30,6 +30,7 @@ import java.util.BitSet;
 import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.frame.data.columns.ArrayFactory.FrameArrayType;
+import org.apache.sysds.runtime.frame.data.compress.ArrayCompressionStatistics;
 import org.apache.sysds.runtime.matrix.data.Pair;
 import org.apache.sysds.runtime.util.UtilFunctions;
 import org.apache.sysds.utils.MemoryEstimates;
@@ -539,10 +540,16 @@ public class BitSetArray extends ABooleanArray {
        }
 
        @Override
-       public double hashDouble(int idx){
+       public double hashDouble(int idx) {
                return get(idx) ? 1.0 : 0.0;
        }
 
+       @Override
+       public ArrayCompressionStatistics statistics(int nSamples) {
+               // Unlikely to compress so lets just say... no
+               return null;
+       }
+
        @Override
        public String toString() {
                StringBuilder sb = new StringBuilder(_size + 10);
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/BooleanArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/BooleanArray.java
index e74f8bcd65..0d40ebe938 100644
--- 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/BooleanArray.java
+++ 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/BooleanArray.java
@@ -28,6 +28,7 @@ import java.util.Arrays;
 
 import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.frame.data.columns.ArrayFactory.FrameArrayType;
+import org.apache.sysds.runtime.frame.data.compress.ArrayCompressionStatistics;
 import org.apache.sysds.runtime.matrix.data.Pair;
 import org.apache.sysds.runtime.util.UtilFunctions;
 import org.apache.sysds.utils.MemoryEstimates;
@@ -339,10 +340,16 @@ public class BooleanArray extends ABooleanArray {
        }
 
        @Override
-       public double hashDouble(int idx){
+       public double hashDouble(int idx) {
                return get(idx) ? 1.0 : 0.0;
        }
 
+       @Override
+       public ArrayCompressionStatistics statistics(int nSamples) {
+               // Unlikely to compress so lets just say... no
+               return null;
+       }
+
        @Override
        public String toString() {
                StringBuilder sb = new StringBuilder(_data.length * 2 + 10);
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/columns/DDCArray.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/columns/DDCArray.java
index f7a810d0fd..7c995769f1 100644
--- a/src/main/java/org/apache/sysds/runtime/frame/data/columns/DDCArray.java
+++ b/src/main/java/org/apache/sysds/runtime/frame/data/columns/DDCArray.java
@@ -22,7 +22,6 @@ package org.apache.sysds.runtime.frame.data.columns;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -56,13 +55,6 @@ public class DDCArray<T> extends ACompressedArray<T> {
                }
        }
 
-       private static <V, K> Map<V, K> invert(Map<K, V> map) {
-               Map<V, K> invMap = new HashMap<V, K>();
-               for(Entry<K, V> e : map.entrySet())
-                       invMap.put(e.getValue(), e.getKey());
-               return invMap;
-       }
-
        /**
         * Try to compress array into DDC format.
         * 
@@ -72,30 +64,33 @@ public class DDCArray<T> extends ACompressedArray<T> {
         */
        @SuppressWarnings("unchecked")
        public static <T> Array<T> compressToDDC(Array<T> arr) {
-               // two pass algorithm
-               if(arr.size() <= 10)
+               // Early aborts
+               // if the size is small do not consider
+               // or if the instance if RaggedArray where all values typically 
are unique.
+               if(arr.size() <= 10 || arr instanceof RaggedArray)
                        return arr;
 
-               // 1. Get unique
+               // Two pass algorithm
+               // 1.full iteration: Get unique
                Map<T, Integer> rcd = arr.getDictionary();
 
+               // Abort if there are to many unique values.
                if(rcd.size() > arr.size() / 2)
                        return arr;
 
+               // Allocate the correct dictionary output
                Array<T> ar;
-
                if(rcd.keySet().contains(null))
                        ar = (Array<T>) 
ArrayFactory.allocateOptional(arr.getValueType(), rcd.size());
                else
                        ar = (Array<T>) 
ArrayFactory.allocate(arr.getValueType(), rcd.size());
 
-               Map<Integer, T> rcdInv = invert(rcd);
-               for(int i = 0; i < rcd.size(); i++)
-                       ar.set(i, rcdInv.get(Integer.valueOf(i)));
+               // Set elements in the Dictionary array --- much smaller.
+               for(Entry<T, Integer> e : rcd.entrySet())
+                       ar.set(e.getValue(), e.getKey());
 
-               // 2. Make map
+               // 2. full iteration: Make map
                AMapToData m = MapToFactory.create(arr.size(), rcd.size());
-
                for(int i = 0; i < arr.size(); i++)
                        m.set(i, rcd.get(arr.get(i)));
 
@@ -285,6 +280,10 @@ public class DDCArray<T> extends ACompressedArray<T> {
                return dict.getDictionary();
        }
 
+       public static long estimateInMemorySize(int memSizeBitPerElement, int 
estDistinct, int nRow) {
+               return (estDistinct * memSizeBitPerElement) / 8 + 
MapToFactory.estimateInMemorySize(nRow, estDistinct);
+       }
+
        @Override
        public String toString() {
                StringBuilder sb = new StringBuilder();
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/compress/ArrayCompressionStatistics.java
 
b/src/main/java/org/apache/sysds/runtime/frame/data/compress/ArrayCompressionStatistics.java
new file mode 100644
index 0000000000..ae41655482
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/frame/data/compress/ArrayCompressionStatistics.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.frame.data.compress;
+
+import org.apache.sysds.runtime.frame.data.columns.ArrayFactory.FrameArrayType;
+
+public class ArrayCompressionStatistics {
+
+       public final long originalSize;
+       public final long compressedSizeEstimate;
+       public final boolean shouldCompress;
+       public final FrameArrayType bestType;
+       public final int bitPerValue;
+       public final int nUnique;
+
+       public ArrayCompressionStatistics(int bitPerValue, int nUnique, boolean 
shouldCompress, FrameArrayType bestType,
+               long originalSize, long compressedSizeEstimate) {
+               this.bitPerValue = bitPerValue;
+               this.nUnique = nUnique;
+               this.shouldCompress = shouldCompress;
+               this.bestType = bestType;
+               this.originalSize = originalSize;
+               this.compressedSizeEstimate = compressedSizeEstimate;
+       }
+
+       @Override
+       public String toString() {
+               StringBuilder sb = new StringBuilder();
+               sb.append(String.format("Compressed Stats: size:%6d->%6d, 
Use:%10s, Unique:%5d", originalSize,
+                       compressedSizeEstimate, bestType.toString(), nUnique));
+               return sb.toString();
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/compress/CompressedFrameBlockFactory.java
 
b/src/main/java/org/apache/sysds/runtime/frame/data/compress/CompressedFrameBlockFactory.java
new file mode 100644
index 0000000000..b3246863c9
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/frame/data/compress/CompressedFrameBlockFactory.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.frame.data.compress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.compress.workload.WTreeRoot;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.frame.data.columns.Array;
+import org.apache.sysds.runtime.frame.data.columns.DDCArray;
+
+public class CompressedFrameBlockFactory {
+
+       private static final Log LOG = 
LogFactory.getLog(CompressedFrameBlockFactory.class.getName());
+
+       private final FrameBlock in;
+       private final FrameCompressionSettings cs;
+       private final ArrayCompressionStatistics[] stats;
+       private final Array<?>[] compressedColumns;
+
+       private CompressedFrameBlockFactory(FrameBlock fb, 
FrameCompressionSettings cs) {
+               this.in = fb;
+               this.cs = cs;
+               this.stats = new ArrayCompressionStatistics[in.getNumColumns()];
+               this.compressedColumns = new Array<?>[in.getNumColumns()];
+       }
+
+       public static FrameBlock compress(FrameBlock fb) {
+               FrameCompressionSettings cs = new 
FrameCompressionSettingsBuilder().create();
+               return compress(fb, cs);
+       }
+
+       public static FrameBlock compress(FrameBlock fb, int k, WTreeRoot root) 
{
+               FrameCompressionSettings cs = new 
FrameCompressionSettingsBuilder()//
+                       .threads(k).wTreeRoot(root).create();
+               return compress(fb, cs);
+       }
+
+       public static FrameBlock compress(FrameBlock fb, 
FrameCompressionSettingsBuilder csb) {
+               return compress(fb, csb.create());
+       }
+
+       public static FrameBlock compress(FrameBlock fb, 
FrameCompressionSettings cs) {
+               return new CompressedFrameBlockFactory(fb, cs).compressFrame();
+       }
+
+       private FrameBlock compressFrame() {
+               extractStatistics();
+               logStatistics();
+               encodeColumns();
+               final FrameBlock ret = new FrameBlock(compressedColumns, 
in.getColumnNames(false));
+               logRet(ret);
+               return ret;
+       }
+
+       private void extractStatistics() {
+               final int nSamples = Math.min(in.getNumRows(), (int) 
Math.ceil(in.getNumRows() * cs.sampleRatio));
+               for(int i = 0; i < stats.length; i++) {
+                       stats[i] = in.getColumn(i).statistics(nSamples);
+               }
+       }
+
+       private void encodeColumns() {
+               for(int i = 0; i < compressedColumns.length; i++) {
+                       if(stats[i] != null) {
+                               // commented out because no other encodings are 
supported yet
+                               // switch(stats[i].bestType) {
+                               // case DDC:
+                               compressedColumns[i] = 
DDCArray.compressToDDC(in.getColumn(i));
+                               // break;
+                               // default:
+                               // compressedColumns[i] = in.getColumn(i);
+                               // break;
+                               // }
+                       }
+                       else
+                               compressedColumns[i] = in.getColumn(i);
+               }
+       }
+
+       private void logStatistics() {
+               if(LOG.isDebugEnabled()) {
+                       for(int i = 0; i < compressedColumns.length; i++) {
+                               if(stats[i] != null)
+                                       LOG.debug(stats[i]);
+                               else
+                                       LOG.debug("no Comp col: " + i);
+                       }
+               }
+       }
+
+       private void logRet(FrameBlock ret) {
+               if(LOG.isDebugEnabled()) {
+                       final long before = in.getInMemorySize();
+                       final long after = ret.getInMemorySize();
+                       LOG.debug(String.format("Uncompressed Size: %15d", 
before));
+                       LOG.debug(String.format("compressed Size:   %15d", 
after));
+                       LOG.debug(String.format("ratio:             %15.3f", 
(double) before / (double) after));
+               }
+       }
+
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/compress/FrameCompressionStatistics.java
 
b/src/main/java/org/apache/sysds/runtime/frame/data/compress/FrameCompressionSettings.java
similarity index 72%
rename from 
src/main/java/org/apache/sysds/runtime/frame/data/compress/FrameCompressionStatistics.java
rename to 
src/main/java/org/apache/sysds/runtime/frame/data/compress/FrameCompressionSettings.java
index c235995e1e..84a23bf648 100644
--- 
a/src/main/java/org/apache/sysds/runtime/frame/data/compress/FrameCompressionStatistics.java
+++ 
b/src/main/java/org/apache/sysds/runtime/frame/data/compress/FrameCompressionSettings.java
@@ -16,8 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.sysds.runtime.frame.data.compress;
 
-public class FrameCompressionStatistics {
+import org.apache.sysds.runtime.compress.workload.WTreeRoot;
+
+public class FrameCompressionSettings {
+
+       public final float sampleRatio;
+       public final int k;
+       public final WTreeRoot wt;
+
+       protected FrameCompressionSettings(float sampleRatio, int k, WTreeRoot 
wt) {
+               this.sampleRatio = sampleRatio;
+               this.k = k;
+               this.wt = wt;
+       }
 
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/lib/FrameLibCompress.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/compress/FrameCompressionSettingsBuilder.java
similarity index 55%
copy from 
src/main/java/org/apache/sysds/runtime/frame/data/lib/FrameLibCompress.java
copy to 
src/main/java/org/apache/sysds/runtime/frame/data/compress/FrameCompressionSettingsBuilder.java
index 584462200d..936cd42898 100644
--- 
a/src/main/java/org/apache/sysds/runtime/frame/data/lib/FrameLibCompress.java
+++ 
b/src/main/java/org/apache/sysds/runtime/frame/data/compress/FrameCompressionSettingsBuilder.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,21 +16,39 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sysds.runtime.frame.data.lib;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
+package org.apache.sysds.runtime.frame.data.compress;
+
 import org.apache.sysds.runtime.compress.workload.WTreeRoot;
-import org.apache.sysds.runtime.frame.data.FrameBlock;
-import org.apache.sysds.runtime.frame.data.compress.FrameCompressionStatistics;
 
-public class FrameLibCompress {
+public class FrameCompressionSettingsBuilder {
+
+       private float sampleRatio;
+       private int k;
+       private WTreeRoot wt;
+
+       public FrameCompressionSettingsBuilder() {
+               this.sampleRatio = 0.1f;
+               this.k = 1;
+               this.wt = null;
+       }
+
+       public FrameCompressionSettingsBuilder wTreeRoot(WTreeRoot wt) {
+               this.wt = wt;
+               return this;
+       }
+
+       public FrameCompressionSettingsBuilder threads(int k) {
+               this.k = k;
+               return this;
+       }
 
-       public static Pair<FrameBlock, FrameCompressionStatistics> 
compress(FrameBlock in, int k) {
-               return compress(in, k, null);
+       public FrameCompressionSettingsBuilder sampleRatio(float sampleRatio) {
+               this.sampleRatio = sampleRatio;
+               return this;
        }
 
-       public static Pair<FrameBlock, FrameCompressionStatistics> 
compress(FrameBlock in, int k, WTreeRoot root) {
-               return new ImmutablePair<>(in, new 
FrameCompressionStatistics());
+       public FrameCompressionSettings create() {
+               return new FrameCompressionSettings(sampleRatio, k, wt);
        }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/frame/data/lib/FrameLibCompress.java 
b/src/main/java/org/apache/sysds/runtime/frame/data/lib/FrameLibCompress.java
index 584462200d..207ece8d26 100644
--- 
a/src/main/java/org/apache/sysds/runtime/frame/data/lib/FrameLibCompress.java
+++ 
b/src/main/java/org/apache/sysds/runtime/frame/data/lib/FrameLibCompress.java
@@ -18,19 +18,17 @@
  */
 package org.apache.sysds.runtime.frame.data.lib;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.sysds.runtime.compress.workload.WTreeRoot;
 import org.apache.sysds.runtime.frame.data.FrameBlock;
-import org.apache.sysds.runtime.frame.data.compress.FrameCompressionStatistics;
+import 
org.apache.sysds.runtime.frame.data.compress.CompressedFrameBlockFactory;
 
 public class FrameLibCompress {
 
-       public static Pair<FrameBlock, FrameCompressionStatistics> 
compress(FrameBlock in, int k) {
+       public static FrameBlock compress(FrameBlock in, int k) {
                return compress(in, k, null);
        }
 
-       public static Pair<FrameBlock, FrameCompressionStatistics> 
compress(FrameBlock in, int k, WTreeRoot root) {
-               return new ImmutablePair<>(in, new 
FrameCompressionStatistics());
+       public static FrameBlock compress(FrameBlock in, int k, WTreeRoot root) 
{
+               return CompressedFrameBlockFactory.compress(in, k, root);
        }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/CompressionCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/CompressionCPInstruction.java
index 38b53e090c..22766079e6 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/CompressionCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/CompressionCPInstruction.java
@@ -29,7 +29,6 @@ import 
org.apache.sysds.runtime.compress.SingletonLookupHashMap;
 import org.apache.sysds.runtime.compress.workload.WTreeRoot;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.frame.data.FrameBlock;
-import org.apache.sysds.runtime.frame.data.compress.FrameCompressionStatistics;
 import org.apache.sysds.runtime.frame.data.lib.FrameLibCompress;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -89,12 +88,9 @@ public class CompressionCPInstruction extends 
ComputationCPInstruction {
        }
 
        private void processFrameBlockCompression(ExecutionContext ec, 
FrameBlock in, int k, WTreeRoot root) {
-               Pair<FrameBlock, FrameCompressionStatistics> compResult = 
FrameLibCompress.compress(in, k, root);
-               if(LOG.isTraceEnabled())
-                       LOG.trace(compResult.getRight());
-               FrameBlock out = compResult.getLeft();
+               FrameBlock compResult = FrameLibCompress.compress(in, k, root);
                // Set output and release input
                ec.releaseFrameInput(input1.getName());
-               ec.setFrameOutput(output.getName(), out);
+               ec.setFrameOutput(output.getName(), compResult);
        }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/io/FrameWriterCompressed.java 
b/src/main/java/org/apache/sysds/runtime/io/FrameWriterCompressed.java
index 70b6e89a9a..82c5a08e2c 100644
--- a/src/main/java/org/apache/sysds/runtime/io/FrameWriterCompressed.java
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameWriterCompressed.java
@@ -40,7 +40,7 @@ public class FrameWriterCompressed extends 
FrameWriterBinaryBlockParallel {
        protected void writeBinaryBlockFrameToHDFS(Path path, JobConf job, 
FrameBlock src, long rlen, long clen)
                throws IOException, DMLRuntimeException {
                int k = parallel ? 
OptimizerUtils.getParallelBinaryWriteParallelism() : 1;
-               FrameBlock compressed = FrameLibCompress.compress(src, 
k).getLeft();
+               FrameBlock compressed = FrameLibCompress.compress(src, k);
                super.writeBinaryBlockFrameToHDFS(path, job, compressed, rlen, 
clen);
        }
 
diff --git 
a/src/test/java/org/apache/sysds/test/component/frame/array/FrameArrayTests.java
 
b/src/test/java/org/apache/sysds/test/component/frame/array/FrameArrayTests.java
index 81908f8d39..a5acb13381 100644
--- 
a/src/test/java/org/apache/sysds/test/component/frame/array/FrameArrayTests.java
+++ 
b/src/test/java/org/apache/sysds/test/component/frame/array/FrameArrayTests.java
@@ -1995,6 +1995,7 @@ public class FrameArrayTests {
        }
 
        public static String[] generateRandomStringNUniqueLengthOpt(int size, 
int seed, int nUnique, int stringLength) {
+               nUnique = Math.max(1, nUnique);
                String[] rands = generateRandomStringLength(nUnique, seed, 
stringLength);
                rands[rands.length - 1] = null;
                Random r = new Random(seed + 1);
diff --git 
a/src/test/java/org/apache/sysds/test/component/frame/compress/FrameCompressTest.java
 
b/src/test/java/org/apache/sysds/test/component/frame/compress/FrameCompressTest.java
index bdf6038550..fc4e69d752 100644
--- 
a/src/test/java/org/apache/sysds/test/component/frame/compress/FrameCompressTest.java
+++ 
b/src/test/java/org/apache/sysds/test/component/frame/compress/FrameCompressTest.java
@@ -19,12 +19,63 @@
 
 package org.apache.sysds.test.component.frame.compress;
 
-import org.apache.sysds.runtime.frame.data.compress.FrameCompressionStatistics;
+import static org.junit.Assert.fail;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.frame.data.columns.Array;
+import org.apache.sysds.runtime.frame.data.columns.ArrayFactory;
+import 
org.apache.sysds.runtime.frame.data.compress.CompressedFrameBlockFactory;
+import org.apache.sysds.runtime.frame.data.compress.FrameCompressionSettings;
+import org.apache.sysds.runtime.frame.data.lib.FrameLibCompress;
+import org.apache.sysds.test.TestUtils;
+import org.apache.sysds.test.component.frame.array.FrameArrayTests;
 import org.junit.Test;
 
 public class FrameCompressTest {
+       protected static final Log LOG = 
LogFactory.getLog(FrameCompressTest.class.getName());
+
+       @Test
+       public void testSingleThread() {
+               FrameBlock a = generateCompressableBlock(200, 5, 1232);
+               runTest(a, 1);
+       }
+
        @Test
-       public void testCompressionStatisticsConstruction() {
-               new FrameCompressionStatistics();
+       public void testParallel() {
+               FrameBlock a = generateCompressableBlock(200, 5, 1232);
+               runTest(a, 4);
+       }
+
+       public void runTest(FrameBlock a, int k) {
+               try {
+                       FrameBlock b = FrameLibCompress.compress(a, k);
+                       TestUtils.compareFrames(a, b, true);
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       public void runTestConfig(FrameBlock a, FrameCompressionSettings cs) {
+               try {
+                       FrameBlock b = CompressedFrameBlockFactory.compress(a, 
cs);
+                       TestUtils.compareFrames(a, b, true);
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       private FrameBlock generateCompressableBlock(int rows, int cols, int 
seed) {
+               Array<?>[] data = new Array<?>[cols];
+               for(int i = 0; i < cols; i++) {
+                       data[i] = ArrayFactory.create(//
+                               
FrameArrayTests.generateRandomStringNUniqueLengthOpt(rows, seed + i, i + 1, 55 
+ i));
+               }
+               return new FrameBlock(data);
        }
 }
diff --git 
a/src/test/java/org/apache/sysds/test/component/frame/compress/FrameCompressTestLogging.java
 
b/src/test/java/org/apache/sysds/test/component/frame/compress/FrameCompressTestLogging.java
new file mode 100644
index 0000000000..45de293d24
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/component/frame/compress/FrameCompressTestLogging.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.frame.compress;
+
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.frame.data.columns.Array;
+import org.apache.sysds.runtime.frame.data.columns.ArrayFactory;
+import 
org.apache.sysds.runtime.frame.data.compress.CompressedFrameBlockFactory;
+import org.apache.sysds.runtime.frame.data.lib.FrameLibCompress;
+import org.apache.sysds.test.LoggingUtils;
+import org.apache.sysds.test.LoggingUtils.TestAppender;
+import org.apache.sysds.test.TestUtils;
+import org.apache.sysds.test.component.frame.array.FrameArrayTests;
+import org.junit.Test;
+
+public class FrameCompressTestLogging {
+       protected static final Log LOG = 
LogFactory.getLog(FrameCompressTestLogging.class.getName());
+
+       @Test
+       public void testCompressable() {
+               testLogging(generateCompressableBlock(200, 3, 3214));
+       }
+
+               @Test
+       public void testUnCompressable() {
+               testLogging(generateIncompressableBlock(200, 3, 2321));
+       }
+
+       public void testLogging(FrameBlock a) {
+               final TestAppender appender = LoggingUtils.overwrite();
+               try {
+                       
Logger.getLogger(CompressedFrameBlockFactory.class).setLevel(Level.TRACE);
+
+                       FrameBlock b = FrameLibCompress.compress(a, 1);
+
+                       TestUtils.compareFrames(a, b, true);
+
+                       final List<LoggingEvent> log = 
LoggingUtils.reinsert(appender);
+                       for(LoggingEvent l : log) {
+                               if(l.getMessage().toString().contains("ratio:   
              "))
+                                       return;
+                       }
+                       fail("Log did not contain Dictionary sizes");
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       
Logger.getLogger(CompressedFrameBlockFactory.class).setLevel(Level.WARN);
+                       LoggingUtils.reinsert(appender);
+               }
+
+       }
+
+       private FrameBlock generateCompressableBlock(int rows, int cols, int 
seed) {
+               Array<?>[] data = new Array<?>[cols];
+               for(int i = 0; i < cols; i++) {
+                       data[i] = ArrayFactory.create(//
+                               
FrameArrayTests.generateRandomStringNUniqueLengthOpt(rows, seed + i, i + 1, 55 
+ i));
+               }
+               return new FrameBlock(data);
+       }
+
+               private FrameBlock generateIncompressableBlock(int rows, int 
cols, int seed) {
+               Array<?>[] data = new Array<?>[cols];
+               for(int i = 0; i < cols; i++) {
+                       data[i] = ArrayFactory.create(//
+                               
FrameArrayTests.generateRandomStringNUniqueLengthOpt(rows, seed + i, rows, 55 + 
i));
+               }
+               return new FrameBlock(data);
+       }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/functions/codegen/APICodegenTest.java 
b/src/test/java/org/apache/sysds/test/functions/codegen/APICodegenTest.java
index f20ba75c26..b316e2c587 100644
--- a/src/test/java/org/apache/sysds/test/functions/codegen/APICodegenTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/codegen/APICodegenTest.java
@@ -23,21 +23,20 @@ import static 
org.apache.sysds.api.mlcontext.ScriptFactory.dml;
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.api.jmlc.Connection;
 import org.apache.sysds.api.jmlc.PreparedScript;
 import org.apache.sysds.api.mlcontext.MLContext;
 import org.apache.sysds.api.mlcontext.Script;
-import org.apache.sysds.conf.DMLConfig;
 import org.apache.sysds.conf.CompilerConfig.ConfigType;
+import org.apache.sysds.conf.DMLConfig;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.DataConverter;
 import org.apache.sysds.test.AutomatedTestBase;
-import org.apache.sysds.utils.Statistics;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
 
 
 public class APICodegenTest extends AutomatedTestBase
@@ -85,12 +84,13 @@ public class APICodegenTest extends AutomatedTestBase
                                pscript.setMatrix("X", mX, false);
                                pscript.executeScript();
                                conn.close();
-                               System.out.println(Statistics.display());
+                               // System.out.println(Statistics.display());
                        }
                        else {
                                SparkConf conf = 
SparkExecutionContext.createSystemDSSparkConf()
                                        
.setAppName("MLContextTest").setMaster("local");
                                JavaSparkContext sc = new 
JavaSparkContext(conf);
+                               MLContext.welcomePrint = true;
                                MLContext ml = new MLContext(sc);
                                ml.setConfigProperty(DMLConfig.CODEGEN, "true");
                                ml.setStatistics(true);


Reply via email to