This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch pr-2343
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 3cfdb3e5e5b502d237b5d32b56ea24eebcbce145
Author: Matthias Boehm <[email protected]>
AuthorDate: Fri Oct 31 17:04:40 2025 +0100

    [SYSTEMDS-3930] Cleanup out-of-core backend and buffer pool
    
    * Fix order-dependent restore of indexed matrix blocks (keep indexes)
    * Fix potential race condition in unguarded cache modifications
    * Fix warnings and unnecessary imports
---
 .../ooc/AggregateUnaryOOCInstruction.java          |   2 -
 .../instructions/ooc/BinaryOOCInstruction.java     |   3 -
 .../ooc/MatrixVectorBinaryOOCInstruction.java      |   2 -
 .../instructions/ooc/OOCEvictionManager.java       | 161 ++++++++++-----------
 .../instructions/ooc/ReblockOOCInstruction.java    |   3 -
 .../instructions/ooc/TransposeOOCInstruction.java  |   3 -
 .../instructions/ooc/UnaryOOCInstruction.java      |   3 -
 .../spark/data/IndexedMatrixValue.java             |   4 +
 .../sysds/runtime/matrix/data/MatrixIndexes.java   |   5 +
 9 files changed, 87 insertions(+), 99 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java
index 8c8a64b022..c87b3c99cf 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/AggregateUnaryOOCInstruction.java
@@ -36,10 +36,8 @@ import 
org.apache.sysds.runtime.matrix.operators.AggregateOperator;
 import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysds.runtime.matrix.operators.Operator;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
-import org.apache.sysds.runtime.util.CommonThreadPool;
 
 import java.util.HashMap;
