[SYSTEMML-1919] Shared dictionary for all DDC1 single-column groups

This patch extends the CLA (compressed linear algebra) framework, by an
additional compression phase that creates - in a best-effort manner - a
shared dictionary for all DDC1 single-column groups if the total number
of distinct values is <= 255 (or <=256 if the distinct values include
zero). This constraint ensures that all DDC1 column groups remain in
DDC1 format and simply need to be recoded with respect to the shared
dictionary. Also, having <=256 values ensures that the shared dictionary
easily fits into L1 cache (32KB). Since DDC1 column groups are very
common, this approach greatly improves compression ratios, especially
for distributed matrix representations, where the header can dominate
the total matrix size.

On an example scenario with Mnist240m (540GB in uncompressed format),
this patch reduces the RDD storage size as follows:

block size = 1,024:  219.3GB -> 136.1GB
block size = 16,384: 97.5GB -> 90.3GB

Thus, this patch significantly increases the compression potential,
especially with small block sizes such as our default block size of 1K,
which is important because we now apply compression by default.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/f86879bd
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/f86879bd
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/f86879bd

Branch: refs/heads/master
Commit: f86879bd0af5eb046c6fe00a444ff04c603a2e91
Parents: 119893f
Author: Matthias Boehm <mboe...@gmail.com>
Authored: Sun Sep 17 16:48:28 2017 -0700
Committer: Matthias Boehm <mboe...@gmail.com>
Committed: Sun Sep 17 16:48:54 2017 -0700

----------------------------------------------------------------------
 .../sysml/runtime/compress/ColGroupDDC1.java    |  13 ++
 .../sysml/runtime/compress/ColGroupValue.java   |  15 +-
 .../runtime/compress/CompressedMatrixBlock.java | 139 +++++++++++++++----
 3 files changed, 133 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/f86879bd/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
index 89ca931..63ab3e7 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
@@ -23,6 +23,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashMap;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.compress.utils.ConverterUtils;
@@ -102,6 +103,18 @@ public class ColGroupDDC1 extends ColGroupDDC
                return (_data[r]&0xFF);
        }
        
+       public void recodeData(HashMap<Double,Integer> map) {
+               //prepare translation table
+               final int numVals = getNumValues();
+               byte[] lookup = new byte[numVals];
+               for( int k=0; k<numVals; k++ )
+                       lookup[k] = map.get(_values[k]).byteValue();
+               
+               //recode the data
+               for( int i=0; i<_numRows; i++ )
+                       _data[i] = lookup[_data[i]&0xFF];
+       }
+       
        @Override
        public void write(DataOutput out) throws IOException {
                int numCols = getNumCols();

http://git-wip-us.apache.org/repos/asf/systemml/blob/f86879bd/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java 
b/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java
index febac4c..63185f2 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java
@@ -107,12 +107,15 @@ public abstract class ColGroupValue extends ColGroup
                
                // adding the size of values
                size += 8; //array reference
-               if (_values != null) {
-                       size += 32 + _values.length * 8; //values
-               }
+               size += getValuesSize(); // values
        
                return size;
        }
