This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 23bcd6d2b2 [MINOR] Lazy write buffer optimization
23bcd6d2b2 is described below

commit 23bcd6d2b2be2739bf9abb137d82917860d3fd6e
Author: Sebastian Baunsgaard <[email protected]>
AuthorDate: Fri Jan 5 16:45:34 2024 +0100

    [MINOR] Lazy write buffer optimization
    
    This commit optimize the lazy write buffer to pass through byte
    arrays if provided instead of lazily evaulating them.
    If provided byte arrays are large enough this is faster than the
    previous lazy evaluation. Especially because we previously copied
    over the byte array allocating the elements twice,
    This commit also fixes a bug where if you provide a byte array that
    is larger than the buffer it does not crash.
    
    Closes #1972
---
 .../controlprogram/caching/LazyWriteBuffer.java    | 117 +++++++++++++++------
 1 file changed, 85 insertions(+), 32 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java
index 8c4bfc310f..73c86f9edc 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java
@@ -23,12 +23,19 @@ import java.io.IOException;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.data.SparseBlock.Type;
+import org.apache.sysds.runtime.data.SparseBlockFactory;
+import org.apache.sysds.runtime.data.SparseBlockMCSR;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.LocalFileUtils;
 
-public class LazyWriteBuffer 
-{
+public class LazyWriteBuffer {
+       protected static final Log LOG = 
LogFactory.getLog(LazyWriteBuffer.class.getName());
+
        public enum RPolicy {
                FIFO, //first-in, first-out eviction
                LRU   //least recently used eviction
@@ -52,38 +59,28 @@ public class LazyWriteBuffer
        {
                //obtain basic meta data of cache block
                long lSize = getCacheBlockSize(cb);
+
+               if(lSize > _limit){ // if this block goes above limit
+                       cb = compact(cb); // try to compact it
+                       lSize = getCacheBlockSize(cb); // and update to new 
size of block
+                       if(lSize > _limit){// if we are still above limit
+                               reAllocate(lSize); // try to compact all blocks 
in memory.
+                       }
+               }
+
                boolean requiresWrite = (lSize > _limit        //global buffer 
limit
                        || !ByteBuffer.isValidCapacity(lSize, cb)); //local 
buffer limit
                int numEvicted = 0;
                
-               //handle caching/eviction if it fits in writebuffer
-               if( !requiresWrite ) 
-               {
+               //handle caching/eviction if it fits in the write buffer
+               if(!requiresWrite) {
                        //create byte buffer handle (no block allocation yet)
                        ByteBuffer bbuff = new ByteBuffer( lSize );
                        
-                       //modify buffer pool
-                       synchronized( _mQueue )
-                       {
-                               //evict matrices to make room (by default FIFO)
-                               while( _size+lSize > _limit && 
!_mQueue.isEmpty() )
-                               {
-                                       //remove first entry from eviction queue
-                                       Entry<String, ByteBuffer> entry = 
_mQueue.removeFirst();
-                                       String ftmp = entry.getKey();
-                                       ByteBuffer tmp = entry.getValue();
-                                       
-                                       if( tmp != null ) {
-                                               //wait for pending serialization
-                                               tmp.checkSerialized();
-                                               
-                                               //evict matrix
-                                               tmp.evictBuffer(ftmp);
-                                               tmp.freeMemory();
-                                               _size -= tmp.getSize();
-                                               numEvicted++;
-                                       }
-                               }
+                       // modify buffer pool
+                       synchronized(_mQueue) {
+                               // evict matrices to make room (by default FIFO)
+                               numEvicted += evict(lSize);
                                
                                //put placeholder into buffer pool (reserve mem)
                                _mQueue.addLast(fname, bbuff);
@@ -98,19 +95,75 @@ public class LazyWriteBuffer
                                CacheStatistics.incrementFSWrites(numEvicted);
                        }
                }
-               else
-               {
+               else {
                        //write directly to local FS (bypass buffer if too 
large)
                        LocalFileUtils.writeCacheBlockToLocal(fname, cb);
-                       if( DMLScript.STATISTICS ) {
+                       if( DMLScript.STATISTICS )
                                CacheStatistics.incrementFSWrites();
-                       }
+                       
                        numEvicted++;
                }
                
                return numEvicted;
        }
 
+       private static CacheBlock<?> compact(CacheBlock<?> cb){
+               // compact this block 
+               if(cb instanceof MatrixBlock){
+                       MatrixBlock mb = (MatrixBlock) cb;
+
+                       // convert MCSR to CSR
+                       if(mb.isInSparseFormat() && mb.getSparseBlock() 
instanceof SparseBlockMCSR)
+                               
mb.setSparseBlock(SparseBlockFactory.copySparseBlock(Type.MCSR, 
mb.getSparseBlock(), false));
+               
+                       return mb;
+               }
+               else {
+                       return cb;
+               }
+       }
+
+       private static int reAllocate(long lSize) {
+               int numReAllocated = 0;
+               synchronized(_mQueue) {
+                       if(_size + lSize > _limit) {
+                               // compact all elements in buffer.
+                               for(Entry<String, ByteBuffer> elm : 
_mQueue.entrySet()) {
+                                       ByteBuffer bf = elm.getValue();
+                                       if(bf._cdata != null){ // not 
serialized to bytes.
+                                               long before = 
getCacheBlockSize(bf._cdata);
+                                               bf._cdata = compact(bf._cdata);
+                                               long after = 
getCacheBlockSize(bf._cdata);
+                                               _size -= before - after;
+                                       }
+                               }
+                       }
+               }
+               return numReAllocated;
+       }
+
+       private static int evict(long lSize) throws IOException {
+               int numEvicted = 0;
+               while(_size + lSize > _limit && !_mQueue.isEmpty()) {
+                       // remove first entry from eviction queue
+                       Entry<String, ByteBuffer> entry = _mQueue.removeFirst();
+                       String ftmp = entry.getKey();
+                       ByteBuffer tmp = entry.getValue();
+
+                       if(tmp != null) {
+                               // wait for pending serialization
+                               tmp.checkSerialized();
+
+                               // evict matrix
+                               tmp.evictBuffer(ftmp);
+                               tmp.freeMemory();
+                               _size -= tmp.getSize();
+                               numEvicted++;
+                       }
+               }
+               return numEvicted;
+       }
+
        public static void deleteBlock(String fname)
        {
                boolean requiresDelete = true;
@@ -143,7 +196,7 @@ public class LazyWriteBuffer
                        ldata = _mQueue.get(fname);
                        
                        //modify eviction order (accordingly to access)
-                       if(    CacheableData.CACHING_BUFFER_POLICY == 
RPolicy.LRU
+                       if(CacheableData.CACHING_BUFFER_POLICY == RPolicy.LRU
                                && ldata != null )
                        {
                                //reinsert entry at end of eviction queue

Reply via email to