This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 31f2653b785c4e00c987347264f5f96e8315dd4c
Author: Muhammad Osama <[email protected]>
AuthorDate: Thu Jan 20 22:09:28 2022 +0100

    [SYSTEMDS-2831] CLA DeltaDDC Column Group
    
    This commit adds a new column group type named DeltaDDC, with support for
    a few operations.
    
    While merging I added a interface for future readers for delta encoding,
    a future tasks is to enable analysis of delta encoding.
    
    DIA project WS2021/22.
    
    Closes #1518
---
 .../sysds/runtime/compress/colgroup/AColGroup.java |   4 +-
 .../compress/colgroup/ColGroupDeltaDDC.java        |  81 ++++++++++++++++
 .../runtime/compress/colgroup/ColGroupFactory.java |  74 +++++++++++++--
 .../runtime/compress/colgroup/ColGroupIO.java      |   2 +
 .../colgroup/dictionary/DeltaDictionary.java       |  43 +++++++++
 .../compress/colgroup/dictionary/Dictionary.java   |   2 +-
 .../colgroup/dictionary/DictionaryFactory.java     |   4 +-
 .../compress/estim/CompressedSizeEstimator.java    |  32 ++++++-
 .../estim/CompressedSizeEstimatorExact.java        |  11 +++
 .../estim/CompressedSizeEstimatorSample.java       |  30 ++++--
 .../estim/CompressedSizeEstimatorUltraSparse.java  |   6 ++
 .../compress/estim/CompressedSizeInfoColGroup.java |   1 +
 .../runtime/compress/estim/encoding/IEncode.java   |   8 ++
 .../compress/colgroup/ColGroupDeltaDDCTest.java    | 100 ++++++++++++++++++++
 .../compress/colgroup/JolEstimateDeltaDDCTest.java |  72 ++++++++++++++
 .../compress/colgroup/JolEstimateTest.java         |   1 +
 .../compress/dictionary/DeltaDictionaryTest.java   | 105 +++++++++++++++++++++
 17 files changed, 557 insertions(+), 19 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java