+       
+       public long getValuesSize() {
+               return ( _values != null ) ? 
+                       32 + _values.length * 8 : 0;
+       }
 
        /**
         * Obtain number of distinct sets of values associated with the bitmaps 
in this column group.
@@ -123,11 +126,15 @@ public abstract class ColGroupValue extends ColGroup
        public int getNumValues() {
                return _values.length / _colIndexes.length;
        }
-
+       
        public double[] getValues() {
                return _values;
        }
        
+       public void setValues(double[] values) {
+               _values = values;
+       }
+       
        public double getValue(int k, int col) {
                return _values[k*getNumCols()+col];
        }

http://git-wip-us.apache.org/repos/asf/systemml/blob/f86879bd/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java 
b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index b95b3af..98529c8 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -108,8 +108,9 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
        public static final boolean TRANSPOSE_INPUT = true;
        public static final boolean MATERIALIZE_ZEROS = false;
        public static final long MIN_PAR_AGG_THRESHOLD = 16*1024*1024; //16MB
-       public static boolean INVESTIGATE_ESTIMATES = false;
+       public static final boolean INVESTIGATE_ESTIMATES = false;
        public static boolean ALLOW_DDC_ENCODING = true;
+       public static final boolean ALLOW_SHARED_DDC1_DICTIONARY = true;
        private static final boolean LDEBUG = true; //local debug flag
        private static final Level LDEBUG_LEVEL = Level.INFO; //DEBUG/TRACE for 
details
        
@@ -119,12 +120,13 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                // for internal debugging only
                if( LDEBUG ) {
                        Logger.getLogger("org.apache.sysml.runtime.compress")
-                                 .setLevel((Level) LDEBUG_LEVEL);
+                               .setLevel((Level) LDEBUG_LEVEL);
                }       
        }
        
        protected ArrayList<ColGroup> _colGroups = null;
        protected CompressionStatistics _stats = null;
+       protected boolean _sharedDDC1Dict = false;
        
        public CompressedMatrixBlock() {
                super(-1, -1, true);
@@ -199,7 +201,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
        @Override
        public boolean isEmptyBlock(boolean safe)  {
                if( !isCompressed() )
-                       return super.isEmptyBlock(safe);                
+                       return super.isEmptyBlock(safe);
                return (_colGroups == null || getNonZeros()==0);
        }
        
@@ -265,12 +267,12 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                // where a column is compressible if ratio > 1.
                CompressedSizeInfo[] sizeInfos = (k > 1) ?
                                computeCompressedSizeInfos(bitmapSizeEstimator, 
numCols, k) : 
-                               computeCompressedSizeInfos(bitmapSizeEstimator, 
numCols);       
+                               computeCompressedSizeInfos(bitmapSizeEstimator, 
numCols);
                long nnzUC = 0;         
-               for (int col = 0; col < numCols; col++)  {      
+               for (int col = 0; col < numCols; col++)  {
                        double uncompSize = getUncompressedSize(numRows, 1, 
                                OptimizerUtils.getSparsity(numRows, 1, 
sizeInfos[col].getEstNnz()));
-                       double compRatio = uncompSize / 
sizeInfos[col].getMinSize();                    
+                       double compRatio = uncompSize / 
sizeInfos[col].getMinSize();
                        if( compRatio > 1 ) {
                                colsC.add(col);
                                compRatios.put(col, compRatio);
@@ -287,7 +289,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                        for( int i=0; i<colsUC.size(); i++ ) {
                                int col = colsUC.get(i);
                                double uncompSize = 
getUncompressedSize(numRows, 1, 1.0);
-                               double compRatio = uncompSize / 
sizeInfos[col].getMinSize();                    
+                               double compRatio = uncompSize / 
sizeInfos[col].getMinSize();
                                if( compRatio > 1 ) {
                                        colsC.add(col);
                                        colsUC.remove(i); i--;
@@ -325,7 +327,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                // PHASE 3: Compress and correct sample-based decisions
                ColGroup[] colGroups = (k > 1) ?
                                compressColGroups(rawblock, 
bitmapSizeEstimator, compRatios, numRows, bitmapColGrps, colsUC.isEmpty(), k) : 
-                               compressColGroups(rawblock, 
bitmapSizeEstimator, compRatios, numRows, bitmapColGrps, colsUC.isEmpty());     
    
+                               compressColGroups(rawblock, 
bitmapSizeEstimator, compRatios, numRows, bitmapColGrps, colsUC.isEmpty()); 
                allocateColGroupList();
                HashSet<Integer> remainingCols = seq(0, numCols-1, 1);
                for( int j=0; j<colGroups.length; j++ ) {
@@ -340,8 +342,20 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                        _stats.timePhase3 = time.stop();
                        LOG.debug("--compression phase 3: "+_stats.timePhase3);
                }
-                       
-               // Phase 4: Cleanup
+               
+               // PHASE 4: Best-effort dictionary sharing for DDC1 single-col 
groups
+               double[] dict = createSharedDDC1Dictionary(_colGroups);
+               if( dict != null ) {
+                       applySharedDDC1Dictionary(_colGroups, dict);
+                       _sharedDDC1Dict = true;
+               }
+               
+               if( LOG.isDebugEnabled() ) {
+                       _stats.timePhase4 = time.stop();
+                       LOG.debug("--compression phase 4: "+_stats.timePhase4);
+               }
+               
+               // Phase 5: Cleanup
                // The remaining columns are stored uncompressed as one big 
column group
                if( !remainingCols.isEmpty() ) {
                        ArrayList<Integer> list = new 
ArrayList<Integer>(remainingCols);
@@ -357,9 +371,9 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                this.cleanupBlock(true, true);
                
                if( LOG.isDebugEnabled() ) {
-                       _stats.timePhase4 = time.stop();
+                       _stats.timePhase5 = time.stop();
                        int[] counts = getColGroupCounts(_colGroups);
-                       LOG.debug("--compression phase 4: "+_stats.timePhase4);
+                       LOG.debug("--compression phase 5: "+_stats.timePhase5);
                        LOG.debug("--num col groups: "+_colGroups.size());
                        LOG.debug("--col groups types (OLE,RLE,DDC1,DDC2,UC): "
                                        
+counts[2]+","+counts[1]+","+counts[3]+","+counts[4]+","+counts[0]);
@@ -460,7 +474,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                {
                        //exact big list and observe compression ratio
                        ubm = BitmapEncoder.extractBitmap(colIndexes, in); 
-                       sizeInfo = estim.estimateCompressedColGroupSize(ubm);   
+                       sizeInfo = estim.estimateCompressedColGroupSize(ubm);
                        double sp2 = denseEst ? 1.0 : 
OptimizerUtils.getSparsity(rlen, 1, ubm.getNumOffsets());
                        double compRatio = getUncompressedSize(rlen, 
colIndexes.length, sp2) / sizeInfo.getMinSize();
                
@@ -529,6 +543,47 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                //eventually represented in dense
                return Math.min(8d * rlen * clen, 4d * rlen + 12d * rlen * clen 
* sparsity);
        }
+       
+       private static double[] createSharedDDC1Dictionary(ArrayList<ColGroup> 
colGroups) {
+               if( !ALLOW_DDC_ENCODING || !ALLOW_SHARED_DDC1_DICTIONARY )
+                       return null;
+               
+               //create joint dictionary
+               HashSet<Double> tmp = new HashSet<Double>();
+               int numQual = 0;
+               for( ColGroup grp : colGroups )
+                       if( grp.getNumCols()==1 && grp instanceof ColGroupDDC1 
) {
+                               ColGroupDDC1 grpDDC1 = (ColGroupDDC1) grp;
+                               for( double val : grpDDC1.getValues() )
+                                       tmp.add(val);
+                               numQual ++;
+                       }
+               
+               //abort shared dictionary creation if empty or too large
+               int maxSize = tmp.contains(0d) ? 256 : 255;
+               if( tmp.isEmpty() || tmp.size() > maxSize || numQual < 2 )
+                       return null;
+               LOG.debug("Created shared directionary for "
+                       + numQual+" DDC1 single column groups.");
+               
+               //build consolidated dictionary
+               return tmp.stream().mapToDouble(Double::doubleValue).toArray();
+       }
+       
+       private static void applySharedDDC1Dictionary(ArrayList<ColGroup> 
colGroups, double[] dict) {
+               //create joint mapping table
+               HashMap<Double, Integer> map = new HashMap<>();
+               for(int i=0; i<dict.length; i++)
+                       map.put(dict[i], i);
+               
+               //recode data of all relevant DDC1 groups
+               for( ColGroup grp : colGroups )
+                       if( grp.getNumCols()==1 && grp instanceof ColGroupDDC1 
) {
+                               ColGroupDDC1 grpDDC1 = (ColGroupDDC1) grp;
+                               grpDDC1.recodeData(map);
+                               grpDDC1.setValues(dict);
+                       }
+       }
 
        /**
         * Decompress block.
@@ -545,7 +600,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                
                Timing time = new Timing(true);
                
-               //preallocation sparse rows to avoid repeated reallocations     
        
+               //preallocation sparse rows to avoid repeated reallocations
                MatrixBlock ret = new MatrixBlock(getNumRows(), 
getNumColumns(), isInSparseFormat(), getNonZeros());
                if( ret.isInSparseFormat() ) {
                        int[] rnnz = new int[rlen];
@@ -588,7 +643,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                if( k <= 1 )
                        return decompress();
                
-               Timing time = new Timing(true);
+               Timing time = LOG.isDebugEnabled() ? new Timing(true) : null;
                
                MatrixBlock ret = new MatrixBlock(rlen, clen, sparse, nonZeros);
                ret.allocateDenseOrSparseBlock();
@@ -602,7 +657,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                        ArrayList<DecompressTask> tasks = new 
ArrayList<DecompressTask>();
                        for( int i=0; i<k & i*blklen<getNumRows(); i++ )
                                tasks.add(new DecompressTask(_colGroups, ret, 
i*blklen, Math.min((i+1)*blklen,rlen)));
-                       List<Future<Object>> rtasks = pool.invokeAll(tasks);    
+                       List<Future<Object>> rtasks = pool.invokeAll(tasks);
                        pool.shutdown();
                        for( Future<Object> rt : rtasks )
                                rt.get(); //error handling
@@ -637,6 +692,16 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                total += 80 + 8 * _colGroups.size();
                for (ColGroup grp : _colGroups)
                        total += grp.estimateInMemorySize();
+               //correction for shared DDC1 dictionary
+               if( _sharedDDC1Dict ) {
+                       boolean seenDDC1 = false;
+                       for (ColGroup grp : _colGroups)
+                               if( grp.getNumCols()==1 && grp instanceof 
ColGroupDDC1 ) {
+                                       if( seenDDC1 ) 
+                                               total -= 
((ColGroupDDC1)grp).getValuesSize();
+                                       seenDDC1 = true;
+                               }
+               }
                return total;
        }
 
@@ -660,6 +725,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                public double timePhase2 = -1;
                public double timePhase3 = -1;
                public double timePhase4 = -1;
+               public double timePhase5 = -1;
                public double estSize = -1;
                public double size = -1;
                public double ratio = -1;
@@ -668,11 +734,12 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                        //do nothing
                }
                
-               public CompressionStatistics(double t1, double t2, double t3, 
double t4){
+               public CompressionStatistics(double t1, double t2, double t3, 
double t4, double t5){
                        timePhase1 = t1;
                        timePhase2 = t2;
                        timePhase3 = t3;
                        timePhase4 = t4;
+                       timePhase5 = t5;
                }
        } 
 
@@ -726,9 +793,11 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                rlen = in.readInt();
                clen = in.readInt();
                nonZeros = in.readLong();
+               _sharedDDC1Dict = in.readBoolean();
                int ncolGroups = in.readInt();
                
                _colGroups = new ArrayList<ColGroup>(ncolGroups);
+               double[] sharedDict = null;
                for( int i=0; i<ncolGroups; i++ ) 
                {
                        CompressionType ctype = 
CompressionType.values()[in.readByte()];
@@ -745,11 +814,20 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                                case DDC1:
                                        grp = new ColGroupDDC1(); break;
                                case DDC2:
-                                       grp = new ColGroupDDC2(); break;        
+                                       grp = new ColGroupDDC2(); break;
                        }
                        
                        //deserialize and add column group
                        grp.readFields(in);
+                       
+                       //use shared DDC1 dictionary if applicable
+                       if( _sharedDDC1Dict && grp.getNumCols()==1 ) {
+                               if( sharedDict == null )
+                                       sharedDict = 
((ColGroupDDC1)grp).getValues();
+                               else
+                                       
((ColGroupDDC1)grp).setValues(sharedDict);
+                       }
+                       
                        _colGroups.add(grp);
                }
        }
@@ -770,6 +848,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                out.writeInt(rlen);
                out.writeInt(clen);
                out.writeLong(nonZeros);
+               out.writeBoolean(_sharedDDC1Dict);
                out.writeInt(_colGroups.size());
                
                for( ColGroup grp : _colGroups ) {
@@ -876,7 +955,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                
                final int m = rlen;
                final int n = clen+that.getNumColumns();
-               final long nnz = nonZeros+that.getNonZeros();           
+               final long nnz = nonZeros+that.getNonZeros();
                
                //init result matrix 
                CompressedMatrixBlock ret2 = null;
@@ -905,7 +984,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                }
                
                //meta data maintenance
-               ret2.setNonZeros(nnz);          
+               ret2.setNonZeros(nnz);
                return ret2;
        }
        
@@ -1121,7 +1200,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                                else
                                        for( ArrayList<ColGroup> grp : grpParts 
)
                                                tasks.add(new 
UnaryAggregateTask(grp, ret, 0, rlen, op));
-                               List<Future<MatrixBlock>> rtasks = 
pool.invokeAll(tasks);       
+                               List<Future<MatrixBlock>> rtasks = 
pool.invokeAll(tasks);
                                pool.shutdown();
                                
                                //aggregate partial results
@@ -1362,7 +1441,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                        
                        //compute uncompressed column group in parallel 
                        if( uc != null )
-                               uc.rightMultByVector(vector, result, k);        
                                
+                               uc.rightMultByVector(vector, result, k);
                        
                        //compute remaining compressed column groups in parallel
                        ExecutorService pool = Executors.newFixedThreadPool( k 
);
@@ -1452,7 +1531,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                        
ColGroupValue.setupThreadLocalMemory(getMaxNumValues(colGroups));
                
                // delegate matrix-vector operation to each column group
-               for (ColGroup grp : colGroups) {                        
+               for (ColGroup grp : colGroups) {
                        grp.leftMultByRowVector(rowVector, result);
                }
                
@@ -1469,7 +1548,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                result.reset();
                
                // delegate matrix-vector operation to each column group
-               for( ColGroup grp : colGroups ) {                       
+               for( ColGroup grp : colGroups ) {
                        ((ColGroupValue)grp).leftMultByRowVector(vector, 
result);
                }
                
@@ -1509,7 +1588,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                        //compute uncompressed column group in parallel 
                        ColGroupUncompressed uc = getUncompressedColGroup();
                        if( uc != null )
-                               uc.leftMultByRowVector(vector, result, k);      
                                
+                               uc.leftMultByRowVector(vector, result, k);
                        
                        //compute remaining compressed column groups in parallel
                        ExecutorService pool = Executors.newFixedThreadPool( 
Math.min(colGroups.size()-((uc!=null)?1:0), k) );
@@ -1534,7 +1613,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                throws DMLRuntimeException 
        {
                final int numRows = groups.get(0).getNumRows();
-               final int numGroups = groups.size();            
+               final int numGroups = groups.size();
                final boolean containsUC = containsUncompressedColGroup(groups);
                
                //preallocated dense tmp matrix blocks
@@ -1551,7 +1630,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                for( int i=gl; i<gu; i++ ) 
                {
                        //get current group and relevant col groups
-                       ColGroup group = groups.get(i); 
+                       ColGroup group = groups.get(i);
                        int[] ixgroup = group.getColIndices();
                        List<ColGroup> tmpList = groups.subList(i, numGroups);
                        
@@ -1559,7 +1638,7 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                                && ixgroup.length==1 && !containsUC && 
numRows<BitmapEncoder.BITMAP_BLOCK_SZ ) 
                        {
                                //compute vector-matrix partial result
-                               leftMultByVectorTranspose(tmpList, 
(ColGroupDDC)group, tmpret);                                                    
     
+                               leftMultByVectorTranspose(tmpList, 
(ColGroupDDC)group, tmpret);
                                
                                //write partial results (disjoint non-zeros)
                                
LinearAlgebraUtils.copyNonZerosToUpperTriangle(result, tmpret, ixgroup[0]);     
@@ -1571,10 +1650,10 @@ public class CompressedMatrixBlock extends MatrixBlock 
implements Externalizable
                                        
                                        if( !lhs.isEmptyBlock(false) ) {
                                                //compute vector-matrix partial 
result
-                                               
leftMultByVectorTranspose(tmpList, lhs, tmpret, false, false);                  
                                        
+                                               
leftMultByVectorTranspose(tmpList, lhs, tmpret, false, false);
                                                
                                                //write partial results 
(disjoint non-zeros)
-                                               
LinearAlgebraUtils.copyNonZerosToUpperTriangle(result, tmpret, ixgroup[j]);     
+                                               
LinearAlgebraUtils.copyNonZerosToUpperTriangle(result, tmpret, ixgroup[j]);
                                        }
                                }       
                        }

Reply via email to