This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 945fd9e7c6 [SYSTEMDS-3567] Cost-based eviction from GPU lineage cache
945fd9e7c6 is described below

commit 945fd9e7c643de6d1f01cedb413ab79446507d54
Author: Arnab Phani <[email protected]>
AuthorDate: Sat Oct 7 21:23:42 2023 +0200

    [SYSTEMDS-3567] Cost-based eviction from GPU lineage cache
    
    This patch extends the unified GPU memory manager with a
    cost-based eviction policy, which defines the order of
    intermediates to be recycled in each free list. The score
    has two components: timestamp and DAG height. Size is
    irrelevant for GPU as all the entries in a free list
    have the same size. Cache hits and misses increase the
    score by updating the last access timestamp.
    
    Closes #1920
---
 .../java/org/apache/sysds/hops/OptimizerUtils.java |   2 +-
 .../sysds/lops/rewrite/RewriteAddPrefetchLop.java  |  17 +-
 .../instructions/cp/ComputationCPInstruction.java  |  10 ++
 .../instructions/gpu/context/GPUContextPool.java   |   3 -
 .../instructions/gpu/context/GPUMemoryManager.java |   6 +-
 .../apache/sysds/runtime/lineage/LineageCache.java |  26 +--
 .../sysds/runtime/lineage/LineageCacheConfig.java  |  19 +-
 .../sysds/runtime/lineage/LineageCacheEntry.java   |  32 +++-
 .../runtime/lineage/LineageCacheStatistics.java    |   9 +
 .../runtime/lineage/LineageGPUCacheEviction.java   | 196 +++++++++------------
 .../sysds/runtime/lineage/LineageItemUtils.java    |  10 +-
 .../java/org/apache/sysds/utils/Statistics.java    |   2 +-
 .../lineage/GPULineageCacheEvictionTest.java       |   3 +
 .../functions/lineage/GPUCacheEviction1.dml        |  14 +-
 .../functions/lineage/GPUCacheEviction3.dml        |  71 ++++----
 15 files changed, 230 insertions(+), 190 deletions(-)

diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java 
b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
index 8da0ff110d..a4abc513c6 100644
--- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
@@ -308,7 +308,7 @@ public class OptimizerUtils
        /**
         * Rule-based operator placement policy for GPU.
         */
-       public static boolean RULE_BASED_GPU_EXEC = true;
+       public static boolean RULE_BASED_GPU_EXEC = false;
 
        //////////////////////
        // Optimizer levels //
diff --git 
a/src/main/java/org/apache/sysds/lops/rewrite/RewriteAddPrefetchLop.java 
b/src/main/java/org/apache/sysds/lops/rewrite/RewriteAddPrefetchLop.java
index 4567e88d52..09458cd51a 100644
--- a/src/main/java/org/apache/sysds/lops/rewrite/RewriteAddPrefetchLop.java
+++ b/src/main/java/org/apache/sysds/lops/rewrite/RewriteAddPrefetchLop.java
@@ -59,12 +59,12 @@ public class RewriteAddPrefetchLop extends LopRewriteRule
                        return List.of(sb);
 
                ArrayList<Lop> nodesWithPrefetch = new ArrayList<>();