-import java.util.concurrent.ExecutorService;
 
 public class AggregateUnaryOOCInstruction extends ComputationOOCInstruction {
        private AggregateOperator _aop = null;
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/BinaryOOCInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/BinaryOOCInstruction.java
index 82ad12ae55..1dfc99be81 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/BinaryOOCInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/BinaryOOCInstruction.java
@@ -19,8 +19,6 @@
 
 package org.apache.sysds.runtime.instructions.ooc;
 
-import java.util.concurrent.ExecutorService;
-
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
@@ -33,7 +31,6 @@ import 
org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.Operator;
 import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
-import org.apache.sysds.runtime.util.CommonThreadPool;
 
 public class BinaryOOCInstruction extends ComputationOOCInstruction {
        
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixVectorBinaryOOCInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixVectorBinaryOOCInstruction.java
index c1d1ed6ace..aa215e83e9 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixVectorBinaryOOCInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/MatrixVectorBinaryOOCInstruction.java
@@ -20,7 +20,6 @@
 package org.apache.sysds.runtime.instructions.ooc;
 
 import java.util.HashMap;
-import java.util.concurrent.ExecutorService;
 
 import org.apache.sysds.common.Opcodes;
 import org.apache.sysds.conf.ConfigurationManager;
@@ -39,7 +38,6 @@ import 
org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
 import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
 import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
 import org.apache.sysds.runtime.matrix.operators.Operator;
-import org.apache.sysds.runtime.util.CommonThreadPool;
 
 public class MatrixVectorBinaryOOCInstruction extends 
ComputationOOCInstruction {
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java
index 8a94209a9e..747167d510 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/OOCEvictionManager.java
@@ -22,14 +22,12 @@ package org.apache.sysds.runtime.instructions.ooc;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysds.runtime.util.LocalFileUtils;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.Map;
 
 /**
@@ -71,19 +69,15 @@ import java.util.Map;
 public class OOCEvictionManager {
 
        // Configuration: OOC buffer limit as percentage of heap
-       private static final double OOC_BUFFER_PERCENTAGE = 0.15; // 15% of heap
+       private static final double OOC_BUFFER_PERCENTAGE = 0.00015; // 15% of 
heap
 
        // Memory limit for ByteBuffers
        private static long _limit;
        private static long _size;
 
        // Cache structures: map key -> MatrixBlock and eviction deque 
(head=oldest block)
-       private static final Map<String, IndexedMatrixValue> _cache = new 
HashMap<>();
-       private static final Deque<String> _evictDeque = new ArrayDeque<>();
-
-       // Single lock for synchronization
-       private static final Object lock = new Object();
-
+       private static LinkedHashMap<String, IndexedMatrixValue> _cache = new 
LinkedHashMap<>();
+       
        // Spill directory for evicted blocks
        private static String _spillDir;
 
@@ -92,10 +86,8 @@ public class OOCEvictionManager {
        }
        private static RPolicy _policy = RPolicy.FIFO;
 
-       private OOCEvictionManager() {}
-
        static {
-               _limit = (long)(Runtime.getRuntime().maxMemory() * 
OOC_BUFFER_PERCENTAGE * 0.01); // e.g., 20% of heap
+               _limit = (long)(Runtime.getRuntime().maxMemory() * 
OOC_BUFFER_PERCENTAGE); // e.g., 20% of heap
                _size = 0;
                _spillDir = LocalFileUtils.getUniqueWorkingDir("ooc_stream");
                LocalFileUtils.createLocalFileIfNotExist(_spillDir);
@@ -109,103 +101,106 @@ public class OOCEvictionManager {
                long size = estimateSerializedSize(mb);
                String key = streamId + "_" + blockId;
 
-               synchronized (lock) {
-                       IndexedMatrixValue old = _cache.remove(key); // remove 
old value
-                       if (old != null) {
-                               _evictDeque.remove(key);
-                               _size -= estimateSerializedSize((MatrixBlock) 
old.getValue());
-                       }
-
-                       try {
-                               evict(size);
-                       } catch (IOException e) {
-                               throw new DMLRuntimeException(e);
-                       }
-
-                       _cache.put(key, value); // put new value
-                       _evictDeque.addLast(key); // add to end for FIFO/LRU
-                       _size += size;
+               IndexedMatrixValue old = _cache.remove(key); // remove old value
+               if (old != null) {
+                       _size -= estimateSerializedSize((MatrixBlock) 
old.getValue());
                }
+
+               //make room if needed
+               evict(size);
+               
+               _cache.put(key, value); // put new value last
+               _size += size;
        }
 
        /**
         * Get a block from the OOC cache (deserialize on read)
         */
        public static synchronized IndexedMatrixValue get(long streamId, int 
blockId) {
-
                String key = streamId + "_" + blockId;
                IndexedMatrixValue imv = _cache.get(key);
 
-               synchronized (lock) {
-                       if (imv != null && _policy == RPolicy.LRU) {
-                               _evictDeque.remove(key);
-                               _evictDeque.addLast(key);
-                       }
+               if (imv != null && _policy == RPolicy.LRU) {
+                       _cache.remove(key);
+                       _cache.put(key, imv); //add last semantic
                }
-
-               if (imv != null) {
-                       return imv;
-               } else {
-                       try {
-                               return loadFromDisk(streamId, blockId);
-                       } catch (IOException e) {
-                               throw new DMLRuntimeException(e);
-                       }
-               }
-
+               
+               //restore if needed
+               return (imv.getValue() != null) ? imv : 
+                       loadFromDisk(streamId, blockId);
        }
 
        /**
         * Evict ByteBuffers to disk
         */
-       private static void evict(long requiredSize) throws IOException {
-               while(_size + requiredSize > _limit && !_evictDeque.isEmpty()) {
-                       System.out.println("_size + requiredSize: " + _size +" 
+ "+ requiredSize + "; _limit: " + _limit);
-                       String oldKey = _evictDeque.removeLast();
-                       IndexedMatrixValue toEvict = _cache.remove(oldKey);
-
-                       if (toEvict == null) { continue;}
-                       MatrixBlock mbToEvict = (MatrixBlock) 
toEvict.getValue();
-
-                       // Spill to disk
-                       String filename = _spillDir + "/" + oldKey;
-                       File spillDirFile = new File(_spillDir);
-                       if (!spillDirFile.exists()) {
-                               spillDirFile.mkdirs();
+       private static void evict(long requiredSize) {
+               try {
+                       int pos = 0;
+                       while(_size + requiredSize > _limit && pos++ < 
_cache.size()) {
+                               //System.out.println("BUFFER: 
"+_size+"/"+_limit+" size="+_cache.size());
+                               Map.Entry<String,IndexedMatrixValue> tmp = 
removeFirstFromCache();
+                               if( tmp == null || tmp.getValue().getValue() == 
null ) { 
+                                       if( tmp != null )
+                                               _cache.put(tmp.getKey(), 
tmp.getValue());
+                                       continue;
+                               }
+       
+                               // Spill to disk
+                               String filename = _spillDir + "/" + 
tmp.getKey();
+                               File spillDirFile = new File(_spillDir);
+                               if (!spillDirFile.exists()) {
+                                       spillDirFile.mkdirs();
+                               }
+                               
LocalFileUtils.writeMatrixBlockToLocal(filename, 
(MatrixBlock)tmp.getValue().getValue());
+       
+                               // Evict from memory
+                               long freedSize = 
estimateSerializedSize((MatrixBlock)tmp.getValue().getValue());
+                               tmp.getValue().setValue(null);
+                               _cache.put(tmp.getKey(), tmp.getValue()); // 
add last semantic
+                               _size -= freedSize;
                        }
-
-                       LocalFileUtils.writeMatrixBlockToLocal(filename, 
mbToEvict);
-
-                       long freedSize = estimateSerializedSize(mbToEvict);
-                       _size -= freedSize;
-
+               }
+               catch(Exception ex) {
+                       throw new DMLRuntimeException(ex);
                }
        }
 
        /**
         * Load block from spill file
         */
-       private static IndexedMatrixValue loadFromDisk(long streamId, int 
blockId) throws IOException {
-               String filename = _spillDir + "/" + streamId + "_" + blockId;
+       private static IndexedMatrixValue loadFromDisk(long streamId, int 
blockId) {
+               String key = streamId + "_" + blockId;
+               String filename = _spillDir + "/" + key;
 
-               // check if file exists
-               if (!LocalFileUtils.isExisting(filename)) {
-                       throw new IOException("File " + filename + " does not 
exist");
+               try {
+                       // check if file exists
+                       if (!LocalFileUtils.isExisting(filename)) {
+                               throw new IOException("File " + filename + " 
does not exist");
+                       }
+       
+                       // Read from disk and put into original indexed matrix 
value
+                       MatrixBlock mb = 
LocalFileUtils.readMatrixBlockFromLocal(filename);
+                       IndexedMatrixValue imv = _cache.get(key);
+                       imv.setValue(mb);
+                       return imv;
+               }
+               catch(Exception ex) {
+                       throw new DMLRuntimeException(ex);
                }
-
-               // Read from disk
-               MatrixBlock mb = 
LocalFileUtils.readMatrixBlockFromLocal(filename);
-
-               MatrixIndexes ix = new MatrixIndexes(blockId + 1, 1);
-
-               // Put back in cache (may trigger eviction)
-               // get() operation should not modify cache
-               // put(streamId, blockId, new IndexedMatrixValue(ix, mb));
-
-               return new IndexedMatrixValue(ix, mb);
        }
 
        private static long estimateSerializedSize(MatrixBlock mb) {
                return mb.getExactSerializedSize();
        }
+       
+       private static Map.Entry<String, IndexedMatrixValue> 
removeFirstFromCache() {
+               //move iterator to first entry
+               Iterator<Map.Entry<String, IndexedMatrixValue>> iter = 
_cache.entrySet().iterator();
+               Map.Entry<String, IndexedMatrixValue> entry = iter.next();
+
+               //remove current iterator entry
+               iter.remove();
+
+               return entry;
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ReblockOOCInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ReblockOOCInstruction.java
index 06386c5d66..3c78879b45 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/ReblockOOCInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/ReblockOOCInstruction.java
@@ -19,8 +19,6 @@
 
 package org.apache.sysds.runtime.instructions.ooc;
 
-import java.util.concurrent.ExecutorService;
-
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
@@ -40,7 +38,6 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysds.runtime.matrix.operators.Operator;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
-import org.apache.sysds.runtime.util.CommonThreadPool;
 
 public class ReblockOOCInstruction extends ComputationOOCInstruction {
        private int blen;
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java
index fce5408960..05e31830a5 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java
@@ -30,9 +30,6 @@ import 
org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
-import org.apache.sysds.runtime.util.CommonThreadPool;
-
-import java.util.concurrent.ExecutorService;
 
 public class TransposeOOCInstruction extends ComputationOOCInstruction {
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
index 63f42f5bf1..173486844a 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/ooc/UnaryOOCInstruction.java
@@ -28,9 +28,6 @@ import org.apache.sysds.runtime.instructions.cp.CPOperand;
 import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
-import org.apache.sysds.runtime.util.CommonThreadPool;
-
-import java.util.concurrent.ExecutorService;
 
 public class UnaryOOCInstruction extends ComputationOOCInstruction {
        private UnaryOperator _uop = null;
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/IndexedMatrixValue.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/IndexedMatrixValue.java
index 8f82a99abf..7b20fe2f9e 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/IndexedMatrixValue.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/IndexedMatrixValue.java
@@ -66,6 +66,10 @@ public class IndexedMatrixValue implements Serializable
        public MatrixValue getValue() {
                return _value;
        }
+
+       public void setValue(MatrixValue value) {
+               _value = value;
+       }
        
        public void set(MatrixIndexes indexes2, MatrixValue block2) {
                _indexes.setIndexes(indexes2);
diff --git 
a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixIndexes.java 
b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixIndexes.java
index 5a3d0a6430..7f0f3b7a65 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixIndexes.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixIndexes.java
@@ -108,6 +108,11 @@ public class MatrixIndexes implements 
WritableComparable<MatrixIndexes>, RawComp
        public String toString() {
                return "("+_row+", "+_col+")";
        }
+       
+       public MatrixIndexes fromString(String ix) {
+               String[] parts = ix.substring(1, ix.length()-1).split(",");
+               return new MatrixIndexes(Long.parseLong(parts[0]), 
Long.parseLong(parts[1].trim()));
+       }
 
        ////////////////////////////////////////////////////
        // implementation of Writable read/write

Reply via email to