index af4d757..23674b1 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java
@@ -48,7 +48,7 @@ public abstract class AColGroup implements Serializable {
 
        /** Public super types of compression ColGroups supported */
        public enum CompressionType {
-               UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, PFOR,
+               UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, PFOR, DeltaDDC
        }
 
        /**
@@ -57,7 +57,7 @@ public abstract class AColGroup implements Serializable {
         * Protected such that outside the ColGroup package it should be 
unknown which specific subtype is used.
         */
        protected enum ColGroupType {
-               UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCSingle, 
SDCSingleZeros, SDCZeros, PFOR;
+               UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCSingle, 
SDCSingleZeros, SDCZeros, PFOR, DeltaDDC;
        }
 
        /** The ColGroup Indexes contained in the ColGroup */
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDeltaDDC.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDeltaDDC.java
new file mode 100644
index 0000000..0949dae
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDeltaDDC.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress.colgroup;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
+import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
+import org.apache.sysds.runtime.data.DenseBlock;
+import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
+
+/**
+ * Class to encapsulate information about a column group that is first delta 
encoded then encoded with dense dictionary
+ * encoding (DeltaDDC).
+ */
+public class ColGroupDeltaDDC extends ColGroupDDC {
+
+       /**
+        * Constructor for serialization
+        *
+        * @param numRows number of rows
+        */
+       protected ColGroupDeltaDDC(int numRows) {
+               super(numRows);
+       }
+
+       protected ColGroupDeltaDDC(int[] colIndices, int numRows, ADictionary 
dict, AMapToData data, int[] cachedCounts) {
+               super(colIndices, numRows, dict, data, cachedCounts);
+               _zeros = false;
+               _data = data;
+       }
+
+       public CompressionType getCompType() {
+               return CompressionType.DeltaDDC;
+       }
+
+       @Override
+       protected void decompressToDenseBlockDenseDictionary(DenseBlock db, int 
rl, int ru, int offR, int offC,
+               double[] values) {
+               final int nCol = _colIndexes.length;
+               for(int i = rl, offT = rl + offR; i < ru; i++, offT++) {
+                       final double[] c = db.values(offT);
+                       final int off = db.pos(offT) + offC;
+                       final int rowIndex = _data.getIndex(i) * nCol;
+                       final int prevOff = (off == 0) ? off : off - nCol;
+                       for(int j = 0; j < nCol; j++) {
+                               // Here we use the values in the previous row 
to compute current values along with the delta
+                               double newValue = c[prevOff + j] + 
values[rowIndex + j];
+                               c[off + _colIndexes[j]] += newValue;
+                       }
+               }
+       }
+
+       @Override
+       protected void decompressToSparseBlockDenseDictionary(SparseBlock ret, 
int rl, int ru, int offR, int offC,
+               double[] values) {
+               throw new NotImplementedException();
+       }
+
+       @Override
+       public AColGroup scalarOperation(ScalarOperator op) {
+               return new ColGroupDeltaDDC(_colIndexes, _numRows, 
_dict.applyScalarOp(op), _data, getCachedCounts());
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
index 29a4a21..dbc08a8 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
@@ -58,8 +58,10 @@ import org.apache.sysds.runtime.compress.utils.IntArrayList;
 import org.apache.sysds.runtime.compress.utils.Util;
 import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing;
 import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.matrix.data.LibMatrixReorg;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.CommonThreadPool;
+import org.apache.sysds.runtime.util.DataConverter;
 
 /**
  * Factory class for constructing ColGroups.
@@ -335,6 +337,12 @@ public class ColGroupFactory {
                                tmp.getDblCountMap(nrUniqueEstimate), cs);
                else if(colIndexes.length > 1 && estimatedBestCompressionType 
== CompressionType.DDC)
                        return directCompressDDC(colIndexes, in, cs, cg, k);
+               else if(estimatedBestCompressionType == 
CompressionType.DeltaDDC) {
+                       if(colIndexes.length > 1)
+                               return directCompressDeltaDDC(colIndexes, in, 
cs, cg, k);
+                       else
+                               return compressDeltaDDC(colIndexes, in, cs, cg);
+               }
                else {
                        final int numRows = cs.transposed ? in.getNumColumns() 
: in.getNumRows();
                        final ABitmap ubm = 
BitmapEncoder.extractBitmap(colIndexes, in, cs.transposed, nrUniqueEstimate,
@@ -376,11 +384,26 @@ public class ColGroupFactory {
                final int rlen = cs.transposed ? raw.getNumColumns() : 
raw.getNumRows();
                // use a Map that is at least char size.
                final int nVal = cg.getNumVals() < 16 ? 16 : 
Math.max(cg.getNumVals(), 257);
-               return directCompressDDC(colIndexes, raw, cs, cg, 
MapToFactory.create(rlen, nVal), rlen, k);
+               return directCompressDDCColGroup(colIndexes, raw, cs, cg, 
MapToFactory.create(rlen, nVal), rlen, k, false);
        }
 
-       private static AColGroup directCompressDDC(int[] colIndexes, 
MatrixBlock raw, CompressionSettings cs,
-               CompressedSizeInfoColGroup cg, AMapToData data, int rlen, int 
k) {
+       private static AColGroup directCompressDeltaDDC(int[] colIndexes, 
MatrixBlock raw, CompressionSettings cs,
+               CompressedSizeInfoColGroup cg, int k) {
+               final int rlen = cs.transposed ? raw.getNumColumns() : 
raw.getNumRows();
+               // use a Map that is at least char size.
+               final int nVal = cg.getNumVals() < 16 ? 16 : 
Math.max(cg.getNumVals(), 257);
+               if(cs.transposed) {
+                       LOG.warn("In-effecient transpose back of the input 
matrix to do delta encoding");
+                       raw = LibMatrixReorg.transposeInPlace(raw, k);
+                       cs.transposed = false;
+               }
+               // Delta encode the raw data
+               raw = deltaEncodeMatrixBlock(raw);
+               return directCompressDDCColGroup(colIndexes, raw, cs, cg, 
MapToFactory.create(rlen, nVal), rlen, k, true);
+       }
+
+       private static AColGroup directCompressDDCColGroup(int[] colIndexes, 
MatrixBlock raw, CompressionSettings cs,
+               CompressedSizeInfoColGroup cg, AMapToData data, int rlen, int 
k, boolean deltaEncoded) {
                final int fill = data.getUpperBoundValue();
                data.fill(fill);
 
@@ -396,7 +419,7 @@ public class ColGroupFactory {
                        // This is highly unlikely but could happen if forced 
compression of
                        // not transposed column and the estimator says use DDC.
                        return new ColGroupEmpty(colIndexes);
-               ADictionary dict = DictionaryFactory.create(map, 
colIndexes.length, extra);
+               ADictionary dict = DictionaryFactory.create(map, 
colIndexes.length, extra, deltaEncoded);
                if(extra) {
                        data.replace(fill, map.size());
                        data.setUnique(map.size() + 1);
@@ -405,8 +428,10 @@ public class ColGroupFactory {
                        data.setUnique(map.size());
 
                AMapToData resData = MapToFactory.resize(data, map.size() + 
(extra ? 1 : 0));
-               ColGroupDDC res = new ColGroupDDC(colIndexes, rlen, dict, 
resData, null);
-               return res;
+               if(deltaEncoded)
+                       return new ColGroupDeltaDDC(colIndexes, rlen, dict, 
resData, null);
+               else
+                       return new ColGroupDDC(colIndexes, rlen, dict, resData, 
null);
        }
 
        private static boolean readToMapDDC(final int[] colIndexes, final 
MatrixBlock raw, final DblArrayCountHashMap map,
@@ -460,6 +485,22 @@ public class ColGroupFactory {
                }
        }
 
+       private static MatrixBlock deltaEncodeMatrixBlock(MatrixBlock mb) {
+               LOG.warn("Delta encoding entire matrix input!!");
+               int rows = mb.getNumRows();
+               int cols = mb.getNumColumns();
+               double[][] ret = new double[rows][cols];
+               double[] a = mb.getDenseBlockValues();
+               for(int i = 0, ix = 0; i < rows; i++) {
+                       int prevRowOff = i > 0 ? ix - cols : 0;
+                       for(int j = 0; j < cols; j++, ix++) {
+                               double currentValue = a[ix];
+                               ret[i][j] = i > 0 ? currentValue - a[prevRowOff 
+ j] : currentValue;
+                       }
+               }
+               return DataConverter.convertToMatrixBlock(ret);
+       }
+
        static class readToMapDDCTask implements Callable<Boolean> {
                private final int[] _colIndexes;
                private final MatrixBlock _raw;
@@ -590,6 +631,27 @@ public class ColGroupFactory {
                return new ColGroupDDC(colIndexes, rlen, dict, data, null);
        }
 
+       private static AColGroup compressDeltaDDC(int[] colIndexes, MatrixBlock 
in, CompressionSettings cs,
+               CompressedSizeInfoColGroup cg) {
+
+               LOG.warn("Multi column Delta encoding only supported if delta 
encoding is only compression");
+               if(cs.transposed) {
+                       LibMatrixReorg.transposeInPlace(in, 1);
+                       cs.transposed = false;
+               }
+               // Delta encode the raw data
+               in = deltaEncodeMatrixBlock(in);
+
+               final int rlen = in.getNumRows();
+               // TODO Add extractBitMap that is delta to not require delta 
encoding entire input matrix.
+               final ABitmap ubm = BitmapEncoder.extractBitmap(colIndexes, in, 
cs.transposed, cg.getNumVals(),
+                       cs.sortTuplesByFrequency);
+               boolean zeros = ubm.getNumOffsets() < rlen;
+               ADictionary dict = DictionaryFactory.create(ubm, 
cg.getTupleSparsity(), zeros);
+               AMapToData data = MapToFactory.create(rlen, zeros, 
ubm.getOffsetList());
+               return new ColGroupDeltaDDC(colIndexes, rlen, dict, data, null);
+       }
+
        private static AColGroup compressOLE(int[] colIndexes, int rlen, 
ABitmap ubm, CompressionSettings cs,
                double tupleSparsity) {
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupIO.java 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupIO.java
index 184ca1a..bcb6a02 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupIO.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupIO.java
@@ -106,6 +106,8 @@ public class ColGroupIO {
                                return new ColGroupRLE(nRows);
                        case DDC:
                                return new ColGroupDDC(nRows);
+                       case DeltaDDC:
+                               return new ColGroupDeltaDDC(nRows);
                        case CONST:
                                return new ColGroupConst();
                        case EMPTY:
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DeltaDictionary.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DeltaDictionary.java
new file mode 100644
index 0000000..9942d7e
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DeltaDictionary.java
@@ -0,0 +1,43 @@
+package org.apache.sysds.runtime.compress.colgroup.dictionary;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.sysds.runtime.functionobjects.Divide;
+import org.apache.sysds.runtime.functionobjects.Multiply;
+import org.apache.sysds.runtime.functionobjects.Plus;
+import org.apache.sysds.runtime.functionobjects.Minus;
+import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
+
+/**
+ * This dictionary class is a specialization for the DeltaDDCColgroup. Here 
the adjustments for operations for the delta
+ * encoded values are implemented.
+ */
+public class DeltaDictionary extends Dictionary {
+
+    private final int _numCols;
+
+    public DeltaDictionary(double[] values, int numCols) {
+        super(values);
+        _numCols = numCols;
+    }
+
+    @Override
+    public DeltaDictionary applyScalarOp(ScalarOperator op) {
+        final double[] retV = new double[_values.length];
+        if (op.fn instanceof Multiply || op.fn instanceof Divide) {
+            for(int i = 0; i < _values.length; i++)
+                retV[i] = op.executeScalar(_values[i]);
+        }
+        else if (op.fn instanceof Plus || op.fn instanceof Minus) {
+            // With Plus and Minus only the first row needs to be updated when 
delta encoded
+            for(int i = 0; i < _values.length; i++) {
+                if (i < _numCols)
+                    retV[i] = op.executeScalar(_values[i]);
+                else
+                    retV[i] = _values[i];
+            }
+        } else
+            throw new NotImplementedException();
+
+        return new DeltaDictionary(retV, _numCols);
+    }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/Dictionary.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/Dictionary.java
index 0d4eaec..363e22d 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/Dictionary.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/Dictionary.java
@@ -41,7 +41,7 @@ public class Dictionary extends ADictionary {
 
        private static final long serialVersionUID = -6517136537249507753L;
 
-       private final double[] _values;
+       protected final double[] _values;
 
        public Dictionary(double[] values) {
                if(values == null || values.length == 0)
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java
 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java
index 05784e4..610458e 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java
@@ -66,7 +66,7 @@ public class DictionaryFactory {
                        return Dictionary.getInMemorySize(nrValues * nrColumns);
        }
 
-       public static ADictionary create(DblArrayCountHashMap map, int nCols, 
boolean addZeroTuple) {
+       public static ADictionary create(DblArrayCountHashMap map, int nCols, 
boolean addZeroTuple, boolean deltaEncoded) {
                final ArrayList<DArrCounts> vals = map.extractValues();
                final int nVals = vals.size();
                final double[] resValues = new double[(nVals + (addZeroTuple ? 
1 : 0)) * nCols];
@@ -74,7 +74,7 @@ public class DictionaryFactory {
                        final DArrCounts dac = vals.get(i);
                        System.arraycopy(dac.key.getData(), 0, resValues, 
dac.id * nCols, nCols);
                }
-               return new Dictionary(resValues);
+               return deltaEncoded ? new DeltaDictionary(resValues, nCols) : 
new Dictionary(resValues);
        }
 
        public static ADictionary create(ABitmap ubm) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java
 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java
index fd69fb6..109d8b3 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java
@@ -158,6 +158,17 @@ public abstract class CompressedSizeEstimator {
        }
 
        /**
+        * Method for extracting Compressed Size Info of specified columns as 
delta encodings (delta from previous rows
+        * values), together in a single ColGroup
+        * 
+        * @param colIndexes The columns to group together inside a ColGroup
+        * @return The CompressedSizeInformation associated with the selected 
ColGroups as delta encoding.
+        */
+       public CompressedSizeInfoColGroup 
estimateCompressedColGroupSizeDeltaEncoded(int[] colIndexes) {
+               return estimateCompressedColGroupSize(colIndexes, 8, 
worstCaseUpperBound(colIndexes));
+       }
+
+       /**
         * A method to extract the Compressed Size Info for a given list of 
columns, This method further limits the estimated
         * number of unique values, since in some cases the estimated number of 
uniques is estimated higher than the number
         * estimated in sub groups of the given colIndexes.
@@ -169,12 +180,31 @@ public abstract class CompressedSizeEstimator {
         *                           in the sense that if the sample is small 
then this unique can be manually edited like in
         *                           CoCodeCostMatrixMult.
         * 
-        * @return The CompressedSizeInfoColGroup fro the given column indexes.
+        * @return The CompressedSizeInfoColGroup for the given column indexes.
         */
        public abstract CompressedSizeInfoColGroup 
estimateCompressedColGroupSize(int[] colIndexes, int estimate,
                int nrUniqueUpperBound);
 
        /**
+        * A method to extract the Compressed Size Info for a given list of 
columns, This method further limits the estimated
+        * number of unique values, since in some cases the estimated number of 
uniques is estimated higher than the number
+        * estimated in sub groups of the given colIndexes.
+        * 
+        * The Difference for this method is that it extract the values as 
delta values from the matrix block input.
+        * 
+        * @param colIndexes         The columns to extract compression 
information from
+        * @param estimate           An estimate of number of unique delta 
elements in these columns
+        * @param nrUniqueUpperBound The upper bound of unique elements allowed 
in the estimate, can be calculated from the
+        *                           number of unique elements estimated in sub 
columns multiplied together. This is flexible
+        *                           in the sense that if the sample is small 
then this unique can be manually edited like in
+        *                           CoCodeCostMatrixMult.
+        * 
+        * @return The CompressedSizeInfoColGroup for the given column indexes.
+        */
+       public abstract CompressedSizeInfoColGroup 
estimateCompressedColGroupSizeDeltaEncoded(int[] colIndexes, int estimate,
+               int nrUniqueUpperBound);
+
+       /**
         * Join two analyzed column groups together. without materializing the 
dictionaries of either side.
         * 
         * if the number of distinct elements in both sides multiplied is 
larger than Integer, return null.
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java
 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java
index c3c6595..e55a44d 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java
@@ -43,6 +43,16 @@ public class CompressedSizeEstimatorExact extends 
CompressedSizeEstimator {
        }
 
        @Override
+       public CompressedSizeInfoColGroup 
estimateCompressedColGroupSizeDeltaEncoded(int[] colIndexes, int estimate,
+               int nrUniqueUpperBound) {
+               final int _numRows = getNumRows();
+               final IEncode map = IEncode.createFromMatrixBlockDelta(_data, 
_cs.transposed, colIndexes);
+               final EstimationFactors em = 
map.computeSizeEstimation(colIndexes, _numRows, _data.getSparsity(),
+                       _data.getSparsity());
+               return new CompressedSizeInfoColGroup(colIndexes, em, 
_cs.validCompressions, map);
+       }
+
+       @Override
        protected CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] 
joined, CompressedSizeInfoColGroup g1,
                CompressedSizeInfoColGroup g2, int joinedMaxDistinct) {
                final int _numRows = getNumRows();
@@ -65,4 +75,5 @@ public class CompressedSizeEstimatorExact extends 
CompressedSizeEstimator {
        public final int getSampleSize() {
                return getNumRows();
        }
+
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
index 9c1363c..7d34aff9 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
@@ -82,14 +82,29 @@ public class CompressedSizeEstimatorSample extends 
CompressedSizeEstimator {
        @Override
        public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] 
colIndexes, int estimate,
                int nrUniqueUpperBound) {
-
+               // Extract primitive information from sample
                final IEncode map = IEncode.createFromMatrixBlock(_sample, 
_transposed, colIndexes);
-               final EstimationFactors sampleFacts = 
map.computeSizeEstimation(colIndexes, _sampleSize, _data.getSparsity(), 
_data.getSparsity());
-               // EstimationFactors.computeSizeEstimation(colIndexes, map, 
false, _sampleSize,
-                       // false);
+               // Get the facts for the sample
+               final EstimationFactors sampleFacts = 
map.computeSizeEstimation(colIndexes, _sampleSize, _data.getSparsity(),
+                       _data.getSparsity());
+               // Scale the facts up to full size
                final EstimationFactors em = 
estimateCompressionFactors(sampleFacts, map, colIndexes, nrUniqueUpperBound);
                return new CompressedSizeInfoColGroup(colIndexes, em, 
_cs.validCompressions, map);
+       }
 
+       @Override
+       public CompressedSizeInfoColGroup 
estimateCompressedColGroupSizeDeltaEncoded(int[] colIndexes, int estimate,
+               int nrUniqueUpperBound) {
+               // Don't use sample when doing estimation of delta encoding, 
instead we read from the start of the matrix until
+               // sample size. This guarantees that the delta values are 
actually represented in the full compression
+               final IEncode map = IEncode.createFromMatrixBlockDelta(_data, 
_transposed, colIndexes, _sampleSize);
+               // Get the Facts for the sample
+               final EstimationFactors sampleFacts = 
map.computeSizeEstimation(colIndexes, _sampleSize, _data.getSparsity(),
+                       _data.getSparsity());
+               // TODO find out if we need to scale differently if we use 
delta (I suspect not)
+               // Scale sample
+               final EstimationFactors em = 
estimateCompressionFactors(sampleFacts, map, colIndexes, nrUniqueUpperBound);
+               return new CompressedSizeInfoColGroup(colIndexes, em, 
_cs.validCompressions, map);
        }
 
        @Override
@@ -106,9 +121,10 @@ public class CompressedSizeEstimatorSample extends 
CompressedSizeEstimator {
                        return null;
 
                final IEncode map = g1.getMap().join(g2.getMap());
-               final EstimationFactors sampleFacts = 
map.computeSizeEstimation(joined, _sampleSize,_data.getSparsity(), 
_data.getSparsity());
-               //  EstimationFactors.computeSizeEstimation(joined, map,
-                       // _cs.validCompressions.contains(CompressionType.RLE), 
map.size(), false);
+               final EstimationFactors sampleFacts = 
map.computeSizeEstimation(joined, _sampleSize, _data.getSparsity(),
+                       _data.getSparsity());
+               // EstimationFactors.computeSizeEstimation(joined, map,
+               // _cs.validCompressions.contains(CompressionType.RLE), 
map.size(), false);
 
                // result facts
                final EstimationFactors em = 
estimateCompressionFactors(sampleFacts, map, joined, joinedMaxDistinct);
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorUltraSparse.java
 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorUltraSparse.java
index 5915fcd..7a31f13 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorUltraSparse.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorUltraSparse.java
@@ -95,6 +95,12 @@ public class CompressedSizeEstimatorUltraSparse extends 
CompressedSizeEstimator
        }
 
        @Override
+       public CompressedSizeInfoColGroup 
estimateCompressedColGroupSizeDeltaEncoded(int[] colIndexes, int estimate,
+               int nrUniqueUpperBound) {
+               throw new NotImplementedException("Delta sampling is not clear 
how to do in ultra sparse");
+       }
+
+       @Override
        protected CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] 
joined, CompressedSizeInfoColGroup g1,
                CompressedSizeInfoColGroup g2, int joinedMaxDistinct) {
                throw new NotImplementedException();
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
index 04a2248..9bcec29 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
@@ -190,6 +190,7 @@ public class CompressedSizeInfoColGroup {
        private static long getCompressionSize(int numCols, CompressionType ct, 
EstimationFactors fact) {
                int nv;
                switch(ct) {
+                       case DeltaDDC: // TODO add proper extraction
                        case DDC:
                                nv = fact.numVals + (fact.zeroIsMostFrequent ? 
1 : 0);
                                // + 1 if the column contains zero
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/IEncode.java 
b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/IEncode.java
index fc991ff..dfcda0c 100644
--- 
a/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/IEncode.java
+++ 
b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/IEncode.java
@@ -54,6 +54,14 @@ public interface IEncode {
                        return createWithReader(m, rowCols, transposed);
        }
 
+       public static IEncode createFromMatrixBlockDelta(MatrixBlock m, boolean 
transposed, int[] rowCols){
+               return createFromMatrixBlockDelta(m, transposed, rowCols, 
transposed ? m.getNumColumns() : m.getNumRows());
+       }
+
+       public static IEncode createFromMatrixBlockDelta(MatrixBlock m, boolean 
transposed, int[] rowCols, int nVals){
+               throw new NotImplementedException();
+       }
+
        public static IEncode createFromMatrixBlock(MatrixBlock m, boolean 
transposed, int rowCol) {
                if(m.isEmpty())
                        return new EmptyEncoding();
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupDeltaDDCTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupDeltaDDCTest.java
new file mode 100644
index 0000000..3520cc4
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupDeltaDDCTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.compress.colgroup;
+
+import java.util.EnumSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.compress.CompressionSettings;
+import org.apache.sysds.runtime.compress.CompressionSettingsBuilder;
+import org.apache.sysds.runtime.compress.colgroup.AColGroup;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory;
+import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimatorExact;
+import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
+import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
+import org.apache.sysds.runtime.matrix.data.LibMatrixReorg;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.util.DataConverter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ColGroupDeltaDDCTest {
+
+       protected static final Log LOG = 
LogFactory.getLog(JolEstimateTest.class.getName());
+
+       @Test
+       public void testDecompressToDenseBlockSingleColumn() {
+               testDecompressToDenseBlock(new double[][] {{1, 2, 3, 4, 5}}, 
true);
+       }
+
+       @Test
+       public void testDecompressToDenseBlockSingleColumnTransposed() {
+               testDecompressToDenseBlock(new double[][] {{1}, {2}, {3}, {4}, 
{5}}, false);
+       }
+
+       @Test
+       public void testDecompressToDenseBlockTwoColumns() {
+               testDecompressToDenseBlock(new double[][] {{1, 1}, {2, 1}, {3, 
1}, {4, 1}, {5, 1}}, false);
+       }
+
+       @Test
+       public void testDecompressToDenseBlockTwoColumnsTransposed() {
+               testDecompressToDenseBlock(new double[][] {{1, 2, 3, 4, 5}, {1, 
1, 1, 1, 1}}, true);
+       }
+
+       public void testDecompressToDenseBlock(double[][] data, boolean 
isTransposed) {
+               MatrixBlock mbt = DataConverter.convertToMatrixBlock(data);
+
+               final int numCols = isTransposed ? mbt.getNumRows() : 
mbt.getNumColumns();
+               final int numRows = isTransposed ? mbt.getNumColumns() : 
mbt.getNumRows();
+               int[] colIndexes = new int[numCols];
+               for(int x = 0; x < numCols; x++)
+                       colIndexes[x] = x;
+
+               try {
+                       CompressionSettings cs = new 
CompressionSettingsBuilder().setSamplingRatio(1.0)
+                               
.setValidCompressions(EnumSet.of(AColGroup.CompressionType.DeltaDDC)).create();
+                       cs.transposed = isTransposed;
+
+                       final CompressedSizeInfoColGroup cgi = new 
CompressedSizeEstimatorExact(mbt, cs)
+                               .estimateCompressedColGroupSize(colIndexes);
+                       CompressedSizeInfo csi = new CompressedSizeInfo(cgi);
+                       AColGroup cg = ColGroupFactory.compressColGroups(mbt, 
csi, cs, 1).get(0);
+
+                       // Decompress to dense block
+                       MatrixBlock ret = new MatrixBlock(numRows, numCols, 
false);
+                       ret.allocateDenseBlock();
+                       cg.decompressToDenseBlock(ret.getDenseBlock(), 0, 
numRows);
+
+                       MatrixBlock expected = 
DataConverter.convertToMatrixBlock(data);
+                       if(isTransposed)
+                               LibMatrixReorg.transposeInPlace(expected, 1);
+                       
Assert.assertArrayEquals(expected.getDenseBlockValues(), 
ret.getDenseBlockValues(), 0.01);
+
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       throw new DMLRuntimeException("Failed construction : " 
+ this.getClass().getSimpleName());
+               }
+       }
+
+}
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateDeltaDDCTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateDeltaDDCTest.java
new file mode 100644
index 0000000..c8b90ef
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateDeltaDDCTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.compress.colgroup;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.sysds.runtime.compress.colgroup.AColGroup;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.util.DataConverter;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+public class JolEstimateDeltaDDCTest extends JolEstimateTest {
+
+       @Parameterized.Parameters
+       public static Collection<Object[]> data() {
+               ArrayList<Object[]> tests = new ArrayList<>();
+
+               MatrixBlock mb;
+
+               mb = DataConverter.convertToMatrixBlock(new double[][] {{0}});
+               tests.add(new Object[] {mb});
+
+               mb = DataConverter.convertToMatrixBlock(new double[][] {{1}});
+               tests.add(new Object[] {mb});
+
+               // TODO add reader that reads as if Delta encoded.
+               // then afterwards use this test.
+
+               // mb = DataConverter.convertToMatrixBlock(new double[][] {{1, 
2, 3, 4, 5}});
+               // tests.add(new Object[] {mb});
+
+               // mb = DataConverter.convertToMatrixBlock(new double[][] 
{{1,2,3},{1,1,1}});
+               // tests.add(new Object[] {mb});
+
+               // mb = DataConverter.convertToMatrixBlock(new double[][] {{1, 
1}, {2, 1}, {3, 1}, {4, 1}, {5, 1}});
+               // tests.add(new Object[] {mb});
+
+               // mb = TestUtils.generateTestMatrixBlock(2, 5, 0, 20, 1.0, 7);
+               // tests.add(new Object[] {mb});
+
+               return tests;
+       }
+
+       public JolEstimateDeltaDDCTest(MatrixBlock mb) {
+               super(mb);
+       }
+
+       @Override
+       public AColGroup.CompressionType getCT() {
+               return delta;
+       }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java
index dcecacb..c5219bf 100644
--- 
a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java
@@ -49,6 +49,7 @@ public abstract class JolEstimateTest {
        protected static final Log LOG = 
LogFactory.getLog(JolEstimateTest.class.getName());
 
        protected static final CompressionType ddc = CompressionType.DDC;
+       protected static final CompressionType delta = CompressionType.DeltaDDC;
        protected static final CompressionType ole = CompressionType.OLE;
        protected static final CompressionType rle = CompressionType.RLE;
        protected static final CompressionType sdc = CompressionType.SDC;
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/dictionary/DeltaDictionaryTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/dictionary/DeltaDictionaryTest.java
new file mode 100644
index 0000000..69fc14b
--- /dev/null
+++ 
b/src/test/java/org/apache/sysds/test/component/compress/dictionary/DeltaDictionaryTest.java
@@ -0,0 +1,105 @@
+package org.apache.sysds.test.component.compress.dictionary;
+
+import org.apache.sysds.runtime.compress.colgroup.dictionary.DeltaDictionary;
+import org.apache.sysds.runtime.functionobjects.Divide;
+import org.apache.sysds.runtime.functionobjects.Minus;
+import org.apache.sysds.runtime.functionobjects.Multiply;
+import org.apache.sysds.runtime.functionobjects.Plus;
+import org.apache.sysds.runtime.matrix.operators.LeftScalarOperator;
+import org.apache.sysds.runtime.matrix.operators.RightScalarOperator;
+import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DeltaDictionaryTest {
+
+    @Test
+    public void testScalarOpRightMultiplySingleColumn() {
+        double scalar = 2;
+        DeltaDictionary d = new DeltaDictionary(new double[] {1, 2}, 1);
+        ScalarOperator sop = new 
RightScalarOperator(Multiply.getMultiplyFnObject(), scalar, 1);
+        d = d.applyScalarOp(sop);
+        double[] expected = new double[] {2, 4};
+        Assert.assertArrayEquals(expected, d.getValues(), 0.01);
+    }
+
+    @Test
+    public void testScalarOpRightMultiplyTwoColumns() {
+        double scalar = 2;
+        DeltaDictionary d = new DeltaDictionary(new double[] {1, 2, 3, 4}, 2);
+        ScalarOperator sop = new 
RightScalarOperator(Multiply.getMultiplyFnObject(), scalar, 1);
+        d = d.applyScalarOp(sop);
+        double[] expected = new double[] {2, 4, 6, 8};
+        Assert.assertArrayEquals(expected, d.getValues(), 0.01);
+    }
+
+    @Test
+    public void testNegScalarOpRightMultiplyTwoColumns() {
+        double scalar = -2;
+        DeltaDictionary d = new DeltaDictionary(new double[] {1, 2, 3, 4}, 2);
+        ScalarOperator sop = new 
RightScalarOperator(Multiply.getMultiplyFnObject(), scalar, 1);
+        d = d.applyScalarOp(sop);
+        double[] expected = new double[] {-2, -4, -6, -8};
+        Assert.assertArrayEquals(expected, d.getValues(), 0.01);
+    }
+
+    @Test
+    public void testScalarOpLeftMultiplyTwoColumns() {
+        double scalar = 2;
+        DeltaDictionary d = new DeltaDictionary(new double[] {1, 2, 3, 4}, 2);
+        ScalarOperator sop = new 
LeftScalarOperator(Multiply.getMultiplyFnObject(), scalar, 1);
+        d = d.applyScalarOp(sop);
+        double[] expected = new double[] {2, 4, 6, 8};
+        Assert.assertArrayEquals(expected, d.getValues(), 0.01);
+    }
+
+    @Test
+    public void testScalarOpRightDivideTwoColumns() {
+        double scalar = 0.5;
+        DeltaDictionary d = new DeltaDictionary(new double[] {1, 2, 3, 4}, 2);
+        ScalarOperator sop = new 
RightScalarOperator(Divide.getDivideFnObject(), scalar, 1);
+        d = d.applyScalarOp(sop);
+        double[] expected = new double[] {2, 4, 6, 8};
+        Assert.assertArrayEquals(expected, d.getValues(), 0.01);
+    }
+
+    @Test
+    public void testScalarOpRightPlusSingleColumn() {
+        double scalar = 2;
+        DeltaDictionary d = new DeltaDictionary(new double[] {1, 2}, 1);
+        ScalarOperator sop = new RightScalarOperator(Plus.getPlusFnObject(), 
scalar, 1);
+        d = d.applyScalarOp(sop);
+        double[] expected = new double[] {3, 2};
+        Assert.assertArrayEquals(expected, d.getValues(), 0.01);
+    }
+
+    @Test
+    public void testScalarOpRightPlusTwoColumns() {
+        double scalar = 2;
+        DeltaDictionary d = new DeltaDictionary(new double[] {1, 2, 3, 4}, 2);
+        ScalarOperator sop = new RightScalarOperator(Plus.getPlusFnObject(), 
scalar, 1);
+        d = d.applyScalarOp(sop);
+        double[] expected = new double[] {3, 4, 3, 4};
+        Assert.assertArrayEquals(expected, d.getValues(), 0.01);
+    }
+
+    @Test
+    public void testScalarOpRightMinusTwoColumns() {
+        double scalar = 2;
+        DeltaDictionary d = new DeltaDictionary(new double[] {1, 2, 3, 4}, 2);
+        ScalarOperator sop = new RightScalarOperator(Minus.getMinusFnObject(), 
scalar, 1);
+        d = d.applyScalarOp(sop);
+        double[] expected = new double[] {-1, 0, 3, 4};
+        Assert.assertArrayEquals(expected, d.getValues(), 0.01);
+    }
+
+    @Test
+    public void testScalarOpLeftPlusTwoColumns() {
+        double scalar = 2;
+        DeltaDictionary d = new DeltaDictionary(new double[] {1, 2, 3, 4}, 2);
+        ScalarOperator sop = new LeftScalarOperator(Plus.getPlusFnObject(), 
scalar, 1);
+        d = d.applyScalarOp(sop);
+        double[] expected = new double[] {3, 4, 3, 4};
+        Assert.assertArrayEquals(expected, d.getValues(), 0.01);
+    }
+}

Reply via email to