[SYSTEMML-1919] Shared dictionary for all DDC1 single-column groups This patch extends the CLA (compressed linear algebra) framework, by an additional compression phase that creates - in a best-effort manner - a shared dictionary for all DDC1 single-column groups if the total number of distinct values is <= 255 (or <=256 if the distinct values include zero). This constraint ensures that all DDC1 column groups remain in DDC1 format and simply need to be recoded with respect to the shared dictionary. Also, having <=256 values ensures that the shared dictionary easily fits into L1 cache (32KB). Since DDC1 column groups are very common, this approach greatly improves compression ratios, especially for distributed matrix representations, where the header can dominate the total matrix size.
On an example scenario with Mnist240m (540GB in uncompressed format), this patch reduces the RDD storage size as follows: block size = 1,024: 219.3GB -> 136.1GB block size = 16,384: 97.5GB -> 90.3GB Thus, this patch significantly increases the compression potential, especially with small block sizes such as our default block size of 1K, which is important because we now apply compression by default. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/f86879bd Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/f86879bd Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/f86879bd Branch: refs/heads/master Commit: f86879bd0af5eb046c6fe00a444ff04c603a2e91 Parents: 119893f Author: Matthias Boehm <mboe...@gmail.com> Authored: Sun Sep 17 16:48:28 2017 -0700 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Sun Sep 17 16:48:54 2017 -0700 ---------------------------------------------------------------------- .../sysml/runtime/compress/ColGroupDDC1.java | 13 ++ .../sysml/runtime/compress/ColGroupValue.java | 15 +- .../runtime/compress/CompressedMatrixBlock.java | 139 +++++++++++++++---- 3 files changed, 133 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/f86879bd/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java index 89ca931..63ab3e7 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java +++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java @@ -23,6 +23,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; import org.apache.sysml.runtime.DMLRuntimeException; import org.apache.sysml.runtime.compress.utils.ConverterUtils; @@ -102,6 +103,18 @@ public class ColGroupDDC1 extends ColGroupDDC return (_data[r]&0xFF); } + public void recodeData(HashMap<Double,Integer> map) { + //prepare translation table + final int numVals = getNumValues(); + byte[] lookup = new byte[numVals]; + for( int k=0; k<numVals; k++ ) + lookup[k] = map.get(_values[k]).byteValue(); + + //recode the data + for( int i=0; i<_numRows; i++ ) + _data[i] = lookup[_data[i]&0xFF]; + } + @Override public void write(DataOutput out) throws IOException { int numCols = getNumCols(); http://git-wip-us.apache.org/repos/asf/systemml/blob/f86879bd/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java index febac4c..63185f2 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java +++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java @@ -107,12 +107,15 @@ public abstract class ColGroupValue extends ColGroup // adding the size of values size += 8; //array reference - if (_values != null) { - size += 32 + _values.length * 8; //values - } + size += getValuesSize(); // values return size; } + + public long getValuesSize() { + return ( _values != null ) ? + 32 + _values.length * 8 : 0; + } /** * Obtain number of distinct sets of values associated with the bitmaps in this column group. @@ -123,11 +126,15 @@ public abstract class ColGroupValue extends ColGroup public int getNumValues() { return _values.length / _colIndexes.length; } - + public double[] getValues() { return _values; } + public void setValues(double[] values) { + _values = values; + } + public double getValue(int k, int col) { return _values[k*getNumCols()+col]; } http://git-wip-us.apache.org/repos/asf/systemml/blob/f86879bd/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java index b95b3af..98529c8 100644 --- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java @@ -108,8 +108,9 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable public static final boolean TRANSPOSE_INPUT = true; public static final boolean MATERIALIZE_ZEROS = false; public static final long MIN_PAR_AGG_THRESHOLD = 16*1024*1024; //16MB - public static boolean INVESTIGATE_ESTIMATES = false; + public static final boolean INVESTIGATE_ESTIMATES = false; public static boolean ALLOW_DDC_ENCODING = true; + public static final boolean ALLOW_SHARED_DDC1_DICTIONARY = true; private static final boolean LDEBUG = true; //local debug flag private static final Level LDEBUG_LEVEL = Level.INFO; //DEBUG/TRACE for details @@ -119,12 +120,13 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable // for internal debugging only if( LDEBUG ) { Logger.getLogger("org.apache.sysml.runtime.compress") - .setLevel((Level) LDEBUG_LEVEL); + .setLevel((Level) LDEBUG_LEVEL); } } protected ArrayList<ColGroup> _colGroups = null; protected CompressionStatistics _stats = null; + protected boolean _sharedDDC1Dict = false; public CompressedMatrixBlock() { super(-1, -1, true); @@ -199,7 +201,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable @Override public boolean isEmptyBlock(boolean safe) { if( !isCompressed() ) - return super.isEmptyBlock(safe); + return super.isEmptyBlock(safe); return (_colGroups == null || getNonZeros()==0); } @@ -265,12 +267,12 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable // where a column is compressible if ratio > 1. CompressedSizeInfo[] sizeInfos = (k > 1) ? computeCompressedSizeInfos(bitmapSizeEstimator, numCols, k) : - computeCompressedSizeInfos(bitmapSizeEstimator, numCols); + computeCompressedSizeInfos(bitmapSizeEstimator, numCols); long nnzUC = 0; - for (int col = 0; col < numCols; col++) { + for (int col = 0; col < numCols; col++) { double uncompSize = getUncompressedSize(numRows, 1, OptimizerUtils.getSparsity(numRows, 1, sizeInfos[col].getEstNnz())); - double compRatio = uncompSize / sizeInfos[col].getMinSize(); + double compRatio = uncompSize / sizeInfos[col].getMinSize(); if( compRatio > 1 ) { colsC.add(col); compRatios.put(col, compRatio); @@ -287,7 +289,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable for( int i=0; i<colsUC.size(); i++ ) { int col = colsUC.get(i); double uncompSize = getUncompressedSize(numRows, 1, 1.0); - double compRatio = uncompSize / sizeInfos[col].getMinSize(); + double compRatio = uncompSize / sizeInfos[col].getMinSize(); if( compRatio > 1 ) { colsC.add(col); colsUC.remove(i); i--; @@ -325,7 +327,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable // PHASE 3: Compress and correct sample-based decisions ColGroup[] colGroups = (k > 1) ? compressColGroups(rawblock, bitmapSizeEstimator, compRatios, numRows, bitmapColGrps, colsUC.isEmpty(), k) : - compressColGroups(rawblock, bitmapSizeEstimator, compRatios, numRows, bitmapColGrps, colsUC.isEmpty()); + compressColGroups(rawblock, bitmapSizeEstimator, compRatios, numRows, bitmapColGrps, colsUC.isEmpty()); allocateColGroupList(); HashSet<Integer> remainingCols = seq(0, numCols-1, 1); for( int j=0; j<colGroups.length; j++ ) { @@ -340,8 +342,20 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable _stats.timePhase3 = time.stop(); LOG.debug("--compression phase 3: "+_stats.timePhase3); } - - // Phase 4: Cleanup + + // PHASE 4: Best-effort dictionary sharing for DDC1 single-col groups + double[] dict = createSharedDDC1Dictionary(_colGroups); + if( dict != null ) { + applySharedDDC1Dictionary(_colGroups, dict); + _sharedDDC1Dict = true; + } + + if( LOG.isDebugEnabled() ) { + _stats.timePhase4 = time.stop(); + LOG.debug("--compression phase 4: "+_stats.timePhase4); + } + + // Phase 5: Cleanup // The remaining columns are stored uncompressed as one big column group if( !remainingCols.isEmpty() ) { ArrayList<Integer> list = new ArrayList<Integer>(remainingCols); @@ -357,9 +371,9 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable this.cleanupBlock(true, true); if( LOG.isDebugEnabled() ) { - _stats.timePhase4 = time.stop(); + _stats.timePhase5 = time.stop(); int[] counts = getColGroupCounts(_colGroups); - LOG.debug("--compression phase 4: "+_stats.timePhase4); + LOG.debug("--compression phase 5: "+_stats.timePhase5); LOG.debug("--num col groups: "+_colGroups.size()); LOG.debug("--col groups types (OLE,RLE,DDC1,DDC2,UC): " +counts[2]+","+counts[1]+","+counts[3]+","+counts[4]+","+counts[0]); @@ -460,7 +474,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable { //exact big list and observe compression ratio ubm = BitmapEncoder.extractBitmap(colIndexes, in); - sizeInfo = estim.estimateCompressedColGroupSize(ubm); + sizeInfo = estim.estimateCompressedColGroupSize(ubm); double sp2 = denseEst ? 1.0 : OptimizerUtils.getSparsity(rlen, 1, ubm.getNumOffsets()); double compRatio = getUncompressedSize(rlen, colIndexes.length, sp2) / sizeInfo.getMinSize(); @@ -529,6 +543,47 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable //eventually represented in dense return Math.min(8d * rlen * clen, 4d * rlen + 12d * rlen * clen * sparsity); } + + private static double[] createSharedDDC1Dictionary(ArrayList<ColGroup> colGroups) { + if( !ALLOW_DDC_ENCODING || !ALLOW_SHARED_DDC1_DICTIONARY ) + return null; + + //create joint dictionary + HashSet<Double> tmp = new HashSet<Double>(); + int numQual = 0; + for( ColGroup grp : colGroups ) + if( grp.getNumCols()==1 && grp instanceof ColGroupDDC1 ) { + ColGroupDDC1 grpDDC1 = (ColGroupDDC1) grp; + for( double val : grpDDC1.getValues() ) + tmp.add(val); + numQual ++; + } + + //abort shared dictionary creation if empty or too large + int maxSize = tmp.contains(0d) ? 256 : 255; + if( tmp.isEmpty() || tmp.size() > maxSize || numQual < 2 ) + return null; + LOG.debug("Created shared directionary for " + + numQual+" DDC1 single column groups."); + + //build consolidated dictionary + return tmp.stream().mapToDouble(Double::doubleValue).toArray(); + } + + private static void applySharedDDC1Dictionary(ArrayList<ColGroup> colGroups, double[] dict) { + //create joint mapping table + HashMap<Double, Integer> map = new HashMap<>(); + for(int i=0; i<dict.length; i++) + map.put(dict[i], i); + + //recode data of all relevant DDC1 groups + for( ColGroup grp : colGroups ) + if( grp.getNumCols()==1 && grp instanceof ColGroupDDC1 ) { + ColGroupDDC1 grpDDC1 = (ColGroupDDC1) grp; + grpDDC1.recodeData(map); + grpDDC1.setValues(dict); + } + } /** * Decompress block. @@ -545,7 +600,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable Timing time = new Timing(true); - //preallocation sparse rows to avoid repeated reallocations + //preallocation sparse rows to avoid repeated reallocations MatrixBlock ret = new MatrixBlock(getNumRows(), getNumColumns(), isInSparseFormat(), getNonZeros()); if( ret.isInSparseFormat() ) { int[] rnnz = new int[rlen]; @@ -588,7 +643,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable if( k <= 1 ) return decompress(); - Timing time = new Timing(true); + Timing time = LOG.isDebugEnabled() ? new Timing(true) : null; MatrixBlock ret = new MatrixBlock(rlen, clen, sparse, nonZeros); ret.allocateDenseOrSparseBlock(); @@ -602,7 +657,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable ArrayList<DecompressTask> tasks = new ArrayList<DecompressTask>(); for( int i=0; i<k & i*blklen<getNumRows(); i++ ) tasks.add(new DecompressTask(_colGroups, ret, i*blklen, Math.min((i+1)*blklen,rlen))); - List<Future<Object>> rtasks = pool.invokeAll(tasks); + List<Future<Object>> rtasks = pool.invokeAll(tasks); pool.shutdown(); for( Future<Object> rt : rtasks ) rt.get(); //error handling @@ -637,6 +692,16 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable total += 80 + 8 * _colGroups.size(); for (ColGroup grp : _colGroups) total += grp.estimateInMemorySize(); + //correction for shared DDC1 dictionary + if( _sharedDDC1Dict ) { + boolean seenDDC1 = false; + for (ColGroup grp : _colGroups) + if( grp.getNumCols()==1 && grp instanceof ColGroupDDC1 ) { + if( seenDDC1 ) + total -= ((ColGroupDDC1)grp).getValuesSize(); + seenDDC1 = true; + } + } return total; } @@ -660,6 +725,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable public double timePhase2 = -1; public double timePhase3 = -1; public double timePhase4 = -1; + public double timePhase5 = -1; public double estSize = -1; public double size = -1; public double ratio = -1; @@ -668,11 +734,12 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable //do nothing } - public CompressionStatistics(double t1, double t2, double t3, double t4){ + public CompressionStatistics(double t1, double t2, double t3, double t4, double t5){ timePhase1 = t1; timePhase2 = t2; timePhase3 = t3; timePhase4 = t4; + timePhase5 = t5; } } @@ -726,9 +793,11 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable rlen = in.readInt(); clen = in.readInt(); nonZeros = in.readLong(); + _sharedDDC1Dict = in.readBoolean(); int ncolGroups = in.readInt(); _colGroups = new ArrayList<ColGroup>(ncolGroups); + double[] sharedDict = null; for( int i=0; i<ncolGroups; i++ ) { CompressionType ctype = CompressionType.values()[in.readByte()]; @@ -745,11 +814,20 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable case DDC1: grp = new ColGroupDDC1(); break; case DDC2: - grp = new ColGroupDDC2(); break; + grp = new ColGroupDDC2(); break; } //deserialize and add column group grp.readFields(in); + + //use shared DDC1 dictionary if applicable + if( _sharedDDC1Dict && grp.getNumCols()==1 ) { + if( sharedDict == null ) + sharedDict = ((ColGroupDDC1)grp).getValues(); + else + ((ColGroupDDC1)grp).setValues(sharedDict); + } + _colGroups.add(grp); } } @@ -770,6 +848,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable out.writeInt(rlen); out.writeInt(clen); out.writeLong(nonZeros); + out.writeBoolean(_sharedDDC1Dict); out.writeInt(_colGroups.size()); for( ColGroup grp : _colGroups ) { @@ -876,7 +955,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable final int m = rlen; final int n = clen+that.getNumColumns(); - final long nnz = nonZeros+that.getNonZeros(); + final long nnz = nonZeros+that.getNonZeros(); //init result matrix CompressedMatrixBlock ret2 = null; @@ -905,7 +984,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable } //meta data maintenance - ret2.setNonZeros(nnz); + ret2.setNonZeros(nnz); return ret2; } @@ -1121,7 +1200,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable else for( ArrayList<ColGroup> grp : grpParts ) tasks.add(new UnaryAggregateTask(grp, ret, 0, rlen, op)); - List<Future<MatrixBlock>> rtasks = pool.invokeAll(tasks); + List<Future<MatrixBlock>> rtasks = pool.invokeAll(tasks); pool.shutdown(); //aggregate partial results @@ -1362,7 +1441,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable //compute uncompressed column group in parallel if( uc != null ) - uc.rightMultByVector(vector, result, k); + uc.rightMultByVector(vector, result, k); //compute remaining compressed column groups in parallel ExecutorService pool = Executors.newFixedThreadPool( k ); @@ -1452,7 +1531,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable ColGroupValue.setupThreadLocalMemory(getMaxNumValues(colGroups)); // delegate matrix-vector operation to each column group - for (ColGroup grp : colGroups) { + for (ColGroup grp : colGroups) { grp.leftMultByRowVector(rowVector, result); } @@ -1469,7 +1548,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable result.reset(); // delegate matrix-vector operation to each column group - for( ColGroup grp : colGroups ) { + for( ColGroup grp : colGroups ) { ((ColGroupValue)grp).leftMultByRowVector(vector, result); } @@ -1509,7 +1588,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable //compute uncompressed column group in parallel ColGroupUncompressed uc = getUncompressedColGroup(); if( uc != null ) - uc.leftMultByRowVector(vector, result, k); + uc.leftMultByRowVector(vector, result, k); //compute remaining compressed column groups in parallel ExecutorService pool = Executors.newFixedThreadPool( Math.min(colGroups.size()-((uc!=null)?1:0), k) ); @@ -1534,7 +1613,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable throws DMLRuntimeException { final int numRows = groups.get(0).getNumRows(); - final int numGroups = groups.size(); + final int numGroups = groups.size(); final boolean containsUC = containsUncompressedColGroup(groups); //preallocated dense tmp matrix blocks @@ -1551,7 +1630,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable for( int i=gl; i<gu; i++ ) { //get current group and relevant col groups - ColGroup group = groups.get(i); + ColGroup group = groups.get(i); int[] ixgroup = group.getColIndices(); List<ColGroup> tmpList = groups.subList(i, numGroups); @@ -1559,7 +1638,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable && ixgroup.length==1 && !containsUC && numRows<BitmapEncoder.BITMAP_BLOCK_SZ ) { //compute vector-matrix partial result - leftMultByVectorTranspose(tmpList, (ColGroupDDC)group, tmpret); + leftMultByVectorTranspose(tmpList, (ColGroupDDC)group, tmpret); //write partial results (disjoint non-zeros) LinearAlgebraUtils.copyNonZerosToUpperTriangle(result, tmpret, ixgroup[0]); @@ -1571,10 +1650,10 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable if( !lhs.isEmptyBlock(false) ) { //compute vector-matrix partial result - leftMultByVectorTranspose(tmpList, lhs, tmpret, false, false); + leftMultByVectorTranspose(tmpList, lhs, tmpret, false, false); //write partial results (disjoint non-zeros) - LinearAlgebraUtils.copyNonZerosToUpperTriangle(result, tmpret, ixgroup[j]); + LinearAlgebraUtils.copyNonZerosToUpperTriangle(result, tmpret, ixgroup[j]); } } }