This is an automated email from the ASF dual-hosted git repository. mboehm7 pushed a commit to branch pr-2343 in repository https://gitbox.apache.org/repos/asf/systemds.git
commit 3cfdb3e5e5b502d237b5d32b56ea24eebcbce145 Author: Matthias Boehm <[email protected]> AuthorDate: Fri Oct 31 17:04:40 2025 +0100 [SYSTEMDS-3930] Cleanup out-of-core backend and buffer pool * Fix order-dependent restore of indexed matrix blocks (keep indexes) * Fix potential race condition in unguarded cache modifications * Fix warnings and unnecessary imports --- .../ooc/AggregateUnaryOOCInstruction.java | 2 - .../instructions/ooc/BinaryOOCInstruction.java | 3 - .../ooc/MatrixVectorBinaryOOCInstruction.java | 2 - .../instructions/ooc/OOCEvictionManager.java | 161 ++++++++++----------- .../instructions/ooc/ReblockOOCInstruction.java | 3 - .../instructions/ooc/TransposeOOCInstruction.java | 3 - .../instructions/ooc/UnaryOOCInstruction.java | 3 - .../spark/data/IndexedMatrixValue.java | 4 + .../sysds/runtime/matrix/data/MatrixIndexes.java | 5 + 9 files changed, 87 insertions(+), 99 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java index 8c8a64b022..c87b3c99cf 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java @@ -36,10 +36,8 @@ import org.apache.sysds.runtime.matrix.operators.AggregateOperator; import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator; import org.apache.sysds.runtime.matrix.operators.Operator; import org.apache.sysds.runtime.meta.DataCharacteristics; -import org.apache.sysds.runtime.util.CommonThreadPool; import java.util.HashMap; -import java.util.concurrent.ExecutorService; public class AggregateUnaryOOCInstruction extends ComputationOOCInstruction { private AggregateOperator _aop = null; diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/BinaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/BinaryOOCInstruction.java index 82ad12ae55..1dfc99be81 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/BinaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/BinaryOOCInstruction.java @@ -19,8 +19,6 @@ package org.apache.sysds.runtime.instructions.ooc; -import java.util.concurrent.ExecutorService; - import org.apache.sysds.common.Types.DataType; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; @@ -33,7 +31,6 @@ import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.operators.Operator; import org.apache.sysds.runtime.matrix.operators.ScalarOperator; -import org.apache.sysds.runtime.util.CommonThreadPool; public class BinaryOOCInstruction extends ComputationOOCInstruction { diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixVectorBinaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixVectorBinaryOOCInstruction.java index c1d1ed6ace..aa215e83e9 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixVectorBinaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixVectorBinaryOOCInstruction.java @@ -20,7 +20,6 @@ package org.apache.sysds.runtime.instructions.ooc; import java.util.HashMap; -import java.util.concurrent.ExecutorService; import org.apache.sysds.common.Opcodes; import org.apache.sysds.conf.ConfigurationManager; @@ -39,7 +38,6 @@ import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator; import org.apache.sysds.runtime.matrix.operators.AggregateOperator; import org.apache.sysds.runtime.matrix.operators.BinaryOperator; import org.apache.sysds.runtime.matrix.operators.Operator; -import org.apache.sysds.runtime.util.CommonThreadPool; public class MatrixVectorBinaryOOCInstruction extends ComputationOOCInstruction { diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java index 8a94209a9e..747167d510 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java @@ -22,14 +22,12 @@ package org.apache.sysds.runtime.instructions.ooc; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.matrix.data.MatrixBlock; -import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.util.LocalFileUtils; import java.io.File; import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Deque; -import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.Map; /** @@ -71,19 +69,15 @@ import java.util.Map; public class OOCEvictionManager { // Configuration: OOC buffer limit as percentage of heap - private static final double OOC_BUFFER_PERCENTAGE = 0.15; // 15% of heap + private static final double OOC_BUFFER_PERCENTAGE = 0.00015; // 15% of heap // Memory limit for ByteBuffers private static long _limit; private static long _size; // Cache structures: map key -> MatrixBlock and eviction deque (head=oldest block) - private static final Map<String, IndexedMatrixValue> _cache = new HashMap<>(); - private static final Deque<String> _evictDeque = new ArrayDeque<>(); - - // Single lock for synchronization - private static final Object lock = new Object(); - + private static LinkedHashMap<String, IndexedMatrixValue> _cache = new LinkedHashMap<>(); + // Spill directory for evicted blocks private static String _spillDir; @@ -92,10 +86,8 @@ public class OOCEvictionManager { } private static RPolicy _policy = RPolicy.FIFO; - private OOCEvictionManager() {} - static { - _limit = (long)(Runtime.getRuntime().maxMemory() * OOC_BUFFER_PERCENTAGE * 0.01); // e.g., 20% of heap + _limit = (long)(Runtime.getRuntime().maxMemory() * OOC_BUFFER_PERCENTAGE); // e.g., 20% of heap _size = 0; _spillDir = LocalFileUtils.getUniqueWorkingDir("ooc_stream"); LocalFileUtils.createLocalFileIfNotExist(_spillDir); @@ -109,103 +101,106 @@ public class OOCEvictionManager { long size = estimateSerializedSize(mb); String key = streamId + "_" + blockId; - synchronized (lock) { - IndexedMatrixValue old = _cache.remove(key); // remove old value - if (old != null) { - _evictDeque.remove(key); - _size -= estimateSerializedSize((MatrixBlock) old.getValue()); - } - - try { - evict(size); - } catch (IOException e) { - throw new DMLRuntimeException(e); - } - - _cache.put(key, value); // put new value - _evictDeque.addLast(key); // add to end for FIFO/LRU - _size += size; + IndexedMatrixValue old = _cache.remove(key); // remove old value + if (old != null) { + _size -= estimateSerializedSize((MatrixBlock) old.getValue()); } + + //make room if needed + evict(size); + + _cache.put(key, value); // put new value last + _size += size; } /** * Get a block from the OOC cache (deserialize on read) */ public static synchronized IndexedMatrixValue get(long streamId, int blockId) { - String key = streamId + "_" + blockId; IndexedMatrixValue imv = _cache.get(key); - synchronized (lock) { - if (imv != null && _policy == RPolicy.LRU) { - _evictDeque.remove(key); - _evictDeque.addLast(key); - } + if (imv != null && _policy == RPolicy.LRU) { + _cache.remove(key); + _cache.put(key, imv); //add last semantic } - - if (imv != null) { - return imv; - } else { - try { - return loadFromDisk(streamId, blockId); - } catch (IOException e) { - throw new DMLRuntimeException(e); - } - } - + + //restore if needed + return (imv.getValue() != null) ? imv : + loadFromDisk(streamId, blockId); } /** * Evict ByteBuffers to disk */ - private static void evict(long requiredSize) throws IOException { - while(_size + requiredSize > _limit && !_evictDeque.isEmpty()) { - System.out.println("_size + requiredSize: " + _size +" + "+ requiredSize + "; _limit: " + _limit); - String oldKey = _evictDeque.removeLast(); - IndexedMatrixValue toEvict = _cache.remove(oldKey); - - if (toEvict == null) { continue;} - MatrixBlock mbToEvict = (MatrixBlock) toEvict.getValue(); - - // Spill to disk - String filename = _spillDir + "/" + oldKey; - File spillDirFile = new File(_spillDir); - if (!spillDirFile.exists()) { - spillDirFile.mkdirs(); + private static void evict(long requiredSize) { + try { + int pos = 0; + while(_size + requiredSize > _limit && pos++ < _cache.size()) { + //System.out.println("BUFFER: "+_size+"/"+_limit+" size="+_cache.size()); + Map.Entry<String,IndexedMatrixValue> tmp = removeFirstFromCache(); + if( tmp == null || tmp.getValue().getValue() == null ) { + if( tmp != null ) + _cache.put(tmp.getKey(), tmp.getValue()); + continue; + } + + // Spill to disk + String filename = _spillDir + "/" + tmp.getKey(); + File spillDirFile = new File(_spillDir); + if (!spillDirFile.exists()) { + spillDirFile.mkdirs(); + } + LocalFileUtils.writeMatrixBlockToLocal(filename, (MatrixBlock)tmp.getValue().getValue()); + + // Evict from memory + long freedSize = estimateSerializedSize((MatrixBlock)tmp.getValue().getValue()); + tmp.getValue().setValue(null); + _cache.put(tmp.getKey(), tmp.getValue()); // add last semantic + _size -= freedSize; } - - LocalFileUtils.writeMatrixBlockToLocal(filename, mbToEvict); - - long freedSize = estimateSerializedSize(mbToEvict); - _size -= freedSize; - + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); } } /** * Load block from spill file */ - private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) throws IOException { - String filename = _spillDir + "/" + streamId + "_" + blockId; + private static IndexedMatrixValue loadFromDisk(long streamId, int blockId) { + String key = streamId + "_" + blockId; + String filename = _spillDir + "/" + key; - // check if file exists - if (!LocalFileUtils.isExisting(filename)) { - throw new IOException("File " + filename + " does not exist"); + try { + // check if file exists + if (!LocalFileUtils.isExisting(filename)) { + throw new IOException("File " + filename + " does not exist"); + } + + // Read from disk and put into original indexed matrix value + MatrixBlock mb = LocalFileUtils.readMatrixBlockFromLocal(filename); + IndexedMatrixValue imv = _cache.get(key); + imv.setValue(mb); + return imv; + } + catch(Exception ex) { + throw new DMLRuntimeException(ex); } - - // Read from disk - MatrixBlock mb = LocalFileUtils.readMatrixBlockFromLocal(filename); - - MatrixIndexes ix = new MatrixIndexes(blockId + 1, 1); - - // Put back in cache (may trigger eviction) - // get() operation should not modify cache - // put(streamId, blockId, new IndexedMatrixValue(ix, mb)); - - return new IndexedMatrixValue(ix, mb); } private static long estimateSerializedSize(MatrixBlock mb) { return mb.getExactSerializedSize(); } + + private static Map.Entry<String, IndexedMatrixValue> removeFirstFromCache() { + //move iterator to first entry + Iterator<Map.Entry<String, IndexedMatrixValue>> iter = _cache.entrySet().iterator(); + Map.Entry<String, IndexedMatrixValue> entry = iter.next(); + + //remove current iterator entry + iter.remove(); + + return entry; + } } diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ReblockOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ReblockOOCInstruction.java index 06386c5d66..3c78879b45 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ReblockOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ReblockOOCInstruction.java @@ -19,8 +19,6 @@ package org.apache.sysds.runtime.instructions.ooc; -import java.util.concurrent.ExecutorService; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; @@ -40,7 +38,6 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.matrix.operators.Operator; import org.apache.sysds.runtime.meta.DataCharacteristics; -import org.apache.sysds.runtime.util.CommonThreadPool; public class ReblockOOCInstruction extends ComputationOOCInstruction { private int blen; diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java index fce5408960..05e31830a5 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java @@ -30,9 +30,6 @@ import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.matrix.operators.ReorgOperator; -import org.apache.sysds.runtime.util.CommonThreadPool; - -import java.util.concurrent.ExecutorService; public class TransposeOOCInstruction extends ComputationOOCInstruction { diff --git a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java index 63f42f5bf1..173486844a 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java @@ -28,9 +28,6 @@ import org.apache.sysds.runtime.instructions.cp.CPOperand; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.operators.UnaryOperator; -import org.apache.sysds.runtime.util.CommonThreadPool; - -import java.util.concurrent.ExecutorService; public class UnaryOOCInstruction extends ComputationOOCInstruction { private UnaryOperator _uop = null; diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/IndexedMatrixValue.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/IndexedMatrixValue.java index 8f82a99abf..7b20fe2f9e 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/IndexedMatrixValue.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/IndexedMatrixValue.java @@ -66,6 +66,10 @@ public class IndexedMatrixValue implements Serializable public MatrixValue getValue() { return _value; } + + public void setValue(MatrixValue value) { + _value = value; + } public void set(MatrixIndexes indexes2, MatrixValue block2) { _indexes.setIndexes(indexes2); diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixIndexes.java b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixIndexes.java index 5a3d0a6430..7f0f3b7a65 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixIndexes.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixIndexes.java @@ -108,6 +108,11 @@ public class MatrixIndexes implements WritableComparable<MatrixIndexes>, RawComp public String toString() { return "("+_row+", "+_col+")"; } + + public MatrixIndexes fromString(String ix) { + String[] parts = ix.substring(1, ix.length()-1).split(","); + return new MatrixIndexes(Long.parseLong(parts[0]), Long.parseLong(parts[1].trim())); + } //////////////////////////////////////////////////// // implementation of Writable read/write
