This is an automated email from the ASF dual-hosted git repository. baunsgaard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push: new 48de384bb3 [MINOR] Update cocode algorithms for CLA 48de384bb3 is described below commit 48de384bb3dca3e63f35b654e907e9ecaf5d747c Author: Sebastian Baunsgaard <baunsga...@apache.org> AuthorDate: Tue Apr 9 20:16:50 2024 +0200 [MINOR] Update cocode algorithms for CLA This commit adds a new memorizer that rely on an array in the size of number of columns to compress, instead of a hashmap with all. The memory footprint is the same, but the performance is very much improved because it allows constant time deletion of all memorized column groups that contains a combination with the given specific columns. The technique first allocate an array in size number of columns each index get its own hashmap. containing the columngroup associated with it. then when combining columnsgroups, the lowest index of all columns combined determine which array index hash map to add the combined index into. Once a combination is chosen, the buckets of the lowest index of each column group combined is reset, and the combined columngroup is inserted. The result is constant time O(1) deletion and insertion in the memorizer --- .../runtime/compress/cocode/AColumnCoCoder.java | 7 +- .../runtime/compress/cocode/CoCodeGreedy.java | 36 +++++++--- .../runtime/compress/cocode/CoCodeHybrid.java | 33 ++++++---- .../runtime/compress/cocode/CoCodePriorityQue.java | 43 ++++++------ .../runtime/compress/cocode/CoCoderFactory.java | 23 +++++-- .../sysds/runtime/compress/cocode/ColIndexes.java | 4 +- .../sysds/runtime/compress/cocode/Memorizer.java | 13 ++-- .../cocode/{Memorizer.java => MemorizerV2.java} | 53 ++++++++------- .../sysds/runtime/compress/estim/AComEst.java | 76 +++++++++++++--------- .../compress/estim/CompressedSizeInfoColGroup.java | 21 ++++++ 10 files changed, 196 insertions(+), 113 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java index fc13e16f65..cfe1b1b55e 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java @@ -26,6 +26,10 @@ import org.apache.sysds.runtime.compress.cost.ACostEstimate; import org.apache.sysds.runtime.compress.estim.AComEst; import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo; +/** + * Main abstract class for the co-coding of columns to combine different compression statistics and calculate the + * combinations of columns + */ public abstract class AColumnCoCoder { protected static final Log LOG = LogFactory.getLog(AColumnCoCoder.class.getName()); @@ -34,8 +38,7 @@ public abstract class AColumnCoCoder { protected final ACostEstimate _cest; protected final CompressionSettings _cs; - protected AColumnCoCoder(AComEst sizeEstimator, ACostEstimate costEstimator, - CompressionSettings cs) { + protected AColumnCoCoder(AComEst sizeEstimator, ACostEstimate costEstimator, CompressionSettings cs) { _sest = sizeEstimator; _cest = costEstimator; _cs = cs; diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java index d5d6c6936e..45f5654ab2 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java @@ -37,14 +37,14 @@ import org.apache.sysds.runtime.util.CommonThreadPool; public class CoCodeGreedy extends AColumnCoCoder { - private final Memorizer mem; + private final MemorizerV2 mem; protected CoCodeGreedy(AComEst sizeEstimator, ACostEstimate costEstimator, CompressionSettings cs) { super(sizeEstimator, costEstimator, cs); - mem = new Memorizer(sizeEstimator); + mem = new MemorizerV2(sizeEstimator, sizeEstimator.getNumColumns()); } - protected CoCodeGreedy(AComEst sizeEstimator, ACostEstimate costEstimator, CompressionSettings cs, Memorizer mem) { + protected CoCodeGreedy(AComEst sizeEstimator, ACostEstimate costEstimator, CompressionSettings cs, MemorizerV2 mem) { super(sizeEstimator, costEstimator, cs); this.mem = mem; } @@ -93,16 +93,22 @@ public class CoCodeGreedy extends AColumnCoCoder { for(int j = i + 1; j < workSet.size(); j++) { final ColIndexes c1 = workSet.get(i); final ColIndexes c2 = workSet.get(j); - final double costC1 = _cest.getCost(mem.get(c1)); - final double costC2 = _cest.getCost(mem.get(c2)); + final CompressedSizeInfoColGroup c1i = mem.get(c1); + final CompressedSizeInfoColGroup c2i = mem.get(c2); + + final double costC1 = _cest.getCost(c1i); + final double costC2 = _cest.getCost(c2i); mem.incst1(); + final int maxCombined = c1i.getNumVals() * c2i.getNumVals(); // Pruning filter : skip dominated candidates // Since even if the entire size of one of the column lists is removed, // it still does not improve compression. // In the case of workload we relax the requirement for the filter. - if(-Math.min(costC1, costC2) > changeInCost) + if(-Math.min(costC1, costC2) > changeInCost // change in cost cannot possibly be better. + || (maxCombined < 0) // int overflow + || (maxCombined > c1i.getNumRows())) // higher combined number of rows. continue; // Combine the two column groups. @@ -206,10 +212,20 @@ public class CoCodeGreedy extends AColumnCoCoder { } @Override - public Object call() { - final IColIndex c = _c1._indexes.combine(_c2._indexes); - final ColIndexes cI = new ColIndexes(c); - mem.getOrCreate(cI, _c1, _c2); + public Object call() throws Exception { + final CompressedSizeInfoColGroup c1i = mem.get(_c1); + final CompressedSizeInfoColGroup c2i = mem.get(_c2); + if(c1i != null && c2i != null) { + final int maxCombined = c1i.getNumVals() * c2i.getNumVals(); + + if(maxCombined < 0 // int overflow + || maxCombined > c1i.getNumRows()) // higher combined than number of rows. + return null; + + final IColIndex c = _c1._indexes.combine(_c2._indexes); + final ColIndexes cI = new ColIndexes(c); + mem.getOrCreate(cI, _c1, _c2); + } return null; } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeHybrid.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeHybrid.java index 6dc53739d2..c2d3dc9667 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeHybrid.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeHybrid.java @@ -21,6 +21,7 @@ package org.apache.sysds.runtime.compress.cocode; import org.apache.sysds.runtime.compress.CompressionSettings; import org.apache.sysds.runtime.compress.cost.ACostEstimate; +import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator; import org.apache.sysds.runtime.compress.estim.AComEst; import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo; import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing; @@ -37,33 +38,43 @@ public class CoCodeHybrid extends AColumnCoCoder { @Override protected CompressedSizeInfo coCodeColumns(CompressedSizeInfo colInfos, int k) { final int startSize = colInfos.getInfo().size(); + final int pqColumnThreashold = Math.max(128, (_sest.getNumColumns() / startSize) * 100); + LOG.error(pqColumnThreashold); + if(startSize == 1) return colInfos; // nothing to join when there only is one column else if(startSize <= 16) {// Greedy all compare all if small number of columns - LOG.debug("Hybrid chose to do greedy cocode because of few columns"); + LOG.debug("Hybrid chose to do greedy CoCode because of few columns"); CoCodeGreedy gd = new CoCodeGreedy(_sest, _cest, _cs); return colInfos.setInfo(gd.combine(colInfos.getInfo(), k)); } - else if(startSize > 1000) - return colInfos.setInfo(CoCodePriorityQue.join(colInfos.getInfo(), _sest, _cest, 1, k)); - LOG.debug("Using Hybrid Cocode Strategy: "); + else if(startSize > 1000) { + CoCodePriorityQue pq = new CoCodePriorityQue(_sest, _cest, _cs, pqColumnThreashold); + + return colInfos.setInfo(pq.join(colInfos.getInfo(), 1, k)); + } + LOG.debug("Using Hybrid CoCode Strategy: "); final int PriorityQueGoal = startSize / 5; if(PriorityQueGoal > 30) { // hybrid if there is a large number of columns to begin with Timing time = new Timing(true); - colInfos.setInfo(CoCodePriorityQue.join(colInfos.getInfo(), _sest, _cest, PriorityQueGoal, k)); - LOG.debug("Que based time: " + time.stop()); + CoCodePriorityQue pq = new CoCodePriorityQue(_sest, _cest, _cs, pqColumnThreashold); + colInfos.setInfo(pq.join(colInfos.getInfo(), PriorityQueGoal, k)); final int pqSize = colInfos.getInfo().size(); - if(pqSize <= PriorityQueGoal * 2) { - time = new Timing(true); + + LOG.debug("Que based time: " + time.stop()); + if(pqSize < PriorityQueGoal || (pqSize < startSize && _cest instanceof ComputationCostEstimator)) { CoCodeGreedy gd = new CoCodeGreedy(_sest, _cest, _cs); colInfos.setInfo(gd.combine(colInfos.getInfo(), k)); LOG.debug("Greedy time: " + time.stop()); } return colInfos; } - else // If somewhere in between use the que based approach only. - return colInfos.setInfo(CoCodePriorityQue.join(colInfos.getInfo(), _sest, _cest, 1, k)); - + else { + LOG.debug("Using only Greedy based since Nr Column groups: " + startSize + " is not large enough"); + CoCodeGreedy gd = new CoCodeGreedy(_sest, _cest, _cs); + colInfos.setInfo(gd.combine(colInfos.getInfo(), k)); + return colInfos; + } } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java index b600c697db..ca7135c262 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java @@ -48,27 +48,30 @@ public class CoCodePriorityQue extends AColumnCoCoder { private static final int COL_COMBINE_THRESHOLD = 1024; - protected CoCodePriorityQue(AComEst sizeEstimator, ACostEstimate costEstimator, CompressionSettings cs) { + private final int lastCombineThreshold; + + protected CoCodePriorityQue(AComEst sizeEstimator, ACostEstimate costEstimator, CompressionSettings cs, + int lastCombineThreshold) { super(sizeEstimator, costEstimator, cs); + this.lastCombineThreshold = lastCombineThreshold; } @Override protected CompressedSizeInfo coCodeColumns(CompressedSizeInfo colInfos, int k) { - colInfos.setInfo(join(colInfos.getInfo(), _sest, _cest, 1, k)); + colInfos.setInfo(join(colInfos.getInfo(), 1, k)); return colInfos; } - protected static List<CompressedSizeInfoColGroup> join(List<CompressedSizeInfoColGroup> groups, AComEst sEst, - ACostEstimate cEst, int minNumGroups, int k) { + protected List<CompressedSizeInfoColGroup> join(List<CompressedSizeInfoColGroup> groups, int minNumGroups, int k) { if(groups.size() > COL_COMBINE_THRESHOLD && k > 1) - return combineMultiThreaded(groups, sEst, cEst, minNumGroups, k); + return combineMultiThreaded(groups, _sest, _cest, minNumGroups, k); else - return combineSingleThreaded(groups, sEst, cEst, minNumGroups); + return combineSingleThreaded(groups, _sest, _cest, minNumGroups); } - private static List<CompressedSizeInfoColGroup> combineMultiThreaded(List<CompressedSizeInfoColGroup> groups, - AComEst sEst, ACostEstimate cEst, int minNumGroups, int k) { + private List<CompressedSizeInfoColGroup> combineMultiThreaded(List<CompressedSizeInfoColGroup> groups, AComEst sEst, + ACostEstimate cEst, int minNumGroups, int k) { final ExecutorService pool = CommonThreadPool.get(k); try { final List<PQTask> tasks = new ArrayList<>(); @@ -90,18 +93,18 @@ public class CoCodePriorityQue extends AColumnCoCoder { catch(Exception e) { throw new DMLCompressionException("Failed parallel priority que cocoding", e); } - finally{ + finally { pool.shutdown(); } } - private static List<CompressedSizeInfoColGroup> combineSingleThreaded(List<CompressedSizeInfoColGroup> groups, - AComEst sEst, ACostEstimate cEst, int minNumGroups) { + private List<CompressedSizeInfoColGroup> combineSingleThreaded(List<CompressedSizeInfoColGroup> groups, AComEst sEst, + ACostEstimate cEst, int minNumGroups) { return combineBlock(groups, 0, groups.size(), sEst, cEst, minNumGroups); } - private static List<CompressedSizeInfoColGroup> combineBlock(List<CompressedSizeInfoColGroup> groups, int start, - int end, AComEst sEst, ACostEstimate cEst, int minNumGroups) { + private List<CompressedSizeInfoColGroup> combineBlock(List<CompressedSizeInfoColGroup> groups, int start, int end, + AComEst sEst, ACostEstimate cEst, int minNumGroups) { Queue<CompressedSizeInfoColGroup> que = getQue(end - start, cEst); for(int i = start; i < end; i++) { @@ -113,7 +116,7 @@ public class CoCodePriorityQue extends AColumnCoCoder { return combineBlock(que, sEst, cEst, minNumGroups); } - private static List<CompressedSizeInfoColGroup> combineBlock(Queue<CompressedSizeInfoColGroup> que, AComEst sEst, + private List<CompressedSizeInfoColGroup> combineBlock(Queue<CompressedSizeInfoColGroup> que, AComEst sEst, ACostEstimate cEst, int minNumGroups) { List<CompressedSizeInfoColGroup> ret = new ArrayList<>(); @@ -133,21 +136,21 @@ public class CoCodePriorityQue extends AColumnCoCoder { if(costOfJoin < costIndividual) { que.poll(); int numColumns = g.getColumns().size(); - if(numColumns > 128){ + if(numColumns > lastCombineThreshold) { lastCombine++; ret.add(g); } - else{ + else { lastCombine = 0; que.add(g); } } - else{ + else { lastCombine++; ret.add(l); } } - else{ + else { lastCombine++; ret.add(l); } @@ -155,7 +158,7 @@ public class CoCodePriorityQue extends AColumnCoCoder { l = que.poll(); groupNr = ret.size() + que.size(); } - while(que.peek() != null){ + while(que.peek() != null) { // empty que ret.add(l); l = que.poll(); @@ -180,7 +183,7 @@ public class CoCodePriorityQue extends AColumnCoCoder { return cEst.getCost(x) + x.getColumns().avgOfIndex() / 100000; } - protected static class PQTask implements Callable<List<CompressedSizeInfoColGroup>> { + protected class PQTask implements Callable<List<CompressedSizeInfoColGroup>> { private final List<CompressedSizeInfoColGroup> _groups; private final int _start; diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCoderFactory.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCoderFactory.java index abd12d3f6a..6c560fb979 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCoderFactory.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCoderFactory.java @@ -63,16 +63,18 @@ public interface CoCoderFactory { AColumnCoCoder co = createColumnGroupPartitioner(cs.columnPartitioner, est, costEstimator, cs); // Find out if any of the groups are empty. - final boolean containsEmptyOrConst = containsEmptyOrConst(colInfos); + final boolean containsEmptyConstOrIncompressable = containsEmptyConstOrIncompressable(colInfos); // if there are no empty or const columns then try cocode algorithms for all columns - if(!containsEmptyOrConst) + if(!containsEmptyConstOrIncompressable) return co.coCodeColumns(colInfos, k); else { // filtered empty groups final List<IColIndex> emptyCols = new ArrayList<>(); // filtered const groups final List<IColIndex> constCols = new ArrayList<>(); + // incompressable groups + final List<IColIndex> incompressable = new ArrayList<>(); // filtered groups -- in the end starting with all groups final List<CompressedSizeInfoColGroup> groups = new ArrayList<>(); @@ -85,13 +87,15 @@ public interface CoCoderFactory { emptyCols.add(g.getColumns()); else if(g.isConst()) constCols.add(g.getColumns()); + else if(g.isIncompressable()) + incompressable.add(g.getColumns()); else groups.add(g); } // overwrite groups. colInfos.compressionInfo = groups; - + // cocode remaining groups if(!groups.isEmpty()) { colInfos = co.coCodeColumns(colInfos, k); @@ -109,14 +113,19 @@ public interface CoCoderFactory { colInfos.compressionInfo.add(new CompressedSizeInfoColGroup(idx, nRow, CompressionType.CONST)); } + if(incompressable.size() > 0) { + final IColIndex idx = ColIndexFactory.combineIndexes(incompressable); + colInfos.compressionInfo.add(new CompressedSizeInfoColGroup(idx, nRow, CompressionType.UNCOMPRESSED)); + } + return colInfos; } } - private static boolean containsEmptyOrConst(CompressedSizeInfo colInfos) { + private static boolean containsEmptyConstOrIncompressable(CompressedSizeInfo colInfos) { for(CompressedSizeInfoColGroup g : colInfos.compressionInfo) - if(g.isEmpty() || g.isConst()) + if(g.isEmpty() || g.isConst() || g.isIncompressable()) return true; return false; } @@ -133,9 +142,9 @@ public interface CoCoderFactory { case STATIC: return new CoCodeStatic(est, costEstimator, cs); case PRIORITY_QUE: - return new CoCodePriorityQue(est, costEstimator, cs); + return new CoCodePriorityQue(est, costEstimator, cs, 128); default: - throw new RuntimeException("Unsupported column group partitioner: " + type.toString()); + throw new RuntimeException("Unsupported column group partition technique: " + type.toString()); } } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/ColIndexes.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/ColIndexes.java index dcdcbe464c..910c640d72 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/ColIndexes.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/ColIndexes.java @@ -42,9 +42,9 @@ public class ColIndexes { } public boolean contains(ColIndexes a, ColIndexes b) { - if(a == null || b == null) return false; - return _indexes.contains(a._indexes.get(0)) || _indexes.contains(b._indexes.get(0)); + return _indexes.contains(a._indexes.get(0)) // + || _indexes.contains(b._indexes.get(0)); } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/Memorizer.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/Memorizer.java index db77a32bf6..9ac0d5c948 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/Memorizer.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/Memorizer.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; +import org.apache.sysds.runtime.compress.DMLCompressionException; import org.apache.sysds.runtime.compress.estim.AComEst; import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; @@ -50,8 +51,6 @@ public class Memorizer { } public void remove(ColIndexes c1, ColIndexes c2) { - mem.remove(c1); - mem.remove(c2); Iterator<Entry<ColIndexes, CompressedSizeInfoColGroup>> i = mem.entrySet().iterator(); while(i.hasNext()) { final ColIndexes eci = i.next().getKey(); @@ -60,7 +59,7 @@ public class Memorizer { } } - public CompressedSizeInfoColGroup getOrCreate(ColIndexes cI, ColIndexes c1, ColIndexes c2){ + public CompressedSizeInfoColGroup getOrCreate(ColIndexes cI, ColIndexes c1, ColIndexes c2) { CompressedSizeInfoColGroup g = mem.get(cI); st2++; if(g == null) { @@ -69,7 +68,11 @@ public class Memorizer { if(left != null && right != null) { st3++; g = _sEst.combine(cI._indexes, left, right); - + if(g != null) { + if(g.getNumVals() < 0) + throw new DMLCompressionException( + "Combination returned less distinct values on: \n" + left + "\nand\n" + right + "\nEq\n" + g); + } synchronized(this) { mem.put(cI, g); } @@ -88,7 +91,7 @@ public class Memorizer { } public String stats() { - return " possible: " + st1 + " requests: " + st2 + " combined: " + st3 + " outSecond: "+ st4; + return " possible: " + st1 + " requests: " + st2 + " combined: " + st3 + " outSecond: " + st4; } public void resetStats() { diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/Memorizer.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/MemorizerV2.java similarity index 63% copy from src/main/java/org/apache/sysds/runtime/compress/cocode/Memorizer.java copy to src/main/java/org/apache/sysds/runtime/compress/cocode/MemorizerV2.java index db77a32bf6..b63a3657fc 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/cocode/Memorizer.java +++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/MemorizerV2.java @@ -20,58 +20,63 @@ package org.apache.sysds.runtime.compress.cocode; import java.util.HashMap; -import java.util.Iterator; import java.util.Map; -import java.util.Map.Entry; +import org.apache.sysds.runtime.compress.DMLCompressionException; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; import org.apache.sysds.runtime.compress.estim.AComEst; import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; -public class Memorizer { +public class MemorizerV2 { private final AComEst _sEst; - private final Map<ColIndexes, CompressedSizeInfoColGroup> mem; + + private final Map<ColIndexes, CompressedSizeInfoColGroup>[] mem; private int st1 = 0, st2 = 0, st3 = 0, st4 = 0; - public Memorizer(AComEst sEst) { + @SuppressWarnings("unchecked") + public MemorizerV2(AComEst sEst, int nCol) { _sEst = sEst; - mem = new HashMap<>(); + mem = new Map[nCol]; } public void put(CompressedSizeInfoColGroup g) { - mem.put(new ColIndexes(g.getColumns()), g); + put(new ColIndexes(g.getColumns()), g); } public void put(ColIndexes key, CompressedSizeInfoColGroup val) { - mem.put(key, val); + final IColIndex gi = key._indexes; + final int bucketID = gi.get(0); + Map<ColIndexes, CompressedSizeInfoColGroup> bucket = mem[bucketID]; + if(bucket == null) + bucket = mem[bucketID] = new HashMap<>(); + bucket.put(key, val); } public CompressedSizeInfoColGroup get(ColIndexes c) { - return mem.get(c); + return mem[c._indexes.get(0)].get(c); } public void remove(ColIndexes c1, ColIndexes c2) { - mem.remove(c1); - mem.remove(c2); - Iterator<Entry<ColIndexes, CompressedSizeInfoColGroup>> i = mem.entrySet().iterator(); - while(i.hasNext()) { - final ColIndexes eci = i.next().getKey(); - if(eci.contains(c1, c2)) - i.remove(); - } + mem[c1._indexes.get(0)] = null; + mem[c2._indexes.get(0)] = null; } - public CompressedSizeInfoColGroup getOrCreate(ColIndexes cI, ColIndexes c1, ColIndexes c2){ - CompressedSizeInfoColGroup g = mem.get(cI); + public CompressedSizeInfoColGroup getOrCreate(ColIndexes cI, ColIndexes c1, ColIndexes c2) { + CompressedSizeInfoColGroup g = get(cI); st2++; if(g == null) { - final CompressedSizeInfoColGroup left = mem.get(c1); - final CompressedSizeInfoColGroup right = mem.get(c2); + final CompressedSizeInfoColGroup left = get(c1); + final CompressedSizeInfoColGroup right = get(c2); if(left != null && right != null) { st3++; g = _sEst.combine(cI._indexes, left, right); - + if(g != null) { + if(g.getNumVals() < 0) + throw new DMLCompressionException( + "Combination returned less distinct values on: \n" + left + "\nand\n" + right + "\nEq\n" + g); + } synchronized(this) { - mem.put(cI, g); + put(cI, g); } } @@ -88,7 +93,7 @@ public class Memorizer { } public String stats() { - return " possible: " + st1 + " requests: " + st2 + " combined: " + st3 + " outSecond: "+ st4; + return " possible: " + st1 + " requests: " + st2 + " combined: " + st3 + " outSecond: " + st4; } public void resetStats() { diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/AComEst.java b/src/main/java/org/apache/sysds/runtime/compress/estim/AComEst.java index 03cf173a13..832725f328 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/estim/AComEst.java +++ b/src/main/java/org/apache/sysds/runtime/compress/estim/AComEst.java @@ -22,7 +22,6 @@ package org.apache.sysds.runtime.compress.estim; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -63,11 +62,21 @@ public abstract class AComEst { _cs = cs; } - protected int getNumRows() { + /** + * Get the number of rows in the overall compressing block. + * + * @return The number of rows + */ + public int getNumRows() { return _cs.transposed ? _data.getNumColumns() : _data.getNumRows(); } - protected int getNumColumns() { + /** + * Get the number of columns in the overall compressing block. + * + * @return The number of cols + */ + public int getNumColumns() { return _cs.transposed ? _data.getNumRows() : _data.getNumColumns(); } @@ -221,6 +230,13 @@ public abstract class AComEst { protected abstract CompressedSizeInfoColGroup combine(IColIndex combinedColumns, CompressedSizeInfoColGroup g1, CompressedSizeInfoColGroup g2, int maxDistinct); + /** + * Collect the compressed size for all individual columns using the available k parallelism degree. + * + * @param clen The number of total columns + * @param k The parallelization degree + * @return A list of the individual columns compressibility. + */ protected List<CompressedSizeInfoColGroup> CompressedSizeInfoColGroup(int clen, int k) { if(k <= 1) return CompressedSizeInfoColGroupSingleThread(clen); @@ -228,6 +244,12 @@ public abstract class AComEst { return CompressedSizeInfoColGroupParallel(clen, k); } + /** + * Compress the column groups using a single thread. + * + * @param clen the number of total columns + * @return A list of the individual columns compressibility. + */ private List<CompressedSizeInfoColGroup> CompressedSizeInfoColGroupSingleThread(int clen) { List<CompressedSizeInfoColGroup> ret = new ArrayList<>(clen); if(!_cs.transposed && !_data.isEmpty() && _data.isInSparseFormat()) @@ -237,6 +259,13 @@ public abstract class AComEst { return ret; } + /** + * Collect the compressed size for all individual columns using the available k parallelism degree. + * + * @param clen The number of total columns + * @param k The parallelization degree + * @return A list of the individual columns compressibility. + */ private List<CompressedSizeInfoColGroup> CompressedSizeInfoColGroupParallel(int clen, int k) { final ExecutorService pool = CommonThreadPool.get(k); try { @@ -249,15 +278,22 @@ public abstract class AComEst { CompressedSizeInfoColGroup[] res = new CompressedSizeInfoColGroup[clen]; final int blkz = Math.max(1, clen / (k * 10)); - final ArrayList<SizeEstimationTask> tasks = new ArrayList<>(clen / blkz + 1); + final ArrayList<Future<Object>> tasks = new ArrayList<>(clen / blkz + 1); if(blkz != 1) LOG.debug("Extracting column samples in blocks of " + blkz); - for(int col = 0; col < clen; col += blkz) - tasks.add(new SizeEstimationTask(res, col, Math.min(clen, col + blkz))); + for(int col = 0; col < clen; col += blkz) { + final int start = col; + final int end = Math.min(clen, col + blkz); + tasks.add(pool.submit(() -> { + for(int c = start; c < end; c++) + res[c] = getColGroupInfo(new SingleIndex(c)); + return null; + })); + } - for(Future<Object> f : pool.invokeAll(tasks)) + for(Future<Object> f : tasks) f.get(); return Arrays.asList(res); @@ -265,35 +301,11 @@ public abstract class AComEst { catch(Exception e) { throw new DMLCompressionException("Multithreaded first extraction failed", e); } - finally{ + finally { pool.shutdown(); } } - private class SizeEstimationTask implements Callable<Object> { - final CompressedSizeInfoColGroup[] _res; - final int _cs; - final int _ce; - - private SizeEstimationTask(CompressedSizeInfoColGroup[] res, int cs, int ce) { - _res = res; - _cs = cs; - _ce = ce; - } - - @Override - public Object call() { - try { - for(int c = _cs; c < _ce; c++) - _res[c] = getColGroupInfo(new SingleIndex(c)); - return null; - } - catch(Exception e) { - throw new DMLCompressionException("ColGroup extraction failed", e); - } - } - } - @Override public String toString() { return this.getClass().getSimpleName(); diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java index 1168147b3d..4fbf9b0ee4 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java +++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java @@ -129,6 +129,10 @@ public class CompressedSizeInfoColGroup { _sizes.put(ct, (double) ColGroupSizes.estimateInMemorySizeCONST(columns.size(), columns.isContiguous(), 1.0, false)); break; + case UNCOMPRESSED: + _sizes.put(ct, (double) ColGroupSizes.estimateInMemorySizeUncompressed(nRows, columns.isContiguous(), + columns.size(), 1.0)); + break; default: throw new DMLCompressionException("Invalid instantiation of const Cost"); } @@ -206,6 +210,10 @@ public class CompressedSizeInfoColGroup { return _map; } + public void setMap(IEncode map) { + _map = map; + } + public boolean containsZeros() { return _facts.numOffs < _facts.numRows; } @@ -229,6 +237,10 @@ public class CompressedSizeInfoColGroup { return _bestCompressionType == CompressionType.CONST || _sizes.containsKey(CompressionType.CONST); } + public boolean isIncompressable() { + return _bestCompressionType == CompressionType.UNCOMPRESSED; + } + private static double getCompressionSize(IColIndex cols, CompressionType ct, EstimationFactors fact) { int nv; final int numCols = cols.size(); @@ -284,6 +296,15 @@ public class CompressedSizeInfoColGroup { sb.append(" Sizes: " + _sizes); sb.append(" facts: " + _facts); sb.append(" mapIsNull: " + (_map == null)); + if(_map != null) { + String s = _map.toString(); + if(s.length() > 1000) { + sb.append(s, 0, 1000); + } + else { + sb.append(s); + } + } return sb.toString(); }