This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 985e4916fa [SYSTEMDS-3341] Unified Memory Manager initial 
implementation
985e4916fa is described below

commit 985e4916facdda74fccfc219fec64e8b3bfcadd3
Author: arnabp <[email protected]>
AuthorDate: Sat Apr 9 14:04:32 2022 +0200

    [SYSTEMDS-3341] Unified Memory Manager initial implementation
    
    This patch brings in the initial version of UMM.
    So far, operation memory (70%) and buffer pool memory (15%) are managed
    independently. UMM unifies these two and allows dynamic shifting of the
    boundary between these two memory partitions. While pinning an input,
    we first check if the input is available in the cache, and we make space
    by evicting cached objects if the required space is not available. We
    also reserve memory for the worst-case output memory (70% - sizeof(inputs))
    during pinning each input. A better approach is to reserve estimated
    output memory, however, that'd need us to add a reserve call in each
    instruction.
    
    Closes #1573
---
 src/main/java/org/apache/sysds/conf/DMLConfig.java |   4 +-
 .../java/org/apache/sysds/hops/OptimizerUtils.java |  70 +++-
 .../apache/sysds/parser/ParForStatementBlock.java  |   6 +-
 .../controlprogram/caching/CacheEvictionQueue.java |  64 ++++
 .../caching/CacheMaintenanceService.java           | 105 ++++++
 .../controlprogram/caching/CacheStatistics.java    |  14 +-
 .../controlprogram/caching/CacheableData.java      |  52 ++-
 .../controlprogram/caching/FrameObject.java        |   8 +-
 .../controlprogram/caching/LazyWriteBuffer.java    | 143 +-------
 .../controlprogram/caching/MatrixObject.java       |   7 +-
 .../controlprogram/caching/TensorObject.java       |   8 +-
 .../caching/UnifiedMemoryManager.java              | 392 +++++++++++++++++----
 .../sysds/runtime/meta/TensorCharacteristics.java  |  10 +
 .../test/component/frame/FrameEvictionTest.java    |  10 +-
 .../sysds/test/functions/caching/UMMTest.java      | 100 ++++++
 src/test/scripts/functions/caching/UMMTest1.dml    |  35 ++
 16 files changed, 800 insertions(+), 228 deletions(-)

diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java 
b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index a870644838..03a0afb233 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -165,8 +165,8 @@ public class DMLConfig
                _defaultVals.put(NATIVE_BLAS_DIR,        "none" );
                _defaultVals.put(LINEAGECACHESPILL,      "true" );
                _defaultVals.put(COMPILERASSISTED_RW,    "true" );
-               _defaultVals.put(BUFFERPOOL_LIMIT,       "15"); // 15% of total 
heap
-               _defaultVals.put(MEMORY_MANAGER,         "static"); // static 
partitioning of heap
+               _defaultVals.put(BUFFERPOOL_LIMIT,       "15"); // % of total 
heap
+               _defaultVals.put(MEMORY_MANAGER,         "static"); // 
static/unified partitioning of heap
                _defaultVals.put(PRINT_GPU_MEMORY_INFO,  "false" );
                _defaultVals.put(EVICTION_SHADOW_BUFFERSIZE,  "0.0" );
                _defaultVals.put(STATS_MAX_WRAP_LEN,     "30" );
diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java 
b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
index f96cd0200d..a58450d5f3 100644
--- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
@@ -45,6 +45,7 @@ import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.ForProgramBlock;
 import org.apache.sysds.runtime.controlprogram.LocalVariableMap;
 import org.apache.sysds.runtime.controlprogram.caching.LazyWriteBuffer;
+import org.apache.sysds.runtime.controlprogram.caching.UnifiedMemoryManager;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
 import 
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysds.runtime.data.SparseBlock;
@@ -74,7 +75,21 @@ public class OptimizerUtils
         * NOTE: it is important that 
MEM_UTIL_FACTOR+CacheableData.CACHING_BUFFER_SIZE &lt; 1.0
         */
        public static double MEM_UTIL_FACTOR = 0.7d;
-       
+       /** Default buffer pool sizes for static (15%) and unified (85%) memory 
*/
+       public static double DEFAULT_MEM_UTIL_FACTOR = 0.15d;
+       public static double DEFAULT_UMM_UTIL_FACTOR = 0.85d;
+
+       /** Memory managers (static partitioned, unified) */
+       public enum MemoryManager {
+               STATIC_MEMORY_MANAGER,
+               UNIFIED_MEMORY_MANAGER
+       }
+
+       /** Indicate the current memory manager in effect */
+       public static MemoryManager MEMORY_MANAGER = null;
+       /** Buffer pool size in bytes */
+       public static long BUFFER_POOL_SIZE = 0;
+
        /** Default blocksize if unspecified or for testing purposes */
        public static final int DEFAULT_BLOCKSIZE = 1000;
        
@@ -468,6 +483,59 @@ public class OptimizerUtils
                double ret = InfrastructureAnalyzer.getLocalMaxMemory();
                return ret * OptimizerUtils.MEM_UTIL_FACTOR;
        }
