This is an automated email from the ASF dual-hosted git repository. baunsgaard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/systemds.git
commit e8da72fa0c607fe191265a962f39b79420d06611 Author: baunsgaard <[email protected]> AuthorDate: Fri May 14 10:13:40 2021 +0200 [SYSTEMDS-2992] CLA init workload cost functions Initial version of cost based co-coding functions. This commit adds costs for compressed tsmm and compressed mm. --- .../runtime/compress/CompressedMatrixBlock.java | 2 + .../compress/CompressedMatrixBlockFactory.java | 17 +- .../runtime/compress/CompressionSettings.java | 8 +- .../compress/CompressionSettingsBuilder.java | 15 +- .../runtime/compress/CompressionStatistics.java | 18 +- .../runtime/compress/cocode/AColumnCoCoder.java | 6 +- .../runtime/compress/cocode/CoCodeBinPacking.java | 4 +- .../sysds/runtime/compress/cocode/CoCodeCost.java | 4 +- .../compress/cocode/CoCodeCostMatrixMult.java | 72 ++-- .../runtime/compress/cocode/CoCodeStatic.java | 4 +- .../runtime/compress/cocode/PlanningCoCoder.java | 16 +- .../sysds/runtime/compress/colgroup/AColGroup.java | 16 +- .../compress/colgroup/ColGroupCompressed.java | 8 +- .../runtime/compress/colgroup/ColGroupConst.java | 5 - .../runtime/compress/colgroup/ColGroupDDC.java | 5 +- .../runtime/compress/colgroup/ColGroupEmpty.java | 4 - .../runtime/compress/colgroup/ColGroupFactory.java | 102 ++--- .../runtime/compress/colgroup/ColGroupOLE.java | 7 +- .../runtime/compress/colgroup/ColGroupOffset.java | 10 +- .../runtime/compress/colgroup/ColGroupSDC.java | 2 +- .../compress/colgroup/ColGroupSDCSingle.java | 2 +- .../compress/colgroup/ColGroupSDCSingleZeros.java | 20 +- .../compress/colgroup/ColGroupSDCZeros.java | 16 +- .../runtime/compress/colgroup/ColGroupSizes.java | 57 ++- .../compress/colgroup/ColGroupUncompressed.java | 5 + .../runtime/compress/colgroup/ColGroupValue.java | 66 +-- .../compress/colgroup/dictionary/ADictionary.java | 36 +- .../compress/colgroup/dictionary/Dictionary.java | 22 + .../colgroup/dictionary/DictionaryFactory.java | 135 +++++- .../colgroup/dictionary/MatrixBlockDictionary.java | 456 +++++++++++++++++++++ .../compress/colgroup/dictionary/QDictionary.java | 10 + .../colgroup/dictionary/SparseDictionary.java | 218 ---------- .../compress/estim/CompressedSizeEstimator.java | 87 +++- .../estim/CompressedSizeEstimatorExact.java | 3 +- .../estim/CompressedSizeEstimatorFactory.java | 41 +- .../estim/CompressedSizeEstimatorSample.java | 38 +- .../compress/estim/CompressedSizeInfoColGroup.java | 27 +- .../sysds/runtime/compress/lib/BitmapEncoder.java | 56 +-- .../runtime/compress/lib/BitmapLossyEncoder.java | 30 +- .../runtime/compress/lib/CLALibLeftMultBy.java | 2 +- .../runtime/compress/lib/CLALibRightMultBy.java | 6 +- .../sysds/runtime/compress/lib/CLALibSquash.java | 11 +- .../sysds/runtime/compress/utils/Bitmap.java | 29 +- .../utils/{Bitmap.java => MultiColBitmap.java} | 43 +- .../org/apache/sysds/utils/MemoryEstimates.java | 2 + .../compress/AbstractCompressedUnaryTests.java | 3 - .../component/compress/CompressedTestBase.java | 7 +- .../compress/CompressibleInputGenerator.java | 15 + .../sysds/test/component/compress/TestBase.java | 13 +- .../test/component/compress/TestConstants.java | 1 + .../compress/colgroup/JolEstimateTest.java | 74 ++-- .../compress/estim/SampleEstimatorTest.java | 119 ++++++ 52 files changed, 1314 insertions(+), 661 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java index bb7781e..249ad3e 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java @@ -208,6 +208,7 @@ public class CompressedMatrixBlock extends MatrixBlock { ret.allocateDenseBlock(); decompress(ret); + ret.examSparsity(); if(DMLScript.STATISTICS || LOG.isDebugEnabled()) { double t = time.stop(); LOG.debug("decompressed block w/ k=" + 1 + " in " + t + "ms."); @@ -256,6 +257,7 @@ public class CompressedMatrixBlock extends MatrixBlock { ret.allocateDenseBlock(); decompress(ret, k); + ret.examSparsity(); if(DMLScript.STATISTICS || LOG.isDebugEnabled()) { double t = time.stop(); LOG.debug("decompressed block w/ k=" + k + " in " + time.stop() + "ms."); diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java index bb0cf8a..1756018 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java @@ -28,6 +28,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.sysds.runtime.compress.cocode.PlanningCoCoder; +import org.apache.sysds.runtime.compress.cocode.PlanningCoCoder.PartitionerType; import org.apache.sysds.runtime.compress.colgroup.AColGroup; import org.apache.sysds.runtime.compress.colgroup.ColGroupConst; import org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty; @@ -161,18 +162,18 @@ public class CompressedMatrixBlockFactory { _stats.estimatedSizeCols = sizeInfos.memoryEstimate(); logPhase(); - long memoryEstimate = sizeInfos.memoryEstimate(); - if(memoryEstimate < _stats.originalSize) + if(_stats.estimatedSizeCols < _stats.originalSize || compSettings.columnPartitioner == PartitionerType.COST_MATRIX_MULT) coCodePhase(sizeEstimator, sizeInfos, mb.getNumRows()); else { - LOG.info("Estimated Size of singleColGroups: " + memoryEstimate); + LOG.info("Estimated Size of singleColGroups: " + _stats.estimatedSizeCols); LOG.info("Original size : " + _stats.originalSize); } } private void coCodePhase(CompressedSizeEstimator sizeEstimator, CompressedSizeInfo sizeInfos, int numRows) { coCodeColGroups = PlanningCoCoder.findCoCodesByPartitioning(sizeEstimator, sizeInfos, numRows, k, compSettings); + _stats.estimatedSizeCoCoded = coCodeColGroups.memoryEstimate(); logPhase(); } @@ -206,6 +207,7 @@ public class CompressedMatrixBlockFactory { private void compressPhase() { res.allocateColGroupList(ColGroupFactory.compressColGroups(mb, coCodeColGroups, compSettings, k)); + _stats.compressedInitialSize = res.getInMemorySize(); logPhase(); } @@ -229,6 +231,7 @@ public class CompressedMatrixBlockFactory { o.add(combineConst(c)); res.allocateColGroupList(o); + logPhase(); } @@ -276,12 +279,11 @@ public class CompressedMatrixBlockFactory { private void cleanupPhase() { res.cleanupBlock(true, true); - mb.cleanupBlock(true, true); _stats.size = res.estimateCompressedSizeInMemory(); final double ratio = _stats.getRatio(); - if(ratio < 1) { + if(ratio < 1 && compSettings.columnPartitioner != PartitionerType.COST_MATRIX_MULT) { LOG.info("--dense size: " + _stats.denseSize); LOG.info("--original size: " + _stats.originalSize); LOG.info("--compressed size: " + _stats.size); @@ -291,6 +293,8 @@ public class CompressedMatrixBlockFactory { return; } + mb.cleanupBlock(true, true); + _stats.setColGroupsCounts(res.getColGroups()); logPhase(); @@ -311,10 +315,12 @@ public class CompressedMatrixBlockFactory { switch(phase) { case 0: LOG.debug("--compression phase " + phase + " Classify : " + getLastTimePhase()); + LOG.debug("--Individual Columns Estimated Compression: " + _stats.estimatedSizeCols); break; case 1: LOG.debug("--compression phase " + phase + " Grouping : " + getLastTimePhase()); LOG.debug("Grouping using: " + compSettings.columnPartitioner); + LOG.debug("--Cocoded Columns estimated Compression:" + _stats.estimatedSizeCoCoded); break; case 2: LOG.debug("--compression phase " + phase + " Transpose : " + getLastTimePhase()); @@ -324,6 +330,7 @@ public class CompressedMatrixBlockFactory { LOG.debug("--compression phase " + phase + " Compress : " + getLastTimePhase()); LOG.debug("--compression Hash collisions:" + DblArrayIntListHashMap.hashMissCount); DblArrayIntListHashMap.hashMissCount = 0; + LOG.debug("--compressed initial actual size:" + _stats.compressedInitialSize); break; case 4: LOG.debug("--compression phase " + phase + " Share : " + getLastTimePhase()); diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java index ddbc60b..895600d 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java @@ -97,10 +97,15 @@ public class CompressionSettings { */ public final EnumSet<CompressionType> validCompressions; + /** + * The minimum size of the sample extracted. + */ + public final int minimumSampleSize; + protected CompressionSettings(double samplingRatio, boolean allowSharedDictionary, String transposeInput, boolean skipList, int seed, boolean investigateEstimate, boolean lossy, EnumSet<CompressionType> validCompressions, boolean sortValuesByLength, PartitionerType columnPartitioner, - int maxColGroupCoCode, double coCodePercentage) { + int maxColGroupCoCode, double coCodePercentage, int minimumSampleSize) { this.samplingRatio = samplingRatio; this.allowSharedDictionary = allowSharedDictionary; this.transposeInput = transposeInput; @@ -113,6 +118,7 @@ public class CompressionSettings { this.columnPartitioner = columnPartitioner; this.maxColGroupCoCode = maxColGroupCoCode; this.coCodePercentage = coCodePercentage; + this.minimumSampleSize = minimumSampleSize; LOG.debug(this); } diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java index 216267e..83d01e5 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java @@ -43,6 +43,7 @@ public class CompressionSettingsBuilder { private PartitionerType columnPartitioner; private int maxStaticColGroupCoCode = 10000; private double coCodePercentage = 0.01; + private int minimumSampleSize = 2000; private final static double defaultSampleRate = 0.01; @@ -255,6 +256,18 @@ public class CompressionSettingsBuilder { } /** + * Set the minimum sample size to extract from a given matrix, this overrules the sample percentage if the sample + * percentage extracted is lower than this minimum bound. + * + * @param minimumSampleSize The minimum sample size to extract + * @return The CompressionSettingsBuilder + */ + public CompressionSettingsBuilder setMinimumSampleSize(int minimumSampleSize) { + this.minimumSampleSize = minimumSampleSize; + return this; + } + + /** * Create the CompressionSettings object to use in the compression. * * @return The CompressionSettings @@ -262,6 +275,6 @@ public class CompressionSettingsBuilder { public CompressionSettings create() { return new CompressionSettings(samplingRatio, allowSharedDictionary, transposeInput, skipList, seed, investigateEstimate, lossy, validCompressions, sortValuesByLength, columnPartitioner, - maxStaticColGroupCoCode, coCodePercentage); + maxStaticColGroupCoCode, coCodePercentage, minimumSampleSize); } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java index 5c7d815..466953a 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java @@ -31,10 +31,16 @@ import org.apache.sysds.runtime.compress.colgroup.AColGroup; */ public class CompressionStatistics { + // sizes while compressing + public long estimatedSizeCoCoded; + public long estimatedSizeCols; + public long compressedInitialSize; + + // sizes before compression public long originalSize; public long denseSize; - public long estimatedSizeColGroups; - public long estimatedSizeCols; + + // compressed size public long size; private Map<String, int[]> colGroupCounts; @@ -74,20 +80,20 @@ public class CompressionStatistics { for(String ctKey : colGroupCounts.keySet()) sb.append(ctKey + ":" + colGroupCounts.get(ctKey)[0] + " "); - + return sb.toString(); } public String getGroupsSizesString() { StringBuilder sb = new StringBuilder(); - for(String ctKey : colGroupCounts.keySet()) + for(String ctKey : colGroupCounts.keySet()) sb.append(ctKey + ":" + colGroupCounts.get(ctKey)[1] + " "); - + return sb.toString(); } - public double getRatio(){ + public double getRatio() { return (double) originalSize / size; } diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java index 1ede652..124e3a4 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java @@ -32,12 +32,10 @@ public abstract class AColumnCoCoder { final protected CompressedSizeEstimator _est; final protected CompressionSettings _cs; - // final protected int _numRows; - protected AColumnCoCoder(CompressedSizeEstimator sizeEstimator, CompressionSettings cs, int numRows) { + protected AColumnCoCoder(CompressedSizeEstimator sizeEstimator, CompressionSettings cs) { _est = sizeEstimator; _cs = cs; - // _numRows = numRows; } /** @@ -57,7 +55,7 @@ public abstract class AColumnCoCoder { protected CompressedSizeInfoColGroup joinWithAnalysis(CompressedSizeInfoColGroup lhs, CompressedSizeInfoColGroup rhs) { int[] joined = Util.join(lhs.getColumns(), rhs.getColumns()); - return _est.estimateCompressedColGroupSize(joined); + return _est.estimateCompressedColGroupSize(joined,( lhs.getNumVals() + 1) * (rhs.getNumVals() + 1)); } protected CompressedSizeInfoColGroup joinWithoutAnalysis(CompressedSizeInfoColGroup lhs, diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java index 0116edc..e6dace1 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java @@ -46,8 +46,8 @@ public class CoCodeBinPacking extends AColumnCoCoder { */ public static double BIN_CAPACITY = 0.000032; - protected CoCodeBinPacking(CompressedSizeEstimator sizeEstimator, CompressionSettings cs, int numRows) { - super(sizeEstimator, cs, numRows); + protected CoCodeBinPacking(CompressedSizeEstimator sizeEstimator, CompressionSettings cs) { + super(sizeEstimator, cs); } @Override diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeCost.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeCost.java index 66ad209..3f7185a 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeCost.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeCost.java @@ -49,8 +49,8 @@ public class CoCodeCost extends AColumnCoCoder { private final static int toSmallForAnalysis = 64; - protected CoCodeCost(CompressedSizeEstimator sizeEstimator, CompressionSettings cs, int numRows) { - super(sizeEstimator, cs, numRows); + protected CoCodeCost(CompressedSizeEstimator sizeEstimator, CompressionSettings cs) { + super(sizeEstimator, cs); largestDistinct = Math.min(4096, Math.max(256, (int) (sizeEstimator.getNumRows() * cs.coCodePercentage))); } diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeCostMatrixMult.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeCostMatrixMult.java index 910c94a..836e4d0 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeCostMatrixMult.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeCostMatrixMult.java @@ -26,28 +26,35 @@ import java.util.PriorityQueue; import java.util.Queue; import org.apache.sysds.runtime.compress.CompressionSettings; -import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType; import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator; +import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimatorSample; import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo; import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; -/** - * Column group partitioning by number of distinct items estimated. This allows us to join columns based on the worst - * case estimate of the joined sizes. Then once we decide to join, if the worst case is okay, we then analyze the actual - * cardinality of the join. - * - * This method allows us to compress many more columns than the BinPacking - * - */ public class CoCodeCostMatrixMult extends AColumnCoCoder { - protected CoCodeCostMatrixMult(CompressedSizeEstimator e, CompressionSettings cs, int numRows) { - super(e, cs, numRows); + protected CoCodeCostMatrixMult(CompressedSizeEstimator e, CompressionSettings cs) { + super(e, cs); } @Override protected CompressedSizeInfo coCodeColumns(CompressedSizeInfo colInfos, int k) { - colInfos.setInfo(join(colInfos.getInfo())); + + List<CompressedSizeInfoColGroup> joinRes = join(colInfos.getInfo()); + + if(_cs.samplingRatio < 0.1 && _est instanceof CompressedSizeEstimatorSample) { + LOG.debug("Performing second join with double sample rate"); + CompressedSizeEstimatorSample estS = (CompressedSizeEstimatorSample) _est; + estS.sampleData(estS.getSample().getNumRows() * 2); + List<int[]> colG = new ArrayList<>(joinRes.size()); + for(CompressedSizeInfoColGroup g : joinRes) + colG.add(g.getColumns()); + + joinRes = join(estS.computeCompressedSizeInfos(colG, k)); + } + + colInfos.setInfo(joinRes); + return colInfos; } @@ -66,6 +73,8 @@ public class CoCodeCostMatrixMult extends AColumnCoCoder { final CostOfJoin r = que.poll(); final double costIndividual = (l.cost + r.cost); final CostOfJoin g = new CostOfJoin(joinWithAnalysis(l.elm, r.elm)); + if(LOG.isDebugEnabled()) + LOG.debug("\nl: " + l + "\nr: " + r + "\njoined: " + g); if(g.cost < costIndividual) que.add(g); else { @@ -81,6 +90,7 @@ public class CoCodeCostMatrixMult extends AColumnCoCoder { else break; } + for(CostOfJoin g : que) ret.add(g.elm); @@ -94,30 +104,16 @@ public class CoCodeCostMatrixMult extends AColumnCoCoder { protected CostOfJoin(CompressedSizeInfoColGroup elm) { this.elm = elm; - final double constantOverheadForColGroup = 5; - final double nCols = elm.getColumns().length; + final int nCols = elm.getColumns().length; final double nRows = _est.getNumRows(); - if(elm.getBestCompressionType() == CompressionType.UNCOMPRESSED) - this.cost = nRows * nCols * 2 + constantOverheadForColGroup; - else { - final int blksz = CompressionSettings.BITMAP_BLOCK_SZ; - - // LOG.error(constantOverheadForColGroup); - final double commonFraction = elm.getMostCommonFraction(); - final double rowsCost = commonFraction > 0.2 ? nRows * (1 - commonFraction) : nRows; - // this.cost = rowsToProcess + elm.getNumVals() * nCols + constantOverheadForColGroup; - // this.cost = rowsToProcess + elm.getNumVals() * nCols * (1 - commonFraction) + - // constantOverheadForColGroup; - // final double sparsity_tuple_effect = elm.getTupleSparsity() > 0.4 ? 1 - - // Math.min(elm.getTupleSparsity(), 0.9) : 1; - final int numberTuples = elm.getNumVals(); - final double tuplesCost = (numberTuples < blksz) ? numberTuples : numberTuples * 2; - - // this.cost = elementsCost; - // this.cost = rowsCost + tuplesCost * sparsity_tuple_effect + constantOverheadForColGroup; - - this.cost = rowsCost + tuplesCost + constantOverheadForColGroup; - } + final double preAggregateCost = nRows; + + final int numberTuples = elm.getNumVals(); + final double tupleSparsity = elm.getTupleSparsity(); + final double postScalingCost = (nCols > 1 && elm.getTupleSparsity() > 0.4) ? numberTuples * + nCols : numberTuples * nCols * tupleSparsity; + + this.cost = preAggregateCost + postScalingCost; } @Override @@ -128,10 +124,14 @@ public class CoCodeCostMatrixMult extends AColumnCoCoder { @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("\n"); sb.append(cost); sb.append(" - "); + sb.append(elm.getBestCompressionType()); + sb.append(" nrVals: "); + sb.append(elm.getNumVals()); + sb.append(" "); sb.append(Arrays.toString(elm.getColumns())); + return sb.toString(); } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeStatic.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeStatic.java index 06e01b4..674a819 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeStatic.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeStatic.java @@ -29,8 +29,8 @@ import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo; */ public class CoCodeStatic extends AColumnCoCoder { - protected CoCodeStatic(CompressedSizeEstimator sizeEstimator, CompressionSettings cs, int numRows) { - super(sizeEstimator, cs, numRows); + protected CoCodeStatic(CompressedSizeEstimator sizeEstimator, CompressionSettings cs) { + super(sizeEstimator, cs); } @Override diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/PlanningCoCoder.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/PlanningCoCoder.java index 94572c9..3bae0e5 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/PlanningCoCoder.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/PlanningCoCoder.java @@ -67,7 +67,7 @@ public class PlanningCoCoder { List<CompressedSizeInfoColGroup> newGroups = new ArrayList<>(); mem = new Memorizer(); for(CompressedSizeInfoColGroup g : colInfos.getInfo()) { - if(g.getBestCompressionType() == CompressionType.CONST) + if(g.getBestCompressionType(cs) == CompressionType.CONST) constantGroups.add(g); else { mem.put(g); @@ -93,13 +93,13 @@ public class PlanningCoCoder { CompressionSettings cs, int numRows) { switch(type) { case BIN_PACKING: - return new CoCodeBinPacking(est, cs, numRows); + return new CoCodeBinPacking(est, cs); case STATIC: - return new CoCodeStatic(est, cs, numRows); + return new CoCodeStatic(est, cs); case COST: - return new CoCodeCost(est, cs, numRows); + return new CoCodeCost(est, cs); case COST_MATRIX_MULT: - return new CoCodeCostMatrixMult(est, cs, numRows); + return new CoCodeCostMatrixMult(est, cs); default: throw new RuntimeException("Unsupported column group partitioner: " + type.toString()); } @@ -186,7 +186,7 @@ public class PlanningCoCoder { break; } - LOG.error(mem.stats()); + LOG.debug(mem.stats()); mem.resetStats(); List<CompressedSizeInfoColGroup> ret = new ArrayList<>(workset.size()); @@ -226,9 +226,9 @@ public class PlanningCoCoder { if(g == null) { final CompressedSizeInfoColGroup left = mem.get(new ColIndexes(c1)); final CompressedSizeInfoColGroup right = mem.get(new ColIndexes(c2)); - final boolean leftConst = left.getBestCompressionType() == CompressionType.CONST && + final boolean leftConst = left.getBestCompressionType(cs) == CompressionType.CONST && left.getNumOffs() == 0; - final boolean rightConst = right.getBestCompressionType() == CompressionType.CONST && + final boolean rightConst = right.getBestCompressionType(cs) == CompressionType.CONST && right.getNumOffs() == 0; if(leftConst) g = CompressedSizeInfoColGroup.addConstGroup(c, right, cs.validCompressions); diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java index ddc6195..6b74136 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java @@ -32,6 +32,7 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator; import org.apache.sysds.runtime.matrix.operators.BinaryOperator; import org.apache.sysds.runtime.matrix.operators.ScalarOperator; +import org.apache.sysds.utils.MemoryEstimates; import edu.emory.mathcs.backport.java.util.Arrays; @@ -143,6 +144,15 @@ public abstract class AColGroup implements Serializable { public abstract int getNumRows(); /** + * Obtain number of distinct tuples in contained sets of values associated with this column group. + * + * If the column group is uncompressed the number or rows is returned. + * + * @return the number of distinct sets of values associated with the bitmaps in this column group + */ + public abstract int getNumValues(); + + /** * Obtain the number of columns in this column group. * * @return number of columns in this column group @@ -183,7 +193,11 @@ public abstract class AColGroup implements Serializable { * * @return an upper bound on the number of bytes used to store this ColGroup in memory. */ - public abstract long estimateInMemorySize(); + public long estimateInMemorySize(){ + long size = 16; // object header + size += MemoryEstimates.intArrayCost(_colIndexes.length); + return size; + } /** * Decompress the contents of this column group into the specified full matrix block. diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupCompressed.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupCompressed.java index c5d29ba..3b598e6 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupCompressed.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupCompressed.java @@ -58,8 +58,6 @@ public abstract class ColGroupCompressed extends AColGroup { _numRows = numRows; } - public abstract int getNumValues(); - public abstract double[] getValues(); public abstract void addMinMax(double[] ret); @@ -147,4 +145,10 @@ public abstract class ColGroupCompressed extends AColGroup { return _numRows; } + @Override + public long estimateInMemorySize() { + long size = super.estimateInMemorySize(); + size += 4; + return size; + } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java index 8f6299c..e3c2965 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java @@ -113,11 +113,6 @@ public class ColGroupConst extends ColGroupValue { } @Override - public long estimateInMemorySize() { - return ColGroupSizes.estimateInMemorySizeCONST(getNumCols(), getNumValues(), isLossy()); - } - - @Override public void decompressToBlockSafe(MatrixBlock target, int rl, int ru, int offT, double[] values) { decompressToBlockUnSafe(target, rl, ru, offT, values); target.setNonZeros(_colIndexes.length * target.getNumRows() + target.getNonZeros()); diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java index df45f56..fa967da 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java @@ -555,8 +555,9 @@ public class ColGroupDDC extends ColGroupValue { @Override public long estimateInMemorySize() { - return ColGroupSizes.estimateInMemorySizeDDC(getNumCols(), getNumValues(), _numRows, isLossy()); - + long size = super.estimateInMemorySize(); + size += _data.getInMemorySize(); + return size; } @Override diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupEmpty.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupEmpty.java index c873b1a..be33491 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupEmpty.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupEmpty.java @@ -83,10 +83,6 @@ public class ColGroupEmpty extends ColGroupCompressed { return ColGroupType.EMPTY; } - @Override - public long estimateInMemorySize() { - return ColGroupSizes.estimateInMemorySizeEMPTY(getNumCols()); - } @Override public void decompressToBlockSafe(MatrixBlock target, int rl, int ru, int offT, double[] values) { diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java index 62acf71..ffcc2fb 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java @@ -34,9 +34,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.sysds.runtime.DMLCompressionException; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.compress.CompressionSettings; +import org.apache.sysds.runtime.compress.cocode.PlanningCoCoder.PartitionerType; import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType; import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary; -import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary; +import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory; import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData; import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory; import org.apache.sysds.runtime.compress.colgroup.tree.AInsertionSorter; @@ -47,7 +48,6 @@ import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo; import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; import org.apache.sysds.runtime.compress.lib.BitmapEncoder; import org.apache.sysds.runtime.compress.utils.ABitmap; -import org.apache.sysds.runtime.compress.utils.Bitmap; import org.apache.sysds.runtime.compress.utils.IntArrayList; import org.apache.sysds.runtime.data.SparseBlock; import org.apache.sysds.runtime.matrix.data.MatrixBlock; @@ -155,8 +155,9 @@ public class ColGroupFactory { private static Collection<AColGroup> compressColGroup(MatrixBlock in, int[] colIndexes, CompressionSettings compSettings) { - if(in.isInSparseFormat() && compSettings.transposed) { - + if(in.isEmpty()) + return Collections.singletonList(new ColGroupEmpty(colIndexes, compSettings.transposed ? in.getNumColumns(): in.getNumRows())); + else if(in.isInSparseFormat() && compSettings.transposed) { final SparseBlock sb = in.getSparseBlock(); for(int col : colIndexes) if(sb.isEmpty(col)) @@ -191,7 +192,7 @@ public class ColGroupFactory { private static AColGroup compressColGroupForced(MatrixBlock in, int[] colIndexes, CompressionSettings compSettings) { - + ABitmap ubm = BitmapEncoder.extractBitmap(colIndexes, in, compSettings.transposed); CompressedSizeEstimator estimator = new CompressedSizeEstimatorExact(in, compSettings); @@ -200,7 +201,8 @@ public class ColGroupFactory { estimator.estimateCompressedColGroupSize(ubm, colIndexes), compSettings.validCompressions); int numRows = compSettings.transposed ? in.getNumColumns() : in.getNumRows(); - return compress(colIndexes, numRows, ubm, sizeInfo.getBestCompressionType(), compSettings, in); + return compress(colIndexes, numRows, ubm, sizeInfo.getBestCompressionType(compSettings), compSettings, in, + sizeInfo.getTupleSparsity()); } // private static AColGroup compressColGroupCorrecting(MatrixBlock in, @@ -275,17 +277,21 @@ public class ColGroupFactory { * @param compType The CompressionType selected * @param cs The compression Settings used for the given compression * @param rawMatrixBlock The copy of the original input (maybe transposed) MatrixBlock + * @param tupleSparsity The sparsity of the ubs entries. * @return A Compressed ColGroup */ public static AColGroup compress(int[] colIndexes, int rlen, ABitmap ubm, CompressionType compType, - CompressionSettings cs, MatrixBlock rawMatrixBlock) { + CompressionSettings cs, MatrixBlock rawMatrixBlock, double tupleSparsity) { + + if(compType == CompressionType.UNCOMPRESSED && cs.columnPartitioner == PartitionerType.COST_MATRIX_MULT) + compType = CompressionType.DDC; final IntArrayList[] of = ubm.getOffsetList(); if(of == null) return new ColGroupEmpty(colIndexes, rlen); else if(of.length == 1 && of[0].size() == rlen) - return new ColGroupConst(colIndexes, rlen, ADictionary.getDictionary(ubm)); + return new ColGroupConst(colIndexes, rlen, DictionaryFactory.create(ubm)); if(LOG.isTraceEnabled()) LOG.trace("compressing to: " + compType); @@ -295,13 +301,13 @@ public class ColGroupFactory { switch(compType) { case DDC: - return compressDDC(colIndexes, rlen, ubm, cs); + return compressDDC(colIndexes, rlen, ubm, cs, tupleSparsity); case RLE: - return compressRLE(colIndexes, rlen, ubm, cs); + return compressRLE(colIndexes, rlen, ubm, cs, tupleSparsity); case OLE: - return compressOLE(colIndexes, rlen, ubm, cs); + return compressOLE(colIndexes, rlen, ubm, cs, tupleSparsity); case SDC: - return compressSDC(colIndexes, rlen, ubm, cs); + return compressSDC(colIndexes, rlen, ubm, cs, tupleSparsity); case UNCOMPRESSED: return new ColGroupUncompressed(colIndexes, rawMatrixBlock, cs.transposed); default: @@ -316,7 +322,8 @@ public class ColGroupFactory { } } - private static AColGroup compressSDC(int[] colIndexes, int rlen, ABitmap ubm, CompressionSettings cs) { + private static AColGroup compressSDC(int[] colIndexes, int rlen, ABitmap ubm, CompressionSettings cs, + double tupleSparsity) { int numZeros = (int) ((long) rlen - (int) ubm.getNumOffsets()); int largestOffset = 0; @@ -330,17 +337,17 @@ public class ColGroupFactory { index++; } AColGroup cg; - ADictionary dict = new Dictionary(((Bitmap) ubm).getValues()); + ADictionary dict = DictionaryFactory.create(ubm, tupleSparsity); if(numZeros >= largestOffset && ubm.getOffsetList().length == 1) cg = new ColGroupSDCSingleZeros(colIndexes, rlen, dict, ubm.getOffsetList()[0].extractValues(true), null); else if(ubm.getOffsetList().length == 1) {// todo - dict = moveFrequentToLastDictionaryEntry(dict, ubm, rlen, largestIndex); + dict = DictionaryFactory.moveFrequentToLastDictionaryEntry(dict, ubm, rlen, largestIndex); cg = setupSingleValueSDCColGroup(colIndexes, rlen, ubm, dict); } else if(numZeros >= largestOffset) cg = setupMultiValueZeroColGroup(colIndexes, ubm, rlen, dict); else { - dict = moveFrequentToLastDictionaryEntry(dict, ubm, rlen, largestIndex); + dict = DictionaryFactory.moveFrequentToLastDictionaryEntry(dict, ubm, rlen, largestIndex); cg = setupMultiValueColGroup(colIndexes, numZeros, largestOffset, ubm, rlen, largestIndex, dict); } return cg; @@ -397,62 +404,17 @@ public class ColGroupFactory { return new ColGroupSDCSingle(colIndexes, numRows, dict, _indexes, null); } - private static ADictionary moveFrequentToLastDictionaryEntry(ADictionary dict, ABitmap ubm, int numRows, - int largestIndex) { - final double[] dictValues = dict.getValues(); - final int zeros = numRows - (int) ubm.getNumOffsets(); - final int nCol = ubm.getNumColumns(); - final int offsetToLargest = largestIndex * nCol; - - if(zeros == 0) { - final double[] swap = new double[nCol]; - System.arraycopy(dictValues, offsetToLargest, swap, 0, nCol); - for(int i = offsetToLargest; i < dictValues.length - nCol; i++) { - dictValues[i] = dictValues[i + nCol]; - } - System.arraycopy(swap, 0, dictValues, dictValues.length - nCol, nCol); - return dict; - } - - final int largestIndexSize = ubm.getOffsetsList(largestIndex).size(); - final double[] newDict = new double[dictValues.length + nCol]; - - if(zeros > largestIndexSize) - System.arraycopy(dictValues, 0, newDict, 0, dictValues.length); - else { - System.arraycopy(dictValues, 0, newDict, 0, offsetToLargest); - System.arraycopy(dictValues, offsetToLargest + nCol, newDict, offsetToLargest, - dictValues.length - offsetToLargest - nCol); - System.arraycopy(dictValues, offsetToLargest, newDict, newDict.length - nCol, nCol); - } - return new Dictionary(newDict); - } - - private static AColGroup compressDDC(int[] colIndexes, int rlen, ABitmap ubm, CompressionSettings cs) { + private static AColGroup compressDDC(int[] colIndexes, int rlen, ABitmap ubm, CompressionSettings cs, + double tupleSparsity) { boolean _zeros = ubm.getNumOffsets() < (long) rlen; - ADictionary dict = ADictionary.getDictionary(ubm); - double[] values = dict.getValues(); - if(_zeros) { - double[] appendedZero = new double[values.length + colIndexes.length]; - System.arraycopy(values, 0, appendedZero, 0, values.length); - dict = new Dictionary(appendedZero); - } - else - dict = new Dictionary(values); - + ADictionary dict = (_zeros) ? DictionaryFactory.createWithAppendedZeroTuple(ubm, + tupleSparsity) : DictionaryFactory.create(ubm, tupleSparsity); int numVals = ubm.getNumValues(); AMapToData _data = MapToFactory.create(rlen, numVals + (_zeros ? 1 : 0)); if(_zeros) _data.fill(numVals); - // for(int i = 0; i < numVals; i++) { - // int[] tmpList = ubm.getOffsetsList(i).extractValues(); - // int tmpListSize = ubm.getNumOffsets(i); - // for(int k = 0; k < tmpListSize; k++) - // _data[tmpList[k]] = (char) i; - // } - for(int i = 0; i < numVals; i++) { IntArrayList tmpList = ubm.getOffsetsList(i); final int sz = tmpList.size(); @@ -464,9 +426,10 @@ public class ColGroupFactory { } - private static AColGroup compressOLE(int[] colIndexes, int rlen, ABitmap ubm, CompressionSettings cs) { + private static AColGroup compressOLE(int[] colIndexes, int rlen, ABitmap ubm, CompressionSettings cs, + double tupleSparsity) { - ADictionary dict = ADictionary.getDictionary(ubm); + ADictionary dict = DictionaryFactory.create(ubm, tupleSparsity); ColGroupOLE ole = new ColGroupOLE(rlen); final int numVals = ubm.getNumValues(); @@ -485,9 +448,10 @@ public class ColGroupFactory { return ole; } - private static AColGroup compressRLE(int[] colIndexes, int rlen, ABitmap ubm, CompressionSettings cs) { + private static AColGroup compressRLE(int[] colIndexes, int rlen, ABitmap ubm, CompressionSettings cs, + double tupleSparsity) { - ADictionary dict = ADictionary.getDictionary(ubm); + ADictionary dict = DictionaryFactory.create(ubm, tupleSparsity); ColGroupRLE rle = new ColGroupRLE(rlen); // compress the bitmaps final int numVals = ubm.getNumValues(); diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java index 2df2878..bb4f325 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java @@ -329,11 +329,6 @@ public class ColGroupOLE extends ColGroupOffset { } @Override - public long estimateInMemorySize() { - return ColGroupSizes.estimateInMemorySizeOLE(getNumCols(), getNumValues(), _data.length, _numRows, isLossy()); - } - - @Override public AColGroup scalarOperation(ScalarOperator op) { double val0 = op.executeScalar(0); @@ -1209,7 +1204,7 @@ public class ColGroupOLE extends ColGroupOffset { } @Override - public Dictionary preAggregateThatSDCSingleStructure(ColGroupSDCSingle that, Dictionary ret, boolean preModified){ + public Dictionary preAggregateThatSDCSingleStructure(ColGroupSDCSingle that, Dictionary ret, boolean preModified) { throw new NotImplementedException(); } diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOffset.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOffset.java index d33e816..7b7c595 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOffset.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOffset.java @@ -27,6 +27,7 @@ import java.util.Arrays; import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary; import org.apache.sysds.runtime.compress.utils.LinearAlgebraUtils; import org.apache.sysds.runtime.functionobjects.Builtin; +import org.apache.sysds.utils.MemoryEstimates; /** * Base class for column groups encoded with various types of bitmap encoding. @@ -75,11 +76,10 @@ public abstract class ColGroupOffset extends ColGroupValue { @Override public long estimateInMemorySize() { - // Could use a ternary operator, but it looks odd with our code formatter here. - - return ColGroupSizes.estimateInMemorySizeOffset(getNumCols(), getNumValues(), _ptr.length, _data.length, - isLossy()); - + long size = super.estimateInMemorySize(); + size += MemoryEstimates.intArrayCost(_ptr.length); + size += MemoryEstimates.charArrayCost(_data.length); + return size; } protected final void sumAllValues(double[] b, double[] c) { diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDC.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDC.java index e449259..d2cebf5 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDC.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDC.java @@ -330,7 +330,7 @@ public class ColGroupSDC extends ColGroupValue { @Override public long estimateInMemorySize() { - long size = ColGroupSizes.estimateInMemorySizeGroupValue(_colIndexes.length, getNumValues(), isLossy()); + long size = super.estimateInMemorySize(); size += _indexes.getInMemorySize(); size += _data.getInMemorySize(); return size; diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingle.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingle.java index 58649c3..1424072 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingle.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingle.java @@ -314,7 +314,7 @@ public class ColGroupSDCSingle extends ColGroupValue { @Override public long estimateInMemorySize() { - long size = ColGroupSizes.estimateInMemorySizeGroupValue(_colIndexes.length, getNumValues(), isLossy()); + long size = super.estimateInMemorySize(); size += _indexes.getInMemorySize(); return size; } diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingleZeros.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingleZeros.java index 94be81c..997c42a 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingleZeros.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingleZeros.java @@ -187,10 +187,18 @@ public class ColGroupSDCSingleZeros extends ColGroupValue { final double vals = _dict.aggregateTuples(builtin, _colIndexes.length)[0]; final AIterator it = _indexes.getIterator(); it.skipTo(rl); - while(it.hasNext() && it.value() < ru) { - final int idx = it.value(); - it.next(); - c[idx] = builtin.execute(c[idx], vals); + int rix = rl; + for(; rix < ru && it.hasNext(); rix++) { + if(it.value() != rix) + c[rix] = builtin.execute(c[rix], 0); + else { + c[rix] = builtin.execute(c[rix], vals); + it.next(); + } + } + + for(; rix < ru; rix++) { + c[rix] = builtin.execute(c[rix], 0); } } @@ -260,7 +268,7 @@ public class ColGroupSDCSingleZeros extends ColGroupValue { @Override public long estimateInMemorySize() { - long size = ColGroupSizes.estimateInMemorySizeGroupValue(_colIndexes.length, getNumValues(), isLossy()); + long size = super.estimateInMemorySize(); size += _indexes.getInMemorySize(); return size; } @@ -450,7 +458,7 @@ public class ColGroupSDCSingleZeros extends ColGroupValue { } @Override - public Dictionary preAggregateThatSDCSingleStructure(ColGroupSDCSingle that, Dictionary ret, boolean preModified){ + public Dictionary preAggregateThatSDCSingleStructure(ColGroupSDCSingle that, Dictionary ret, boolean preModified) { throw new NotImplementedException(); } diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCZeros.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCZeros.java index 23b06a9..77cf6e2 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCZeros.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCZeros.java @@ -198,9 +198,17 @@ public class ColGroupSDCZeros extends ColGroupValue { final double[] vals = _dict.aggregateTuples(builtin, _colIndexes.length); final AIterator it = _indexes.getIterator(); it.skipTo(rl); - while(it.hasNext() && it.value() < ru) { - final int idx = it.value(); - c[idx] = builtin.execute(c[idx], vals[getIndex(it.getDataIndexAndIncrement())]); + + int rix = rl; + for(; rix < ru && it.hasNext(); rix++) { + if(it.value() != rix) + c[rix] = builtin.execute(c[rix], 0); + else + c[rix] = builtin.execute(c[rix], vals[_data.getIndex(it.getDataIndexAndIncrement())]); + } + + for(; rix < ru; rix++) { + c[rix] = builtin.execute(c[rix], 0); } } @@ -277,7 +285,7 @@ public class ColGroupSDCZeros extends ColGroupValue { @Override public long estimateInMemorySize() { - long size = ColGroupSizes.estimateInMemorySizeGroupValue(_colIndexes.length, getNumValues(), isLossy()); + long size = super.estimateInMemorySize(); size += _indexes.getInMemorySize(); size += _data.getInMemorySize(); return size; diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSizes.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSizes.java index 3c8c50a..faa3bb3 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSizes.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSizes.java @@ -32,67 +32,68 @@ public class ColGroupSizes { protected static final Log LOG = LogFactory.getLog(ColGroupSizes.class.getName()); public static long estimateInMemorySizeGroup(int nrColumns) { - long size = 0; - size += 16; // Object header + long size = 16; // Object header size += MemoryEstimates.intArrayCost(nrColumns); return size; } - public static long estimateInMemorySizeGroupValue(int nrColumns, int nrValues, boolean lossy) { - long size = estimateInMemorySizeGroup(nrColumns); + public static long estimateInMoemorySizeCompressedColumn(int nrColumns) { + return estimateInMemorySizeGroup(nrColumns) + 4; // 4 for num Rows; + } + + public static long estimateInMemorySizeGroupValue(int nrColumns, int nrValues, double tupleSparsity, + boolean lossy) { + long size = estimateInMoemorySizeCompressedColumn(nrColumns); // LOG.error("MemorySize Group Value: " + nrColumns + " " + nrValues + " " + lossy); size += 8; // Dictionary Reference. size += 8; // Counts reference - size += 4; // int numRows size += 1; // _zeros boolean reference size += 1; // _lossy boolean reference size += 2; // padding - size += DictionaryFactory.getInMemorySize(nrValues, nrColumns, lossy); + size += DictionaryFactory.getInMemorySize(nrValues, nrColumns, tupleSparsity, lossy); return size; } - public static long estimateInMemorySizeDDC(int nrCols, int numTuples, int dataLength, boolean lossy) { + public static long estimateInMemorySizeDDC(int nrCols, int numTuples, int dataLength, double tupleSparsity, + boolean lossy) { // LOG.error("Arguments for DDC memory Estimate " + nrCols + " " + numTuples + " " + dataLength + " " + lossy); - long size = estimateInMemorySizeGroupValue(nrCols, numTuples, lossy); - size += 8; // Map toFactory reference; + long size = estimateInMemorySizeGroupValue(nrCols, numTuples, tupleSparsity, lossy); size += MapToFactory.estimateInMemorySize(dataLength, numTuples); return size; } public static long estimateInMemorySizeOffset(int nrColumns, int nrValues, int pointers, int offsetLength, - boolean lossy) { + double tupleSparsity, boolean lossy) { // LOG.error("Offset Size: " + nrColumns + " " + nrValues + " " + pointers + " " + offsetLength); - long size = estimateInMemorySizeGroupValue(nrColumns, nrValues, lossy); + long size = estimateInMemorySizeGroupValue(nrColumns, nrValues, tupleSparsity, lossy); size += MemoryEstimates.intArrayCost(pointers); size += MemoryEstimates.charArrayCost(offsetLength); return size; } public static long estimateInMemorySizeOLE(int nrColumns, int nrValues, int offsetLength, int nrRows, - boolean lossy) { + double tupleSparsity, boolean lossy) { // LOG.error(nrColumns + " " + nrValues + " " + offsetLength + " " + nrRows + " " + lossy); nrColumns = nrColumns > 0 ? nrColumns : 1; offsetLength += (nrRows / CompressionSettings.BITMAP_BLOCK_SZ) * 2; - long size = 0; - size = estimateInMemorySizeOffset(nrColumns, nrValues, (nrValues / nrColumns) + 1, offsetLength, lossy); - if(nrRows > CompressionSettings.BITMAP_BLOCK_SZ * 2) { - size += MemoryEstimates.intArrayCost((int) nrValues / nrColumns); - } + long size = estimateInMemorySizeOffset(nrColumns, nrValues, nrValues + 1, offsetLength, + tupleSparsity, lossy); return size; } - public static long estimateInMemorySizeRLE(int nrColumns, int nrValues, int nrRuns, int nrRows, boolean lossy) { + public static long estimateInMemorySizeRLE(int nrColumns, int nrValues, int nrRuns, int nrRows, + double tupleSparsity, boolean lossy) { // LOG.error("RLE Size: " + nrColumns + " " + nrValues + " " + nrRuns + " " + nrRows); int offsetLength = (nrRuns) * 2; - long size = estimateInMemorySizeOffset(nrColumns, nrValues, (nrValues) + 1, offsetLength, lossy); + long size = estimateInMemorySizeOffset(nrColumns, nrValues, (nrValues) + 1, offsetLength, tupleSparsity, lossy); return size; } public static long estimateInMemorySizeSDC(int nrColumns, int nrValues, int nrRows, int largestOff, - boolean largestOffIsZero, boolean containNoZeroValues, boolean lossy) { + boolean largestOffIsZero, boolean containNoZeroValues, double tupleSparsity, boolean lossy) { long size = estimateInMemorySizeGroupValue(nrColumns, - nrValues + (largestOffIsZero || containNoZeroValues ? 0 : 1), lossy); + nrValues + (largestOffIsZero || containNoZeroValues ? 0 : 1), tupleSparsity, lossy); // LOG.error("SDC Estimation values: " + nrColumns + " " + nrValues + " " + nrRows + " " + largestOff); size += OffsetFactory.estimateInMemorySize(nrRows - largestOff - 1, nrRows); if(nrValues > 1) @@ -101,22 +102,20 @@ public class ColGroupSizes { } public static long estimateInMemorySizeSDCSingle(int nrColumns, int nrValues, int nrRows, int largestOff, - boolean largestOffIsZero, boolean containNoZeroValues, boolean lossy) { + boolean largestOffIsZero, boolean containNoZeroValues, double tupleSparsity, boolean lossy) { long size = estimateInMemorySizeGroupValue(nrColumns, - nrValues + (largestOffIsZero || containNoZeroValues ? 0 : 1), lossy); + nrValues + (largestOffIsZero || containNoZeroValues ? 0 : 1), tupleSparsity, lossy); size += OffsetFactory.estimateInMemorySize(nrRows - largestOff, nrRows); return size; } - public static long estimateInMemorySizeCONST(int nrColumns, int nrValues, boolean lossy) { - long size = estimateInMemorySizeGroupValue(nrColumns, nrValues, lossy); + public static long estimateInMemorySizeCONST(int nrColumns, int nrValues, double tupleSparsity, boolean lossy) { + long size = estimateInMemorySizeGroupValue(nrColumns, nrValues, tupleSparsity, lossy); return size; } public static long estimateInMemorySizeEMPTY(int nrColumns) { - long size = estimateInMemorySizeGroup(nrColumns); - size += 8; // null pointer to _dict - return size; + return estimateInMoemorySizeCompressedColumn(nrColumns); } public static long estimateInMemorySizeUncompressed(int nrRows, int nrColumns, double sparsity) { @@ -124,7 +123,7 @@ public class ColGroupSizes { // Since the Object is a col group the overhead from the Memory Size group is added size += estimateInMemorySizeGroup(nrColumns); size += 8; // reference to MatrixBlock. - size += MatrixBlock.estimateSizeInMemory(nrRows, nrColumns, (nrColumns > 1) ? sparsity : 1); + size += MatrixBlock.estimateSizeInMemory(nrRows, nrColumns, (nrColumns > 1) ? sparsity : 1); return size; } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java index 3cec6fb..c8f9bf6 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java @@ -660,4 +660,9 @@ public class ColGroupUncompressed extends AColGroup { LibMatrixMult.matrixMult(_data, right, out, InfrastructureAnalyzer.getLocalParallelism()); return new ColGroupUncompressed(outputCols, out, false); } + + @Override + public int getNumValues() { + return _data.getNumRows(); + } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java index fc7da24..a0c127a 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java @@ -33,6 +33,7 @@ import org.apache.sysds.runtime.DMLCompressionException; import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary; import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary; import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory; +import org.apache.sysds.runtime.compress.colgroup.dictionary.MatrixBlockDictionary; import org.apache.sysds.runtime.compress.colgroup.pre.ArrPreAggregate; import org.apache.sysds.runtime.compress.colgroup.pre.IPreAggregate; import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; @@ -93,11 +94,7 @@ public abstract class ColGroupValue extends ColGroupCompressed implements Clonea decompressToBlock(target, rl, ru, offT, getValues()); } - /** - * Obtain number of distinct sets of values associated with the bitmaps in this column group. - * - * @return the number of distinct sets of values associated with the bitmaps in this column group - */ + @Override public final int getNumValues() { return _dict.getNumberOfValues(_colIndexes.length); } @@ -250,22 +247,21 @@ public abstract class ColGroupValue extends ColGroupCompressed implements Clonea return aggregateColumns; } - private Pair<int[], double[]> preaggValuesFromDense(final int numVals, final double[] b, double[] dictVals, - final int cl, final int cu, final int cut) { + private Pair<int[], double[]> preaggValuesFromDense(final int numVals, final double[] b, final int cl, final int cu, + final int cut) { - int[] aggregateColumns = getAggregateColumnsSetDense(b, cl, cu, cut); - double[] ret = new double[numVals * aggregateColumns.length]; + final int[] aggregateColumns = getAggregateColumnsSetDense(b, cl, cu, cut); + final double[] ret = new double[numVals * aggregateColumns.length]; for(int k = 0, off = 0; k < numVals * _colIndexes.length; k += _colIndexes.length, off += aggregateColumns.length) { for(int h = 0; h < _colIndexes.length; h++) { int idb = _colIndexes[h] * cut; - double v = dictVals[k + h]; - for(int i = 0; i < aggregateColumns.length; i++) { - ret[off + i] += v * b[idb + aggregateColumns[i]]; - } - + double v = _dict.getValue(k + h); + if(v != 0) + for(int i = 0; i < aggregateColumns.length; i++) + ret[off + i] += v * b[idb + aggregateColumns[i]]; } } @@ -290,8 +286,7 @@ public abstract class ColGroupValue extends ColGroupCompressed implements Clonea return aggregateColumns; } - private Pair<int[], double[]> preaggValuesFromSparse(int numVals, SparseBlock b, double[] dictVals, int cl, int cu, - int cut) { + private Pair<int[], double[]> preaggValuesFromSparse(int numVals, SparseBlock b, int cl, int cu, int cut) { int[] aggregateColumns = getAggregateColumnsSetSparse(b); @@ -310,7 +305,7 @@ public abstract class ColGroupValue extends ColGroupCompressed implements Clonea for(int j = 0, offOrg = h; j < numVals * aggregateColumns.length; j += aggregateColumns.length, offOrg += _colIndexes.length) { - ret[j + retIdx] += dictVals[offOrg] * sValues[i]; + ret[j + retIdx] += _dict.getValue(offOrg) * sValues[i]; } } } @@ -318,18 +313,18 @@ public abstract class ColGroupValue extends ColGroupCompressed implements Clonea return new ImmutablePair<>(aggregateColumns, ret); } - public Pair<int[], double[]> preaggValues(int numVals, MatrixBlock b, double[] dictVals, int cl, int cu, int cut) { - return b.isInSparseFormat() ? preaggValuesFromSparse(numVals, b.getSparseBlock(), dictVals, cl, cu, - cut) : preaggValuesFromDense(numVals, b.getDenseBlockValues(), dictVals, cl, cu, cut); + public Pair<int[], double[]> preaggForRightMultiplyValues(int numVals, MatrixBlock b, int cl, int cu, int cut) { + return b.isInSparseFormat() ? preaggValuesFromSparse(numVals, b.getSparseBlock(), cl, cu, + cut) : preaggValuesFromDense(numVals, b.getDenseBlockValues(), cl, cu, cut); } - protected static double[] sparsePreaggValues(int numVals, double v, boolean allocNew, double[] dictVals) { - double[] ret = allocNew ? new double[numVals + 1] : allocDVector(numVals + 1, true); + // protected static double[] sparsePreaggValues(int numVals, double v, boolean allocNew, ADictionary dict) { + // double[] ret = allocNew ? new double[numVals + 1] : allocDVector(numVals + 1, true); - for(int k = 0; k < numVals; k++) - ret[k] = dictVals[k] * v; - return ret; - } + // for(int k = 0; k < numVals; k++) + // ret[k] = dictVals[k] * v; + // return ret; + // } protected double computeMxx(double c, Builtin builtin) { if(_zeros) @@ -1016,9 +1011,10 @@ public abstract class ColGroupValue extends ColGroupCompressed implements Clonea */ public void leftMultByMatrix(MatrixBlock matrix, MatrixBlock result, double[] values, int rl, int ru) { final int numVals = getNumValues(); + if(!(_dict instanceof MatrixBlockDictionary)) + _dict = _dict.getAsMatrixBlockDictionary(_colIndexes.length); - DenseBlock dictV = new DenseBlockFP64(new int[] {numVals, _colIndexes.length}, values); - MatrixBlock dictM = new MatrixBlock(numVals, _colIndexes.length, dictV); + MatrixBlock dictM = ((MatrixBlockDictionary) _dict).getMatrixBlock(); dictM.examSparsity(); MatrixBlock tmpRes = new MatrixBlock(1, _colIndexes.length, false); for(int i = rl; i < ru; i++) { @@ -1064,10 +1060,22 @@ public abstract class ColGroupValue extends ColGroupCompressed implements Clonea } public AColGroup rightMultByMatrix(MatrixBlock right) { - Pair<int[], double[]> pre = preaggValues(getNumValues(), right, getValues(), 0, right.getNumColumns(), + Pair<int[], double[]> pre = preaggForRightMultiplyValues(getNumValues(), right, 0, right.getNumColumns(), right.getNumColumns()); if(pre.getLeft().length > 0) return copyAndSet(pre.getLeft(), pre.getRight()); return null; } + + @Override + public long estimateInMemorySize() { + long size = super.estimateInMemorySize(); + size += 8; // Dictionary Reference. + size += 8; // Counts reference + size += 1; // _zeros boolean reference + size += 1; // _lossy boolean reference + size += 2; // padding + size += _dict.getInMemorySize(); + return size; + } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/ADictionary.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/ADictionary.java index 352e96b..e5d2a15 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/ADictionary.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/ADictionary.java @@ -24,9 +24,6 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.sysds.runtime.compress.utils.ABitmap; -import org.apache.sysds.runtime.compress.utils.Bitmap; -import org.apache.sysds.runtime.compress.utils.BitmapLossy; import org.apache.sysds.runtime.functionobjects.Builtin; import org.apache.sysds.runtime.functionobjects.ValueFunction; import org.apache.sysds.runtime.matrix.operators.ScalarOperator; @@ -143,13 +140,7 @@ public abstract class ADictionary { * @param fn The function to apply to individual columns * @param colIndexes The mapping to the target columns from the individual columns */ - public void aggregateCols(double[] c, Builtin fn, int[] colIndexes) { - int ncol = colIndexes.length; - int vlen = size() / ncol; - for(int k = 0; k < vlen; k++) - for(int j = 0, valOff = k * ncol; j < ncol; j++) - c[colIndexes[j]] = fn.execute(c[colIndexes[j]], getValue(valOff + j)); - } + public abstract void aggregateCols(double[] c, Builtin fn, int[] colIndexes); /** * Write the dictionary to a DataOutput. @@ -178,7 +169,7 @@ public abstract class ADictionary { public abstract boolean isLossy(); /** - * Get the number of values given that the column group has n columns + * Get the number of distinct tuples given that the column group has n columns * * @param ncol The number of Columns in the ColumnGroup. * @return the number of value tuples contained in the dictionary. @@ -254,6 +245,14 @@ public abstract class ADictionary { public abstract boolean containsValue(double pattern); + /** + * Calculate the number of non zeros in the dictionary. The number of non zeros should be scaled with the counts + * given + * + * @param counts The counts of each dictionary entry + * @param nCol The number of columns in this dictionary + * @return The nonZero count + */ public abstract long getNumberNonZeros(int[] counts, int nCol); public abstract long getNumberNonZerosContained(); @@ -268,13 +267,6 @@ public abstract class ADictionary { */ public abstract void addToEntry(Dictionary d, int fr, int to, int nCol); - public static ADictionary getDictionary(ABitmap ubm) { - if(ubm instanceof BitmapLossy) - return new QDictionary((BitmapLossy) ubm).makeDoubleDictionary(); - else - return new Dictionary(((Bitmap) ubm).getValues()); - } - /** * Get the most common tuple element contained in the dictionary * @@ -293,4 +285,12 @@ public abstract class ADictionary { * @return a new instance of dictionary with the tuple subtracted. */ public abstract ADictionary subtractTuple(double[] tuple); + + /** + * Get this dictionary as a matrixBlock dictionary. This allows us to use optimized kernels coded elsewhere in the + * system, such as matrix multiplication. + * + * @return A Dictionary containing a MatrixBlock. + */ + public abstract MatrixBlockDictionary getAsMatrixBlockDictionary(int nCol); } diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/Dictionary.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/Dictionary.java index 5a32823..595c250 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/Dictionary.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/Dictionary.java @@ -25,8 +25,11 @@ import java.io.IOException; import java.util.Arrays; import org.apache.sysds.runtime.DMLCompressionException; +import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.data.DenseBlockFP64; import org.apache.sysds.runtime.functionobjects.Builtin; import org.apache.sysds.runtime.functionobjects.ValueFunction; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.operators.ScalarOperator; import org.apache.sysds.utils.MemoryEstimates; @@ -473,4 +476,23 @@ public class Dictionary extends ADictionary { } return new Dictionary(newValues); } + + @Override + public MatrixBlockDictionary getAsMatrixBlockDictionary(int nCol) { + final int nRow = _values.length / nCol; + DenseBlock dictV = new DenseBlockFP64(new int[] {nRow, nCol}, _values); + MatrixBlock dictM = new MatrixBlock(nRow, nCol, dictV); + dictM.examSparsity(); + return new MatrixBlockDictionary(dictM); + } + + @Override + public void aggregateCols(double[] c, Builtin fn, int[] colIndexes) { + int ncol = colIndexes.length; + int vlen = size() / ncol; + for(int k = 0; k < vlen; k++) + for(int j = 0, valOff = k * ncol; j < ncol; j++) + c[colIndexes[j]] = fn.execute(c[colIndexes[j]], getValue(valOff + j)); + + } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java index 328e145..bab32b0 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java @@ -22,8 +22,20 @@ package org.apache.sysds.runtime.compress.colgroup.dictionary; import java.io.DataInput; import java.io.IOException; +import org.apache.commons.lang.NotImplementedException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.sysds.runtime.compress.utils.ABitmap; +import org.apache.sysds.runtime.compress.utils.Bitmap; +import org.apache.sysds.runtime.compress.utils.BitmapLossy; +import org.apache.sysds.runtime.compress.utils.MultiColBitmap; +import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; + public class DictionaryFactory { + protected static final Log LOG = LogFactory.getLog(DictionaryFactory.class.getName()); + public static ADictionary read(DataInput in) throws IOException { boolean lossy = in.readBoolean(); if(lossy) { @@ -46,10 +58,131 @@ public class DictionaryFactory { } } - public static long getInMemorySize(int nrValues, int nrColumns, boolean lossy) { + public static long getInMemorySize(int nrValues, int nrColumns, double tupleSparsity, boolean lossy) { if(lossy) return QDictionary.getInMemorySize(nrValues * nrColumns); + else if(nrColumns > 1 && tupleSparsity < 0.4) + return MatrixBlockDictionary.getInMemorySize(nrValues, nrColumns, tupleSparsity); else return Dictionary.getInMemorySize(nrValues * nrColumns); } + + public static ADictionary create(ABitmap ubm) { + return create(ubm, 1.0); + } + + public static ADictionary create(ABitmap ubm, double sparsity) { + if(ubm instanceof BitmapLossy) + return new QDictionary((BitmapLossy) ubm); + else if(ubm instanceof Bitmap) + return new Dictionary(((Bitmap) ubm).getValues()); + else if(sparsity < 0.4 && ubm instanceof MultiColBitmap) { + final int nCols = ubm.getNumColumns(); + final int nRows = ubm.getNumValues(); + final MultiColBitmap mcbm = (MultiColBitmap) ubm; + + final MatrixBlock m = new MatrixBlock(nRows, nCols, true); + m.allocateSparseRowsBlock(); + final SparseBlock sb = m.getSparseBlock(); + + final int nVals = ubm.getNumValues(); + for(int i = 0; i < nVals; i++) { + final double[] tuple = mcbm.getValues(i); + for(int col = 0; col < nCols; col++) + sb.append(i, col, tuple[col]); + } + m.recomputeNonZeros(); + return new MatrixBlockDictionary(m); + } + else if(ubm instanceof MultiColBitmap) { + MultiColBitmap mcbm = (MultiColBitmap) ubm; + final int nCol = ubm.getNumColumns(); + final int nVals = ubm.getNumValues(); + double[] resValues = new double[nVals * nCol]; + for(int i = 0; i < nVals; i++) + System.arraycopy(mcbm.getValues(i), 0, resValues, i * nCol, nCol); + + return new Dictionary(resValues); + } + throw new NotImplementedException( + "Not implemented creation of bitmap type : " + ubm.getClass().getSimpleName()); + } + + public static ADictionary createWithAppendedZeroTuple(ABitmap ubm) { + return createWithAppendedZeroTuple(ubm, 1.0); + } + + public static ADictionary createWithAppendedZeroTuple(ABitmap ubm, double sparsity) { + // Log.warn("Inefficient creation of dictionary, to then allocate again."); + final int nRows = ubm.getNumValues() + 1; + final int nCols = ubm.getNumColumns(); + if(ubm instanceof Bitmap) { + Bitmap bm = (Bitmap) ubm; + double[] resValues = new double[ubm.getNumValues() + 1]; + double[] from = bm.getValues(); + System.arraycopy(from, 0, resValues, 0, from.length); + return new Dictionary(resValues); + } + else if(sparsity < 0.4 && ubm instanceof MultiColBitmap) { + final MultiColBitmap mcbm = (MultiColBitmap) ubm; + final MatrixBlock m = new MatrixBlock(nRows, nCols, true); + m.allocateSparseRowsBlock(); + final SparseBlock sb = m.getSparseBlock(); + + final int nVals = ubm.getNumValues(); + for(int i = 0; i < nVals; i++) { + final double[] tuple = mcbm.getValues(i); + for(int col = 0; col < nCols; col++) + sb.append(i, col, tuple[col]); + } + m.recomputeNonZeros(); + return new MatrixBlockDictionary(m); + } + else if(ubm instanceof MultiColBitmap) { + MultiColBitmap mcbm = (MultiColBitmap) ubm; + final int nVals = ubm.getNumValues(); + double[] resValues = new double[nRows * nCols]; + for(int i = 0; i < nVals; i++) + System.arraycopy(mcbm.getValues(i), 0, resValues, i * nCols, nCols); + + return new Dictionary(resValues); + } + else { + throw new NotImplementedException( + "Not implemented creation of bitmap type : " + ubm.getClass().getSimpleName()); + } + + } + + public static ADictionary moveFrequentToLastDictionaryEntry(ADictionary dict, ABitmap ubm, int numRows, + int largestIndex) { + LOG.warn("Inefficient creation of dictionary, to then allocate again to move one entry to end."); + final double[] dictValues = dict.getValues(); + final int zeros = numRows - (int) ubm.getNumOffsets(); + final int nCol = ubm.getNumColumns(); + final int offsetToLargest = largestIndex * nCol; + + if(zeros == 0) { + final double[] swap = new double[nCol]; + System.arraycopy(dictValues, offsetToLargest, swap, 0, nCol); + for(int i = offsetToLargest; i < dictValues.length - nCol; i++) { + dictValues[i] = dictValues[i + nCol]; + } + System.arraycopy(swap, 0, dictValues, dictValues.length - nCol, nCol); + return dict; + } + + final int largestIndexSize = ubm.getOffsetsList(largestIndex).size(); + final double[] newDict = new double[dictValues.length + nCol]; + + if(zeros > largestIndexSize) + System.arraycopy(dictValues, 0, newDict, 0, dictValues.length); + else { + System.arraycopy(dictValues, 0, newDict, 0, offsetToLargest); + System.arraycopy(dictValues, offsetToLargest + nCol, newDict, offsetToLargest, + dictValues.length - offsetToLargest - nCol); + System.arraycopy(dictValues, offsetToLargest, newDict, newDict.length - nCol, nCol); + } + return new Dictionary(newDict); + } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/MatrixBlockDictionary.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/MatrixBlockDictionary.java new file mode 100644 index 0000000..322e56b --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/MatrixBlockDictionary.java @@ -0,0 +1,456 @@ +package org.apache.sysds.runtime.compress.colgroup.dictionary; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.functionobjects.Builtin; +import org.apache.sysds.runtime.functionobjects.Builtin.BuiltinCode; +import org.apache.sysds.runtime.functionobjects.ValueFunction; +import org.apache.sysds.runtime.matrix.data.LibMatrixReorg; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.ScalarOperator; + +public class MatrixBlockDictionary extends ADictionary { + + private MatrixBlock _data; + + public MatrixBlockDictionary(MatrixBlock data) { + _data = data; + } + + public MatrixBlock getMatrixBlock() { + return _data; + } + + @Override + public double[] getValues() { + LOG.warn("Inefficient force dense format."); + _data.sparseToDense(); + return _data.getDenseBlockValues(); + } + + @Override + public double getValue(int i) { + final int nCol = _data.getNumColumns(); + LOG.warn("inefficient get value at index"); + return _data.quickGetValue(i / nCol, i % nCol); + } + + @Override + public int hasZeroTuple(int nCol) { + if(_data.isInSparseFormat()) { + SparseBlock sb = _data.getSparseBlock(); + for(int i = 0; i < _data.getNumRows(); i++) { + if(sb.isEmpty(i)) { + return i; + } + } + } + else { + throw new NotImplementedException(); + } + return -1; + } + + @Override + public long getInMemorySize() { + return 8 + _data.estimateSizeInMemory(); + } + + public static long getInMemorySize(int numberValues, int numberColumns, double sparsity) { + return 8 + MatrixBlock.estimateSizeInMemory(numberValues, numberColumns, sparsity); + } + + @Override + public double aggregate(double init, Builtin fn) { + if(fn.getBuiltinCode() == BuiltinCode.MAX) + return fn.execute(init, _data.max()); + else if(fn.getBuiltinCode() == BuiltinCode.MIN) + return fn.execute(init, _data.min()); + else + throw new NotImplementedException(); + } + + @Override + public double[] aggregateTuples(Builtin fn, int nCol) { + double[] ret = new double[_data.getNumRows()]; + if(_data.isEmpty()) + return ret; + else if(_data.isInSparseFormat()) { + SparseBlock sb = _data.getSparseBlock(); + for(int i = 0; i < _data.getNumRows(); i++) { + if(!sb.isEmpty(i)) { + final int apos = sb.pos(i); + final int alen = sb.size(i) + apos; + final double[] avals = sb.values(i); + ret[i] = avals[apos]; + for(int j = apos + 1; j < alen; j++) + ret[i] = fn.execute(ret[i], avals[j]); + + if(sb.size(i) < _data.getNumColumns()) + ret[i] = fn.execute(ret[i], 0); + } + else + ret[i] = fn.execute(ret[i], 0); + } + } + else if(nCol == 1) + return _data.getDenseBlockValues(); + else { + double[] values = _data.getDenseBlockValues(); + int off = 0; + for(int k = 0; k < _data.getNumRows(); k++) { + ret[k] = values[off++]; + for(int j = 1; j < _data.getNumColumns(); j++) + ret[k] = fn.execute(ret[k], values[off++]); + } + } + return ret; + } + + @Override + public void aggregateCols(double[] c, Builtin fn, int[] colIndexes) { + if(_data.isEmpty()) { + for(int j = 0; j < colIndexes.length; j++) { + final int idx = colIndexes[j]; + c[idx] = fn.execute(c[idx], 0); + } + } + else if(_data.isInSparseFormat()) { + MatrixBlock t = LibMatrixReorg.transposeInPlace(_data, 1); + if(!t.isInSparseFormat()) { + throw new NotImplementedException(); + } + SparseBlock sbt = t.getSparseBlock(); + + for(int i = 0; i < _data.getNumColumns(); i++) { + final int idx = colIndexes[i]; + if(!sbt.isEmpty(i)) { + final int apos = sbt.pos(i); + final int alen = sbt.size(i) + apos; + final double[] avals = sbt.values(i); + for(int j = apos; j < alen; j++) + c[idx] = fn.execute(c[idx], avals[j]); + if(avals.length != _data.getNumRows()) + c[idx] = fn.execute(c[idx], 0); + } + else + c[idx] = fn.execute(c[idx], 0); + } + } + else { + double[] values = _data.getDenseBlockValues(); + int off = 0; + for(int k = 0; k < _data.getNumRows(); k++) { + for(int j = 0; j < _data.getNumColumns(); j++) { + final int idx = colIndexes[j]; + c[idx] = fn.execute(c[idx], values[off++]); + } + } + } + } + + @Override + public int size() { + return (int) _data.getNonZeros(); + } + + @Override + public ADictionary apply(ScalarOperator op) { + MatrixBlock res = _data.scalarOperations(op, new MatrixBlock()); + return new MatrixBlockDictionary(res); + } + + @Override + public ADictionary applyScalarOp(ScalarOperator op, double newVal, int numCols) { + MatrixBlock res = _data.scalarOperations(op, new MatrixBlock()); + MatrixBlock res2 = res.append(new MatrixBlock(1, 1, newVal), new MatrixBlock()); + return new MatrixBlockDictionary(res2); + } + + @Override + public ADictionary applyBinaryRowOpLeft(ValueFunction fn, double[] v, boolean sparseSafe, int[] colIndexes) { + throw new NotImplementedException(); + } + + @Override + public ADictionary applyBinaryRowOpRight(ValueFunction fn, double[] v, boolean sparseSafe, int[] colIndexes) { + throw new NotImplementedException(); + } + + @Override + public ADictionary clone() { + MatrixBlock ret = new MatrixBlock(); + ret.copy(_data); + return new MatrixBlockDictionary(ret); + } + + @Override + public ADictionary cloneAndExtend(int len) { + throw new NotImplementedException(); + } + + @Override + public boolean isLossy() { + return false; + } + + @Override + public int getNumberOfValues(int ncol) { + return _data.getNumRows(); + } + + @Override + public double[] sumAllRowsToDouble(boolean square, int nrColumns) { + double[] ret = new double[_data.getNumRows()]; + + if(_data.isEmpty()) + return ret; + else if(_data.isInSparseFormat()) { + SparseBlock sb = _data.getSparseBlock(); + for(int i = 0; i < _data.getNumRows(); i++) { + if(!sb.isEmpty(i)) { + final int apos = sb.pos(i); + final int alen = sb.size(i) + apos; + final double[] avals = sb.values(i); + for(int j = apos; j < alen; j++) { + ret[i] += (square) ? avals[j] * avals[j] : avals[j]; + } + } + } + } + else { + double[] values = _data.getDenseBlockValues(); + int off = 0; + for(int k = 0; k < _data.getNumRows(); k++) { + for(int j = 0; j < _data.getNumColumns(); j++) { + final double v = values[off++]; + ret[k] += (square) ? v * v : v; + } + } + } + return ret; + } + + @Override + public double sumRow(int k, boolean square, int nrColumns) { + throw new NotImplementedException(); + } + + @Override + public double[] colSum(int[] counts, int nCol) { + if(_data.isEmpty()) + return null; + double[] ret = new double[nCol]; + if(_data.isInSparseFormat()) { + SparseBlock sb = _data.getSparseBlock(); + for(int i = 0; i < _data.getNumRows(); i++) { + if(!sb.isEmpty(i)) { + // double tmpSum = 0; + final int count = counts[i]; + final int apos = sb.pos(i); + final int alen = sb.size(i) + apos; + final int[] aix = sb.indexes(i); + final double[] avals = sb.values(i); + for(int j = apos; j < alen; j++) { + ret[aix[j]] += count * avals[j]; + } + } + } + } + else { + double[] values = _data.getDenseBlockValues(); + int off = 0; + for(int k = 0; k < _data.getNumRows(); k++) { + final int countK = counts[k]; + for(int j = 0; j < _data.getNumColumns(); j++) { + final double v = values[off++]; + ret[j] += v * countK; + } + } + } + return ret; + } + + @Override + public void colSum(double[] c, int[] counts, int[] colIndexes, boolean square) { + if(_data.isEmpty()) + return; + if(_data.isInSparseFormat()) { + SparseBlock sb = _data.getSparseBlock(); + for(int i = 0; i < _data.getNumRows(); i++) { + if(!sb.isEmpty(i)) { + // double tmpSum = 0; + final int count = counts[i]; + final int apos = sb.pos(i); + final int alen = sb.size(i) + apos; + final int[] aix = sb.indexes(i); + final double[] avals = sb.values(i); + for(int j = apos; j < alen; j++) { + c[colIndexes[aix[j]]] += square ? count * avals[j] * avals[j] : count * avals[j]; + } + } + } + } + else { + double[] values = _data.getDenseBlockValues(); + int off = 0; + for(int k = 0; k < _data.getNumRows(); k++) { + final int countK = counts[k]; + for(int j = 0; j < _data.getNumColumns(); j++) { + final double v = values[off++]; + c[colIndexes[j]] += square ? v * v * countK : v * countK; + } + } + } + } + + @Override + public double sum(int[] counts, int ncol) { + double tmpSum = 0; + if(_data.isEmpty()) + return tmpSum; + if(_data.isInSparseFormat()) { + SparseBlock sb = _data.getSparseBlock(); + for(int i = 0; i < _data.getNumRows(); i++) { + if(!sb.isEmpty(i)) { + final int count = counts[i]; + final int apos = sb.pos(i); + final int alen = sb.size(i) + apos; + final double[] avals = sb.values(i); + for(int j = apos; j < alen; j++) { + tmpSum += count * avals[j]; + } + } + } + } + else { + double[] values = _data.getDenseBlockValues(); + int off = 0; + for(int k = 0; k < _data.getNumRows(); k++) { + final int countK = counts[k]; + for(int j = 0; j < _data.getNumColumns(); j++) { + final double v = values[off++]; + tmpSum += v * countK; + } + } + } + return tmpSum; + } + + @Override + public double sumsq(int[] counts, int ncol) { + double tmpSum = 0; + if(_data.isEmpty()) + return tmpSum; + if(_data.isInSparseFormat()) { + SparseBlock sb = _data.getSparseBlock(); + for(int i = 0; i < _data.getNumRows(); i++) { + if(!sb.isEmpty(i)) { + final int count = counts[i]; + final int apos = sb.pos(i); + final int alen = sb.size(i) + apos; + final double[] avals = sb.values(i); + for(int j = apos; j < alen; j++) { + tmpSum += count * avals[j] * avals[j]; + } + } + } + } + else { + double[] values = _data.getDenseBlockValues(); + int off = 0; + for(int k = 0; k < _data.getNumRows(); k++) { + final int countK = counts[k]; + for(int j = 0; j < _data.getNumColumns(); j++) { + final double v = values[off++]; + tmpSum += v * v * countK; + } + } + } + return tmpSum; + } + + @Override + public String getString(int colIndexes) { + return _data.toString(); + } + + @Override + public void addMaxAndMin(double[] ret, int[] colIndexes) { + throw new NotImplementedException(); + } + + @Override + public ADictionary sliceOutColumnRange(int idxStart, int idxEnd, int previousNumberOfColumns) { + throw new NotImplementedException(); + } + + @Override + public ADictionary reExpandColumns(int max) { + throw new NotImplementedException(); + } + + @Override + public boolean containsValue(double pattern) { + return _data.containsValue(pattern); + } + + @Override + public long getNumberNonZeros(int[] counts, int nCol) { + if(_data.isEmpty()) + return 0; + long nnz = 0; + if(_data.isInSparseFormat()) { + SparseBlock sb = _data.getSparseBlock(); + for(int i = 0; i < _data.getNumRows(); i++) + if(sb.isEmpty(i)) + nnz += sb.size(i) * counts[i]; + + } + else { + double[] values = _data.getDenseBlockValues(); + int off = 0; + for(int i = 0; i < _data.getNumRows(); i++) { + int countThisTuple = 0; + for(int j = 0; j < _data.getNumColumns(); j++) { + double v = values[off++]; + if(v != 0) + countThisTuple++; + } + nnz += countThisTuple * counts[i]; + } + } + return nnz; + } + + @Override + public long getNumberNonZerosContained() { + throw new NotImplementedException(); + } + + @Override + public void addToEntry(Dictionary d, int fr, int to, int nCol) { + throw new NotImplementedException(); + } + + @Override + public double[] getMostCommonTuple(int[] counts, int nCol) { + throw new NotImplementedException(); + } + + @Override + public ADictionary subtractTuple(double[] tuple) { + throw new NotImplementedException(); + } + + @Override + public MatrixBlockDictionary getAsMatrixBlockDictionary(int nCol) { + // Simply return this. + return this; + } + + @Override + public String toString() { + return "MatrixBlock Dictionary :" + _data.toString(); + } +} diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/QDictionary.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/QDictionary.java index 7b01fb5..70836ca 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/QDictionary.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/QDictionary.java @@ -499,4 +499,14 @@ public class QDictionary extends ADictionary { public ADictionary subtractTuple(double[] tuple) { throw new NotImplementedException(); } + + @Override + public MatrixBlockDictionary getAsMatrixBlockDictionary(int nCol) { + throw new NotImplementedException(); + } + + @Override + public void aggregateCols(double[] c, Builtin fn, int[] colIndexes) { + throw new NotImplementedException(); + } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/SparseDictionary.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/SparseDictionary.java deleted file mode 100644 index 3a400f3..0000000 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/SparseDictionary.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysds.runtime.compress.colgroup.dictionary; - -import org.apache.sysds.runtime.functionobjects.Builtin; -import org.apache.sysds.runtime.functionobjects.ValueFunction; -import org.apache.sysds.runtime.matrix.operators.ScalarOperator; - -/** - * A sparse dictionary implementation, use if the tuples are sparse. - */ -public class SparseDictionary extends ADictionary { - - @Override - public double[] getValues() { - LOG.warn("Inefficient materialization of sparse Dictionary."); - - return null; - } - - @Override - public double getValue(int i) { - // TODO Auto-generated method stub - return 0; - } - - @Override - public int hasZeroTuple(int nCol) { - // TODO Auto-generated method stub - return 0; - } - - @Override - public long getInMemorySize() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public double aggregate(double init, Builtin fn) { - // TODO Auto-generated method stub - return 0; - } - - @Override - public double[] aggregateTuples(Builtin fn, int nCol) { - // TODO Auto-generated method stub - return null; - } - - @Override - public int size() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public ADictionary apply(ScalarOperator op) { - // TODO Auto-generated method stub - return null; - } - - @Override - public ADictionary applyScalarOp(ScalarOperator op, double newVal, int numCols) { - // TODO Auto-generated method stub - return null; - } - - @Override - public ADictionary applyBinaryRowOpLeft(ValueFunction fn, double[] v, boolean sparseSafe, int[] colIndexes) { - // TODO Auto-generated method stub - return null; - } - - @Override - public ADictionary applyBinaryRowOpRight(ValueFunction fn, double[] v, boolean sparseSafe, int[] colIndexes) { - // TODO Auto-generated method stub - return null; - } - - @Override - public ADictionary clone() { - // TODO Auto-generated method stub - return null; - } - - @Override - public ADictionary cloneAndExtend(int len) { - // TODO Auto-generated method stub - return null; - } - - @Override - public boolean isLossy() { - // TODO Auto-generated method stub - return false; - } - - @Override - public int getNumberOfValues(int ncol) { - // TODO Auto-generated method stub - return 0; - } - - @Override - public double[] sumAllRowsToDouble(boolean square, int nrColumns) { - // TODO Auto-generated method stub - return null; - } - - @Override - public double sumRow(int k, boolean square, int nrColumns) { - // TODO Auto-generated method stub - return 0; - } - - @Override - public double[] colSum(int[] counts, int nCol) { - // TODO Auto-generated method stub - return null; - } - - @Override - public void colSum(double[] c, int[] counts, int[] colIndexes, boolean square) { - // TODO Auto-generated method stub - - } - - @Override - public double sum(int[] counts, int ncol) { - // TODO Auto-generated method stub - return 0; - } - - @Override - public double sumsq(int[] counts, int ncol) { - // TODO Auto-generated method stub - return 0; - } - - @Override - public String getString(int colIndexes) { - // TODO Auto-generated method stub - return null; - } - - @Override - public void addMaxAndMin(double[] ret, int[] colIndexes) { - // TODO Auto-generated method stub - - } - - @Override - public ADictionary sliceOutColumnRange(int idxStart, int idxEnd, int previousNumberOfColumns) { - // TODO Auto-generated method stub - return null; - } - - @Override - public ADictionary reExpandColumns(int max) { - // TODO Auto-generated method stub - return null; - } - - @Override - public boolean containsValue(double pattern) { - // TODO Auto-generated method stub - return false; - } - - @Override - public long getNumberNonZeros(int[] counts, int nCol) { - // TODO Auto-generated method stub - return 0; - } - - @Override - public long getNumberNonZerosContained() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public void addToEntry(Dictionary d, int fr, int to, int nCol) { - // TODO Auto-generated method stub - - } - - @Override - public double[] getMostCommonTuple(int[] counts, int nCol) { - // TODO Auto-generated method stub - return null; - } - - @Override - public ADictionary subtractTuple(double[] tuple) { - // TODO Auto-generated method stub - return null; - } - -} diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java index d4b2a63..f46ee53 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java +++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java @@ -21,6 +21,7 @@ package org.apache.sysds.runtime.compress.estim; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -91,6 +92,52 @@ public abstract class CompressedSizeEstimator { return new CompressedSizeInfo(sizeInfos); } + /** + * Multi threaded version of extracting Compression Size info from list of specified columns + * + * @return + */ + + /** + * Multi threaded version of extracting Compression Size info from list of specified columns + * + * @param columnLists The specified columns to extract. + * @param k The parallelization degree + * @return The Compression information from the specified column groups. + */ + public List<CompressedSizeInfoColGroup> computeCompressedSizeInfos(Collection<int[]> columnLists, int k) { + if(k == 1) + return computeCompressedSizeInfos(columnLists); + try { + ExecutorService pool = CommonThreadPool.get(k); + ArrayList<SizeEstimationTask> tasks = new ArrayList<>(); + for(int[] g : columnLists) + tasks.add(new SizeEstimationTask(this, g)); + List<Future<CompressedSizeInfoColGroup>> rtask = pool.invokeAll(tasks); + ArrayList<CompressedSizeInfoColGroup> ret = new ArrayList<>(); + for(Future<CompressedSizeInfoColGroup> lrtask : rtask) + ret.add(lrtask.get()); + pool.shutdown(); + return ret; + } + catch(InterruptedException | ExecutionException e) { + return computeCompressedSizeInfos(columnLists); + } + } + + /** + * Compression Size info from list of specified columns + * + * @param columnLists The specified columns to extract. + * @return The Compression information from the specified column groups. + */ + public List<CompressedSizeInfoColGroup> computeCompressedSizeInfos(Collection<int[]> columnLists) { + ArrayList<CompressedSizeInfoColGroup> ret = new ArrayList<>(); + for(int[] g : columnLists) + ret.add(estimateCompressedColGroupSize(g)); + return ret; + } + private CompressedSizeInfoColGroup[] estimateIndividualColumnGroupSizes(int k) { return (k > 1) ? CompressedSizeInfoColGroup(_numCols, k) : CompressedSizeInfoColGroup(_numCols); } @@ -106,19 +153,36 @@ public abstract class CompressedSizeEstimator { } /** - * Abstract method for extracting Compressed Size Info of specified columns, together in a single ColGroup + * Method for extracting Compressed Size Info of specified columns, together in a single ColGroup * - * @param colIndexes The Colums to group together inside a ColGroup + * @param colIndexes The columns to group together inside a ColGroup * @return The CompressedSizeInformation associated with the selected ColGroups. */ - public abstract CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes); + public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes) { + return estimateCompressedColGroupSize(colIndexes, Integer.MAX_VALUE); + } + + /** + * A method to extract the Compressed Size Info for a given list of columns, This method further limits the + * estimated number of unique values, since in some cases the estimated number of uniques is estimated higher than + * the number estimated in sub groups of the given colIndexes. + * + * @param colIndexes The columns to extract compression information from + * @param nrUniqueUpperBound The upper bound of unique elements allowed in the estimate, can be calculated from the + * number of unique elements estimated in sub columns multiplied together. This is + * flexible in the sense that if the sample is small then this unique can be manually + * edited like in CoCodeCostMatrixMult. + * + * @return The CompressedSizeInfoColGroup fro the given column indexes. + */ + public abstract CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes, int nrUniqueUpperBound); /** * Method used to extract the CompressedSizeEstimationFactors from an constructed UncompressedBitmap. Note this * method works both for the sample based estimator and the exact estimator, since the bitmap, can be extracted from * a sample or from the entire dataset. * - * @param ubm The UncompressedBitmap, either extracted from a sample or from the entier dataset + * @param ubm The UncompressedBitmap, either extracted from a sample or from the entire dataset * @param colIndexes The columns that is compressed together. * @return The size factors estimated from the Bit Map. */ @@ -154,16 +218,21 @@ public abstract class CompressedSizeEstimator { private static class SizeEstimationTask implements Callable<CompressedSizeInfoColGroup> { private final CompressedSizeEstimator _estimator; - private final int _col; + private final int[] _cols; protected SizeEstimationTask(CompressedSizeEstimator estimator, int col) { _estimator = estimator; - _col = col; + _cols = new int[] {col}; + } + + protected SizeEstimationTask(CompressedSizeEstimator estimator, int[] cols) { + _estimator = estimator; + _cols = cols; } @Override public CompressedSizeInfoColGroup call() { - return _estimator.estimateCompressedColGroupSize(new int[] {_col}); + return _estimator.estimateCompressedColGroupSize(_cols); } } @@ -174,8 +243,4 @@ public abstract class CompressedSizeEstimator { } return colIndexes; } - - public MatrixBlock getSample() { - return _data; - } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java index 40677e9..5fdf8e2 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java +++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java @@ -34,7 +34,8 @@ public class CompressedSizeEstimatorExact extends CompressedSizeEstimator { } @Override - public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes) { + public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes, int nrUniqueUpperBound) { + // exact estimator can ignore upper bound. ABitmap entireBitMap = BitmapEncoder.extractBitmap(colIndexes, _data, _transposed); return new CompressedSizeInfoColGroup(estimateCompressedColGroupSize(entireBitMap, colIndexes), _cs.validCompressions); diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java index cf5437e..ebeef16 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java +++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java @@ -27,23 +27,42 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock; public class CompressedSizeEstimatorFactory { protected static final Log LOG = LogFactory.getLog(CompressedSizeEstimatorFactory.class.getName()); - public static final int minimumSampleSize = 2000; - public static CompressedSizeEstimator getSizeEstimator(MatrixBlock data, CompressionSettings cs) { - final long nRows = cs.transposed ? data.getNumColumns() : data.getNumRows(); + final int nRows = cs.transposed ? data.getNumColumns() : data.getNumRows(); + final int nCols = cs.transposed ? data.getNumRows() : data.getNumColumns(); + final int nnzRows = (int) Math.ceil(data.getNonZeros() / nCols); - // Calculate the sample size. - // If the sample size is very small, set it to the minimum size - final int sampleSize = Math.max((int) Math.ceil(nRows * cs.samplingRatio), minimumSampleSize); + final double sampleRatio = cs.samplingRatio; + final int sampleSize = getSampleSize(sampleRatio, nRows, cs.minimumSampleSize); - CompressedSizeEstimator est; - if(cs.samplingRatio >= 1.0 || nRows < minimumSampleSize || sampleSize > nRows) - est = new CompressedSizeEstimatorExact(data, cs); - else - est = new CompressedSizeEstimatorSample(data, cs, sampleSize); + final CompressedSizeEstimator est = (shouldUseExactEstimator(cs, nRows, sampleSize, + nnzRows)) ? new CompressedSizeEstimatorExact(data, + cs) : tryToMakeSampleEstimator(data, cs, sampleRatio, sampleSize, nRows, nnzRows); LOG.debug("Estimating using: " + est); return est; } + + private static CompressedSizeEstimator tryToMakeSampleEstimator(MatrixBlock data, CompressionSettings cs, + double sampleRatio, int sampleSize, int nRows, int nnzRows) { + CompressedSizeEstimatorSample estS = new CompressedSizeEstimatorSample(data, cs, sampleSize); + while(estS.getSample() == null) { + LOG.warn("Doubling sample size"); + sampleSize = sampleSize * 2; + if(shouldUseExactEstimator(cs, nRows, sampleSize, nnzRows)) + return new CompressedSizeEstimatorExact(data, cs); + else + estS.sampleData(sampleSize); + } + return estS; + } + + private static boolean shouldUseExactEstimator(CompressionSettings cs, int nRows, int sampleSize, int nnzRows) { + return cs.samplingRatio >= 1.0 || nRows < cs.minimumSampleSize || sampleSize > nnzRows; + } + + private static int getSampleSize(double sampleRatio, int nRows, int minimumSampleSize) { + return Math.max((int) Math.ceil(nRows * sampleRatio), minimumSampleSize); + } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java index 3098394..9fe57d7 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java +++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java @@ -21,7 +21,6 @@ package org.apache.sysds.runtime.compress.estim; import java.util.HashMap; -import org.apache.sysds.runtime.DMLCompressionException; import org.apache.sysds.runtime.compress.CompressionSettings; import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType; import org.apache.sysds.runtime.compress.estim.sample.HassAndStokes; @@ -38,8 +37,8 @@ import org.apache.sysds.runtime.util.UtilFunctions; public class CompressedSizeEstimatorSample extends CompressedSizeEstimator { - private final int[] _sampleRows; - private final MatrixBlock _sample; + private int[] _sampleRows; + private MatrixBlock _sample; private HashMap<Integer, Double> _solveCache = null; /** @@ -51,12 +50,16 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator { */ public CompressedSizeEstimatorSample(MatrixBlock data, CompressionSettings cs, int sampleSize) { super(data, cs); - _sampleRows = CompressedSizeEstimatorSample.getSortedUniformSample(_numRows, sampleSize, _cs.seed); - _solveCache = new HashMap<>(); - _sample = sampleData(); + _sample = sampleData(sampleSize); } - protected MatrixBlock sampleData() { + public MatrixBlock getSample() { + return _sample; + } + + public MatrixBlock sampleData(int sampleSize) { + _sampleRows = CompressedSizeEstimatorSample.getSortedUniformSample(_numRows, sampleSize, _cs.seed); + _solveCache = new HashMap<>(); MatrixBlock sampledMatrixBlock; if(_data.isInSparseFormat() && !_cs.transposed) { sampledMatrixBlock = new MatrixBlock(_sampleRows.length, _data.getNumColumns(), true); @@ -77,18 +80,17 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator { select.appendValue(_sampleRows[i], 0, 1); sampledMatrixBlock = _data.removeEmptyOperations(new MatrixBlock(), !_cs.transposed, true, select); - } if(sampledMatrixBlock.isEmpty()) - throw new DMLCompressionException("Empty sample block"); - - return sampledMatrixBlock; + return null; + else + return sampledMatrixBlock; } @Override - public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes) { + public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes, int nrUniqueUpperBound) { final int sampleSize = _sampleRows.length; // final int numCols = colIndexes.length; final double scalingFactor = ((double) _numRows / sampleSize); @@ -109,7 +111,7 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator { // Estimate number of distinct values (incl fixes for anomalies w/ large sample fraction) int totalCardinality = getNumDistinctValues(ubm, _numRows, sampleSize, _solveCache); // Number of unique is trivially bounded by the sampled number of uniques and the number of rows. - totalCardinality = Math.min(Math.max(totalCardinality, fact.numVals), _numRows); + totalCardinality = Math.min(Math.min(Math.max(totalCardinality, fact.numVals), _numRows), nrUniqueUpperBound); // estimate number of non-zeros (conservatively round up) final double C = Math.max(1 - (double) fact.numSingle / sampleSize, (double) sampleSize / _numRows); @@ -127,7 +129,6 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator { for(IntArrayList a : ubm.getOffsetList()) if(a.size() > largestInstanceCount) largestInstanceCount = a.size(); - final boolean zeroIsMostFrequent = largestInstanceCount == numZerosInSample; @@ -307,12 +308,12 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator { /** * Returns a sorted array of n integers, drawn uniformly from the range [0,range). * - * @param range the range - * @param smplSize sample size + * @param range the range + * @param sampleSize sample size * @return sorted array of integers */ - private static int[] getSortedUniformSample(int range, int smplSize, long seed) { - return UtilFunctions.getSortedSampleIndexes(range, smplSize, seed); + private static int[] getSortedUniformSample(int range, int sampleSize, long seed) { + return UtilFunctions.getSortedSampleIndexes(range, sampleSize, seed); } @Override @@ -329,4 +330,5 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator { sb.append(_numRows); return sb.toString(); } + } diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java index 50aeb7e..8b984ff 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java +++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java @@ -27,6 +27,8 @@ import java.util.Set; import org.apache.commons.lang.NotImplementedException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.sysds.runtime.compress.CompressionSettings; +import org.apache.sysds.runtime.compress.cocode.PlanningCoCoder.PartitionerType; import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType; import org.apache.sysds.runtime.compress.colgroup.ColGroupSizes; @@ -59,7 +61,7 @@ public class CompressedSizeInfoColGroup { _cardinalityRatio = (double) numVals / numRows; _sizes = null; _bestCompressionType = null; - _minSize = ColGroupSizes.estimateInMemorySizeDDC(columns.length, numVals, numRows, false); + _minSize = ColGroupSizes.estimateInMemorySizeDDC(columns.length, numVals, numRows, 1.0, false); } /** @@ -106,6 +108,15 @@ public class CompressedSizeInfoColGroup { return _sizes.get(ct); } + public CompressionType getBestCompressionType(CompressionSettings cs) { + if(cs.columnPartitioner == PartitionerType.COST_MATRIX_MULT) { + // if(_sizes.get(CompressionType.SDC) * 0.8 < _sizes.get(_bestCompressionType)) + if(getMostCommonFraction() > 0.4) + return CompressionType.SDC; + } + return _bestCompressionType; + } + public CompressionType getBestCompressionType() { return _bestCompressionType; } @@ -171,26 +182,28 @@ public class CompressedSizeInfoColGroup { case DDC: // + 1 if the column contains zero return ColGroupSizes.estimateInMemorySizeDDC(numCols, - fact.numVals + (fact.numOffs < fact.numRows ? 1 : 0), fact.numRows, fact.lossy); + fact.numVals + (fact.numOffs < fact.numRows ? 1 : 0), fact.numRows, fact.tupleSparsity, fact.lossy); case RLE: return ColGroupSizes.estimateInMemorySizeRLE(numCols, fact.numVals, fact.numRuns, fact.numRows, - fact.lossy); + fact.tupleSparsity, fact.lossy); case OLE: return ColGroupSizes.estimateInMemorySizeOLE(numCols, fact.numVals, fact.numOffs + fact.numVals, - fact.numRows, fact.lossy); + fact.numRows, fact.tupleSparsity, fact.lossy); case UNCOMPRESSED: return ColGroupSizes.estimateInMemorySizeUncompressed(fact.numRows, numCols, fact.overAllSparsity); case SDC: if(fact.numOffs <= 1) return ColGroupSizes.estimateInMemorySizeSDCSingle(numCols, fact.numVals, fact.numRows, - fact.largestOff, fact.zeroIsMostFrequent, fact.containNoZeroValues, fact.lossy); + fact.largestOff, fact.zeroIsMostFrequent, fact.containNoZeroValues, fact.tupleSparsity, + fact.lossy); return ColGroupSizes.estimateInMemorySizeSDC(numCols, fact.numVals, fact.numRows, fact.largestOff, - fact.zeroIsMostFrequent, fact.containNoZeroValues, fact.lossy); + fact.zeroIsMostFrequent, fact.containNoZeroValues, fact.tupleSparsity, fact.lossy); case CONST: if(fact.numOffs == 0) return ColGroupSizes.estimateInMemorySizeEMPTY(numCols); else if(fact.numOffs == fact.numRows && fact.numVals == 1) - return ColGroupSizes.estimateInMemorySizeCONST(numCols, fact.numVals, fact.lossy); + return ColGroupSizes.estimateInMemorySizeCONST(numCols, fact.numVals, fact.tupleSparsity, + fact.lossy); else return -1; default: diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/BitmapEncoder.java b/src/main/java/org/apache/sysds/runtime/compress/lib/BitmapEncoder.java index 310b236..64b0bdd 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/BitmapEncoder.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/BitmapEncoder.java @@ -34,6 +34,7 @@ import org.apache.sysds.runtime.compress.utils.DblArrayIntListHashMap.DArrayILis import org.apache.sysds.runtime.compress.utils.DoubleIntListHashMap; import org.apache.sysds.runtime.compress.utils.DoubleIntListHashMap.DIListEntry; import org.apache.sysds.runtime.compress.utils.IntArrayList; +import org.apache.sysds.runtime.compress.utils.MultiColBitmap; import org.apache.sysds.runtime.data.SparseBlock; import org.apache.sysds.runtime.matrix.data.MatrixBlock; @@ -52,29 +53,28 @@ public class BitmapEncoder { * @param transposed Boolean specifying if the rawblock was transposed. * @return uncompressed bitmap representation of the columns */ - public static Bitmap extractBitmap(int[] colIndices, MatrixBlock rawBlock, boolean transposed) { - - Bitmap res = null; - if(colIndices.length == 1) { - res = extractBitmap(colIndices[0], rawBlock, transposed); - } - // multiple column selection (general case) - else { + public static ABitmap extractBitmap(int[] colIndices, MatrixBlock rawBlock, boolean transposed) { + try { final int numRows = transposed ? rawBlock.getNumColumns() : rawBlock.getNumRows(); - try { - res = extractBitmap(colIndices, ReaderColumnSelection.createReader(rawBlock, colIndices, transposed), numRows); - } - catch(Exception e) { - throw new DMLRuntimeException("Failed to extract bitmap", e); + if(rawBlock.isEmpty() && colIndices.length == 1) + return new Bitmap(null, null, numRows); + else if(colIndices.length == 1) + return extractBitmap(colIndices[0], rawBlock, transposed); + else if(rawBlock.isEmpty()) + return new MultiColBitmap(colIndices.length, null, null, numRows); + else { + ReaderColumnSelection reader = ReaderColumnSelection.createReader(rawBlock, colIndices, transposed); + return extractBitmap(colIndices, reader, numRows); } } - return res; - + catch(Exception e) { + throw new DMLRuntimeException("Failed to extract bitmap", e); + } } public static ABitmap extractBitmap(int[] colIndices, int rows, BitSet rawBlock, CompressionSettings compSettings) { ReaderColumnSelection reader = new ReaderColumnSelectionBitSet(rawBlock, rows, colIndices); - Bitmap res = extractBitmap(colIndices, reader, rows); + ABitmap res = extractBitmap(colIndices, reader, rows); return res; } @@ -88,7 +88,7 @@ public class BitmapEncoder { * @param transposed Boolean specifying if the rawBlock is transposed or not. * @return Bitmap containing the Information of the column. */ - private static Bitmap extractBitmap(int colIndex, MatrixBlock rawBlock, boolean transposed) { + private static ABitmap extractBitmap(int colIndex, MatrixBlock rawBlock, boolean transposed) { DoubleIntListHashMap hashMap = transposed ? extractHashMapTransposed(colIndex, rawBlock) : extractHashMap(colIndex, rawBlock); return makeBitmap(hashMap, transposed ? rawBlock.getNumColumns() : rawBlock.getNumRows()); @@ -187,7 +187,7 @@ public class BitmapEncoder { * @param numRows The number of contained rows * @return The Bitmap */ - protected static Bitmap extractBitmap(int[] colIndices, ReaderColumnSelection rowReader, int numRows) { + protected static ABitmap extractBitmap(int[] colIndices, ReaderColumnSelection rowReader, int numRows) { // probe map for distinct items (for value or value groups) DblArrayIntListHashMap distinctVals = new DblArrayIntListHashMap(); @@ -216,24 +216,24 @@ public class BitmapEncoder { * @param numCols Number of columns * @return The Bitmap. */ - private static Bitmap makeBitmap(DblArrayIntListHashMap distinctVals, int numRows, int numCols) { + private static ABitmap makeBitmap(DblArrayIntListHashMap distinctVals, int numRows, int numCols) { // added for one pass bitmap construction // Convert inputs to arrays ArrayList<DArrayIListEntry> mapEntries = distinctVals.extractValues(); if(!mapEntries.isEmpty()) { int numVals = distinctVals.size(); - double[] values = new double[numVals * numCols]; + double[][] values = new double[numVals][]; IntArrayList[] offsetsLists = new IntArrayList[numVals]; int bitmapIx = 0; for(DArrayIListEntry val : mapEntries) { - System.arraycopy(val.key.getData(), 0, values, bitmapIx * numCols, numCols); + values[bitmapIx] = val.key.getData(); offsetsLists[bitmapIx++] = val.value; } - return new Bitmap(numCols, offsetsLists, values, numRows); + return new MultiColBitmap(numCols, offsetsLists, values, numRows); } else - return new Bitmap(numCols, null, null, numRows); + return new MultiColBitmap(numCols, null, null, numRows); } @@ -248,7 +248,7 @@ public class BitmapEncoder { // added for one pass bitmap construction // Convert inputs to arrays int numVals = distinctVals.size(); - if(numVals > 0){ + if(numVals > 0) { double[] values = new double[numVals]; IntArrayList[] offsetsLists = new IntArrayList[numVals]; @@ -257,11 +257,11 @@ public class BitmapEncoder { values[bitmapIx] = val.key; offsetsLists[bitmapIx++] = val.value; } - - return new Bitmap(1, offsetsLists, values, numRows); + + return new Bitmap(offsetsLists, values, numRows); } - else{ - return new Bitmap(1, null, null, numRows); + else { + return new Bitmap(null, null, numRows); } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/BitmapLossyEncoder.java b/src/main/java/org/apache/sysds/runtime/compress/lib/BitmapLossyEncoder.java index 81789f1..4e74d69 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/BitmapLossyEncoder.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/BitmapLossyEncoder.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Queue; +import org.apache.commons.lang.NotImplementedException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.sysds.runtime.compress.CompressedMatrixBlock; @@ -68,20 +69,21 @@ public class BitmapLossyEncoder { * @param numRows The number of rows contained in the ubm. * @return A bitmap. */ - public static ABitmap makeBitmapLossy(Bitmap ubm, int numRows) { - final double[] fp = ubm.getValues(); - if(fp.length == 0) { - return ubm; - } - Stats stats = new Stats(fp); - // TODO make better decisions than just a 8 Bit encoding. - if(Double.isInfinite(stats.max) || Double.isInfinite(stats.min)) { - LOG.warn("Defaulting to incompressable colGroup"); - return ubm; - } - else { - return make8BitLossy(ubm, stats, numRows); - } + public static ABitmap makeBitmapLossy(ABitmap ubm, int numRows) { + throw new NotImplementedException(); + // final double[] fp = ubm.getValues(); + // if(fp.length == 0) { + // return ubm; + // } + // Stats stats = new Stats(fp); + // // TODO make better decisions than just a 8 Bit encoding. + // if(Double.isInfinite(stats.max) || Double.isInfinite(stats.min)) { + // LOG.warn("Defaulting to incompressable colGroup"); + // return ubm; + // } + // else { + // return make8BitLossy(ubm, stats, numRows); + // } } /** diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibLeftMultBy.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibLeftMultBy.java index 9c73f7c..ba22642 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibLeftMultBy.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibLeftMultBy.java @@ -255,7 +255,7 @@ public class CLALibLeftMultBy { ExecutorService pool = CommonThreadPool.get(k); // compute remaining compressed column groups in parallel ArrayList<Callable<Object>> tasks = new ArrayList<>(); - int rowBlockSize = 8; + int rowBlockSize = 1; if(overlapping) { for(int blo = 0; blo < that.getNumRows(); blo += rowBlockSize) { tasks.add(new LeftMatrixMatrixMultTask(colGroups, that, ret, blo, diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRightMultBy.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRightMultBy.java index 5b0a105..61e275c 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRightMultBy.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRightMultBy.java @@ -141,14 +141,14 @@ public class CLALibRightMultBy { ret.setOverlapping(true); if(containsNull) { - ColGroupEmpty cge = findEmptyColumnsAndMakeEmptyColGroup(retCg, ret.getNumColumns()); + ColGroupEmpty cge = findEmptyColumnsAndMakeEmptyColGroup(retCg, ret.getNumColumns(), ret.getNumRows()); if(cge != null) retCg.add(cge); } return ret; } - private static ColGroupEmpty findEmptyColumnsAndMakeEmptyColGroup(List<AColGroup> colGroups, int nCols) { + private static ColGroupEmpty findEmptyColumnsAndMakeEmptyColGroup(List<AColGroup> colGroups, int nCols, int nRows) { Set<Integer> emptyColumns = new HashSet<Integer>(nCols); for(int i = 0; i < nCols; i++) emptyColumns.add(i); @@ -159,7 +159,7 @@ public class CLALibRightMultBy { if(emptyColumns.size() != 0) { int[] emptyColumnsFinal = emptyColumns.stream().mapToInt(Integer::intValue).toArray(); - return new ColGroupEmpty(emptyColumnsFinal, colGroups.get(0).getNumRows()); + return new ColGroupEmpty(emptyColumnsFinal, nRows); } else return null; diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibSquash.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibSquash.java index aacea4a..fecd08e 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibSquash.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibSquash.java @@ -39,7 +39,6 @@ import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory; import org.apache.sysds.runtime.compress.colgroup.ColGroupValue; import org.apache.sysds.runtime.compress.readers.ReaderColumnSelection; import org.apache.sysds.runtime.compress.utils.ABitmap; -import org.apache.sysds.runtime.compress.utils.Bitmap; import org.apache.sysds.runtime.util.CommonThreadPool; public class CLALibSquash { @@ -123,18 +122,16 @@ public class CLALibSquash { map = extractBitmap(columnIds, m); } else - map = BitmapLossyEncoder.extractMapFromCompressedSingleColumn(m, - columnIds[0], - minMaxes[columnIds[0] * 2], + map = BitmapLossyEncoder.extractMapFromCompressedSingleColumn(m, columnIds[0], minMaxes[columnIds[0] * 2], minMaxes[columnIds[0] * 2 + 1], m.getNumRows()); - AColGroup newGroup = ColGroupFactory.compress(columnIds, m.getNumRows(), map, CompressionType.DDC, cs, m); + AColGroup newGroup = ColGroupFactory.compress(columnIds, m.getNumRows(), map, CompressionType.DDC, cs, m, 1); return newGroup; } private static ABitmap extractBitmap(int[] colIndices, CompressedMatrixBlock compressedBlock) { - Bitmap x = BitmapEncoder.extractBitmap(colIndices, - ReaderColumnSelection.createCompressedReader(compressedBlock, colIndices), compressedBlock.getNumRows()); + ABitmap x = BitmapEncoder.extractBitmap(colIndices, + ReaderColumnSelection.createCompressedReader(compressedBlock, colIndices), compressedBlock.getNumRows()); return BitmapLossyEncoder.makeBitmapLossy(x, compressedBlock.getNumRows()); } diff --git a/src/main/java/org/apache/sysds/runtime/compress/utils/Bitmap.java b/src/main/java/org/apache/sysds/runtime/compress/utils/Bitmap.java index 09354f6..db0c2b6 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/utils/Bitmap.java +++ b/src/main/java/org/apache/sysds/runtime/compress/utils/Bitmap.java @@ -34,8 +34,8 @@ public final class Bitmap extends ABitmap { */ private double[] _values; - public Bitmap(int numCols, IntArrayList[] offsetsLists, double[] values, int rows) { - super(numCols, offsetsLists, rows); + public Bitmap(IntArrayList[] offsetsLists, double[] values, int rows) { + super(1, offsetsLists, rows); _values = values; } @@ -48,33 +48,16 @@ public final class Bitmap extends ABitmap { return _values; } - /** - * Obtain tuple of column values associated with index. - * - * @param ix index of a particular distinct value - * @return the tuple of column values associated with the specified index - */ - public double[] getValues(int ix) { - return Arrays.copyOfRange(_values, ix * _numCols, (ix + 1) * _numCols); - } - public int getNumNonZerosInOffset(int idx){ - if(_numCols == 1) - return _values[0] != 0 ? 1 : 0; - int nz = 0; - for(int i = idx * _numCols; i < (idx+1) * _numCols; i++) - nz += _values[i] == 0 ? 0 : 1; - - return nz; + return _values[idx] != 0 ? 1 : 0; } public int getNumValues() { - return (_values == null) ? 0: _values.length / _numCols; + return (_values == null) ? 0: _values.length; } public void sortValuesByFrequency() { int numVals = getNumValues(); - int numCols = getNumColumns(); double[] freq = new double[numVals]; int[] pos = new int[numVals]; @@ -90,10 +73,10 @@ public final class Bitmap extends ABitmap { ArrayUtils.reverse(pos); // create new value and offset list arrays - double[] lvalues = new double[numVals * numCols]; + double[] lvalues = new double[numVals]; IntArrayList[] loffsets = new IntArrayList[numVals]; for(int i = 0; i < numVals; i++) { - System.arraycopy(_values, pos[i] * numCols, lvalues, i * numCols, numCols); + lvalues[i] = _values[pos[i]]; loffsets[i] = _offsetsLists[pos[i]]; } _values = lvalues; diff --git a/src/main/java/org/apache/sysds/runtime/compress/utils/Bitmap.java b/src/main/java/org/apache/sysds/runtime/compress/utils/MultiColBitmap.java similarity index 70% copy from src/main/java/org/apache/sysds/runtime/compress/utils/Bitmap.java copy to src/main/java/org/apache/sysds/runtime/compress/utils/MultiColBitmap.java index 09354f6..80d98df 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/utils/Bitmap.java +++ b/src/main/java/org/apache/sysds/runtime/compress/utils/MultiColBitmap.java @@ -27,14 +27,12 @@ import org.apache.sysds.runtime.util.SortUtils; /** * Uncompressed representation of one or more columns in bitmap format. */ -public final class Bitmap extends ABitmap { +public final class MultiColBitmap extends ABitmap { - /** - * Distinct values that appear in the column. Linearized as value groups <v11 v12> <v21 v22>. - */ - private double[] _values; + /** Distinct tuples that appear in the columnGroup */ + private double[][] _values; - public Bitmap(int numCols, IntArrayList[] offsetsLists, double[] values, int rows) { + public MultiColBitmap(int numCols, IntArrayList[] offsetsLists, double[][] values, int rows) { super(numCols, offsetsLists, rows); _values = values; } @@ -44,7 +42,7 @@ public final class Bitmap extends ABitmap { * * @return dictionary of value tuples */ - public double[] getValues() { + public double[][] getValues() { return _values; } @@ -55,29 +53,26 @@ public final class Bitmap extends ABitmap { * @return the tuple of column values associated with the specified index */ public double[] getValues(int ix) { - return Arrays.copyOfRange(_values, ix * _numCols, (ix + 1) * _numCols); + return _values[ix]; } - public int getNumNonZerosInOffset(int idx){ - if(_numCols == 1) - return _values[0] != 0 ? 1 : 0; + public int getNumNonZerosInOffset(int idx) { int nz = 0; - for(int i = idx * _numCols; i < (idx+1) * _numCols; i++) - nz += _values[i] == 0 ? 0 : 1; - + for(double v : getValues(idx)) + nz += v == 0 ? 0 : 1; + return nz; } public int getNumValues() { - return (_values == null) ? 0: _values.length / _numCols; + return (_values == null) ? 0 : _values.length; } public void sortValuesByFrequency() { - int numVals = getNumValues(); - int numCols = getNumColumns(); - - double[] freq = new double[numVals]; - int[] pos = new int[numVals]; + final int numVals = getNumValues(); + + final double[] freq = new double[numVals]; + final int[] pos = new int[numVals]; // populate the temporary arrays for(int i = 0; i < numVals; i++) { @@ -90,10 +85,10 @@ public final class Bitmap extends ABitmap { ArrayUtils.reverse(pos); // create new value and offset list arrays - double[] lvalues = new double[numVals * numCols]; + double[][] lvalues = new double[numVals][]; IntArrayList[] loffsets = new IntArrayList[numVals]; for(int i = 0; i < numVals; i++) { - System.arraycopy(_values, pos[i] * numCols, lvalues, i * numCols, numCols); + lvalues[i] = _values[pos[i]]; loffsets[i] = _offsetsLists[pos[i]]; } _values = lvalues; @@ -104,7 +99,9 @@ public final class Bitmap extends ABitmap { public String toString() { StringBuilder sb = new StringBuilder(); sb.append(super.toString()); - sb.append("\nValues: " + Arrays.toString(_values)); + sb.append("\nValues:"); + for(double[] vv : _values) + sb.append("\n" + Arrays.toString(vv)); return sb.toString(); } diff --git a/src/main/java/org/apache/sysds/utils/MemoryEstimates.java b/src/main/java/org/apache/sysds/utils/MemoryEstimates.java index 8401d1a..8e0bac8 100644 --- a/src/main/java/org/apache/sysds/utils/MemoryEstimates.java +++ b/src/main/java/org/apache/sysds/utils/MemoryEstimates.java @@ -29,6 +29,8 @@ package org.apache.sysds.utils; */ public class MemoryEstimates { + // private static final Log LOG = LogFactory.getLog(MemoryEstimates.class.getName()); + /** * Get the worst case memory usage of an java.util.BitSet java object. * diff --git a/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java b/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java index 94ba730..80df1e5 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java +++ b/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java @@ -228,9 +228,6 @@ public abstract class AbstractCompressedUnaryTests extends CompressedTestBase { } } - catch(NotImplementedException e) { - throw e; - } catch(Exception e) { e.printStackTrace(); throw new RuntimeException(this.toString() + "\n" + e.getMessage(), e); diff --git a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java index 1167512..7b9d871 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java +++ b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java @@ -78,7 +78,7 @@ public abstract class CompressedTestBase extends TestBase { protected static SparsityType[] usedSparsityTypes = new SparsityType[] {SparsityType.FULL, SparsityType.SPARSE,}; protected static ValueType[] usedValueTypes = new ValueType[] {ValueType.RAND_ROUND, ValueType.OLE_COMPRESSIBLE, - ValueType.RLE_COMPRESSIBLE,}; + ValueType.RLE_COMPRESSIBLE}; protected static ValueRange[] usedValueRanges = new ValueRange[] {ValueRange.SMALL, ValueRange.NEGATIVE, ValueRange.BYTE}; @@ -120,6 +120,9 @@ public abstract class CompressedTestBase extends TestBase { new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed).setTransposeInput("true") .setColumnPartitioner(PartitionerType.STATIC).setInvestigateEstimate(true), + new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed).setTransposeInput("true") + .setColumnPartitioner(PartitionerType.COST_MATRIX_MULT).setInvestigateEstimate(true), + // Forced Uncompressed tests new CompressionSettingsBuilder().setValidCompressions(EnumSet.of(CompressionType.UNCOMPRESSED)), @@ -302,6 +305,8 @@ public abstract class CompressedTestBase extends TestBase { for(OverLapping ov : overLapping) { tests.add(new Object[] {SparsityType.EMPTY, ValueType.RAND, ValueRange.BOOLEAN, cs, mt, ov}); tests.add(new Object[] {SparsityType.FULL, ValueType.CONST, ValueRange.LARGE, cs, mt, ov}); + tests.add( + new Object[] {SparsityType.FULL, ValueType.ONE_HOT_ENCODED, ValueRange.BOOLEAN, cs, mt, ov}); } return tests; } diff --git a/src/test/java/org/apache/sysds/test/component/compress/CompressibleInputGenerator.java b/src/test/java/org/apache/sysds/test/component/compress/CompressibleInputGenerator.java index 4666327..a8a2a3e 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/CompressibleInputGenerator.java +++ b/src/test/java/org/apache/sysds/test/component/compress/CompressibleInputGenerator.java @@ -82,6 +82,21 @@ public class CompressibleInputGenerator { return output; } + public static double[][] getInputOneHotMatrix(int rows, int cols, int seed) { + double[][] variations = new double[cols][]; + for(int i = 0; i < cols; i++) { + variations[i] = new double[cols]; + variations[i][i] = 1; + } + + double[][] matrix = new double[rows][]; + Random r = new Random(seed); + for(int i = 0; i < rows; i++) + matrix[i] = variations[r.nextInt(cols)]; + + return matrix; + } + private static double[][] rle(int rows, int cols, int nrUnique, int max, int min, double sparsity, int seed, boolean transpose) { diff --git a/src/test/java/org/apache/sysds/test/component/compress/TestBase.java b/src/test/java/org/apache/sysds/test/component/compress/TestBase.java index f2ebbba..29a2a87 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/TestBase.java +++ b/src/test/java/org/apache/sysds/test/component/compress/TestBase.java @@ -35,6 +35,7 @@ import org.apache.sysds.test.component.compress.TestConstants.ValueRange; import org.apache.sysds.test.component.compress.TestConstants.ValueType; public class TestBase { + // private static final Log LOG = LogFactory.getLog(TestBase.class.getName()); protected ValueType valType; protected ValueRange valRange; @@ -70,20 +71,23 @@ public class TestBase { this.min = this.max; // Do not Break, utilize the RAND afterwards. case RAND: - this.input = TestUtils.generateTestMatrix(rows, cols, min, max, sparsity, 7); + this.input = TestUtils.generateTestMatrix(rows, cols, min, max, sparsity, seed); break; case RAND_ROUND: - this.input = TestUtils.round(TestUtils.generateTestMatrix(rows, cols, min, max, sparsity, 7)); + this.input = TestUtils.round(TestUtils.generateTestMatrix(rows, cols, min, max, sparsity, seed)); break; case OLE_COMPRESSIBLE: // Note the Compressible Input generator, generates an already Transposed input // normally, therefore last argument is true, to build a non transposed matrix. this.input = CompressibleInputGenerator.getInputDoubleMatrix(rows, cols, CompressionType.OLE, - (max - min), max, min, sparsity, 7, true); + (max - min), max, min, sparsity, seed, true); break; case RLE_COMPRESSIBLE: this.input = CompressibleInputGenerator.getInputDoubleMatrix(rows, cols, CompressionType.RLE, - (max - min), max, min, sparsity, 7, true); + (max - min), max, min, sparsity, seed, true); + break; + case ONE_HOT_ENCODED: + this.input = CompressibleInputGenerator.getInputOneHotMatrix(rows, cols, seed); break; default: throw new NotImplementedException("Not Implemented Test Value type input generator"); @@ -94,6 +98,7 @@ public class TestBase { this.compressionSettings = compressionSettings.create(); mb = DataConverter.convertToMatrixBlock(this.input); + } catch(Exception e) { e.printStackTrace(); diff --git a/src/test/java/org/apache/sysds/test/component/compress/TestConstants.java b/src/test/java/org/apache/sysds/test/component/compress/TestConstants.java index c936c1b..649e96c 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/TestConstants.java +++ b/src/test/java/org/apache/sysds/test/component/compress/TestConstants.java @@ -38,6 +38,7 @@ public class TestConstants { RAND_ROUND, // Values rounded to nearest whole numbers. OLE_COMPRESSIBLE, // Ideal inputs for OLE Compression. RLE_COMPRESSIBLE, // Ideal inputs for RLE Compression. + ONE_HOT_ENCODED, } public enum MatrixTypology { diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java index c32ccbe..b370451 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java +++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java @@ -58,7 +58,7 @@ public abstract class JolEstimateTest { public abstract CompressionType getCT(); private final long actualSize; - // The actual compressed column group + private final int actualNumberUnique; private final AColGroup cg; public JolEstimateTest(MatrixBlock mbt) { @@ -72,8 +72,9 @@ public abstract class JolEstimateTest { .setValidCompressions(EnumSet.of(getCT())).create(); cs.transposed = true; ABitmap ubm = BitmapEncoder.extractBitmap(colIndexes, mbt, true); - cg = ColGroupFactory.compress(colIndexes, mbt.getNumColumns(), ubm, getCT(), cs, mbt); + cg = ColGroupFactory.compress(colIndexes, mbt.getNumColumns(), ubm, getCT(), cs, mbt, 1); actualSize = cg.estimateInMemorySize(); + actualNumberUnique = cg.getNumValues(); } catch(Exception e) { e.printStackTrace(); @@ -83,21 +84,7 @@ public abstract class JolEstimateTest { @Test public void compressedSizeInfoEstimatorExact() { - try { - CompressionSettings cs = new CompressionSettingsBuilder().setSamplingRatio(1.0) - .setValidCompressions(EnumSet.of(getCT())).setSeed(seed).create(); - cs.transposed = true; - - final long estimateCSI = CompressedSizeEstimatorFactory.getSizeEstimator(mbt, cs) - .estimateCompressedColGroupSize().getCompressionSize(cg.getCompType()); - - boolean res = Math.abs(estimateCSI - actualSize) <= 0; - assertTrue("CSI estimate " + estimateCSI + " should be exactly " + actualSize + "\n" + cg.toString(), res); - } - catch(Exception e) { - e.printStackTrace(); - assertTrue("Failed exact test " + getCT(), false); - } + compressedSizeInfoEstimatorSample(1.0, 1.0); } @Test @@ -107,48 +94,59 @@ public abstract class JolEstimateTest { @Test public void compressedSizeInfoEstimatorSample_50() { - compressedSizeInfoEstimatorSample(0.5, 0.90); + compressedSizeInfoEstimatorSample(0.5, 0.8); } @Test public void compressedSizeInfoEstimatorSample_20() { - compressedSizeInfoEstimatorSample(0.2, 0.8); + compressedSizeInfoEstimatorSample(0.2, 0.7); } - @Test - public void compressedSizeInfoEstimatorSample_10() { - compressedSizeInfoEstimatorSample(0.1, 0.75); - } + // @Test + // public void compressedSizeInfoEstimatorSample_10() { + // compressedSizeInfoEstimatorSample(0.1, 0.6); + // } - @Test - public void compressedSizeInfoEstimatorSample_5() { - compressedSizeInfoEstimatorSample(0.05, 0.7); - } + // @Test + // public void compressedSizeInfoEstimatorSample_5() { + // compressedSizeInfoEstimatorSample(0.05, 0.5); + // } - @Test - public void compressedSizeInfoEstimatorSample_1() { - compressedSizeInfoEstimatorSample(0.01, 0.6); - } + // @Test + // public void compressedSizeInfoEstimatorSample_1() { + // compressedSizeInfoEstimatorSample(0.01, 0.4); + // } public void compressedSizeInfoEstimatorSample(double ratio, double tolerance) { try { - if(mbt.getNumColumns() < CompressedSizeEstimatorFactory.minimumSampleSize) - return; // Skip the tests that anyway wouldn't use the sample based approach. CompressionSettings cs = new CompressionSettingsBuilder().setSamplingRatio(ratio) - .setValidCompressions(EnumSet.of(getCT())).setSeed(seed).create(); + .setValidCompressions(EnumSet.of(getCT())).setMinimumSampleSize(100).setSeed(seed).create(); cs.transposed = true; final CompressedSizeInfoColGroup cgsi = CompressedSizeEstimatorFactory.getSizeEstimator(mbt, cs) .estimateCompressedColGroupSize(); + + if(cg.getCompType() != CompressionType.UNCOMPRESSED && actualNumberUnique > 10) { + + final int estimateNUniques = cgsi.getNumVals(); + final double minToleranceNUniques = actualNumberUnique * tolerance; + final double maxToleranceNUniques = actualNumberUnique / tolerance; + final String uniqueString = minToleranceNUniques + " < " + estimateNUniques + " < " + + maxToleranceNUniques; + final boolean withinToleranceOnNUniques = minToleranceNUniques <= actualNumberUnique && + actualNumberUnique <= maxToleranceNUniques; + assertTrue("CSI Sampled estimate of number of unique values not in range " + uniqueString + "\n" + cgsi, + withinToleranceOnNUniques); + } + final long estimateCSI = cgsi.getCompressionSize(cg.getCompType()); final double minTolerance = actualSize * tolerance; final double maxTolerance = actualSize / tolerance; final String rangeString = minTolerance + " < " + estimateCSI + " < " + maxTolerance; - boolean res = minTolerance < estimateCSI && estimateCSI < maxTolerance; - assertTrue( - "CSI Sampled estimate is not in tolerance range " + rangeString + "\n" + cgsi + "\n" + cg.toString(), - res); + final boolean withinToleranceOnSize = minTolerance <= estimateCSI && estimateCSI <= maxTolerance; + assertTrue("CSI Sampled estimate is not in tolerance range " + rangeString + " Actual number uniques:" + + actualNumberUnique + "\n" + cgsi, withinToleranceOnSize); } catch(Exception e) { diff --git a/src/test/java/org/apache/sysds/test/component/compress/estim/SampleEstimatorTest.java b/src/test/java/org/apache/sysds/test/component/compress/estim/SampleEstimatorTest.java new file mode 100644 index 0000000..fba1088 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/compress/estim/SampleEstimatorTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.compress.estim; + +import static org.junit.Assert.assertTrue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.sysds.runtime.compress.CompressionSettings; +import org.apache.sysds.runtime.compress.CompressionSettingsBuilder; +import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator; +import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimatorFactory; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.util.DataConverter; +import org.apache.sysds.test.TestUtils; +import org.junit.Test; + +public class SampleEstimatorTest { + + protected static final Log LOG = LogFactory.getLog(SampleEstimatorTest.class.getName()); + + private static final int seed = 1512314; + + final MatrixBlock mbt; + + public SampleEstimatorTest() { + // matrix block 2 columns + mbt = DataConverter + .convertToMatrixBlock(TestUtils.round(TestUtils.generateTestMatrix(2, 500000, 0, 299, 1.0, seed + 1))); + } + + @Test + public void compressedSizeInfoEstimatorFull() { + testSampleEstimateIsAtMaxEstimatedElementsInEachColumnsProduct(1.0, 1.0); + } + + @Test + public void compressedSizeInfoEstimatorSample_90() { + testSampleEstimateIsAtMaxEstimatedElementsInEachColumnsProduct(0.9, 0.9); + } + + @Test + public void compressedSizeInfoEstimatorSample_50() { + testSampleEstimateIsAtMaxEstimatedElementsInEachColumnsProduct(0.5, 0.90); + } + + @Test + public void compressedSizeInfoEstimatorSample_20() { + testSampleEstimateIsAtMaxEstimatedElementsInEachColumnsProduct(0.2, 0.8); + } + + @Test + public void compressedSizeInfoEstimatorSample_10() { + testSampleEstimateIsAtMaxEstimatedElementsInEachColumnsProduct(0.1, 0.75); + } + + @Test + public void compressedSizeInfoEstimatorSample_5() { + testSampleEstimateIsAtMaxEstimatedElementsInEachColumnsProduct(0.05, 0.7); + } + + @Test + public void compressedSizeInfoEstimatorSample_1() { + testSampleEstimateIsAtMaxEstimatedElementsInEachColumnsProduct(0.01, 0.6); + } + + @Test + public void compressedSizeInfoEstimatorSample_p1() { + testSampleEstimateIsAtMaxEstimatedElementsInEachColumnsProduct(0.001, 0.5); + } + + /** + * This test verify that the estimated number or unique values in individual columns is adhered to when analyzing + * multi columns. + * + * The important part here is not if the number of unique elements is estimated correctly, but that the relation + * between observations is preserved. + * + * @param ratio Ratio to sample + * @param tolerance A percentage tolerance in number of unique element estimated + */ + private void testSampleEstimateIsAtMaxEstimatedElementsInEachColumnsProduct(double ratio, double tolerance) { + + final CompressionSettings cs_estimate = new CompressionSettingsBuilder().setMinimumSampleSize(100) + .setSamplingRatio(ratio).setSeed(seed).create(); + + cs_estimate.transposed = true; + + final CompressedSizeEstimator estimate = CompressedSizeEstimatorFactory.getSizeEstimator(mbt, cs_estimate); + final int estimate_1 = estimate.estimateCompressedColGroupSize(new int[] {0}).getNumVals() + 1; + final int estimate_2 = estimate.estimateCompressedColGroupSize(new int[] {1}).getNumVals() + 1; + + final int estimate_full = estimate.estimateCompressedColGroupSize(new int[] {0, 1}, estimate_1 * estimate_2) + .getNumVals(); + assertTrue( + "Estimate of all columns should be upper bounded by distinct of each column multiplied: " + estimate_full + + " * " + tolerance + " <= " + estimate_1 * estimate_2, + estimate_full * tolerance <= estimate_1 * estimate_2); + + } + +}
