This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 23bcd6d2b2 [MINOR] Lazy write buffer optimization
23bcd6d2b2 is described below
commit 23bcd6d2b2be2739bf9abb137d82917860d3fd6e
Author: Sebastian Baunsgaard <[email protected]>
AuthorDate: Fri Jan 5 16:45:34 2024 +0100
[MINOR] Lazy write buffer optimization
This commit optimize the lazy write buffer to pass through byte
arrays if provided instead of lazily evaulating them.
If provided byte arrays are large enough this is faster than the
previous lazy evaluation. Especially because we previously copied
over the byte array allocating the elements twice,
This commit also fixes a bug where if you provide a byte array that
is larger than the buffer it does not crash.
Closes #1972
---
.../controlprogram/caching/LazyWriteBuffer.java | 117 +++++++++++++++------
1 file changed, 85 insertions(+), 32 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java
index 8c4bfc310f..73c86f9edc 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java
@@ -23,12 +23,19 @@ import java.io.IOException;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.data.SparseBlock.Type;
+import org.apache.sysds.runtime.data.SparseBlockFactory;
+import org.apache.sysds.runtime.data.SparseBlockMCSR;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.LocalFileUtils;
-public class LazyWriteBuffer
-{
+public class LazyWriteBuffer {
+ protected static final Log LOG =
LogFactory.getLog(LazyWriteBuffer.class.getName());
+
public enum RPolicy {
FIFO, //first-in, first-out eviction
LRU //least recently used eviction
@@ -52,38 +59,28 @@ public class LazyWriteBuffer
{
//obtain basic meta data of cache block
long lSize = getCacheBlockSize(cb);
+
+ if(lSize > _limit){ // if this block goes above limit
+ cb = compact(cb); // try to compact it
+ lSize = getCacheBlockSize(cb); // and update to new
size of block
+ if(lSize > _limit){// if we are still above limit
+ reAllocate(lSize); // try to compact all blocks
in memory.
+ }
+ }
+
boolean requiresWrite = (lSize > _limit //global buffer
limit
|| !ByteBuffer.isValidCapacity(lSize, cb)); //local
buffer limit
int numEvicted = 0;
- //handle caching/eviction if it fits in writebuffer
- if( !requiresWrite )
- {
+ //handle caching/eviction if it fits in the write buffer
+ if(!requiresWrite) {
//create byte buffer handle (no block allocation yet)
ByteBuffer bbuff = new ByteBuffer( lSize );
- //modify buffer pool
- synchronized( _mQueue )
- {
- //evict matrices to make room (by default FIFO)
- while( _size+lSize > _limit &&
!_mQueue.isEmpty() )
- {
- //remove first entry from eviction queue
- Entry<String, ByteBuffer> entry =
_mQueue.removeFirst();
- String ftmp = entry.getKey();
- ByteBuffer tmp = entry.getValue();
-
- if( tmp != null ) {
- //wait for pending serialization
- tmp.checkSerialized();
-
- //evict matrix
- tmp.evictBuffer(ftmp);
- tmp.freeMemory();
- _size -= tmp.getSize();
- numEvicted++;
- }
- }
+ // modify buffer pool
+ synchronized(_mQueue) {
+ // evict matrices to make room (by default FIFO)
+ numEvicted += evict(lSize);
//put placeholder into buffer pool (reserve mem)
_mQueue.addLast(fname, bbuff);
@@ -98,19 +95,75 @@ public class LazyWriteBuffer
CacheStatistics.incrementFSWrites(numEvicted);
}
}
- else
- {
+ else {
//write directly to local FS (bypass buffer if too
large)
LocalFileUtils.writeCacheBlockToLocal(fname, cb);
- if( DMLScript.STATISTICS ) {
+ if( DMLScript.STATISTICS )
CacheStatistics.incrementFSWrites();
- }
+
numEvicted++;
}
return numEvicted;
}
+ private static CacheBlock<?> compact(CacheBlock<?> cb){
+ // compact this block
+ if(cb instanceof MatrixBlock){
+ MatrixBlock mb = (MatrixBlock) cb;
+
+ // convert MCSR to CSR
+ if(mb.isInSparseFormat() && mb.getSparseBlock()
instanceof SparseBlockMCSR)
+
mb.setSparseBlock(SparseBlockFactory.copySparseBlock(Type.MCSR,
mb.getSparseBlock(), false));
+
+ return mb;
+ }
+ else {
+ return cb;
+ }
+ }
+
+ private static int reAllocate(long lSize) {
+ int numReAllocated = 0;
+ synchronized(_mQueue) {
+ if(_size + lSize > _limit) {
+ // compact all elements in buffer.
+ for(Entry<String, ByteBuffer> elm :
_mQueue.entrySet()) {
+ ByteBuffer bf = elm.getValue();
+ if(bf._cdata != null){ // not
serialized to bytes.
+ long before =
getCacheBlockSize(bf._cdata);
+ bf._cdata = compact(bf._cdata);
+ long after =
getCacheBlockSize(bf._cdata);
+ _size -= before - after;
+ }
+ }
+ }
+ }
+ return numReAllocated;
+ }
+
+ private static int evict(long lSize) throws IOException {
+ int numEvicted = 0;
+ while(_size + lSize > _limit && !_mQueue.isEmpty()) {
+ // remove first entry from eviction queue
+ Entry<String, ByteBuffer> entry = _mQueue.removeFirst();
+ String ftmp = entry.getKey();
+ ByteBuffer tmp = entry.getValue();
+
+ if(tmp != null) {
+ // wait for pending serialization
+ tmp.checkSerialized();
+
+ // evict matrix
+ tmp.evictBuffer(ftmp);
+ tmp.freeMemory();
+ _size -= tmp.getSize();
+ numEvicted++;
+ }
+ }
+ return numEvicted;
+ }
+
public static void deleteBlock(String fname)
{
boolean requiresDelete = true;
@@ -143,7 +196,7 @@ public class LazyWriteBuffer
ldata = _mQueue.get(fname);
//modify eviction order (accordingly to access)
- if( CacheableData.CACHING_BUFFER_POLICY ==
RPolicy.LRU
+ if(CacheableData.CACHING_BUFFER_POLICY == RPolicy.LRU
&& ldata != null )
{
//reinsert entry at end of eviction queue