-               //Find the Spark nodes with all CP outputs
+               //Find the Spark/GPU nodes with all CP outputs
                for (Lop l : lops) {
                        nodesWithPrefetch.add(l);
                        if (isPrefetchNeeded(l)) {
                                List<Lop> oldOuts = new 
ArrayList<>(l.getOutputs());
-                               //Construct a Prefetch lop that takes this 
Spark node as a input
+                               //Construct a Prefetch lop that takes this 
Spark/GPU node as an input
                                UnaryCP prefetch = new UnaryCP(l, 
Types.OpOp1.PREFETCH, l.getDataType(), l.getValueType(), Types.ExecType.CP);
                                prefetch.setAsynchronous(true);
                                //Reset asynchronous flag for the input if 
already set (e.g. mapmm -> prefetch)
@@ -126,7 +126,18 @@ public class RewriteAddPrefetchLop extends LopRewriteRule
 
        private boolean isPrefetchFromGPUNeeded(Lop lop) {
                // Prefetch a GPU intermediate if all the outputs are CP.
-               return lop.getDataType() == Types.DataType.MATRIX
+               boolean gpuOP =  lop.getDataType() == Types.DataType.MATRIX
                        && lop.isExecGPU() && lop.isAllOutputsCP();
+
+               // Exclude List consumers. List is just a metadata handle.
+               boolean anyOutputList = lop.getOutputs().stream()
+                       .anyMatch(out -> out.getDataType() == 
Types.DataType.LIST);
+
+               //FIXME: Rewire _inputParams when needed (e.g. Replace)
+               boolean hasParameterizedOut = lop.getOutputs().stream()
+                       .anyMatch(out -> ((out instanceof ParameterizedBuiltin)
+                               || (out instanceof GroupedAggregate)
+                               || (out instanceof GroupedAggregateM)));
+               return gpuOP && !hasParameterizedOut && !anyOutputList;
        }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ComputationCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ComputationCPInstruction.java
index 76c1617229..de45036991 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ComputationCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ComputationCPInstruction.java
@@ -67,6 +67,16 @@ public abstract class ComputationCPInstruction extends 
CPInstruction implements
                return new CPOperand[]{input1, input2, input3};
        }
 
+       public boolean hasFrameInput() {
+               if (input1 != null && input1.isFrame())
+                       return true;
+               if (input2 != null && input2.isFrame())
+                       return true;
+               if (input3 != null && input3.isFrame())
+                       return true;
+               return false;
+       }
+
        protected boolean checkGuardedRepresentationChange( MatrixBlock in1, 
MatrixBlock out ) {
                return checkGuardedRepresentationChange(in1, null, out);
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUContextPool.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUContextPool.java
index 9437e71566..aa9bb8563a 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUContextPool.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUContextPool.java
@@ -153,9 +153,6 @@ public class GPUContextPool {
                //LOG.debug("Active CUDA device number : " + device[0]);
                //LOG.debug("Max Blocks/Threads/SharedMem on active device: " + 
maxBlocks + "/" + maxThreadsPerBlock + "/" + sharedMemPerBlock);
                GPUStatistics.cudaInitTime = System.nanoTime() - start;
-
-               // Initialize the maximum size of the lineage cache in the GPU 
(30% of initial GPU memory)
-               LineageGPUCacheEviction.setGPULineageCacheLimit();
        }
 
        /**
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUMemoryManager.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUMemoryManager.java
index 4999d8c4d6..26efd68162 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUMemoryManager.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUMemoryManager.java
@@ -258,7 +258,7 @@ public class GPUMemoryManager {
 
                // Step 1: First try reusing exact match in rmvarGPUPointers to 
avoid holes in the GPU memory
                Pointer A = lazyCudaFreeMemoryManager.getRmvarPointer(opcode, 
size);
-               
+
                Pointer tmpA = (A == null) ? new Pointer() : null;
                // Step 2: Allocate a new pointer in the GPU memory (since 
memory is available)
                // Step 3 has potential to create holes as well as limit future 
reuse, hence perform this step before step 3.
@@ -297,7 +297,7 @@ public class GPUMemoryManager {
                        if (le != null) {
                                if(!LineageCacheConfig.GPU2HOSTEVICTION) {
                                        A = le.getGPUPointer(); //recycle
-                                       
LineageGPUCacheEviction.removeFromDeviceCache(le, le.getGPUPointer(), true);
+                                       
//LineageGPUCacheEviction.removeFromDeviceCache(le, le.getGPUPointer(), true);
                                        if (DMLScript.STATISTICS)
                                                
LineageCacheStatistics.incrementGpuRecycle();
                                }
@@ -327,7 +327,7 @@ public class GPUMemoryManager {
                                if(le != null) {
                                        freedSize += 
getSizeAllocatedGPUPointer(le.getGPUPointer());
                                        
if(!LineageCacheConfig.GPU2HOSTEVICTION) {
-                                               
LineageGPUCacheEviction.removeFromDeviceCache(le, le.getGPUPointer(), true);
+                                               
//LineageGPUCacheEviction.removeFromDeviceCache(le, le.getGPUPointer(), true);
                                                
guardedCudaFree(le.getGPUPointer()); //free
                                                if (DMLScript.STATISTICS)
                                                        
LineageCacheStatistics.incrementGpuDel();
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index 7aacfb5151..e9529213f2 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -78,7 +78,6 @@ public class LineageCache
        static {
                
LineageCacheEviction.setCacheLimit(LineageCacheConfig.CPU_CACHE_FRAC); //5%
                LineageCacheEviction.setStartTimestamp();
-               LineageGPUCacheEviction.setStartTimestamp();
                // Note: GPU cache initialization is done in 
GPUContextPool:initializeGPU()
        }
        
@@ -204,6 +203,8 @@ public class LineageCache
                                                
ec.getMatrixObject(outName).updateDataCharacteristics(e.getDataCharacteristics());
                                                //Increment the live count for 
this pointer
                                                
LineageGPUCacheEviction.incrementLiveCount(gpuPtr);
+                                               //Maintain the eviction list in 
the free list
+                                               
LineageGPUCacheEviction.maintainOrder(e);
                                        }
                                        //Replace the live lineage trace with 
the cached one (if not parfor, dedup)
                                        ec.replaceLineageItem(outName, e._key);
@@ -325,6 +326,8 @@ public class LineageCache
                                                case GPUCACHED:
                                                        //Increment the live 
count for this pointer
                                                        
LineageGPUCacheEviction.incrementLiveCount(e.getGPUPointer());
+                                                       //Maintain the eviction 
list in the free list
+                                                       
LineageGPUCacheEviction.maintainOrder(e);
                                                        if 
(DMLScript.STATISTICS) LineageCacheStatistics.incrementGpuHits();
                                                        break;
                                                default:
@@ -762,17 +765,19 @@ public class LineageCache
                                removePlaceholder(instLI);
                                return;
                        }
+                       if(DMLScript.STATISTICS && 
LineageCacheEviction._removelist.containsKey(centry._key))
+                               LineageCacheStatistics.incrementDelHitsGpu();
                        switch(centry.getCacheStatus()) {
                                case EMPTY:  //first hit
                                        // Set the GPUOject in the cache. Will 
be garbage collected
-                                       
centry.setGPUValue(gpuObj.getDensePointer(), gpuObj.getAllocatedSize(),
-                                               
gpuObj.getMatrixObject().getMetaData(), computetime);
-                                       
centry.setCacheStatus(LineageCacheStatus.TOCACHEGPU);
-                                       break;
+                                       if 
(!LineageCacheEviction._removelist.containsKey(centry._key)) {
+                                               // Cache right away if removed 
before
+                                               
centry.setGPUValue(gpuObj.getDensePointer(), gpuObj.getAllocatedSize(),
+                                                       
gpuObj.getMatrixObject().getMetaData(), computetime);
+                                               
centry.setCacheStatus(LineageCacheStatus.TOCACHEGPU);
+                                               break;
+                                       }
                                case TOCACHEGPU:  //second hit
-                                       // Update the total size of lineage 
cached gpu objects
-                                       // The eviction is handled by the 
unified gpu memory manager
-                                       
LineageGPUCacheEviction.updateSize(gpuObj.getAllocatedSize(), true);
                                        // Set the GPUOject in the cache and 
update the status
                                        
centry.setGPUValue(gpuObj.getDensePointer(), gpuObj.getAllocatedSize(),
                                                
gpuObj.getMatrixObject().getMetaData(), computetime);
@@ -1099,7 +1104,7 @@ public class LineageCache
                        // Maintain order for eviction
                        if (e.isRDDPersist())
                                LineageSparkCacheEviction.maintainOrder(e);
-                       else
+                       else if (!e.isGPUObject())
                                LineageCacheEviction.getEntry(e);
                        return e;
                }
@@ -1115,7 +1120,8 @@ public class LineageCache
                // Entries with RDDs are cached twice. First hit is GCed,
                // Second hit saves the child RDDs
                if (LineageCache.probe(probeItem)) {
-                       LineageCacheEntry oe = getIntern(probeItem);
+                       //LineageCacheEntry oe = getIntern(probeItem);
+                       LineageCacheEntry oe = _cache.get(probeItem);
                        LineageCacheEntry e = _cache.get(item);
                        boolean exists = !e.isNullVal();
                        e.copyValueFrom(oe, computetime);
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index d5265f33e8..67eeed9481 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -99,7 +99,6 @@ public class LineageCacheConfig
        }
 
        protected static final double CPU_CACHE_FRAC = 0.05; // 5% of JVM heap 
size
-       protected static final double GPU_CACHE_MAX = 0.30; // 30% of gpu memory
        private static ReuseCacheType _cacheType = null;
        private static CachedItemHead _itemH = null;
        private static CachedItemTail _itemT = null;
@@ -144,8 +143,6 @@ public class LineageCacheConfig
        // Weights for scoring components (computeTime/size, LRU timestamp, DAG 
height)
        protected static double[] WEIGHTS = {1, 0, 0};
        public static boolean GPU2HOSTEVICTION = false;
-       public static boolean CONCURRENTGPUEVICTION = false;
-       public static volatile boolean STOPBACKGROUNDEVICTION = false;
 
        protected enum LineageCacheStatus {
                EMPTY,     //Placeholder with no data. Cannot be evicted.
@@ -209,6 +206,14 @@ public class LineageCacheConfig
                return ret;
        };
 
+       protected static Comparator<LineageCacheEntry> 
LineageGPUCacheComparator = (e1, e2) -> {
+               if (e1._key.getId() == e2._key.getId())
+                       return 0;
+               if (e1.score == e2.score)
+                       return Long.compare(e1._key.getId(), e2._key.getId());
+               else
+                       return e1.score < e2.score ? -1 : 1;
+       };
 
        //-------------SPARK OPERATION RELATED CONFIGURATIONS--------------//
 
@@ -362,14 +367,6 @@ public class LineageCacheConfig
                        && _cacheType.isMultilevelReuse();
        }
 
-       public static CachedItemHead getCachedItemHead() {
-               return _itemH;
-       }
-
-       public static CachedItemTail getCachedItemTail() {
-               return _itemT;
-       }
-       
        public static boolean getCompAssRW() {
                return _compilerAssistedRW;
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
index a603111031..e019f9810d 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
@@ -342,6 +342,30 @@ public class LineageCacheEntry {
                int computeGroup = 
LineageCacheConfig.getComputeGroup(_key.getOpcode());
                int refCount = Math.max(_rddObject.getMaxReferenceCount(), 1);
                score = w1*(((double)computeGroup*refCount)/estimatedSize) + 
w2*getTimestamp() + w3*(((double)1)/getDagHeight());
+               // Update score to emulate computeTime scaling by #misses
+               if (removeList.containsKey(_key) && 
LineageCacheConfig.isCostNsize()) {
+                       int missCount = 1 + removeList.get(_key);
+                       score = score + (w1*(((double) computeGroup * refCount) 
/ estimatedSize) * missCount);
+               }
+       }
+
+       protected synchronized void initiateScoreGPU(Map<LineageItem, Integer> 
removeList) {
+               // Set timestamp
+               _timestamp =  System.currentTimeMillis() - 
LineageCacheEviction.getStartTimestamp();
+               if (_timestamp < 0)
+                       throw new DMLRuntimeException ("Execution timestamp 
shouldn't be -ve. Key: "+_key);
+               // Weights for scoring components in GPU
+               double w1 = 0;
+               double w2 = 1;
+               double w3 = 1;
+               // Generate initial score
+               score = w2*getTimestamp() + w3*(((double)1)/getDagHeight());
+               // TODO: timestamp >> DAg_height. Normalize timestamp and DAG 
height.
+               // Update score to emulate computeTime scaling by #misses
+               if (removeList.containsKey(_key)) {
+                       int missCount = 1 + removeList.get(_key);
+                       score = score + ((w2*getTimestamp() + 
w3*(((double)1)/getDagHeight())) * missCount);
+               }
        }
        
        protected synchronized void updateScore(boolean add) {
@@ -349,12 +373,14 @@ public class LineageCacheEntry {
                double w1 = LineageCacheConfig.WEIGHTS[0];
                long size = getSize();
                int sign = add ? 1: -1;
-                if(isLocalObject())
+                if (isLocalObject())
                         score = score + sign * w1 * (((double) _computeTime) / 
size);
-                if(isRDDPersist() && size != 0) {  //size == 0 means not 
persisted yet
+                if (isRDDPersist() && size != 0) {  //size == 0 means not 
persisted yet
                         int computeGroup = 
LineageCacheConfig.getComputeGroup(_key.getOpcode());
                         score = score + sign * w1 * (((double) computeGroup) / 
size);
                 }
+                if (isGPUObject())
+                        score = score + sign * (getTimestamp() + 
((double)1)/getDagHeight());
        }
        
        protected synchronized long getTimestamp() {
@@ -383,6 +409,8 @@ public class LineageCacheEntry {
                        int refCount = 
Math.max(_rddObject.getMaxReferenceCount(), 1);
                        score = w1*(((double)computeGroup*refCount)/size) + 
w2*getTimestamp() + w3*(((double)1)/getDagHeight());
                }
+               if (isGPUObject())
+                       score = getTimestamp() + (((double)1)/getDagHeight());
        }
 
        static class GPUPointer {
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
index be7460f6fd..ffc7e5eeff 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
@@ -48,6 +48,7 @@ public class LineageCacheStatistics {
        private static final LongAdder _numSyncEvictGpu = new LongAdder();
        private static final LongAdder _numRecycleGpu   = new LongAdder();
        private static final LongAdder _numDelGpu       = new LongAdder();
+       private static final LongAdder _numHitsDelGpu   = new LongAdder();
        private static final LongAdder _evtimeGpu       = new LongAdder();
        // Below entries are specific to Spark instructions
        private static final LongAdder _numHitsRdd      = new LongAdder();
@@ -78,6 +79,7 @@ public class LineageCacheStatistics {
                _numSyncEvictGpu.reset();
                _numRecycleGpu.reset();
                _numDelGpu.reset();
+               _numHitsDelGpu.reset();
                _numHitsRdd.reset();
                _numHitsSparkActions.reset();
                _numHitsRddPersist.reset();
@@ -230,6 +232,11 @@ public class LineageCacheStatistics {
                _numDelGpu.increment();
        }
 
+       public static void incrementDelHitsGpu() {
+               // Number of hits on pointers that are deleted/recycled before
+               _numHitsDelGpu.increment();
+       }
+
        public static void incrementEvictTimeGpu(long delta) {
                // Total time spent on evicting from GPU to main memory or 
deleting from GPU lineage cache
                _evtimeGpu.add(delta);
@@ -327,6 +334,8 @@ public class LineageCacheStatistics {
                sb.append(_numRecycleGpu.longValue());
                sb.append("/");
                sb.append(_numDelGpu.longValue());
+               sb.append("/");
+               sb.append(_numHitsDelGpu.longValue());
                return sb.toString();
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageGPUCacheEviction.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageGPUCacheEviction.java
index 60d5e00a9e..8d2072adc5 100644
--- 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageGPUCacheEviction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageGPUCacheEviction.java
@@ -32,7 +32,6 @@ import java.util.stream.Collectors;
 import jcuda.Pointer;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.instructions.gpu.context.GPUContext;
-import org.apache.sysds.runtime.instructions.gpu.context.GPUContextPool;
 import org.apache.sysds.runtime.matrix.data.LibMatrixCUDA;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
@@ -41,110 +40,97 @@ import static 
org.apache.sysds.runtime.instructions.gpu.context.GPUObject.toIntE
 
 public class LineageGPUCacheEviction 
 {
-       private static long _currentCacheSize = 0;
-       private static long GPU_CACHE_LIMIT; //limit in bytes
+       // GPU context for this GPU
        private static GPUContext _gpuContext = null;
-       private static long _startTimestamp = 0;
+       // Thread for asynchronous copy
        public static ExecutorService gpuEvictionThread = null;
-
        // Weighted queue of freed pointers.
        private static HashMap<Long, TreeSet<LineageCacheEntry>> freeQueues = 
new HashMap<>();
-
        // Pointers and live counts associated
        private static HashMap<Pointer, Integer> livePointers = new HashMap<>();
-
        // All cached pointers mapped to the corresponding lineage cache entries
        private static HashMap<Pointer, LineageCacheEntry> GPUCacheEntries = 
new HashMap<>();
 
        protected static void resetEviction() {
-               _currentCacheSize = 0;
                gpuEvictionThread = null;
-               //LineageCacheConfig.CONCURRENTGPUEVICTION = false;
                freeQueues.clear();
                livePointers.clear();
                GPUCacheEntries.clear();
        }
 
-       public static void setGPUContext(GPUContext gpuCtx) {
-               _gpuContext = gpuCtx;
-       }
-
-       protected static GPUContext getGPUContext() {
-               return _gpuContext;
-       }
-
-       protected static long getPointerSize(Pointer ptr) {
-               return 
_gpuContext.getMemoryManager().getSizeAllocatedGPUPointer(ptr);
-       }
+       //--------------- CACHE & POINTER QUEUE MAINTENANCE --------------//
 
+       // During reuse, move the reused pointer from free lists to the live 
list
        protected static void incrementLiveCount(Pointer ptr) {
                // Move from free list (if exists) to live list
                if(livePointers.merge(ptr, 1, Integer::sum) == 1)
                        
freeQueues.get(getPointerSize(ptr)).remove(GPUCacheEntries.get(ptr));
        }
 
+       // rmVar moves the live pointer to a free list
        public static void decrementLiveCount(Pointer ptr) {
                // Decrement and move to the free list if the live count 
becomes 0
                if(livePointers.compute(ptr, (k, v) -> v==1 ? null : v-1) == 
null) {
                        long size = getPointerSize(ptr);
                        if (!freeQueues.containsKey(size))
-                               freeQueues.put(size, new 
TreeSet<>(LineageCacheConfig.LineageCacheComparator));
-                               //FIXME: Multiple entries can point to same 
pointer due to multi-level reuse
+                               freeQueues.put(size, new 
TreeSet<>(LineageCacheConfig.LineageGPUCacheComparator));
                        freeQueues.get(size).add(GPUCacheEntries.get(ptr));
                }
        }
 
+       // Check if this pointer is live
        public static boolean probeLiveCachedPointers(Pointer ptr) {
                return livePointers.containsKey(ptr);
        }
 
-       //---------------- COSTING RELATED METHODS -----------------
-
-       /**
-        * Set the max constraint for the lineage cache in GPU
-        */
-       public static void setGPULineageCacheLimit() {
-               long available = GPUContextPool.initialGPUMemBudget();
-               GPU_CACHE_LIMIT = (long) (available * 
LineageCacheConfig.GPU_CACHE_MAX);
-       }
-       protected static void setStartTimestamp() {
-               _startTimestamp = System.currentTimeMillis();
-       }
-
-       protected static long getStartTimestamp() {
-               return _startTimestamp;
-       }
-       
-       private static void adjustD2HTransferSpeed(double sizeByte, double 
copyTime) {
-               double sizeMB = sizeByte / (1024*1024);
-               double newTSpeed = sizeMB / copyTime;  //bandwidth (MB/sec) + 
java overhead
-
-               if (newTSpeed > LineageCacheConfig.D2HMAXBANDWIDTH)
-                       return;  //filter out errorneous measurements (~ 
>8GB/sec)
-               // Perform exponential smoothing.
-               double smFactor = 0.5;  //smoothing factor
-               LineageCacheConfig.D2HCOPYBANDWIDTH = (smFactor * newTSpeed) + 
((1-smFactor) * LineageCacheConfig.D2HCOPYBANDWIDTH);
-               //System.out.println("size_t: "+sizeMB+ " speed_t: "+newTSpeed 
+ " estimate_t+1: "+LineageCacheConfig.D2HCOPYBANDWIDTH);
-       }
-
-       //--------------- CACHE MAINTENANCE & LOOKUP FUNCTIONS --------------//
-
        protected static void addEntry(LineageCacheEntry entry) {
                if (entry.isNullVal())
-                       // Placeholders shouldn't participate in eviction 
cycles.
-                       return;
+                       return;  // Placeholders shouldn't participate in 
eviction cycles.
                if (entry.isScalarValue())
                        throw new DMLRuntimeException ("Scalars are never 
stored in GPU. Lineage: "+ entry._key);
 
-               // TODO: Separate removelist, starttimestamp, score and weights 
from CPU cache
-               entry.computeScore(LineageCacheEviction._removelist);
+               entry.initiateScoreGPU(LineageCacheEviction._removelist);
                // The pointer must be live at this moment
                livePointers.put(entry.getGPUPointer(), 1);
                GPUCacheEntries.put(entry.getGPUPointer(), entry);
        }
-       
-       public static boolean isGPUCacheEmpty() {
-               return (freeQueues.isEmpty() && livePointers.isEmpty());
+
+       // MaintainOrder is called on a reuse, which means the pointer is
+       // then moved to the livePointers list and removed from the
+       // free queue. Later, the pointer will be moved to the free
+       // queue in a new position due to the updated score.
+       protected static void maintainOrder (LineageCacheEntry entry) {
+               if (entry.getCacheStatus() != 
LineageCacheConfig.LineageCacheStatus.GPUCACHED)
+                       return;
+               // Reset the timestamp to maintain the LRU component of the 
scoring function
+               entry.updateTimestamp();
+               // Scale score of the sought entry after every cache hit
+               entry.updateScore(true);
+       }
+
+       protected static void removeSingleEntry(Map<LineageItem, 
LineageCacheEntry> cache, LineageCacheEntry e) {
+               cache.remove(e._key);
+               // Maintain miss count to increase the score if the item enters 
the cache again
+               LineageCacheEviction._removelist.merge(e._key, 1, Integer::sum);
+       }
+
+       private static void removeEntry(LineageCacheEntry e) {
+               Map<LineageItem, LineageCacheEntry> cache = 
LineageCache.getLineageCache();
+               if (e._origItem == null) {
+                       // Single entry. Remove.
+                       removeSingleEntry(cache, e);
+                       return;
+               }
+               // Remove all entries pointing to this pointer
+               LineageCacheEntry tmp = cache.get(e._origItem);
+               while (tmp != null) {
+                       removeSingleEntry(cache, tmp);
+                       tmp = tmp._nextEntry;
+               }
+       }
+
+       public static void setGPUContext(GPUContext gpuCtx) {
+               _gpuContext = gpuCtx;
        }
 
        public static boolean isGPUCacheFreeQEmpty() {
@@ -158,8 +144,15 @@ public class LineageGPUCacheEviction
                        freeQueues.remove(size); //remove if empty
 
                // Poll the first pointer from the queue
-               if (freeList != null && !freeList.isEmpty())
-                       return freeList.pollFirst();
+               LineageCacheEntry e = null;
+               if (freeList != null && !freeList.isEmpty()) {
+                       e = freeList.pollFirst();
+                       if (probeLiveCachedPointers(e.getGPUPointer()))
+                               throw new DMLRuntimeException("Recycling live 
pointer: "+e._key);
+                       removeEntry(e);
+                       GPUCacheEntries.remove(e.getGPUPointer());
+                       return e;
+               }
                return null;
        }
 
@@ -180,43 +173,21 @@ public class LineageGPUCacheEviction
                return null;
        }
 
-       public static LineageCacheEntry peekFirstFreeEntry(long size) {
-               return freeQueues.get(size).first();
-       }
-       
-       public static void removeFreeEntry(LineageCacheEntry e) {
-               long size = getPointerSize(e.getGPUPointer());
-               freeQueues.get(size).remove(e);
-       }
-
-       //---------------- CACHE SPACE MANAGEMENT METHODS -----------------//
-
-       protected static void updateSize(long space, boolean addspace) {
-               if (addspace)
-                       _currentCacheSize += space;
-               else
-                       _currentCacheSize -= space;
-       }
-
-       protected static boolean isBelowMaxThreshold(long spaceNeeded) {
-               return ((spaceNeeded + _currentCacheSize) <= GPU_CACHE_LIMIT);
-       }
-       
-       protected static long getGPUCacheLimit() {
-               return GPU_CACHE_LIMIT;
-       }
+       //---------------- SPACE MANAGEMENT, DEBUG PRINT & D2H COPY 
-----------------//
 
        public static int numPointersCached() {
-               return livePointers.size() + 
freeQueues.values().stream().mapToInt(TreeSet::size).sum();
+               return 
freeQueues.values().stream().mapToInt(TreeSet::size).sum();
        }
 
        public static long totalMemoryCached() {
-               long totLive = livePointers.keySet().stream()
-                       .mapToLong(ptr -> 
_gpuContext.getMemoryManager().getSizeAllocatedGPUPointer(ptr)).sum();
                long totFree = 0;
                for (Map.Entry<Long, TreeSet<LineageCacheEntry>> entry : 
freeQueues.entrySet())
                        totFree += entry.getKey() * entry.getValue().size();
-               return totLive + totFree;
+               return totFree;
+       }
+
+       protected static long getPointerSize(Pointer ptr) {
+               return 
_gpuContext.getMemoryManager().getSizeAllocatedGPUPointer(ptr);
        }
 
        public static Set<Pointer> getAllCachedPointers() {
@@ -224,7 +195,6 @@ public class LineageGPUCacheEviction
                for (Map.Entry<Long, TreeSet<LineageCacheEntry>> entry : 
freeQueues.entrySet())
                        cachedPointers.addAll(entry.getValue().stream()
                                
.map(LineageCacheEntry::getGPUPointer).collect(Collectors.toSet()));
-               cachedPointers.addAll(livePointers.keySet());
                return cachedPointers;
        }
 
@@ -255,38 +225,38 @@ public class LineageGPUCacheEviction
                return ptr;
        }
 
+       private static void adjustD2HTransferSpeed(double sizeByte, double 
copyTime) {
+               double sizeMB = sizeByte / (1024*1024);
+               double newTSpeed = sizeMB / copyTime;  //bandwidth (MB/sec) + 
java overhead
+
+               if (newTSpeed > LineageCacheConfig.D2HMAXBANDWIDTH)
+                       return;  //filter out errorneous measurements (~ 
>8GB/sec)
+               // Perform exponential smoothing.
+               double smFactor = 0.5;  //smoothing factor
+               LineageCacheConfig.D2HCOPYBANDWIDTH = (smFactor * newTSpeed) + 
((1-smFactor) * LineageCacheConfig.D2HCOPYBANDWIDTH);
+               //System.out.println("size_t: "+sizeMB+ " speed_t: "+newTSpeed 
+ " estimate_t+1: "+LineageCacheConfig.D2HCOPYBANDWIDTH);
+       }
+
        private static MatrixBlock pointerToMatrixBlock(LineageCacheEntry le) {
                MatrixBlock ret = null;
                DataCharacteristics dc = le.getDataCharacteristics();
-               if (le.isDensePointer()) {
-                       ret = new MatrixBlock(toIntExact(dc.getRows()), 
toIntExact(dc.getCols()), false);
-                       ret.allocateDenseBlock();
-                       // copy to the host
-                       
LibMatrixCUDA.cudaSupportFunctions.deviceToHost(getGPUContext(),
-                               le.getGPUPointer(), ret.getDenseBlockValues(), 
null, true);
-                       ret.recomputeNonZeros();
-               } /*else {
-                       int rows = toIntExact(dc.getRows());
-                       int cols = toIntExact(dc.getCols());
-                       int nnz = toIntExact(le.getGPUPointer().nnz);
-                       double[] values = new double[nnz];
-                       
LibMatrixCUDA.cudaSupportFunctions.deviceToHost(getGPUContext(), 
le.getGPUPointer().val, values, null, true);
-                       int[] rowPtr = new int[rows + 1];
-                       int[] colInd = new int[nnz];
-                       CSRPointer.copyPtrToHost(le.getGPUPointer(), rows, nnz, 
rowPtr, colInd);
-                       SparseBlockCSR sparseBlock = new SparseBlockCSR(rowPtr, 
colInd, values, nnz);
-                       ret = new MatrixBlock(rows, cols, nnz, sparseBlock);
-               }*/
+               if (!le.isDensePointer())
+                       throw new DMLRuntimeException ("Sparse pointers should 
not be cached in GPU. Lineage: "+ le._key);
+               ret = new MatrixBlock(toIntExact(dc.getRows()), 
toIntExact(dc.getCols()), false);
+               ret.allocateDenseBlock();
+               // copy to the host
+               LibMatrixCUDA.cudaSupportFunctions.deviceToHost(_gpuContext,
+                       le.getGPUPointer(), ret.getDenseBlockValues(), null, 
true);
+               ret.recomputeNonZeros();
                //mat.acquireModify(tmp);
                //mat.release();
                return ret;
        }
 
+       // Hard removal from GPU cache
        public static void removeFromDeviceCache(LineageCacheEntry entry, 
Pointer ptr, boolean removeFromCache) {
-               long size = 
_gpuContext.getMemoryManager().getSizeAllocatedGPUPointer(ptr);
                if (removeFromCache)
                        LineageCache.removeEntry(entry._key);
-               updateSize(size, false);
                GPUCacheEntries.remove(ptr);
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java
index 12a87ff2b4..b3c296ac7e 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.sysds.runtime.instructions.cp.ComputationCPInstruction;
 import org.apache.sysds.runtime.instructions.spark.ComputationSPInstruction;
 import org.apache.sysds.runtime.instructions.spark.RandSPInstruction;
 import org.apache.sysds.runtime.io.IOUtilFunctions;
@@ -519,11 +520,12 @@ public class LineageItemUtils {
        }
 
        // A statement block benefits from reuse if is large enough (>10 
instructions) or has
-       // Spark instructions. Caching small SBs lead to long chains of 
LineageCacheEntries,
-       // which in turn leads to reduced evictable entries.
+       // Spark instructions or has input frames. Caching small SBs lead to 
long chains of
+       // LineageCacheEntries,which in turn leads to reduced evictable entries.
        public static boolean hasValidInsts(ArrayList<Instruction> insts) {
                int count = 0;
                boolean hasSPInst = false;
+               boolean hasFrameInput = false;
                for (Instruction ins : insts) {
                        if (ins instanceof VariableCPInstruction)
                                continue;
@@ -531,8 +533,10 @@ public class LineageItemUtils {
                        if ((ins instanceof ComputationSPInstruction && 
!ins.getOpcode().equals("chkpoint"))
                                || ins.getOpcode().equals("prefetch"))
                                hasSPInst = true;
+                       if (ins instanceof ComputationCPInstruction && 
((ComputationCPInstruction) ins).hasFrameInput())
+                               hasFrameInput = true;
                }
-               return count >= 10 || hasSPInst;
+               return count >= 10 || hasSPInst || hasFrameInput;
        }
        
        public static void addAllDataLineage(ExecutionContext ec) {
diff --git a/src/main/java/org/apache/sysds/utils/Statistics.java 
b/src/main/java/org/apache/sysds/utils/Statistics.java
index 01c9682d90..2fbb4edb8a 100644
--- a/src/main/java/org/apache/sysds/utils/Statistics.java
+++ b/src/main/java/org/apache/sysds/utils/Statistics.java
@@ -640,7 +640,7 @@ public class Statistics
                                sb.append("LinCache MultiLevel (Ins/SB/Fn):" + 
LineageCacheStatistics.displayMultiLevelHits() + ".\n");
                                if (LineageCacheStatistics.ifGpuStats()) {
                                        sb.append("LinCache GPU (Hit/PF): \t" + 
LineageCacheStatistics.displayGpuStats() + ".\n");
-                                       sb.append("LinCache GPU (Recyc/Del): 
\t" + LineageCacheStatistics.displayGpuPointerStats() + ".\n");
+                                       sb.append("LinCache GPU 
(Recyc/Del/Miss): \t" + LineageCacheStatistics.displayGpuPointerStats() + 
".\n");
                                        sb.append("LinCache GPU evict time: \t" 
+ LineageCacheStatistics.displayGpuEvictTime() + " sec.\n");
                                }
                                if (LineageCacheStatistics.ifSparkStats()) {
diff --git 
a/src/test/java/org/apache/sysds/test/functions/lineage/GPULineageCacheEvictionTest.java
 
b/src/test/java/org/apache/sysds/test/functions/lineage/GPULineageCacheEvictionTest.java
index a40eee0316..e0a6a726e3 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/lineage/GPULineageCacheEvictionTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/lineage/GPULineageCacheEvictionTest.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 
+import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.lineage.Lineage;
 import org.apache.sysds.runtime.lineage.LineageCacheConfig;
 import org.apache.sysds.runtime.matrix.data.MatrixValue;
@@ -87,6 +88,7 @@ public class GPULineageCacheEvictionTest extends 
AutomatedTestBase{
                Lineage.resetInternalState();
                boolean gpu2Mem = LineageCacheConfig.GPU2HOSTEVICTION;
                //LineageCacheConfig.GPU2HOSTEVICTION = true;
+               OptimizerUtils.ASYNC_PREFETCH = true;
                //run the test
                runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
                HashMap<MatrixValue.CellIndex, Double> R_orig = 
readDMLMatrixFromOutputDir("R");
@@ -104,6 +106,7 @@ public class GPULineageCacheEvictionTest extends 
AutomatedTestBase{
                runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
                AutomatedTestBase.TEST_GPU = false;
                LineageCacheConfig.GPU2HOSTEVICTION = gpu2Mem;
+               OptimizerUtils.ASYNC_PREFETCH = false;
                HashMap<MatrixValue.CellIndex, Double> R_reused = 
readDMLMatrixFromOutputDir("R");
 
                //compare results 
diff --git a/src/test/scripts/functions/lineage/GPUCacheEviction1.dml 
b/src/test/scripts/functions/lineage/GPUCacheEviction1.dml
index 330ef6742c..18a9ead4bc 100644
--- a/src/test/scripts/functions/lineage/GPUCacheEviction1.dml
+++ b/src/test/scripts/functions/lineage/GPUCacheEviction1.dml
@@ -23,16 +23,18 @@ X = rand(rows=10000, cols=1000, seed=42);
 y = rand(rows=10000, cols=1000, seed=43);
 S = matrix(0, rows=1, cols=3);
 
+for (j in 1:2) {
 X1 = X;
 y1 = y;
 S1 = 0;
 # fill half of the cache
-for (i in 1:20) {
-  R = X1 * y1;
-  X1 = cbind(X1, rand(rows=10000, cols=1, seed=42));
-  y1 = cbind(y1, rand(rows=10000, cols=1, seed=42));
-  while(FALSE){}
-  S1 = S1 + sum(R);
+  for (i in 1:20) {
+    R = X1 * y1;
+    X1 = cbind(X1, rand(rows=10000, cols=1, seed=42));
+    y1 = cbind(y1, rand(rows=10000, cols=1, seed=42));
+    while(FALSE){}
+    S1 = S1 + sum(R);
+  }
 }
 S[,1] = S1;
 
diff --git a/src/test/scripts/functions/lineage/GPUCacheEviction3.dml 
b/src/test/scripts/functions/lineage/GPUCacheEviction3.dml
index 818b8b2c0a..dd617928ea 100644
--- a/src/test/scripts/functions/lineage/GPUCacheEviction3.dml
+++ b/src/test/scripts/functions/lineage/GPUCacheEviction3.dml
@@ -18,43 +18,46 @@
 # under the License.
 #
 #-------------------------------------------------------------
-D = rand(rows=25600, cols=784, min=0, max=20, seed=42)
-bs = 128;
-ep = 10;
-iter_ep = ceil(nrow(D)/bs);
-maxiter = ep * iter_ep;
-beg = 1;
-iter = 0;
-i = 1;
 
-while (iter < maxiter) {
-  end = beg + bs - 1;
-  if (end>nrow(D))
-    end = nrow(D);
-  X = D[beg:end,]
+miniBatch = function(Matrix[Double] D) return (Matrix[Double] R) { 
+  bs = 2048;
+  ep = 10;
+  iter_ep = ceil(nrow(D)/bs);
+  maxiter = ep * iter_ep;
+  beg = 1;
+  iter = 0;
+  i = 1;
+
+  while (iter < maxiter) {
+    end = beg + bs - 1;
+    if (end>nrow(D))
+      end = nrow(D);
+    X = D[beg:end,]
 
-  # reusable OP across epochs
-  X = scale(X, TRUE, TRUE);
-  # pollute cache with not reusable OPs
-  X = ((X + X) * i - X) / (i+1)
-  X = ((X + X) * i - X) / (i+1)
-  X = ((X + X) * i - X) / (i+1)
-  X = ((X + X) * i - X) / (i+1)
-  X = ((X + X) * i - X) / (i+1)
-  X = ((X + X) * i - X) / (i+1)
-  X = ((X + X) * i - X) / (i+1)
-  X = ((X + X) * i - X) / (i+1)
-  X = ((X + X) * i - X) / (i+1)
-  X = ((X + X) * i - X) / (i+1)
+    # reusable OP across epochs
+    X = scale(X, TRUE, TRUE);
+    # pollute cache with not reusable OPs
+    X = ((X + X) * i - X) / (i+1)
+    X = ((X + X) * i - X) / (i+1)
+    X = ((X + X) * i - X) / (i+1)
+    X = ((X + X) * i - X) / (i+1)
+    X = ((X + X) * i - X) / (i+1)
 
-  iter = iter + 1;
-  if (end == nrow(D))
-    beg = 1;
-  else
-    beg = end + 1;
-  i = i + 1;
+    iter = iter + 1;
+    if (end == nrow(D))
+      beg = 1;
+    else
+      beg = end + 1;
+    i = i + 1;
 
+  }
+  R = X;
+}
+
+D = rand(rows=25600, cols=784, min=0, max=20, seed=42)
+for (i in 1:3) {
+  R = miniBatch(D);
+  print(sum(R));
 }
-print(sum(X));
-write(X, $1, format="text");
+write(R, $1, format="text");
 


Reply via email to