This is an automated email from the ASF dual-hosted git repository. baunsgaard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
commit 31f2653b785c4e00c987347264f5f96e8315dd4c Author: Muhammad Osama <[email protected]> AuthorDate: Thu Jan 20 22:09:28 2022 +0100 [SYSTEMDS-2831] CLA DeltaDDC Column Group This commit adds a new column group type named DeltaDDC, with support for a few operations. While merging I added a interface for future readers for delta encoding, a future tasks is to enable analysis of delta encoding. DIA project WS2021/22. Closes #1518 --- .../sysds/runtime/compress/colgroup/AColGroup.java | 4 +- .../compress/colgroup/ColGroupDeltaDDC.java | 81 ++++++++++++++++ .../runtime/compress/colgroup/ColGroupFactory.java | 74 +++++++++++++-- .../runtime/compress/colgroup/ColGroupIO.java | 2 + .../colgroup/dictionary/DeltaDictionary.java | 43 +++++++++ .../compress/colgroup/dictionary/Dictionary.java | 2 +- .../colgroup/dictionary/DictionaryFactory.java | 4 +- .../compress/estim/CompressedSizeEstimator.java | 32 ++++++- .../estim/CompressedSizeEstimatorExact.java | 11 +++ .../estim/CompressedSizeEstimatorSample.java | 30 ++++-- .../estim/CompressedSizeEstimatorUltraSparse.java | 6 ++ .../compress/estim/CompressedSizeInfoColGroup.java | 1 + .../runtime/compress/estim/encoding/IEncode.java | 8 ++ .../compress/colgroup/ColGroupDeltaDDCTest.java | 100 ++++++++++++++++++++ .../compress/colgroup/JolEstimateDeltaDDCTest.java | 72 ++++++++++++++ .../compress/colgroup/JolEstimateTest.java | 1 + .../compress/dictionary/DeltaDictionaryTest.java | 105 +++++++++++++++++++++ 17 files changed, 557 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java index af4d757..23674b1 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java @@ -48,7 +48,7 @@ public abstract class AColGroup implements Serializable { /** Public super types of compression ColGroups supported */ public enum CompressionType { - UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, PFOR, + UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, PFOR, DeltaDDC } /** @@ -57,7 +57,7 @@ public abstract class AColGroup implements Serializable { * Protected such that outside the ColGroup package it should be unknown which specific subtype is used. */ protected enum ColGroupType { - UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCSingle, SDCSingleZeros, SDCZeros, PFOR; + UNCOMPRESSED, RLE, OLE, DDC, CONST, EMPTY, SDC, SDCSingle, SDCSingleZeros, SDCZeros, PFOR, DeltaDDC; } /** The ColGroup Indexes contained in the ColGroup */ diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDeltaDDC.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDeltaDDC.java new file mode 100644 index 0000000..0949dae --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDeltaDDC.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.compress.colgroup; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary; +import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData; +import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.matrix.operators.ScalarOperator; + +/** + * Class to encapsulate information about a column group that is first delta encoded then encoded with dense dictionary + * encoding (DeltaDDC). + */ +public class ColGroupDeltaDDC extends ColGroupDDC { + + /** + * Constructor for serialization + * + * @param numRows number of rows + */ + protected ColGroupDeltaDDC(int numRows) { + super(numRows); + } + + protected ColGroupDeltaDDC(int[] colIndices, int numRows, ADictionary dict, AMapToData data, int[] cachedCounts) { + super(colIndices, numRows, dict, data, cachedCounts); + _zeros = false; + _data = data; + } + + public CompressionType getCompType() { + return CompressionType.DeltaDDC; + } + + @Override + protected void decompressToDenseBlockDenseDictionary(DenseBlock db, int rl, int ru, int offR, int offC, + double[] values) { + final int nCol = _colIndexes.length; + for(int i = rl, offT = rl + offR; i < ru; i++, offT++) { + final double[] c = db.values(offT); + final int off = db.pos(offT) + offC; + final int rowIndex = _data.getIndex(i) * nCol; + final int prevOff = (off == 0) ? off : off - nCol; + for(int j = 0; j < nCol; j++) { + // Here we use the values in the previous row to compute current values along with the delta + double newValue = c[prevOff + j] + values[rowIndex + j]; + c[off + _colIndexes[j]] += newValue; + } + } + } + + @Override + protected void decompressToSparseBlockDenseDictionary(SparseBlock ret, int rl, int ru, int offR, int offC, + double[] values) { + throw new NotImplementedException(); + } + + @Override + public AColGroup scalarOperation(ScalarOperator op) { + return new ColGroupDeltaDDC(_colIndexes, _numRows, _dict.applyScalarOp(op), _data, getCachedCounts()); + } +} diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java index 29a4a21..dbc08a8 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java @@ -58,8 +58,10 @@ import org.apache.sysds.runtime.compress.utils.IntArrayList; import org.apache.sysds.runtime.compress.utils.Util; import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing; import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.matrix.data.LibMatrixReorg; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.CommonThreadPool; +import org.apache.sysds.runtime.util.DataConverter; /** * Factory class for constructing ColGroups. @@ -335,6 +337,12 @@ public class ColGroupFactory { tmp.getDblCountMap(nrUniqueEstimate), cs); else if(colIndexes.length > 1 && estimatedBestCompressionType == CompressionType.DDC) return directCompressDDC(colIndexes, in, cs, cg, k); + else if(estimatedBestCompressionType == CompressionType.DeltaDDC) { + if(colIndexes.length > 1) + return directCompressDeltaDDC(colIndexes, in, cs, cg, k); + else + return compressDeltaDDC(colIndexes, in, cs, cg); + } else { final int numRows = cs.transposed ? in.getNumColumns() : in.getNumRows(); final ABitmap ubm = BitmapEncoder.extractBitmap(colIndexes, in, cs.transposed, nrUniqueEstimate, @@ -376,11 +384,26 @@ public class ColGroupFactory { final int rlen = cs.transposed ? raw.getNumColumns() : raw.getNumRows(); // use a Map that is at least char size. final int nVal = cg.getNumVals() < 16 ? 16 : Math.max(cg.getNumVals(), 257); - return directCompressDDC(colIndexes, raw, cs, cg, MapToFactory.create(rlen, nVal), rlen, k); + return directCompressDDCColGroup(colIndexes, raw, cs, cg, MapToFactory.create(rlen, nVal), rlen, k, false); } - private static AColGroup directCompressDDC(int[] colIndexes, MatrixBlock raw, CompressionSettings cs, - CompressedSizeInfoColGroup cg, AMapToData data, int rlen, int k) { + private static AColGroup directCompressDeltaDDC(int[] colIndexes, MatrixBlock raw, CompressionSettings cs, + CompressedSizeInfoColGroup cg, int k) { + final int rlen = cs.transposed ? raw.getNumColumns() : raw.getNumRows(); + // use a Map that is at least char size. + final int nVal = cg.getNumVals() < 16 ? 16 : Math.max(cg.getNumVals(), 257); + if(cs.transposed) { + LOG.warn("In-effecient transpose back of the input matrix to do delta encoding"); + raw = LibMatrixReorg.transposeInPlace(raw, k); + cs.transposed = false; + } + // Delta encode the raw data + raw = deltaEncodeMatrixBlock(raw); + return directCompressDDCColGroup(colIndexes, raw, cs, cg, MapToFactory.create(rlen, nVal), rlen, k, true); + } + + private static AColGroup directCompressDDCColGroup(int[] colIndexes, MatrixBlock raw, CompressionSettings cs, + CompressedSizeInfoColGroup cg, AMapToData data, int rlen, int k, boolean deltaEncoded) { final int fill = data.getUpperBoundValue(); data.fill(fill); @@ -396,7 +419,7 @@ public class ColGroupFactory { // This is highly unlikely but could happen if forced compression of // not transposed column and the estimator says use DDC. return new ColGroupEmpty(colIndexes); - ADictionary dict = DictionaryFactory.create(map, colIndexes.length, extra); + ADictionary dict = DictionaryFactory.create(map, colIndexes.length, extra, deltaEncoded); if(extra) { data.replace(fill, map.size()); data.setUnique(map.size() + 1); @@ -405,8 +428,10 @@ public class ColGroupFactory { data.setUnique(map.size()); AMapToData resData = MapToFactory.resize(data, map.size() + (extra ? 1 : 0)); - ColGroupDDC res = new ColGroupDDC(colIndexes, rlen, dict, resData, null); - return res; + if(deltaEncoded) + return new ColGroupDeltaDDC(colIndexes, rlen, dict, resData, null); + else + return new ColGroupDDC(colIndexes, rlen, dict, resData, null); } private static boolean readToMapDDC(final int[] colIndexes, final MatrixBlock raw, final DblArrayCountHashMap map, @@ -460,6 +485,22 @@ public class ColGroupFactory { } } + private static MatrixBlock deltaEncodeMatrixBlock(MatrixBlock mb) { + LOG.warn("Delta encoding entire matrix input!!"); + int rows = mb.getNumRows(); + int cols = mb.getNumColumns(); + double[][] ret = new double[rows][cols]; + double[] a = mb.getDenseBlockValues(); + for(int i = 0, ix = 0; i < rows; i++) { + int prevRowOff = i > 0 ? ix - cols : 0; + for(int j = 0; j < cols; j++, ix++) { + double currentValue = a[ix]; + ret[i][j] = i > 0 ? currentValue - a[prevRowOff + j] : currentValue; + } + } + return DataConverter.convertToMatrixBlock(ret); + } + static class readToMapDDCTask implements Callable<Boolean> { private final int[] _colIndexes; private final MatrixBlock _raw; @@ -590,6 +631,27 @@ public class ColGroupFactory { return new ColGroupDDC(colIndexes, rlen, dict, data, null); } + private static AColGroup compressDeltaDDC(int[] colIndexes, MatrixBlock in, CompressionSettings cs, + CompressedSizeInfoColGroup cg) { + + LOG.warn("Multi column Delta encoding only supported if delta encoding is only compression"); + if(cs.transposed) { + LibMatrixReorg.transposeInPlace(in, 1); + cs.transposed = false; + } + // Delta encode the raw data + in = deltaEncodeMatrixBlock(in); + + final int rlen = in.getNumRows(); + // TODO Add extractBitMap that is delta to not require delta encoding entire input matrix. + final ABitmap ubm = BitmapEncoder.extractBitmap(colIndexes, in, cs.transposed, cg.getNumVals(), + cs.sortTuplesByFrequency); + boolean zeros = ubm.getNumOffsets() < rlen; + ADictionary dict = DictionaryFactory.create(ubm, cg.getTupleSparsity(), zeros); + AMapToData data = MapToFactory.create(rlen, zeros, ubm.getOffsetList()); + return new ColGroupDeltaDDC(colIndexes, rlen, dict, data, null); + } + private static AColGroup compressOLE(int[] colIndexes, int rlen, ABitmap ubm, CompressionSettings cs, double tupleSparsity) { diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupIO.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupIO.java index 184ca1a..bcb6a02 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupIO.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupIO.java @@ -106,6 +106,8 @@ public class ColGroupIO { return new ColGroupRLE(nRows); case DDC: return new ColGroupDDC(nRows); + case DeltaDDC: + return new ColGroupDeltaDDC(nRows); case CONST: return new ColGroupConst(); case EMPTY: diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DeltaDictionary.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DeltaDictionary.java new file mode 100644 index 0000000..9942d7e --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DeltaDictionary.java @@ -0,0 +1,43 @@ +package org.apache.sysds.runtime.compress.colgroup.dictionary; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.sysds.runtime.functionobjects.Divide; +import org.apache.sysds.runtime.functionobjects.Multiply; +import org.apache.sysds.runtime.functionobjects.Plus; +import org.apache.sysds.runtime.functionobjects.Minus; +import org.apache.sysds.runtime.matrix.operators.ScalarOperator; + +/** + * This dictionary class is a specialization for the DeltaDDCColgroup. Here the adjustments for operations for the delta + * encoded values are implemented. + */ +public class DeltaDictionary extends Dictionary { + + private final int _numCols; + + public DeltaDictionary(double[] values, int numCols) { + super(values); + _numCols = numCols; + } + + @Override + public DeltaDictionary applyScalarOp(ScalarOperator op) { + final double[] retV = new double[_values.length]; + if (op.fn instanceof Multiply || op.fn instanceof Divide) { + for(int i = 0; i < _values.length; i++) + retV[i] = op.executeScalar(_values[i]); + } + else if (op.fn instanceof Plus || op.fn instanceof Minus) { + // With Plus and Minus only the first row needs to be updated when delta encoded + for(int i = 0; i < _values.length; i++) { + if (i < _numCols) + retV[i] = op.executeScalar(_values[i]); + else + retV[i] = _values[i]; + } + } else + throw new NotImplementedException(); + + return new DeltaDictionary(retV, _numCols); + } +} diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/Dictionary.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/Dictionary.java index 0d4eaec..363e22d 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/Dictionary.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/Dictionary.java @@ -41,7 +41,7 @@ public class Dictionary extends ADictionary { private static final long serialVersionUID = -6517136537249507753L; - private final double[] _values; + protected final double[] _values; public Dictionary(double[] values) { if(values == null || values.length == 0) diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java index 05784e4..610458e 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java @@ -66,7 +66,7 @@ public class DictionaryFactory { return Dictionary.getInMemorySize(nrValues * nrColumns); } - public static ADictionary create(DblArrayCountHashMap map, int nCols, boolean addZeroTuple) { + public static ADictionary create(DblArrayCountHashMap map, int nCols, boolean addZeroTuple, boolean deltaEncoded) { final ArrayList<DArrCounts> vals = map.extractValues(); final int nVals = vals.size(); final double[] resValues = new double[(nVals + (addZeroTuple ? 1 : 0)) * nCols]; @@ -74,7 +74,7 @@ public class DictionaryFactory { final DArrCounts dac = vals.get(i); System.arraycopy(dac.key.getData(), 0, resValues, dac.id * nCols, nCols); } - return new Dictionary(resValues); + return deltaEncoded ? new DeltaDictionary(resValues, nCols) : new Dictionary(resValues); } public static ADictionary create(ABitmap ubm) { diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java index fd69fb6..109d8b3 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java +++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java @@ -158,6 +158,17 @@ public abstract class CompressedSizeEstimator { } /** + * Method for extracting Compressed Size Info of specified columns as delta encodings (delta from previous rows + * values), together in a single ColGroup + * + * @param colIndexes The columns to group together inside a ColGroup + * @return The CompressedSizeInformation associated with the selected ColGroups as delta encoding. + */ + public CompressedSizeInfoColGroup estimateCompressedColGroupSizeDeltaEncoded(int[] colIndexes) { + return estimateCompressedColGroupSize(colIndexes, 8, worstCaseUpperBound(colIndexes)); + } + + /** * A method to extract the Compressed Size Info for a given list of columns, This method further limits the estimated * number of unique values, since in some cases the estimated number of uniques is estimated higher than the number * estimated in sub groups of the given colIndexes. @@ -169,12 +180,31 @@ public abstract class CompressedSizeEstimator { * in the sense that if the sample is small then this unique can be manually edited like in * CoCodeCostMatrixMult. * - * @return The CompressedSizeInfoColGroup fro the given column indexes. + * @return The CompressedSizeInfoColGroup for the given column indexes. */ public abstract CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes, int estimate, int nrUniqueUpperBound); /** + * A method to extract the Compressed Size Info for a given list of columns, This method further limits the estimated + * number of unique values, since in some cases the estimated number of uniques is estimated higher than the number + * estimated in sub groups of the given colIndexes. + * + * The Difference for this method is that it extract the values as delta values from the matrix block input. + * + * @param colIndexes The columns to extract compression information from + * @param estimate An estimate of number of unique delta elements in these columns + * @param nrUniqueUpperBound The upper bound of unique elements allowed in the estimate, can be calculated from the + * number of unique elements estimated in sub columns multiplied together. This is flexible + * in the sense that if the sample is small then this unique can be manually edited like in + * CoCodeCostMatrixMult. + * + * @return The CompressedSizeInfoColGroup for the given column indexes. + */ + public abstract CompressedSizeInfoColGroup estimateCompressedColGroupSizeDeltaEncoded(int[] colIndexes, int estimate, + int nrUniqueUpperBound); + + /** * Join two analyzed column groups together. without materializing the dictionaries of either side. * * if the number of distinct elements in both sides multiplied is larger than Integer, return null. diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java index c3c6595..e55a44d 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java +++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java @@ -43,6 +43,16 @@ public class CompressedSizeEstimatorExact extends CompressedSizeEstimator { } @Override + public CompressedSizeInfoColGroup estimateCompressedColGroupSizeDeltaEncoded(int[] colIndexes, int estimate, + int nrUniqueUpperBound) { + final int _numRows = getNumRows(); + final IEncode map = IEncode.createFromMatrixBlockDelta(_data, _cs.transposed, colIndexes); + final EstimationFactors em = map.computeSizeEstimation(colIndexes, _numRows, _data.getSparsity(), + _data.getSparsity()); + return new CompressedSizeInfoColGroup(colIndexes, em, _cs.validCompressions, map); + } + + @Override protected CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] joined, CompressedSizeInfoColGroup g1, CompressedSizeInfoColGroup g2, int joinedMaxDistinct) { final int _numRows = getNumRows(); @@ -65,4 +75,5 @@ public class CompressedSizeEstimatorExact extends CompressedSizeEstimator { public final int getSampleSize() { return getNumRows(); } + } diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java index 9c1363c..7d34aff9 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java +++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java @@ -82,14 +82,29 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator { @Override public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes, int estimate, int nrUniqueUpperBound) { - + // Extract primitive information from sample final IEncode map = IEncode.createFromMatrixBlock(_sample, _transposed, colIndexes); - final EstimationFactors sampleFacts = map.computeSizeEstimation(colIndexes, _sampleSize, _data.getSparsity(), _data.getSparsity()); - // EstimationFactors.computeSizeEstimation(colIndexes, map, false, _sampleSize, - // false); + // Get the facts for the sample + final EstimationFactors sampleFacts = map.computeSizeEstimation(colIndexes, _sampleSize, _data.getSparsity(), + _data.getSparsity()); + // Scale the facts up to full size final EstimationFactors em = estimateCompressionFactors(sampleFacts, map, colIndexes, nrUniqueUpperBound); return new CompressedSizeInfoColGroup(colIndexes, em, _cs.validCompressions, map); + } + @Override + public CompressedSizeInfoColGroup estimateCompressedColGroupSizeDeltaEncoded(int[] colIndexes, int estimate, + int nrUniqueUpperBound) { + // Don't use sample when doing estimation of delta encoding, instead we read from the start of the matrix until + // sample size. This guarantees that the delta values are actually represented in the full compression + final IEncode map = IEncode.createFromMatrixBlockDelta(_data, _transposed, colIndexes, _sampleSize); + // Get the Facts for the sample + final EstimationFactors sampleFacts = map.computeSizeEstimation(colIndexes, _sampleSize, _data.getSparsity(), + _data.getSparsity()); + // TODO find out if we need to scale differently if we use delta (I suspect not) + // Scale sample + final EstimationFactors em = estimateCompressionFactors(sampleFacts, map, colIndexes, nrUniqueUpperBound); + return new CompressedSizeInfoColGroup(colIndexes, em, _cs.validCompressions, map); } @Override @@ -106,9 +121,10 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator { return null; final IEncode map = g1.getMap().join(g2.getMap()); - final EstimationFactors sampleFacts = map.computeSizeEstimation(joined, _sampleSize,_data.getSparsity(), _data.getSparsity()); - // EstimationFactors.computeSizeEstimation(joined, map, - // _cs.validCompressions.contains(CompressionType.RLE), map.size(), false); + final EstimationFactors sampleFacts = map.computeSizeEstimation(joined, _sampleSize, _data.getSparsity(), + _data.getSparsity()); + // EstimationFactors.computeSizeEstimation(joined, map, + // _cs.validCompressions.contains(CompressionType.RLE), map.size(), false); // result facts final EstimationFactors em = estimateCompressionFactors(sampleFacts, map, joined, joinedMaxDistinct); diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorUltraSparse.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorUltraSparse.java index 5915fcd..7a31f13 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorUltraSparse.java +++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorUltraSparse.java @@ -95,6 +95,12 @@ public class CompressedSizeEstimatorUltraSparse extends CompressedSizeEstimator } @Override + public CompressedSizeInfoColGroup estimateCompressedColGroupSizeDeltaEncoded(int[] colIndexes, int estimate, + int nrUniqueUpperBound) { + throw new NotImplementedException("Delta sampling is not clear how to do in ultra sparse"); + } + + @Override protected CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] joined, CompressedSizeInfoColGroup g1, CompressedSizeInfoColGroup g2, int joinedMaxDistinct) { throw new NotImplementedException(); diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java index 04a2248..9bcec29 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java +++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java @@ -190,6 +190,7 @@ public class CompressedSizeInfoColGroup { private static long getCompressionSize(int numCols, CompressionType ct, EstimationFactors fact) { int nv; switch(ct) { + case DeltaDDC: // TODO add proper extraction case DDC: nv = fact.numVals + (fact.zeroIsMostFrequent ? 1 : 0); // + 1 if the column contains zero diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/IEncode.java b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/IEncode.java index fc991ff..dfcda0c 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/IEncode.java +++ b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/IEncode.java @@ -54,6 +54,14 @@ public interface IEncode { return createWithReader(m, rowCols, transposed); } + public static IEncode createFromMatrixBlockDelta(MatrixBlock m, boolean transposed, int[] rowCols){ + return createFromMatrixBlockDelta(m, transposed, rowCols, transposed ? m.getNumColumns() : m.getNumRows()); + } + + public static IEncode createFromMatrixBlockDelta(MatrixBlock m, boolean transposed, int[] rowCols, int nVals){ + throw new NotImplementedException(); + } + public static IEncode createFromMatrixBlock(MatrixBlock m, boolean transposed, int rowCol) { if(m.isEmpty()) return new EmptyEncoding(); diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupDeltaDDCTest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupDeltaDDCTest.java new file mode 100644 index 0000000..3520cc4 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupDeltaDDCTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.compress.colgroup; + +import java.util.EnumSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.compress.CompressionSettings; +import org.apache.sysds.runtime.compress.CompressionSettingsBuilder; +import org.apache.sysds.runtime.compress.colgroup.AColGroup; +import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory; +import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimatorExact; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo; +import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; +import org.apache.sysds.runtime.matrix.data.LibMatrixReorg; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.util.DataConverter; +import org.junit.Assert; +import org.junit.Test; + +public class ColGroupDeltaDDCTest { + + protected static final Log LOG = LogFactory.getLog(JolEstimateTest.class.getName()); + + @Test + public void testDecompressToDenseBlockSingleColumn() { + testDecompressToDenseBlock(new double[][] {{1, 2, 3, 4, 5}}, true); + } + + @Test + public void testDecompressToDenseBlockSingleColumnTransposed() { + testDecompressToDenseBlock(new double[][] {{1}, {2}, {3}, {4}, {5}}, false); + } + + @Test + public void testDecompressToDenseBlockTwoColumns() { + testDecompressToDenseBlock(new double[][] {{1, 1}, {2, 1}, {3, 1}, {4, 1}, {5, 1}}, false); + } + + @Test + public void testDecompressToDenseBlockTwoColumnsTransposed() { + testDecompressToDenseBlock(new double[][] {{1, 2, 3, 4, 5}, {1, 1, 1, 1, 1}}, true); + } + + public void testDecompressToDenseBlock(double[][] data, boolean isTransposed) { + MatrixBlock mbt = DataConverter.convertToMatrixBlock(data); + + final int numCols = isTransposed ? mbt.getNumRows() : mbt.getNumColumns(); + final int numRows = isTransposed ? mbt.getNumColumns() : mbt.getNumRows(); + int[] colIndexes = new int[numCols]; + for(int x = 0; x < numCols; x++) + colIndexes[x] = x; + + try { + CompressionSettings cs = new CompressionSettingsBuilder().setSamplingRatio(1.0) + .setValidCompressions(EnumSet.of(AColGroup.CompressionType.DeltaDDC)).create(); + cs.transposed = isTransposed; + + final CompressedSizeInfoColGroup cgi = new CompressedSizeEstimatorExact(mbt, cs) + .estimateCompressedColGroupSize(colIndexes); + CompressedSizeInfo csi = new CompressedSizeInfo(cgi); + AColGroup cg = ColGroupFactory.compressColGroups(mbt, csi, cs, 1).get(0); + + // Decompress to dense block + MatrixBlock ret = new MatrixBlock(numRows, numCols, false); + ret.allocateDenseBlock(); + cg.decompressToDenseBlock(ret.getDenseBlock(), 0, numRows); + + MatrixBlock expected = DataConverter.convertToMatrixBlock(data); + if(isTransposed) + LibMatrixReorg.transposeInPlace(expected, 1); + Assert.assertArrayEquals(expected.getDenseBlockValues(), ret.getDenseBlockValues(), 0.01); + + } + catch(Exception e) { + e.printStackTrace(); + throw new DMLRuntimeException("Failed construction : " + this.getClass().getSimpleName()); + } + } + +} diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateDeltaDDCTest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateDeltaDDCTest.java new file mode 100644 index 0000000..c8b90ef --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateDeltaDDCTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.compress.colgroup; + +import java.util.ArrayList; +import java.util.Collection; + +import org.apache.sysds.runtime.compress.colgroup.AColGroup; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.util.DataConverter; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(value = Parameterized.class) +public class JolEstimateDeltaDDCTest extends JolEstimateTest { + + @Parameterized.Parameters + public static Collection<Object[]> data() { + ArrayList<Object[]> tests = new ArrayList<>(); + + MatrixBlock mb; + + mb = DataConverter.convertToMatrixBlock(new double[][] {{0}}); + tests.add(new Object[] {mb}); + + mb = DataConverter.convertToMatrixBlock(new double[][] {{1}}); + tests.add(new Object[] {mb}); + + // TODO add reader that reads as if Delta encoded. + // then afterwards use this test. + + // mb = DataConverter.convertToMatrixBlock(new double[][] {{1, 2, 3, 4, 5}}); + // tests.add(new Object[] {mb}); + + // mb = DataConverter.convertToMatrixBlock(new double[][] {{1,2,3},{1,1,1}}); + // tests.add(new Object[] {mb}); + + // mb = DataConverter.convertToMatrixBlock(new double[][] {{1, 1}, {2, 1}, {3, 1}, {4, 1}, {5, 1}}); + // tests.add(new Object[] {mb}); + + // mb = TestUtils.generateTestMatrixBlock(2, 5, 0, 20, 1.0, 7); + // tests.add(new Object[] {mb}); + + return tests; + } + + public JolEstimateDeltaDDCTest(MatrixBlock mb) { + super(mb); + } + + @Override + public AColGroup.CompressionType getCT() { + return delta; + } +} diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java index dcecacb..c5219bf 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java +++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java @@ -49,6 +49,7 @@ public abstract class JolEstimateTest { protected static final Log LOG = LogFactory.getLog(JolEstimateTest.class.getName()); protected static final CompressionType ddc = CompressionType.DDC; + protected static final CompressionType delta = CompressionType.DeltaDDC; protected static final CompressionType ole = CompressionType.OLE; protected static final CompressionType rle = CompressionType.RLE; protected static final CompressionType sdc = CompressionType.SDC; diff --git a/src/test/java/org/apache/sysds/test/component/compress/dictionary/DeltaDictionaryTest.java b/src/test/java/org/apache/sysds/test/component/compress/dictionary/DeltaDictionaryTest.java new file mode 100644 index 0000000..69fc14b --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/compress/dictionary/DeltaDictionaryTest.java @@ -0,0 +1,105 @@ +package org.apache.sysds.test.component.compress.dictionary; + +import org.apache.sysds.runtime.compress.colgroup.dictionary.DeltaDictionary; +import org.apache.sysds.runtime.functionobjects.Divide; +import org.apache.sysds.runtime.functionobjects.Minus; +import org.apache.sysds.runtime.functionobjects.Multiply; +import org.apache.sysds.runtime.functionobjects.Plus; +import org.apache.sysds.runtime.matrix.operators.LeftScalarOperator; +import org.apache.sysds.runtime.matrix.operators.RightScalarOperator; +import org.apache.sysds.runtime.matrix.operators.ScalarOperator; +import org.junit.Assert; +import org.junit.Test; + +public class DeltaDictionaryTest { + + @Test + public void testScalarOpRightMultiplySingleColumn() { + double scalar = 2; + DeltaDictionary d = new DeltaDictionary(new double[] {1, 2}, 1); + ScalarOperator sop = new RightScalarOperator(Multiply.getMultiplyFnObject(), scalar, 1); + d = d.applyScalarOp(sop); + double[] expected = new double[] {2, 4}; + Assert.assertArrayEquals(expected, d.getValues(), 0.01); + } + + @Test + public void testScalarOpRightMultiplyTwoColumns() { + double scalar = 2; + DeltaDictionary d = new DeltaDictionary(new double[] {1, 2, 3, 4}, 2); + ScalarOperator sop = new RightScalarOperator(Multiply.getMultiplyFnObject(), scalar, 1); + d = d.applyScalarOp(sop); + double[] expected = new double[] {2, 4, 6, 8}; + Assert.assertArrayEquals(expected, d.getValues(), 0.01); + } + + @Test + public void testNegScalarOpRightMultiplyTwoColumns() { + double scalar = -2; + DeltaDictionary d = new DeltaDictionary(new double[] {1, 2, 3, 4}, 2); + ScalarOperator sop = new RightScalarOperator(Multiply.getMultiplyFnObject(), scalar, 1); + d = d.applyScalarOp(sop); + double[] expected = new double[] {-2, -4, -6, -8}; + Assert.assertArrayEquals(expected, d.getValues(), 0.01); + } + + @Test + public void testScalarOpLeftMultiplyTwoColumns() { + double scalar = 2; + DeltaDictionary d = new DeltaDictionary(new double[] {1, 2, 3, 4}, 2); + ScalarOperator sop = new LeftScalarOperator(Multiply.getMultiplyFnObject(), scalar, 1); + d = d.applyScalarOp(sop); + double[] expected = new double[] {2, 4, 6, 8}; + Assert.assertArrayEquals(expected, d.getValues(), 0.01); + } + + @Test + public void testScalarOpRightDivideTwoColumns() { + double scalar = 0.5; + DeltaDictionary d = new DeltaDictionary(new double[] {1, 2, 3, 4}, 2); + ScalarOperator sop = new RightScalarOperator(Divide.getDivideFnObject(), scalar, 1); + d = d.applyScalarOp(sop); + double[] expected = new double[] {2, 4, 6, 8}; + Assert.assertArrayEquals(expected, d.getValues(), 0.01); + } + + @Test + public void testScalarOpRightPlusSingleColumn() { + double scalar = 2; + DeltaDictionary d = new DeltaDictionary(new double[] {1, 2}, 1); + ScalarOperator sop = new RightScalarOperator(Plus.getPlusFnObject(), scalar, 1); + d = d.applyScalarOp(sop); + double[] expected = new double[] {3, 2}; + Assert.assertArrayEquals(expected, d.getValues(), 0.01); + } + + @Test + public void testScalarOpRightPlusTwoColumns() { + double scalar = 2; + DeltaDictionary d = new DeltaDictionary(new double[] {1, 2, 3, 4}, 2); + ScalarOperator sop = new RightScalarOperator(Plus.getPlusFnObject(), scalar, 1); + d = d.applyScalarOp(sop); + double[] expected = new double[] {3, 4, 3, 4}; + Assert.assertArrayEquals(expected, d.getValues(), 0.01); + } + + @Test + public void testScalarOpRightMinusTwoColumns() { + double scalar = 2; + DeltaDictionary d = new DeltaDictionary(new double[] {1, 2, 3, 4}, 2); + ScalarOperator sop = new RightScalarOperator(Minus.getMinusFnObject(), scalar, 1); + d = d.applyScalarOp(sop); + double[] expected = new double[] {-1, 0, 3, 4}; + Assert.assertArrayEquals(expected, d.getValues(), 0.01); + } + + @Test + public void testScalarOpLeftPlusTwoColumns() { + double scalar = 2; + DeltaDictionary d = new DeltaDictionary(new double[] {1, 2, 3, 4}, 2); + ScalarOperator sop = new LeftScalarOperator(Plus.getPlusFnObject(), scalar, 1); + d = d.applyScalarOp(sop); + double[] expected = new double[] {3, 4, 3, 4}; + Assert.assertArrayEquals(expected, d.getValues(), 0.01); + } +}