+
+       /**
+        * Returns buffer pool size as set in the config
+        *
+        * @return buffer pool size in bytes
+        */
+       public static long getBufferPoolLimit() {
+               if (BUFFER_POOL_SIZE != 0)
+                       return BUFFER_POOL_SIZE;
+               DMLConfig conf = ConfigurationManager.getDMLConfig();
+               double bufferPoolFactor = 
(double)(conf.getIntValue(DMLConfig.BUFFERPOOL_LIMIT))/100;
+               bufferPoolFactor = Math.max(bufferPoolFactor, 
DEFAULT_MEM_UTIL_FACTOR);
+               long maxMem = InfrastructureAnalyzer.getLocalMaxMemory();
+               return (long)(bufferPoolFactor * maxMem);
+       }
+
+       /**
+        * Check if unified memory manager is in effect
+        * @return boolean
+        */
+       public static boolean isUMMEnabled() {
+               if (MEMORY_MANAGER == null) {
+                       DMLConfig conf = ConfigurationManager.getDMLConfig();
+                       boolean isUMM = 
conf.getTextValue(DMLConfig.MEMORY_MANAGER).equalsIgnoreCase("unified");
+                       MEMORY_MANAGER = isUMM ? 
MemoryManager.UNIFIED_MEMORY_MANAGER : MemoryManager.STATIC_MEMORY_MANAGER;
+               }
+               return MEMORY_MANAGER == MemoryManager.UNIFIED_MEMORY_MANAGER;
+       }
+
+       /**
+        * Disable unified memory manager and fallback to static partitioning.
+        * Initialize LazyWriteBuffer with the default size (15%).
+        */
+       public static void disableUMM() {
+               MEMORY_MANAGER = MemoryManager.STATIC_MEMORY_MANAGER;
+               LazyWriteBuffer.cleanup();
+               LazyWriteBuffer.init();
+               long maxMem = InfrastructureAnalyzer.getLocalMaxMemory();
+               BUFFER_POOL_SIZE = (long) (DEFAULT_MEM_UTIL_FACTOR * maxMem);
+               LazyWriteBuffer.setWriteBufferLimit(BUFFER_POOL_SIZE);
+       }
+
+       /**
+        * Enable unified memory manager and initialize with the default size 
(85%).
+        */
+       public static void enableUMM() {
+               MEMORY_MANAGER = MemoryManager.UNIFIED_MEMORY_MANAGER;
+               UnifiedMemoryManager.cleanup();
+               UnifiedMemoryManager.init();
+               long maxMem = InfrastructureAnalyzer.getLocalMaxMemory();
+               BUFFER_POOL_SIZE = (long) (DEFAULT_UMM_UTIL_FACTOR * maxMem);
+               UnifiedMemoryManager.setUMMLimit(BUFFER_POOL_SIZE);
+       }
        
        public static boolean isMaxLocalParallelism(int k) {
                return InfrastructureAnalyzer.getLocalParallelism() == k;
diff --git a/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java 
b/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java
index 607641c7ff..130495d03b 100644
--- a/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java
+++ b/src/main/java/org/apache/sysds/parser/ParForStatementBlock.java
@@ -363,7 +363,11 @@ public class ParForStatementBlock extends ForStatementBlock
                        _fncache.clear();
                
                LOG.debug("INFO: PARFOR("+_PID+"): validate successful (no 
dependencies) in "+time.stop()+"ms.");
-               
+
+               //disable UMM if in effect and fallback to lazy write buffer
+               if (OptimizerUtils.isUMMEnabled())
+                       OptimizerUtils.disableUMM();
+
                return vs;
        }
        
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheEvictionQueue.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheEvictionQueue.java
new file mode 100644
index 0000000000..0b2e3ac8b7
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheEvictionQueue.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.caching;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class CacheEvictionQueue extends LinkedHashMap<String, ByteBuffer>
+{
+       /**
+        * Extended LinkedHashMap with convenience methods for adding and 
removing
+        * last/first entries.
+        *
+        */
+       private static final long serialVersionUID = -5208333402581364859L;
+
+       public void addLast( String fname, ByteBuffer bbuff ) {
+               //put entry into eviction queue w/ 'addLast' semantics
+               put(fname, bbuff);
+       }
+
+       public Map.Entry<String, ByteBuffer> removeFirst()
+       {
+               //move iterator to first entry
+               Iterator<Map.Entry<String, ByteBuffer>> iter = 
entrySet().iterator();
+               Map.Entry<String, ByteBuffer> entry = iter.next();
+
+               //remove current iterator entry
+               iter.remove();
+
+               return entry;
+       }
+
+       public Map.Entry<String, ByteBuffer> removeFirstUnpinned(List<String> 
pinnedList) {
+               //move iterator to first entry
+               Iterator<Map.Entry<String, ByteBuffer>> iter = 
entrySet().iterator();
+               var entry = iter.next();
+               while (pinnedList.contains(entry.getKey()))
+                       entry = iter.next();
+
+               //remove current iterator entry
+               iter.remove();
+               return entry;
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheMaintenanceService.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheMaintenanceService.java
new file mode 100644
index 0000000000..21010781d8
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheMaintenanceService.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.controlprogram.caching;
+
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.util.LocalFileUtils;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class CacheMaintenanceService
+{
+       protected ExecutorService _pool = null;
+
+       public CacheMaintenanceService() {
+               //create new threadpool for async cleanup
+               if( isAsync() )
+                       _pool = Executors.newCachedThreadPool();
+       }
+
+       public void deleteFile(String fname) {
+               //sync or async file delete
+               if( CacheableData.CACHING_ASYNC_FILECLEANUP )
+                       _pool.submit(new 
CacheMaintenanceService.FileCleanerTask(fname));
+               else
+                       LocalFileUtils.deleteFileIfExists(fname, true);
+       }
+
+       public void serializeData(ByteBuffer bbuff, CacheBlock cb) {
+               //sync or async file delete
+               if( CacheableData.CACHING_ASYNC_SERIALIZE )
+                       _pool.submit(new 
CacheMaintenanceService.DataSerializerTask(bbuff, cb));
+               else {
+                       try {
+                               bbuff.serializeBlock(cb);
+                       }
+                       catch(IOException ex) {
+                               throw new DMLRuntimeException(ex);
+                       }
+               }
+       }
+
+       public void close() {
+               //execute pending tasks and shutdown pool
+               if( isAsync() )
+                       _pool.shutdown();
+       }
+
+       @SuppressWarnings("unused")
+       public boolean isAsync() {
+               return CacheableData.CACHING_ASYNC_FILECLEANUP
+                       || CacheableData.CACHING_ASYNC_SERIALIZE;
+       }
+
+       private static class FileCleanerTask implements Runnable {
+               private String _fname = null;
+
+               public FileCleanerTask( String fname ) {
+                       _fname = fname;
+               }
+
+               @Override
+               public void run() {
+                       LocalFileUtils.deleteFileIfExists(_fname, true);
+               }
+       }
+
+       private static class DataSerializerTask implements Runnable {
+               private ByteBuffer _bbuff = null;
+               private CacheBlock _cb = null;
+
+               public DataSerializerTask(ByteBuffer bbuff, CacheBlock cb) {
+                       _bbuff = bbuff;
+                       _cb = cb;
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               _bbuff.serializeBlock(_cb);
+                       }
+                       catch(IOException ex) {
+                               throw new DMLRuntimeException(ex);
+                       }
+               }
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheStatistics.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheStatistics.java
index a001e65708..fe525be9cd 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheStatistics.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheStatistics.java
@@ -59,7 +59,7 @@ public class CacheStatistics
        private static final LongAdder _numHitsLin      = new LongAdder();
 
        //write statistics caching
-       private static final LongAdder _numWritesFSBuff = new LongAdder();
+       private static final LongAdder _numWritesBPool = new LongAdder();
        private static final LongAdder _numWritesFS     = new LongAdder();
        private static final LongAdder _numWritesHDFS   = new LongAdder();
        private static final LongAdder _numWritesLin    = new LongAdder();
@@ -77,7 +77,7 @@ public class CacheStatistics
                _numHitsFS.reset();
                _numHitsHDFS.reset();
                
-               _numWritesFSBuff.reset();
+               _numWritesBPool.reset();
                _numWritesFS.reset();
                _numWritesHDFS.reset();
                _numWritesLin.reset();
@@ -148,16 +148,16 @@ public class CacheStatistics
                return _numHitsLin.longValue();
        }
 
-       public static void incrementFSBuffWrites() {
-               _numWritesFSBuff.increment();
+       public static void incrementBPoolWrites() {
+               _numWritesBPool.increment();
        }
        
        public static void incrementFSBuffWrites(int delta) {
-               _numWritesFSBuff.add(delta);
+               _numWritesBPool.add(delta);
        }
        
        public static long getFSBuffWrites() {
-               return _numWritesFSBuff.longValue();
+               return _numWritesBPool.longValue();
        }
        
        public static void incrementFSWrites() {
@@ -247,7 +247,7 @@ public class CacheStatistics
                StringBuilder sb = new StringBuilder();
                sb.append(_numWritesLin.longValue());
                sb.append("/");
-               sb.append(_numWritesFSBuff.longValue());
+               sb.append(_numWritesBPool.longValue());
                sb.append("/");
                sb.append(_numWritesFS.longValue());
                sb.append("/");
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index 18ca1a4f55..3cee338f7d 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -80,13 +80,10 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
 
        /** Global logging instance for all subclasses of CacheableData */
        protected static final Log LOG = 
LogFactory.getLog(CacheableData.class.getName());
-       static DMLConfig conf = ConfigurationManager.getDMLConfig();
 
        // global constant configuration parameters
-       public static final boolean UMM = 
conf.getTextValue(DMLConfig.MEMORY_MANAGER).equalsIgnoreCase("unified");
-       public static final long    CACHING_THRESHOLD = (long)Math.max(4*1024, 
//obj not s.t. caching
+       public static final long CACHING_THRESHOLD = (long)Math.max(4*1024, 
//obj not s.t. caching
                1e-5 * InfrastructureAnalyzer.getLocalMaxMemory());       //if 
below threshold [in bytes]
-       public static final double CACHING_BUFFER_SIZE = 
(double)(conf.getIntValue(DMLConfig.BUFFERPOOL_LIMIT))/100; //15%
        public static final RPolicy CACHING_BUFFER_POLICY = RPolicy.FIFO;
        public static final boolean CACHING_BUFFER_PAGECACHE = false;
        public static final boolean CACHING_WRITE_CACHE_ON_READ = false;
@@ -528,6 +525,10 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                //get object from cache
                if( _data == null )
                        getCache();
+
+               if (OptimizerUtils.isUMMEnabled())
+                       //track and make space in the UMM
+                       UnifiedMemoryManager.pin(this);
                
                //call acquireHostRead if gpuHandle is set as well as is 
allocated
                if( DMLScript.USE_ACCELERATOR && _gpuObjects != null ) {
@@ -596,9 +597,10 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                else if( _data!=null && DMLScript.STATISTICS ) {
                        CacheStatistics.incrementMemHits();
                }
-               
+
                //cache status maintenance
                acquire( false, _data==null );
+
                return _data;
        }
        
@@ -698,7 +700,11 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                        //compact empty in-memory block 
                        _data.compactEmptyBlock();
                }
-               
+
+               if (OptimizerUtils.isUMMEnabled())
+                       //give the memory back to UMM
+                       UnifiedMemoryManager.unpin(this);
+
                //cache status maintenance (pass cacheNoWrite flag)
                release(_isAcquireFromEmpty && !_requiresLocalWrite);
                
@@ -709,7 +715,11 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
                        if( ( write && !hasValidLineage() ) || 
_requiresLocalWrite ) {
                                String filePath = getCacheFilePathAndName();
                                try {
-                                       LazyWriteBuffer.writeBlock(filePath, 
_data);
+                                       //write into the buffer pool
+                                       if (OptimizerUtils.isUMMEnabled())
+                                               
UnifiedMemoryManager.writeBlock(filePath, _data);
+                                       else
+                                               
LazyWriteBuffer.writeBlock(filePath, _data);
                                }
                                catch (Exception e) {
                                        throw new DMLRuntimeException("Eviction 
to local path " + filePath + " ("+hashCode()+") failed.", e);
@@ -1019,8 +1029,12 @@ public abstract class CacheableData<T extends 
CacheBlock> extends Data
                        LOG.trace("CACHE: Freeing evicted matrix...  " + 
hashCode() + "  HDFS path: " + 
                                (_hdfsFileName == null ? "null" : 
_hdfsFileName) + " Eviction path: " + cacheFilePathAndName);
                
-               if(isCachingActive())
-                       LazyWriteBuffer.deleteBlock(cacheFilePathAndName);
+               if(isCachingActive()) {
+                       if (OptimizerUtils.isUMMEnabled())
+                               
UnifiedMemoryManager.deleteBlock(cacheFilePathAndName);
+                       else
+                               
LazyWriteBuffer.deleteBlock(cacheFilePathAndName);
+               }
                
                if( LOG.isTraceEnabled() )
                        LOG.trace("Freeing evicted matrix - COMPLETED ... " + 
(System.currentTimeMillis()-begin) + " msec.");
@@ -1031,7 +1045,12 @@ public abstract class CacheableData<T extends 
CacheBlock> extends Data
        }
        
        public static boolean isBelowCachingThreshold(CacheBlock data) {
-               return LazyWriteBuffer.getCacheBlockSize(data) <= 
CACHING_THRESHOLD;
+               boolean ret;
+               if (OptimizerUtils.isUMMEnabled())
+                       ret = UnifiedMemoryManager.getCacheBlockSize(data) <= 
CACHING_THRESHOLD;
+               else
+                       ret = LazyWriteBuffer.getCacheBlockSize(data) <= 
CACHING_THRESHOLD;
+               return ret;
        }
        
        public long getDataSize() {
@@ -1355,6 +1374,7 @@ public abstract class CacheableData<T extends CacheBlock> 
extends Data
        public synchronized static void cleanupCacheDir() {
                //cleanup remaining cached writes
                LazyWriteBuffer.cleanup();
+               UnifiedMemoryManager.cleanup();
                
                //delete cache dir and files
                cleanupCacheDir(true);
@@ -1420,13 +1440,17 @@ public abstract class CacheableData<T extends 
CacheBlock> extends Data
                        throw new IOException(e);
                }
        
-               //init write-ahead buffer
-               LazyWriteBuffer.init();
+               if (OptimizerUtils.isUMMEnabled())
+                       //init unified memory manager
+                       UnifiedMemoryManager.init();
+               else
+                       //init write-ahead buffer
+                       LazyWriteBuffer.init();
+
                _refBCs.set(0);
-               
                _activeFlag = true; //turn on caching
        }
-       
+
        public static boolean isCachingActive() {
                return _activeFlag;
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
index 448538833f..84ff65192d 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
@@ -26,6 +26,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.parser.DataExpression;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
@@ -171,7 +172,12 @@ public class FrameObject extends CacheableData<FrameBlock>
        
        @Override
        protected FrameBlock readBlobFromCache(String fname) throws IOException 
{
-               return (FrameBlock)LazyWriteBuffer.readBlock(fname, false);
+               FrameBlock fb = null;
+               if (OptimizerUtils.isUMMEnabled())
+                       fb = (FrameBlock) UnifiedMemoryManager.readBlock(fname, 
false);
+               else
+                       fb = (FrameBlock)LazyWriteBuffer.readBlock(fname, 
false);
+               return fb;
        }
 
        @Override
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java
index 9ca079f47e..53d281631d 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/LazyWriteBuffer.java
@@ -20,15 +20,11 @@
 package org.apache.sysds.runtime.controlprogram.caching;
 
 import java.io.IOException;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.Map.Entry;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 import org.apache.sysds.api.DMLScript;
-import org.apache.sysds.runtime.DMLRuntimeException;
-import 
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.util.LocalFileUtils;
 
 public class LazyWriteBuffer 
@@ -39,23 +35,17 @@ public class LazyWriteBuffer
        }
        
        //global size limit in bytes
-       private static final long _limit;
+       private static long _limit;
        
        //current size in bytes
        private static long _size;
        
        //eviction queue of <filename,buffer> pairs (implemented via linked 
hash map
        //for (1) queue semantics and (2) constant time get/insert/delete 
operations)
-       private static EvictionQueue _mQueue;
+       private static CacheEvictionQueue _mQueue;
        
        //maintenance service for synchronous or asynchronous delete of evicted 
files
-       private static MaintenanceService _fClean;
-       
-       static {
-               //obtain the logical buffer size in bytes
-               long maxMem = InfrastructureAnalyzer.getLocalMaxMemory();
-               _limit = (long)(CacheableData.CACHING_BUFFER_SIZE * maxMem);
-       }
+       private static CacheMaintenanceService _fClean;
        
        public static int writeBlock(String fname, CacheBlock cb)
                throws IOException
@@ -104,7 +94,7 @@ public class LazyWriteBuffer
                        _fClean.serializeData(bbuff, cb);
                        
                        if( DMLScript.STATISTICS ) {
-                               CacheStatistics.incrementFSBuffWrites();
+                               CacheStatistics.incrementBPoolWrites();
                                CacheStatistics.incrementFSWrites(numEvicted);
                        }
                }
@@ -120,7 +110,7 @@ public class LazyWriteBuffer
                
                return numEvicted;
        }
-       
+
        public static void deleteBlock(String fname)
        {
                boolean requiresDelete = true;
@@ -180,8 +170,9 @@ public class LazyWriteBuffer
        }
 
        public static void init() {
-               _mQueue = new EvictionQueue();
-               _fClean = new MaintenanceService();
+               _mQueue = new CacheEvictionQueue();
+               _fClean = new CacheMaintenanceService();
+               _limit = OptimizerUtils.getBufferPoolLimit();
                _size = 0;
                if( CacheableData.CACHING_BUFFER_PAGECACHE )
                        PageCache.init();
@@ -201,6 +192,10 @@ public class LazyWriteBuffer
                //dynamically adjusted in a parfor context, which wouldn't 
reflect the actual size
                return _limit;
        }
+
+       public static void setWriteBufferLimit(long limit) {
+               _limit = limit;
+       }
        
        public static long getWriteBufferSize() {
                synchronized( _mQueue ) {
@@ -281,116 +276,4 @@ public class LazyWriteBuffer
        public static ExecutorService getUtilThreadPool() {
                return _fClean != null ? _fClean._pool : null;
        }
-       
-       /**
-        * Extended LinkedHashMap with convenience methods for adding and 
removing
-        * last/first entries.
-        * 
-        */
-       private static class EvictionQueue extends LinkedHashMap<String, 
ByteBuffer>
-       {
-               private static final long serialVersionUID = 
-5208333402581364859L;
-               
-               public void addLast( String fname, ByteBuffer bbuff ) {
-                       //put entry into eviction queue w/ 'addLast' semantics
-                       put(fname, bbuff);
-               }
-               
-               public Entry<String, ByteBuffer> removeFirst()
-               {
-                       //move iterator to first entry
-                       Iterator<Entry<String, ByteBuffer>> iter = 
entrySet().iterator();
-                       Entry<String, ByteBuffer> entry = iter.next();
-                       
-                       //remove current iterator entry
-                       iter.remove();
-                       
-                       return entry;
-               }
-       }
-       
-       /**
-        * Maintenance service for abstraction of synchronous and asynchronous
-        * file cleanup on rmvar/cpvar as well as serialization of matrices and
-        * frames. The thread pool for asynchronous cleanup may increase the 
-        * number of threads temporarily to the number of concurrent delete 
tasks
-        * (which is bounded to the parfor degree of parallelism).
-        */
-       private static class MaintenanceService
-       {
-               private ExecutorService _pool = null;
-               
-               public MaintenanceService() {
-                       //create new threadpool for async cleanup
-                       if( isAsync() )
-                               _pool = Executors.newCachedThreadPool();
-               }
-               
-               public void deleteFile(String fname) {
-                       //sync or async file delete
-                       if( CacheableData.CACHING_ASYNC_FILECLEANUP )
-                               _pool.submit(new FileCleanerTask(fname));
-                       else
-                               LocalFileUtils.deleteFileIfExists(fname, true);
-               }
-               
-               public void serializeData(ByteBuffer bbuff, CacheBlock cb) {
-                       //sync or async file delete
-                       if( CacheableData.CACHING_ASYNC_SERIALIZE )
-                               _pool.submit(new DataSerializerTask(bbuff, cb));
-                       else {
-                               try {
-                                       bbuff.serializeBlock(cb);
-                               }
-                               catch(IOException ex) {
-                                       throw new DMLRuntimeException(ex);
-                               }
-                       }
-               }
-               
-               public void close() {
-                       //execute pending tasks and shutdown pool
-                       if( isAsync() )
-                               _pool.shutdown();
-               }
-               
-               @SuppressWarnings("unused")
-               public boolean isAsync() {
-                       return CacheableData.CACHING_ASYNC_FILECLEANUP 
-                               || CacheableData.CACHING_ASYNC_SERIALIZE;
-               }
-               
-               private static class FileCleanerTask implements Runnable {
-                       private String _fname = null;
-                       
-                       public FileCleanerTask( String fname ) {
-                               _fname = fname;
-                       }
-                       
-                       @Override
-                       public void run() {
-                               LocalFileUtils.deleteFileIfExists(_fname, true);
-                       }
-               }
-               
-               private static class DataSerializerTask implements Runnable {
-                       private ByteBuffer _bbuff = null;
-                       private CacheBlock _cb = null;
-                       
-                       public DataSerializerTask(ByteBuffer bbuff, CacheBlock 
cb) {
-                               _bbuff = bbuff;
-                               _cb = cb;
-                       }
-                       
-                       @Override
-                       public void run() {
-                               try {
-                                       _bbuff.serializeBlock(_cb);
-                               }
-                               catch(IOException ex) {
-                                       throw new DMLRuntimeException(ex);
-                               }
-                       }
-               }
-       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
index 6afefcfe23..ae5ca10eca 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
@@ -412,7 +412,12 @@ public class MatrixObject extends 
CacheableData<MatrixBlock> {
 
        @Override
        protected MatrixBlock readBlobFromCache(String fname) throws 
IOException {
-               return (MatrixBlock) LazyWriteBuffer.readBlock(fname, true);
+               MatrixBlock mb = null;
+               if (OptimizerUtils.isUMMEnabled())
+                       mb = (MatrixBlock) 
UnifiedMemoryManager.readBlock(fname, true);
+               else
+                       mb = (MatrixBlock) LazyWriteBuffer.readBlock(fname, 
true);
+               return mb;
        }
 
        @Override
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java
index 6b29d41e60..248f3fc226 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java
@@ -26,6 +26,7 @@ import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
@@ -104,7 +105,12 @@ public class TensorObject extends 
CacheableData<TensorBlock> {
 
        @Override
        protected TensorBlock readBlobFromCache(String fname) throws 
IOException {
-               return (TensorBlock) LazyWriteBuffer.readBlock(fname, false);
+               TensorBlock tb = null;
+               if (OptimizerUtils.isUMMEnabled())
+                       tb = (TensorBlock) 
UnifiedMemoryManager.readBlock(fname, false);
+               else
+                       tb = (TensorBlock) LazyWriteBuffer.readBlock(fname, 
false);
+               return tb;
        }
 
        @Override
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/UnifiedMemoryManager.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/UnifiedMemoryManager.java
index 48420b872b..e4b7761307 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/UnifiedMemoryManager.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/UnifiedMemoryManager.java
@@ -20,6 +20,15 @@
 package org.apache.sysds.runtime.controlprogram.caching;
 
 import org.apache.commons.lang.NotImplementedException;
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.util.LocalFileUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Unified Memory Manager - Initial Design
@@ -44,7 +53,7 @@ import org.apache.commons.lang.NotImplementedException;
  * they can occupy, meaning that the boundary for the areas can shift 
dynamically depending
  * on the current load. Most importantly, though, dirty objects must not be 
counted twice
  * when pinning such an object for an operation. The min/max constraints are 
not exposed but
- * configured internally. An good starting point are the following constraints 
(relative to
+ * configured internally. A good starting point are the following constraints 
(relative to
  * JVM max heap size):
  * ___________________________
  * | operations  | 0%  | 70% | (pin requests always accepted)
@@ -63,94 +72,341 @@ import org.apache.commons.lang.NotImplementedException;
  *      eviction. All pin requests have to be accepted, and once a non-dirty 
object is released
  *      (unpinned) it can be dropped without persisting it to local FS.
  *
+ * Example Scenarios for an Operation:
+ *  (1) Inputs are available in the UMM, enough space left for the output.
+ *  (2) Some inputs are pre-evicted. Read and pin those in the operational 
memory.
+ *  (3) Inputs are available in the UMM, not enough space left for the output.
+ *     Evict cached objects to reserve worst-case output memory.
+ *  (4) Some inputs are pre-evicted and not enough space left for the inputs
+ *     and output. Evict cached objects to make space for the inputs.
+ *     Evict cached objects to reserve worst-case output memory.
+ *
  * Thread-safeness:
  * Initially, the UMM will be used in an instance-based manner. For global 
visibility and
  * use in parallel for loops, the UMM would need to provide a static, 
synchronized API, but
  * this constitutes a source of severe contention. In the future, we will 
consider a design
  * with thread-local UMMs for the individual parfor workers.
- *
- * Testing:
- * The UMM will be developed bottom up, and thus initially tested via 
component tests for
- * evaluating the eviction behavior for sequences of API requests. 
  */
+
 public class UnifiedMemoryManager
 {
-       public UnifiedMemoryManager(long capacity) {
-               //TODO implement
-               throw new NotImplementedException();
+       // Maximum size of UMM in bytes (default 85%)
+       private static long _limit;
+       // Current total size of the cached objects
+       private static long _totCachedSize;
+       // Operational memory limit in bytes (70%)
+       private static long _opMemLimit;
+       // List of pinned entries
+       private static final List<String> _pinnedEntries = new 
ArrayList<String>();
+
+       // Eviction queue of <filename,buffer> pairs (implemented via linked 
hash map
+       // for (1) queue semantics and (2) constant time get/insert/delete 
operations)
+       private static CacheEvictionQueue _mQueue;
+
+       // Maintenance service for synchronous or asynchronous delete of 
evicted files
+       private static CacheMaintenanceService _fClean;
+
+       // Pinned size of physical memory. Starts from 0 for each operation. 
Max is 70% of heap
+       // This increases only if the input is not present in the cache and 
read from FS/rdd/fed/gpu
+       private static long _pinnedPhysicalMemSize = 0;
+       // Size of pinned virtual memory. This tracks the total input size
+       // This increases if the input is available in the cache.
+       private static long _pinnedVirtualMemSize = 0;
+
+       //---------------- OPERATION MEMORY MAINTENANCE -------------------//
+
+       // Make space for and track a cache block to be pinned in operation 
memory
+       public static void pin(CacheableData<?> cd) {
+               if (!CacheableData.isCachingActive()) {
+                       return;
+               }
+
+               // Space accounting based on an estimated size and before 
reading the blob
+               long estimatedSize = 
OptimizerUtils.estimateSize(cd.getDataCharacteristics());
+               if (probe(cd))
+                       // Availability in the cache means no memory overhead.
+                       // We still need to track to derive the worst-case 
output memory
+                       _pinnedVirtualMemSize += estimatedSize;
+               else {
+                       // The blob will be restored from local FS, or will be 
read
+                       // from other backends. Make space if not available.
+                       makeSpace(estimatedSize);
+                       _pinnedPhysicalMemSize += estimatedSize;
+               }
+               // Track the pinned entries to protect from evictions
+               _pinnedEntries.add(cd.getCacheFilePathAndName());
+
+               // Reserve space for output after pinning every input.
+               // This overly conservative approach removes the need to call 
reserveOutputMem() from
+               // each instruction. Ideally, every instruction first pins all 
the inputs, followed
+               // by reserving space for the output.
+               reserveOutputMem();
        }
-       
-       /**
-        * Pins a cache block into operation memory.
-        * 
-        * @param key    unique identifier and local FS filename for eviction
-        * @param block  cache block if not under UMM control, null otherwise
-        * @param dirty  indicator if block is dirty (subject to buffer pool 
management)
-        * @return       pinned cache block, potentially restored from local FS
-        */
-       public CacheBlock pin(String key, CacheBlock block, boolean dirty) {
-               //TODO implement
-               throw new NotImplementedException();
+
+       // Reserve space for output in the operation memory
+       public static void reserveOutputMem() {
+               if (!OptimizerUtils.isUMMEnabled() || 
!CacheableData.isCachingActive())
+                       return;
+
+               // Worst case upper bound for output = 70% - size(inputs)
+               // FIXME: Parfor splits this 70% into smaller limits
+               long maxOutputSize = _opMemLimit - (_pinnedVirtualMemSize + 
_pinnedPhysicalMemSize);
+               // Evict cached entries to make space in operation memory if 
needed
+               makeSpace(maxOutputSize);
        }
        
-       /**
-        * Pins a virtual cache block into operation memory, by making a size 
reservation.
-        * The provided size is an upper bound of the actual object size, and 
can be
-        * updated on unpin (once the actual cache block is provided).
-        * 
-        * @param key    unique identifier and local FS filename for eviction
-        * @param size   memory reservation in operation area
-        * @param dirty  indicator if block is dirty (subject to buffer pool 
management)
-        */
-       public void pin(String key, long size, boolean dirty) {
-               //TODO implement
-               throw new NotImplementedException();
+       // Unpins (releases) a cache block from operation memory
+       public static void unpin(CacheableData<?> cd) {
+               if (!CacheableData.isCachingActive())
+                       return;
+
+               // TODO: Track preserved output memory to protect from 
concurrent threads
+               if (!_pinnedEntries.contains(cd.getCacheFilePathAndName()))
+                       return; //unpinned. output of an instruction
+
+               // We still use the estimated size even though we have the 
blobs available.
+               // This makes sure we are subtracting exactly what we added 
during pinning.
+               long estimatedSize = 
OptimizerUtils.estimateSize(cd.getDataCharacteristics());
+               if (probe(cd))
+                       _pinnedVirtualMemSize -= estimatedSize;
+               else
+                       _pinnedPhysicalMemSize -= estimatedSize;
+
+               _pinnedEntries.remove(cd.getCacheFilePathAndName());
        }
-       
-       /**
-        * Unpins (releases) a cache block from operation memory. Dirty objects
-        * are logically moved back to the buffer pool area.
-        * 
-        * @param key    unique identifier and local FS filename for eviction
-        */
-       public void unpin(String key) {
-               //TODO implement
-               throw new NotImplementedException();
+
+       //---------------- UMM MAINTENANCE & LOOKUP -------------------//
+
+       // Initialize the unified memory manager
+       public static void init() {
+               _mQueue = new CacheEvictionQueue();
+               _fClean = new CacheMaintenanceService();
+               _limit = OptimizerUtils.getBufferPoolLimit();
+               _opMemLimit = (long)(OptimizerUtils.getLocalMemBudget()); //70% 
of heap
+               _totCachedSize = 0;
+               _pinnedPhysicalMemSize = 0;
+               _pinnedVirtualMemSize = 0;
+               if( CacheableData.CACHING_BUFFER_PAGECACHE )
+                       PageCache.init();
        }
-       
-       /**
-        * Unpins (releases) a cache block from operation memory. If the size of
-        * the provided cache block differs from the UMM meta data, the UMM meta
-        * data is updated. Use cases include update-in-place operations and
-        * size reservations via worst-case upper bound estimates.
-        * 
-        * @param key    unique identifier and local FS filename for eviction
-        * @param block  cache block which may be under UMM control, if null 
ignored
-        */
-       public void unpin(String key, CacheBlock block) {
-               //TODO implement
-               throw new NotImplementedException();
+
+       // Cleanup the unified memory manager
+       public static void cleanup() {
+               if( _mQueue != null )
+                       _mQueue.clear();
+               if( _fClean != null )
+                       _fClean.close();
+               if( CacheableData.CACHING_BUFFER_PAGECACHE )
+                       PageCache.clear();
+               _totCachedSize = 0;
+               _pinnedPhysicalMemSize = 0;
+               _pinnedVirtualMemSize = 0;
        }
-       
-       /**
-        * Removes a cache block associated with the given key from all memory
-        * areas, and deletes evicted representations (files in local FS). The
-        * local file system deletes can happen asynchronously.
-        * 
-        * @param key    unique identifier and local FS filename for eviction
-        */
-       public void delete(String key) {
-               //TODO implement
-               throw new NotImplementedException();
+
+       public static void setUMMLimit(long val) {
+               _limit = val;
        }
-       
+
+       public static long getUMMSize() {
+               synchronized(_mQueue) {
+                       return _limit;
+               }
+       }
+
+       // Get the available memory in UMM
+       public static long getUMMFree() {
+               synchronized(_mQueue) {
+                       return _limit - (_totCachedSize + 
_pinnedPhysicalMemSize);
+               }
+       }
+
+       // Reads a cached object. This is called from cacheabledata 
implementations
+       public static CacheBlock readBlock(String fname, boolean matrix)
+               throws IOException
+       {
+               CacheBlock cb = null;
+               ByteBuffer ldata = null;
+
+               //probe write buffer
+               synchronized (_mQueue)
+               {
+                       ldata = _mQueue.get(fname);
+
+                       //modify eviction order (accordingly to access)
+                       if (CacheableData.CACHING_BUFFER_POLICY == 
LazyWriteBuffer.RPolicy.LRU
+                               && ldata != null)
+                       {
+                               //reinsert entry at end of eviction queue
+                               _mQueue.remove (fname);
+                               _mQueue.addLast (fname, ldata);
+                       }
+               }
+
+               //deserialize or read from FS if required
+               if( ldata != null )
+               {
+                       cb = ldata.deserializeBlock();
+                       if (DMLScript.STATISTICS)
+                               CacheStatistics.incrementFSBuffHits();
+               }
+               else
+               {
+                       cb = LocalFileUtils.readCacheBlockFromLocal(fname, 
matrix);
+                       if (DMLScript.STATISTICS)
+                               CacheStatistics.incrementFSHits();
+               }
+
+               return cb;
+       }
+
+       public static boolean probe(CacheableData<?> cd) {
+               String filePath = cd.getCacheFilePathAndName();
+               return _mQueue.containsKey(filePath);
+       }
+
+       // Make required space. Evict if needed.
+       public static int makeSpace(long reqSpace) {
+               int numEvicted = 0;
+               // Check if sufficient space is already available
+               if (getUMMFree() > reqSpace)
+                       return numEvicted;
+
+               // Evict cached objects to make space
+               try {
+                       synchronized(_mQueue) {
+                               // Evict blobs to make room (by default FIFO)
+                               while (getUMMFree() < reqSpace && 
!_mQueue.isEmpty()) {
+                                       //remove first unpinned entry from 
eviction queue
+                                       var entry = 
_mQueue.removeFirstUnpinned(_pinnedEntries);
+                                       String ftmp = entry.getKey();
+                                       ByteBuffer bb = entry.getValue();
+
+                                       if(bb != null) {
+                                               // Wait for pending 
serialization
+                                               bb.checkSerialized();
+                                               // Evict object
+                                               bb.evictBuffer(ftmp);
+                                               bb.freeMemory();
+                                               _totCachedSize -= bb.getSize();
+                                               numEvicted++;
+                                       }
+                               }
+                       }
+               }
+               catch (Exception e) {
+                       throw new DMLRuntimeException("Eviction request of size 
"+(reqSpace-getUMMFree())+ " in the UMM failed.", e);
+               }
+
+               if( DMLScript.STATISTICS )
+                       CacheStatistics.incrementFSWrites(numEvicted);
+
+               return numEvicted;
+       }
+
+       // Write an object to the cache
+       public static int writeBlock(String fname, CacheBlock cb)
+               throws IOException
+       {
+               //obtain basic metadata of the cache block
+               long lSize = getCacheBlockSize(cb);
+               boolean requiresWrite = (lSize > _limit        //global buffer 
limit
+                       || !ByteBuffer.isValidCapacity(lSize, cb)); //local 
buffer limit
+               int numEvicted = 0;
+
+               // Handle caching/eviction if it fits in UMM
+               if( !requiresWrite )
+               {
+                       // Create byte buffer handle (no block allocation yet)
+                       ByteBuffer bbuff = new ByteBuffer( lSize );
+
+                       // Modify buffer pool
+                       synchronized( _mQueue )
+                       {
+                               // Evict blocks to make room if required
+                               numEvicted += makeSpace(lSize);
+                               // Put placeholder into buffer pool (reserve 
mem)
+                               _mQueue.addLast(fname, bbuff);
+                               _totCachedSize += lSize;
+                       }
+
+                       // Serialize matrix (outside synchronized critical path)
+                       _fClean.serializeData(bbuff, cb);
+
+                       if( DMLScript.STATISTICS )
+                               CacheStatistics.incrementBPoolWrites();
+               }
+               else
+               {
+                       // Write directly to local FS (bypass buffer if too 
large)
+                       LocalFileUtils.writeCacheBlockToLocal(fname, cb);
+                       if( DMLScript.STATISTICS ) {
+                               CacheStatistics.incrementFSWrites();
+                       }
+                       numEvicted++;
+               }
+
+               return numEvicted;
+       }
+
+       public static long getCacheBlockSize(CacheBlock cb) {
+               return cb.isShallowSerialize() ?
+                       cb.getInMemorySize() : cb.getExactSerializedSize();
+       }
+
+       public static void deleteBlock(String fname)
+       {
+               boolean requiresDelete = true;
+
+               synchronized( _mQueue )
+               {
+                       //remove queue entry
+                       ByteBuffer ldata = _mQueue.remove(fname);
+                       if( ldata != null ) {
+                               _totCachedSize -= ldata.getSize();
+                               requiresDelete = false;
+                               ldata.freeMemory(); //cleanup
+                       }
+               }
+
+               //delete from FS if required
+               if( requiresDelete )
+                       _fClean.deleteFile(fname);
+       }
+
        /**
         * Removes all cache blocks from all memory areas and deletes all 
evicted
         * representations (files in local FS). All internally thread pools 
must be
-        * shut down in a gracefully manner (e.g., wait for pending deletes).
+        * shut down in a graceful manner (e.g., wait for pending deletes).
         */
        public void deleteAll() {
                //TODO implement
                throw new NotImplementedException();
        }
+
+       /**
+        * Evicts all buffer pool entries.
+        * NOTE: use only for debugging or testing.
+        *
+        * @throws IOException if IOException occurs
+        */
+       public static void forceEviction()
+               throws IOException
+       {
+               //evict all matrices and frames
+               while( !_mQueue.isEmpty() )
+               {
+                       //remove first entry from eviction queue
+                       Map.Entry<String, ByteBuffer> entry = 
_mQueue.removeFirst();
+                       ByteBuffer tmp = entry.getValue();
+
+                       if( tmp != null ) {
+                               //wait for pending serialization
+                               tmp.checkSerialized();
+
+                               //evict matrix
+                               tmp.evictBuffer(entry.getKey());
+                               tmp.freeMemory();
+                       }
+               }
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/meta/TensorCharacteristics.java 
b/src/main/java/org/apache/sysds/runtime/meta/TensorCharacteristics.java
index 2b554a261f..20e11d955f 100644
--- a/src/main/java/org/apache/sysds/runtime/meta/TensorCharacteristics.java
+++ b/src/main/java/org/apache/sysds/runtime/meta/TensorCharacteristics.java
@@ -151,6 +151,16 @@ public class TensorCharacteristics extends 
DataCharacteristics
                return _nnz;
        }
 
+       @Override
+       public long getRows() {
+               return getDim(0);
+       }
+
+       @Override
+       public long getCols() {
+               return getDim(1);
+       }
+
        @Override
        public String toString() {
                return "["+Arrays.toString(_dims)+", nnz="+_nnz + ", blocksize= 
"+_blocksize+"]";
diff --git 
a/src/test/java/org/apache/sysds/test/component/frame/FrameEvictionTest.java 
b/src/test/java/org/apache/sysds/test/component/frame/FrameEvictionTest.java
index d8c3d452e8..6c966e0f98 100644
--- a/src/test/java/org/apache/sysds/test/component/frame/FrameEvictionTest.java
+++ b/src/test/java/org/apache/sysds/test/component/frame/FrameEvictionTest.java
@@ -21,6 +21,8 @@ package org.apache.sysds.test.component.frame;
 
 import java.lang.reflect.Method;
 
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.controlprogram.caching.UnifiedMemoryManager;
 import org.junit.Test;
 import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.common.Types.ValueType;
@@ -208,8 +210,12 @@ public class FrameEvictionTest extends AutomatedTestBase
                        fo.release();
                        
                        //evict frame and clear in-memory reference
-                       if( force )
-                               LazyWriteBuffer.forceEviction();
+                       if( force ) {
+                               if (OptimizerUtils.isUMMEnabled())
+                                       UnifiedMemoryManager.forceEviction();
+                               else
+                                       LazyWriteBuffer.forceEviction();
+                       }
                        Method clearfo = CacheableData.class
                                        .getDeclaredMethod("clearCache", new 
Class[]{});
                        clearfo.setAccessible(true); //make method public
diff --git a/src/test/java/org/apache/sysds/test/functions/caching/UMMTest.java 
b/src/test/java/org/apache/sysds/test/functions/caching/UMMTest.java
new file mode 100644
index 0000000000..0bb61a83dd
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/caching/UMMTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.sysds.test.functions.caching;
+
+       import java.util.ArrayList;
+       import java.util.HashMap;
+       import java.util.List;
+
+       import org.apache.sysds.hops.OptimizerUtils;
+       import org.apache.sysds.hops.recompile.Recompiler;
+       import org.apache.sysds.runtime.controlprogram.caching.CacheStatistics;
+       import 
org.apache.sysds.runtime.controlprogram.caching.UnifiedMemoryManager;
+       import 
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+       import org.apache.sysds.runtime.matrix.data.MatrixValue;
+       import org.apache.sysds.test.AutomatedTestBase;
+       import org.apache.sysds.test.TestConfiguration;
+       import org.apache.sysds.test.TestUtils;
+       import org.junit.Assert;
+       import org.junit.Test;
+
+public class UMMTest extends AutomatedTestBase {
+
+       protected static final String TEST_DIR = "functions/caching/";
+       protected static final String TEST_NAME1 = "UMMTest1";
+
+       protected String TEST_CLASS_DIR = TEST_DIR + 
org.apache.sysds.test.functions.caching.UMMTest.class.getSimpleName() + "/";
+
+       @Override
+       public void setUp() {
+               TestUtils.clearAssertionInformation();
+               addTestConfiguration(TEST_NAME1, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1));
+       }
+
+       @Test
+       public void testEvictionOrder() {
+               runTest(TEST_NAME1);
+       }
+
+       public void runTest(String testname)
+       {
+               try {
+                       getAndLoadTestConfiguration(testname);
+                       fullDMLScriptName = getScript();
+                       long oldBufferPool = (long)(0.15 * 
InfrastructureAnalyzer.getLocalMaxMemory());
+
+                       // Static memory management
+                       List<String> proArgs = new ArrayList<>();
+                       proArgs.add("-stats");
+                       proArgs.add("-args");
+                       proArgs.add(String.valueOf(oldBufferPool));
+                       proArgs.add(output("R"));
+                       programArgs = proArgs.toArray(new 
String[proArgs.size()]);
+                       runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
+                       //HashMap<MatrixValue.CellIndex, Double> R_static = 
readDMLMatrixFromOutputDir("R");
+                       HashMap<MatrixValue.CellIndex, Double> R_static = 
readDMLScalarFromOutputDir("R");
+                       long FSwrites_static = CacheStatistics.getFSWrites();
+
+                       // Unified memory management (cache size = 85% of heap)
+                       OptimizerUtils.enableUMM();
+                       proArgs.clear();
+                       proArgs.add("-stats");
+                       proArgs.add("-args");
+                       proArgs.add(String.valueOf(oldBufferPool));
+                       proArgs.add(output("R"));
+                       programArgs = proArgs.toArray(new 
String[proArgs.size()]);
+                       runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
+                       UnifiedMemoryManager.cleanup();
+                       HashMap<MatrixValue.CellIndex, Double> R_unified= 
readDMLScalarFromOutputDir("R");
+                       long FSwrites_unified = CacheStatistics.getFSWrites();
+
+                       // Compare results
+                       TestUtils.compareMatrices(R_static, R_unified, 1e-6, 
"static", "unified");
+                       // Compare FS write counts (#unified FS writes always 
smaller than #static FS writes)
+                       Assert.assertTrue("Violated buffer pool eviction 
counts: "+FSwrites_unified+" <= "+FSwrites_static,
+                               FSwrites_unified <= FSwrites_static);
+               }
+               finally {
+                       Recompiler.reinitRecompiler();
+               }
+       }
+}
+
diff --git a/src/test/scripts/functions/caching/UMMTest1.dml 
b/src/test/scripts/functions/caching/UMMTest1.dml
new file mode 100644
index 0000000000..914f598930
--- /dev/null
+++ b/src/test/scripts/functions/caching/UMMTest1.dml
@@ -0,0 +1,35 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+ummsize = $1;
+nrows = ceil(sqrt(ummsize*0.40/8));
+nrows = 4858;
+X = rand(rows=nrows, cols=nrows, seed=42);
+Y = rand(rows=nrows, cols=nrows, seed=43);
+
+R1 = X + Y;
+R2 = X * Y;
+
+R3 = R2 - R1;
+while(FALSE) {}
+R = sum(R3);
+write(R, $2, format="text");
+

Reply via email to