This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 078f43eba0 [SYSTEMDS-3518] Eviction of lineage-cached RDDs from Spark 
storage
078f43eba0 is described below

commit 078f43eba00f938f1c677db79ba611a7e6d30ada
Author: Arnab Phani <[email protected]>
AuthorDate: Sat May 27 23:55:00 2023 +0200

    [SYSTEMDS-3518] Eviction of lineage-cached RDDs from Spark storage
    
    This patch extends the lineage cache eviction policies to support RDDs
    persisted at the executors.
    - We checkpoint a RDD on the second cache hit (reduce cache pollution).
    - While checkpointing, we rely on the worst case size estimations and
      later update the eviction data structures with actual size once
      the RDDs are persisted.
    - We split the Spark operators into two groups, one for expensive,
      shuffle-based operations, and another for map-based operations.
      For the scoring function, we assume the first set is 2x more expensive
    - We also track the reference counts of RDDs and use that in the scoring.
      More references (many consumers) indicates higher importance.
    - We reduce the score by one hit count if we collect a persisted RDD. This
      is to evict the intermediates which are cached at multiple locations.
    
      Closes #1834
---
 .../context/SparkExecutionContext.java             |  32 ++++-
 .../spark/ComputationSPInstruction.java            |  22 ---
 .../instructions/spark/data/LineageObject.java     |   9 ++
 .../runtime/instructions/spark/data/RDDObject.java |   4 +
 .../apache/sysds/runtime/lineage/LineageCache.java | 114 ++++++++++++---
 .../sysds/runtime/lineage/LineageCacheConfig.java  |  39 +++++-
 .../sysds/runtime/lineage/LineageCacheEntry.java   |  53 ++++++-
 .../runtime/lineage/LineageCacheEviction.java      |   2 +-
 .../runtime/lineage/LineageCacheStatistics.java    |  35 ++++-
 .../runtime/lineage/LineageSparkCacheEviction.java | 155 +++++++++++++++++++++
 .../sysds/runtime/matrix/data/MatrixBlock.java     |   4 +
 .../java/org/apache/sysds/utils/Statistics.java    |  13 +-
 .../functions/async/LineageReuseSparkTest.java     |   2 -
 13 files changed, 422 insertions(+), 62 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
index 1d4681ec75..ce7b43972d 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
@@ -1695,9 +1695,12 @@ public class SparkExecutionContext extends 
ExecutionContext
                return jsc.sc().getPersistentRDDs().contains(rddID);
        }
 
-       public boolean isRDDCached( int rddID ) {
+       public static boolean isRDDCached( int rddID ) {
+               if (!isSparkContextCreated())
+                       return false;
+
+               JavaSparkContext jsc = _spctx;
                //check that rdd is marked for caching
-               JavaSparkContext jsc = getSparkContext();
                if( !jsc.sc().getPersistentRDDs().contains(rddID) ) {
                        return false;
                }
@@ -1710,6 +1713,31 @@ public class SparkExecutionContext extends 
ExecutionContext
                return false;
        }
 
+       public static long getMemCachedRDDSize(int rddID) {
+               if (!isSparkContextCreated())
+                       return 0;
+
+               JavaSparkContext jsc = _spctx;
+               //check that rdd is marked for caching
+               if( !jsc.sc().getPersistentRDDs().contains(rddID) )
+                       return 0;
+
+               for (RDDInfo info : jsc.sc().getRDDStorageInfo()) {
+                       if (info.id() == rddID && info.isCached())
+                               return info.memSize(); //total size summing all 
executors
+               }
+               return 0;
+       }
+
+       public static long getStorageSpaceUsed() {
+               //return the sum of the sizes of the cached RDDs in all 
executors
+               if (!isSparkContextCreated())
+                       return 0;
+
+               JavaSparkContext jsc = _spctx;
+               return 
Arrays.stream(jsc.sc().getRDDStorageInfo()).mapToLong(RDDInfo::memSize).sum();
+       }
+
        ///////////////////////////////////////////
        // Spark configuration handling
        ///////
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ComputationSPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ComputationSPInstruction.java
index f55a0b398e..92f21d86c2 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ComputationSPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ComputationSPInstruction.java
@@ -20,10 +20,7 @@
 package org.apache.sysds.runtime.instructions.spark;
 
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.storage.StorageLevel;
 import org.apache.sysds.runtime.DMLRuntimeException;
-import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysds.runtime.functionobjects.IndexFunction;
@@ -31,13 +28,9 @@ import org.apache.sysds.runtime.functionobjects.ReduceAll;
 import org.apache.sysds.runtime.functionobjects.ReduceCol;
 import org.apache.sysds.runtime.functionobjects.ReduceRow;
 import org.apache.sysds.runtime.instructions.cp.CPOperand;
-import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
-import org.apache.sysds.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysds.runtime.lineage.LineageItem;
 import org.apache.sysds.runtime.lineage.LineageItemUtils;
 import org.apache.sysds.runtime.lineage.LineageTraceable;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysds.runtime.matrix.operators.Operator;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
 
@@ -134,21 +127,6 @@ public abstract class ComputationSPInstruction extends 
SPInstruction implements
                }
        }
 
-       @SuppressWarnings("unchecked")
-       public void checkpointRDD(ExecutionContext ec) {
-               SparkExecutionContext sec = (SparkExecutionContext)ec;
-               CacheableData<?> cd = sec.getCacheableData(output.getName());
-               RDDObject inro =  cd.getRDDHandle();
-               JavaPairRDD<?,?> outrdd = 
SparkUtils.copyBinaryBlockMatrix((JavaPairRDD<MatrixIndexes, 
MatrixBlock>)inro.getRDD(), false);
-               //TODO: remove shallow copying as short-circuit collect is 
disabled if locally cached
-               outrdd = outrdd.persist((StorageLevel.MEMORY_AND_DISK()));
-               RDDObject outro = new RDDObject(outrdd); //create new rdd object
-               outro.setCheckpointRDD(true);            //mark as checkpointed
-               outro.addLineageChild(inro);             //keep lineage to 
prevent cycles on cleanup
-               cd.setRDDHandle(outro);
-               sec.setVariable(output.getName(), cd);
-       }
-       
        @Override
        public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
                return Pair.of(output.getName(), new LineageItem(getOpcode(),
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/LineageObject.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/LineageObject.java
index f4b99bb03e..2f9d1bb0bd 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/LineageObject.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/LineageObject.java
@@ -28,6 +28,7 @@ public abstract class LineageObject
 {
        //basic lineage information
        protected int _numRef = -1;
+       protected int _maxNumRef = -1;
        protected boolean _lineageCached = false;
        protected final List<LineageObject> _childs;
        
@@ -62,11 +63,19 @@ public abstract class LineageObject
        
        public void incrementNumReferences() {
                _numRef++;
+
+               // Maintain the maximum reference count. Higher reference
+               // count indicates higher importance to persist (in lineage 
cache)
+               _maxNumRef = Math.max(_numRef, _maxNumRef);
        }
        
        public void decrementNumReferences() {
                _numRef--;
        }
+
+       public int getMaxReferenceCount() {
+               return _maxNumRef;
+       }
        
        public List<LineageObject> getLineageChilds() {
                return _childs;
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/RDDObject.java 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/RDDObject.java
index 04d021b6ff..6ae7ed2061 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/RDDObject.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/RDDObject.java
@@ -40,6 +40,10 @@ public class RDDObject extends LineageObject
        public JavaPairRDD<?,?> getRDD() {
                return _rddHandle;
        }
+
+       public void setRDD(JavaPairRDD<?,?> rddHandle) {
+               _rddHandle = rddHandle;
+       }
        
        public void setCheckpointRDD( boolean flag ) {
                _checkpointed = flag;
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index f007e719b9..a51c0ae9e3 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -22,6 +22,8 @@ package org.apache.sysds.runtime.lineage;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.storage.StorageLevel;
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.common.Types.FileFormat;
@@ -146,9 +148,29 @@ public class LineageCache
                                                        
ec.setScalarOutput(outName, so);
                                        }
                                        else if (e.isRDDPersist()) {
-                                               //Reuse the cached RDD (local 
or persisted at the executors)
                                                RDDObject rdd = 
e.getRDDObject();
-                                               ((SparkExecutionContext) 
ec).setRDDHandleForVariable(outName, rdd);
+                                               if (rdd == null && 
e.getCacheStatus() == LineageCacheStatus.NOTCACHED)
+                                                       return false;  //the 
executing thread removed this entry from cache
+
+                                               //Reuse the cached RDD (local 
or persisted at the executors)
+                                               switch(e.getCacheStatus()) {
+                                                       case TOPERSISTRDD:
+                                                               //Mark for 
caching on the second hit
+                                                               
persistRDD(inst, e, ec);
+                                                               //Update status 
to indicate persisted in the executors
+                                                               
e.setCacheStatus(LineageCacheStatus.PERSISTEDRDD);
+                                                               //Even not 
persisted, reuse the rdd locally for shuffle operations
+                                                               if 
(!LineageCacheConfig.isShuffleOp(inst))
+                                                                       return 
false;
+                                                               
((SparkExecutionContext) ec).setRDDHandleForVariable(outName, rdd);
+                                                               break;
+                                                       case PERSISTEDRDD:
+                                                               //Reuse the 
persisted intermediate at the executors
+                                                               
((SparkExecutionContext) ec).setRDDHandleForVariable(outName, rdd);
+                                                               break;
+                                                       default:
+                                                               return false;
+                                               }
                                        }
                                        else { //TODO handle locks on gpu 
objects
                                                //Create a GPUObject with the 
cached pointer
@@ -424,6 +446,15 @@ public class LineageCache
                        LineageCacheStatistics.incrementDelHits();
                return p;
        }
+
+       private static boolean probeRDDDistributed(LineageItem key) {
+               if (!_cache.containsKey(key))
+                       return false;
+               LineageCacheEntry e = _cache.get(key);
+               if (!e.isRDDPersist())
+                       return false;
+               return 
SparkExecutionContext.isRDDCached(e.getRDDObject().getRDD().id());
+       }
        
        //This method is for hard removal of an entry, w/o maintaining eviction 
data structures
        public static void removeEntry(LineageItem key) {
@@ -431,6 +462,12 @@ public class LineageCache
                if (!p) return;
                synchronized(_cache) {
                        LineageCacheEntry e = getEntry(key);
+                       if (e.isRDDPersist()) {
+                               e.getRDDObject().getRDD().unpersist(false);
+                               e.getRDDObject().setCheckpointRDD(false);
+                               return;
+                       }
+
                        long size = e.getSize();
                        if (e._origItem == null)
                                _cache.remove(e._key);
@@ -443,7 +480,11 @@ public class LineageCache
                                        _cache.remove(tmp._key);
                                }
                        }
-                       LineageCacheEviction.updateSize(size, false);
+
+                       if (e.isRDDPersist())
+                               
LineageSparkCacheEviction.updateSize(e.getSize(), false);
+                       else
+                               LineageCacheEviction.updateSize(size, false);
                }
        }
        
@@ -632,6 +673,10 @@ public class LineageCache
                synchronized( _cache ) {
                        if (!probe(instLI))
                                return;
+                       LineageCacheEntry centry = _cache.get(instLI);
+                       // Put in the cache only the first time
+                       if (centry.getCacheStatus() != LineageCacheStatus.EMPTY)
+                               return;
                        // Avoid reuse chkpoint, which is unnecessary
                        if (inst.getOpcode().equalsIgnoreCase("chkpoint")) {
                                removePlaceholder(instLI);
@@ -652,21 +697,12 @@ public class LineageCache
                                return;
                        }
 
-                       // Call persist on the output RDD if required
-                       if (opToPersist)
-                               ((ComputationSPInstruction) 
inst).checkpointRDD(ec);
                        // Get the RDD handle of the RDD
                        CacheableData<?> cd = 
ec.getCacheableData(((ComputationSPInstruction)inst).output.getName());
                        RDDObject rddObj = cd.getRDDHandle();
-
-                       LineageCacheEntry centry = _cache.get(instLI);
-                       // Set the RDD object in the cache
-                       // TODO: Make space in the executors
-                       // TODO: Estimate the actual compute time for this 
operation
+                       // Set the RDD object in the cache and set the status 
to TOPERSISTRDD
                        rddObj.setLineageCached();
                        centry.setRDDValue(rddObj, computetime);
-                       // Maintain order for eviction
-                       LineageCacheEviction.addEntry(centry);
                }
        }
 
@@ -685,6 +721,13 @@ public class LineageCache
 
                synchronized( _cache )
                {
+                       // If prefetching a persisted rdd, reduce the score of 
the persisted rdd by one hit count
+                       if (instLI.getOpcode().equals("prefetch") && 
probeRDDDistributed(instLI.getInputs()[0])) {
+                               LineageCacheEntry e = 
_cache.get(instLI.getInputs()[0]);
+                               if (e.getRDDObject().getNumReferences() < 1) 
//no other rdd consumer
+                                       e.updateScore(false);
+                       }
+
                        long computetime = System.nanoTime() - starttime;
                        // Make space, place data and manage queue
                        putIntern(instLI, DataType.MATRIX, mb, null, 
computetime);
@@ -851,6 +894,7 @@ public class LineageCache
                        _cache.clear();
                        LineageCacheEviction.resetEviction();
                        LineageGPUCacheEviction.resetEviction();
+                       LineageSparkCacheEviction.resetEviction();
                }
        }
        
@@ -913,7 +957,10 @@ public class LineageCache
                                LineageCacheStatistics.incrementMemHits();
 
                        // Maintain order for eviction
-                       LineageCacheEviction.getEntry(e);
+                       if (e.isRDDPersist())
+                               LineageSparkCacheEviction.maintainOrder(e);
+                       else
+                               LineageCacheEviction.getEntry(e);
                        return e;
                }
                else
@@ -985,7 +1032,42 @@ public class LineageCache
                else
                        return true;
        }
-       
+
+       private static void persistRDD(Instruction inst, LineageCacheEntry 
centry, ExecutionContext ec) {
+               boolean opToPersist = 
LineageCacheConfig.isReusableRDDType(inst);
+               // Return if the operation is not in the list of instructions 
which benefit
+               // from persisting and the local only RDD caching is disabled
+               if (!opToPersist && 
!LineageCacheConfig.ENABLE_LOCAL_ONLY_RDD_CACHING)
+                       return;
+
+               if (opToPersist && centry.getCacheStatus() == 
LineageCacheStatus.TOPERSISTRDD) {
+                       CacheableData<?> cd = 
ec.getCacheableData(((ComputationSPInstruction)inst).output.getName());
+                       // Estimate worst case dense size
+                       long estimatedSize = 
MatrixBlock.estimateSizeInMemory(cd.getDataCharacteristics());
+                       // Skip if the entry is bigger than the total storage.
+                       if (estimatedSize > 
LineageSparkCacheEviction.getSparkStorageLimit())
+                               return;
+
+                       // Mark the rdd for lazy checkpointing
+                       RDDObject rddObj = centry.getRDDObject();
+                       JavaPairRDD<?,?> rdd = rddObj.getRDD();
+                       rdd = rdd.persist(StorageLevel.MEMORY_AND_DISK());
+                       rddObj.setRDD(rdd);
+                       rddObj.setCheckpointRDD(true);
+
+                       // Make space based on the estimated size
+                       
if(!LineageSparkCacheEviction.isBelowThreshold(estimatedSize))
+                               LineageSparkCacheEviction.makeSpace(_cache, 
estimatedSize);
+                       LineageSparkCacheEviction.updateSize(estimatedSize, 
true);
+                       // Maintain order for eviction
+                       LineageSparkCacheEviction.addEntry(centry, 
estimatedSize);
+
+                       // Count number of RDDs marked for caching at the 
executors
+                       if (DMLScript.STATISTICS)
+                               LineageCacheStatistics.incrementRDDPersists();
+               }
+       }
+
        @Deprecated
        @SuppressWarnings("unused")
        private static double getRecomputeEstimate(Instruction inst, 
ExecutionContext ec) {
@@ -1210,7 +1292,7 @@ public class LineageCache
                
LineageCacheStatistics.incrementSavedComputeTime(e._computeTime);
                if (e.isGPUObject()) LineageCacheStatistics.incrementGpuHits();
                if (e.isRDDPersist()) {
-                       if (((SparkExecutionContext) 
ec).isRDDCached(e.getRDDObject().getRDD().id()))
+                       if 
(SparkExecutionContext.isRDDCached(e.getRDDObject().getRDD().id()))
                                
LineageCacheStatistics.incrementRDDPersistHits(); //persisted in the executors
                        else
                                LineageCacheStatistics.incrementRDDHits();  
//only locally cached
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index 0ce6cf3a8e..d0e32570b9 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -39,7 +39,9 @@ import 
org.apache.sysds.runtime.instructions.spark.ComputationSPInstruction;
 import org.apache.sysds.runtime.instructions.spark.CpmmSPInstruction;
 import org.apache.sysds.runtime.instructions.spark.MapmmSPInstruction;
 
+import java.util.Arrays;
 import java.util.Comparator;
+import java.util.stream.Stream;
 
 public class LineageCacheConfig 
 {
@@ -52,12 +54,23 @@ public class LineageCacheConfig
                "^", "uamax", "uark+", "uacmean", "eigen", "ctableexpand", 
"replace",
                "^2", "uack+", "tak+*", "uacsqk+", "uark+", "n+", "uarimax", 
"qsort", 
                "qpick", "transformapply", "uarmax", "n+", "-*", "castdtm", 
"lowertri",
-               "mapmm", "cpmm", "rmm", "prefetch", "chkpoint"
-               //TODO: Reuse everything. 
+               "prefetch", "mapmm"
+               //TODO: Reuse everything.
        };
 
-       private static final String[] PERSIST_OPCODES = new String[] {
-               "mapmm", "cpmm", "rmm"
+       // Relatively expensive instructions. Most include shuffles.
+       private static final String[] PERSIST_OPCODES1 = new String[] {
+               "cpmm", "rmm", "pmm", "rev", "rshape", "rsort", "+", "-", "*",
+               "/", "%%", "%/%", "1-*", "^", "^2", "*2", "==", "!=", "<", ">",
+               "<=", ">=", "&&", "||", "xor", "max", "min", "rmempty", 
"rappend",
+               "gappend", "galignedappend", "rbind", "cbind", "nmin", "nmax",
+               "n+", "ctable", "ucumack+", "ucumac*", "ucumacmin", "ucumacmax",
+               "qsort", "qpick"
+       };
+
+       // Relatively inexpensive instructions.
+       private static final String[] PERSIST_OPCODES2 = new String[] {
+               "mapmm"
        };
 
        private static String[] REUSE_OPCODES  = new String[] {};
@@ -139,6 +152,8 @@ public class LineageCacheConfig
                RELOADED,  //Reloaded from disk. Can be evicted.
                PINNED,    //Pinned to memory. Cannot be evicted.
                GPUCACHED, //Points to GPU intermediate
+               PERSISTEDRDD, //Persisted at the Spark executors
+               TOPERSISTRDD, //To be persisted if the instruction reoccur
                TOSPILL,   //To be spilled lazily 
                TODELETE;  //TO be removed lazily
                public boolean canEvict() {
@@ -199,7 +214,8 @@ public class LineageCacheConfig
        static {
                //setup static configuration parameters
                REUSE_OPCODES = OPCODES;
-               CHKPOINT_OPCODES = PERSIST_OPCODES;
+               CHKPOINT_OPCODES = 
Stream.concat(Arrays.stream(PERSIST_OPCODES1), Arrays.stream(PERSIST_OPCODES2))
+                       .toArray(String[]::new);
                //setSpill(true);
                setCachePolicy(LineageCachePolicy.COSTNSIZE);
                setCompAssRW(true);
@@ -223,16 +239,17 @@ public class LineageCacheConfig
                        || inst instanceof GPUInstruction
                        || inst instanceof ComputationSPInstruction)
                        && !(inst instanceof ListIndexingCPInstruction);
-               boolean rightop = (ArrayUtils.contains(REUSE_OPCODES, 
inst.getOpcode())
+               boolean rightCPOp = (ArrayUtils.contains(REUSE_OPCODES, 
inst.getOpcode())
                        || (inst.getOpcode().equals("append") && 
isVectorAppend(inst, ec))
                        || (inst.getOpcode().startsWith("spoof"))
                        || (inst instanceof DataGenCPInstruction) && 
((DataGenCPInstruction) inst).isMatrixCall());
+               boolean rightSPOp = isReusableRDDType(inst);
                boolean updateInplace = (inst instanceof 
MatrixIndexingCPInstruction)
                        && 
ec.getMatrixObject(((ComputationCPInstruction)inst).input1).getUpdateType().isInPlace();
                updateInplace = updateInplace || ((inst instanceof 
BinaryMatrixMatrixCPInstruction)
                        && ((BinaryMatrixMatrixCPInstruction) 
inst).isInPlace());
                boolean federatedOutput = false;
-               return insttype && rightop && !updateInplace && 
!federatedOutput;
+               return insttype && (rightCPOp || rightSPOp) && !updateInplace 
&& !federatedOutput;
        }
        
        private static boolean isVectorAppend(Instruction inst, 
ExecutionContext ec) {
@@ -282,6 +299,14 @@ public class LineageCacheConfig
                return insttype && rightOp;
        }
 
+       protected static boolean isShuffleOp(Instruction inst) {
+               return ArrayUtils.contains(PERSIST_OPCODES1, inst.getOpcode());
+       }
+
+       protected static int getComputeGroup(String opcode) {
+               return ArrayUtils.contains(PERSIST_OPCODES1, opcode) ? 2 : 1;
+       }
+
 
        public static boolean isOutputFederated(Instruction inst, Data data) {
                if (!(inst instanceof ComputationFEDInstruction))
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
index f4d3e24982..81898e1f6d 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import jcuda.Pointer;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysds.runtime.instructions.cp.ScalarObject;
 import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
 import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCacheStatus;
@@ -160,6 +161,9 @@ public class LineageCacheEntry {
                        size += _SOval.getSize();
                if (_gpuPointer != null)
                        size += _gpuPointer.getPointerSize();
+               if (_rddObject != null)
+                       //Return total cached size in the executors
+                       size += 
SparkExecutionContext.getMemCachedRDDSize(_rddObject.getRDD().id());
                return size;
        }
        
@@ -175,6 +179,10 @@ public class LineageCacheEntry {
                return _dt.isScalar() && _rddObject == null && _gpuPointer == 
null;
        }
 
+       public boolean isLocalObject() {
+               return isMatrixValue() || isScalarValue();
+       }
+
        public boolean isRDDPersist() {
                return _rddObject != null;
        }
@@ -226,7 +234,8 @@ public class LineageCacheEntry {
        public synchronized void setRDDValue(RDDObject rdd, long computetime) {
                _rddObject = rdd;
                _computeTime = computetime;
-               _status = isNullVal() ? LineageCacheStatus.EMPTY : 
LineageCacheStatus.CACHED;
+               //_status = isNullVal() ? LineageCacheStatus.EMPTY : 
LineageCacheStatus.CACHED;
+               _status = isNullVal() ? LineageCacheStatus.EMPTY : 
LineageCacheStatus.TOPERSISTRDD;
                //resume all threads waiting for val
                notifyAll();
        }
@@ -292,18 +301,41 @@ public class LineageCacheEntry {
 
                // Update score to emulate computeTime scaling by #misses
                if (removeList.containsKey(_key) && 
LineageCacheConfig.isCostNsize()) {
-                       //score = score * (1 + removeList.get(_key));
                        double w1 = LineageCacheConfig.WEIGHTS[0];
                        int missCount = 1 + removeList.get(_key);
-                       score = score + (w1*(((double)_computeTime)/getSize()) 
* missCount);
+                       long size = getSize();
+                       if (isLocalObject())
+                               score = score + 
(w1*(((double)_computeTime)/getSize()) * missCount);
                }
        }
+
+       protected synchronized void initiateScoreSpark(Map<LineageItem, 
Integer> removeList, long estimatedSize) {
+               // Set timestamp
+               _timestamp =  System.currentTimeMillis() - 
LineageCacheEviction.getStartTimestamp();
+               if (_timestamp < 0)
+                       throw new DMLRuntimeException ("Execution timestamp 
shouldn't be -ve. Key: "+_key);
+
+               // Gather the weights for scoring components
+               double w1 = LineageCacheConfig.WEIGHTS[0];
+               double w2 = LineageCacheConfig.WEIGHTS[1];
+               double w3 = LineageCacheConfig.WEIGHTS[2];
+               // Generate initial score
+               int computeGroup = 
LineageCacheConfig.getComputeGroup(_key.getOpcode());
+               int refCount = Math.max(_rddObject.getMaxReferenceCount(), 1);
+               score = w1*(((double)computeGroup*refCount)/estimatedSize) + 
w2*getTimestamp() + w3*(((double)1)/getDagHeight());
+       }
        
-       protected synchronized void updateScore() {
+       protected synchronized void updateScore(boolean add) {
                // Update score to emulate computeTime scaling by cache hit
-               //score *= 2;
                double w1 = LineageCacheConfig.WEIGHTS[0];
-               score = score + w1*(((double)_computeTime)/getSize());
+               long size = getSize();
+               int sign = add ? 1: -1;
+                if(isLocalObject())
+                        score = score + sign * w1 * (((double) _computeTime) / 
size);
+                if(isRDDPersist() && size != 0) {  //size == 0 means not 
persisted yet
+                        int computeGroup = 
LineageCacheConfig.getComputeGroup(_key.getOpcode());
+                        score = score + sign * w1 * (((double) computeGroup) / 
size);
+                }
        }
        
        protected synchronized long getTimestamp() {
@@ -324,7 +356,14 @@ public class LineageCacheEntry {
                double w2 = LineageCacheConfig.WEIGHTS[1];
                double w3 = LineageCacheConfig.WEIGHTS[2];
                // Generate scores
-               score = w1*(((double)_computeTime)/getSize()) + 
w2*getTimestamp() + w3*(((double)1)/getDagHeight());
+               long size = getSize();
+               if (isLocalObject())
+                       score = w1*(((double)_computeTime)/size) + 
w2*getTimestamp() + w3*(((double)1)/getDagHeight());
+               if (isRDDPersist() && size != 0) {  //size == 0 means not 
persisted yet
+                       int computeGroup = 
LineageCacheConfig.getComputeGroup(_key.getOpcode());
+                       int refCount = 
Math.max(_rddObject.getMaxReferenceCount(), 1);
+                       score = w1*(((double)computeGroup*refCount)/size) + 
w2*getTimestamp() + w3*(((double)1)/getDagHeight());
+               }
        }
 
        static class GPUPointer {
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
index 267440d44a..9d642c8afe 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
@@ -90,7 +90,7 @@ public class LineageCacheEviction
                // FIXME: avoid when called from partial reuse methods
                if (LineageCacheConfig.isCostNsize()) {
                        if (weightedQueue.remove(entry)) {
-                               entry.updateScore();
+                               entry.updateScore(true);
                                weightedQueue.add(entry);
                        }
                }
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
index 182e04cfb6..c7bbd6a00d 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
@@ -52,6 +52,8 @@ public class LineageCacheStatistics {
        private static final LongAdder _numHitsRdd      = new LongAdder();
        private static final LongAdder _numHitsSparkActions = new LongAdder();
        private static final LongAdder _numHitsRddPersist   = new LongAdder();
+       private static final LongAdder _numRddPersist   = new LongAdder();
+       private static final LongAdder _numRddUnpersist   = new LongAdder();
 
        public static void reset() {
                _numHitsMem.reset();
@@ -77,6 +79,8 @@ public class LineageCacheStatistics {
                _numHitsRdd.reset();
                _numHitsSparkActions.reset();
                _numHitsRddPersist.reset();
+               _numRddPersist.reset();
+               _numRddUnpersist.reset();
        }
        
        public static void incrementMemHits() {
@@ -241,6 +245,16 @@ public class LineageCacheStatistics {
                _numHitsRddPersist.increment();
        }
 
+       public static void incrementRDDPersists() {
+               // Number of RDDs marked for persistence
+               _numRddPersist.increment();
+       }
+
+       public static void incrementRDDUnpersists() {
+               // Number of RDDs unpersisted due the due to memory pressure
+               _numRddUnpersist.increment();
+       }
+
        public static String displayHits() {
                StringBuilder sb = new StringBuilder();
                sb.append(_numHitsMem.longValue());
@@ -316,7 +330,13 @@ public class LineageCacheStatistics {
                return sb.toString();
        }
 
-       public static String displaySparkStats() {
+       public static boolean ifGpuStats() {
+               return (_numHitsGpu.longValue() + _numAsyncEvictGpu.longValue()
+                       + _numSyncEvictGpu.longValue() + 
_numRecycleGpu.longValue()
+                       + _numDelGpu.longValue() + _evtimeGpu.longValue()) != 0;
+       }
+
+       public static String displaySparkHits() {
                StringBuilder sb = new StringBuilder();
                sb.append(_numHitsSparkActions.longValue());
                sb.append("/");
@@ -325,4 +345,17 @@ public class LineageCacheStatistics {
                sb.append(_numHitsRddPersist.longValue());
                return sb.toString();
        }
+
+       public static String displaySparkPersist() {
+               StringBuilder sb = new StringBuilder();
+               sb.append(_numRddPersist.longValue());
+               sb.append("/");
+               sb.append(_numRddUnpersist.longValue());
+               return sb.toString();
+       }
+
+       public static boolean ifSparkStats() {
+               return (_numHitsSparkActions.longValue() + 
_numHitsRdd.longValue()
+               + _numHitsRddPersist.longValue() + 
_numRddUnpersist.longValue()) != 0;
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java
new file mode 100644
index 0000000000..a639b69b63
--- /dev/null
+++ 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.lineage;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCacheStatus;
+
+import java.util.Map;
+import java.util.TreeSet;
+
+public class LineageSparkCacheEviction
+{
+       private static long SPARK_STORAGE_LIMIT = 0; //60% (upper limit of 
Spark unified memory)
+       private static long _sparkStorageSize = 0; //current size
+       private static TreeSet<LineageCacheEntry> weightedQueue = new 
TreeSet<>(LineageCacheConfig.LineageCacheComparator);
+
+       protected static void resetEviction() {
+               _sparkStorageSize = 0;
+               weightedQueue.clear();
+       }
+
+       //--------------- CACHE MAINTENANCE & LOOKUP FUNCTIONS --------------//
+
+       // This method is called at the first cache hit.
+       protected static void addEntry(LineageCacheEntry entry, long 
estimatedSize) {
+               if (entry.isNullVal())
+                       // Placeholders shouldn't participate in eviction 
cycles.
+                       return;
+
+               entry.initiateScoreSpark(LineageCacheEviction._removelist, 
estimatedSize);
+               weightedQueue.add(entry);
+       }
+
+       protected static void maintainOrder(LineageCacheEntry entry) {
+               // Reset the timestamp to maintain the LRU component of the 
scoring function
+               if (LineageCacheConfig.isTimeBased()) {
+                       if (weightedQueue.remove(entry)) {
+                               entry.updateTimestamp();
+                               weightedQueue.add(entry);
+                       }
+               }
+               // Scale score of the sought entry after every cache hit
+               // FIXME: avoid when called from partial reuse methods
+               if (LineageCacheConfig.isCostNsize()) {
+                       // Exists in weighted queue only if already marked for 
persistent
+                       if (weightedQueue.remove(entry)) {
+                               // Score stays same if not persisted (i.e. size 
== 0)
+                               entry.updateScore(true);
+                               weightedQueue.add(entry);
+                       }
+               }
+       }
+
+       protected static void removeSingleEntry(Map<LineageItem, 
LineageCacheEntry> cache, LineageCacheEntry e) {
+               // Keep in cache. Just change the status to be persisted on the 
next hit
+               e.setCacheStatus(LineageCacheStatus.TOPERSISTRDD);
+               // Mark for lazy unpersisting
+               JavaPairRDD<?,?> rdd = e.getRDDObject().getRDD();
+               rdd.unpersist(false);
+               // Maintain the current size
+               _sparkStorageSize -= e.getSize();
+               // Maintain miss count to increase the score if the item enters 
the cache again
+               LineageCacheEviction._removelist.merge(e._key, 1, Integer::sum);
+
+               if (DMLScript.STATISTICS)
+                       LineageCacheStatistics.incrementRDDUnpersists();
+               // NOTE: The caller of this method maintains the eviction queue.
+       }
+
+       private static void removeEntry(Map<LineageItem, LineageCacheEntry> 
cache, LineageCacheEntry e) {
+               if (e._origItem == null) {
+                       // Single entry. Remove.
+                       removeSingleEntry(cache, e);
+                       return;
+               }
+
+               // Defer the eviction till all the entries with the same 
intermediate are evicted.
+               e.setCacheStatus(LineageCacheStatus.TODELETE);
+
+               boolean del = false;
+               LineageCacheEntry tmp = cache.get(e._origItem);
+               while (tmp != null) {
+                       if (tmp.getCacheStatus() != LineageCacheStatus.TODELETE)
+                               return; //do nothing
+                       del |= (tmp.getCacheStatus() == 
LineageCacheStatus.TODELETE);
+                       tmp = tmp._nextEntry;
+               }
+               if (del) {
+                       tmp = cache.get(e._origItem);
+                       while (tmp != null) {
+                               removeSingleEntry(cache, tmp);
+                               tmp = tmp._nextEntry;
+                       }
+               }
+       }
+
+       //---------------- CACHE SPACE MANAGEMENT METHODS -----------------//
+
+       private static void setSparkStorageLimit() {
+               // Set the limit only during the first RDD caching to avoid 
context creation
+               if (SPARK_STORAGE_LIMIT == 0)
+                       SPARK_STORAGE_LIMIT = (long) 
SparkExecutionContext.getDataMemoryBudget(false, true); //FIXME
+       }
+
+       protected static double getSparkStorageLimit() {
+               if (SPARK_STORAGE_LIMIT == 0)
+                       setSparkStorageLimit();
+               return SPARK_STORAGE_LIMIT;
+       }
+
+       protected static void updateSize(long space, boolean addspace) {
+               _sparkStorageSize += addspace ? space : -space;
+               // NOTE: this doesn't represent the true size as we maintain 
total size based on estimations
+       }
+
+       protected static boolean isBelowThreshold(long estimateSize) {
+               boolean available = (estimateSize + _sparkStorageSize) <= 
getSparkStorageLimit();
+               if (!available)
+                       // Get exact storage used (including checkpoints from 
outside of lineage)
+                       _sparkStorageSize = 
SparkExecutionContext.getStorageSpaceUsed();
+
+               return  (estimateSize + _sparkStorageSize) <= 
getSparkStorageLimit();
+       }
+
+       protected static void makeSpace(Map<LineageItem, LineageCacheEntry> 
cache, long estimatedSize) {
+               // Cost-based eviction
+               while ((estimatedSize + _sparkStorageSize) > 
getSparkStorageLimit()) {
+                       LineageCacheEntry e = weightedQueue.pollFirst();
+                       if (e == null)
+                               // Nothing to evict.
+                               break;
+
+                       removeEntry(cache, e);
+               }
+       }
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java 
b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
index 9d79f7f971..f016235231 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
@@ -2564,6 +2564,10 @@ public class MatrixBlock extends MatrixValue implements 
CacheBlock<MatrixBlock>,
                        return estimateSizeDenseInMemory(nrows, ncols);
        }
 
+       public static long estimateSizeInMemory(DataCharacteristics dc) {
+               return estimateSizeInMemory(dc.getRows(), dc.getCols(), 
dc.getSparsity());
+       }
+
        public long estimateSizeDenseInMemory() {
                return estimateSizeDenseInMemory(rlen, clen);
        }
diff --git a/src/main/java/org/apache/sysds/utils/Statistics.java 
b/src/main/java/org/apache/sysds/utils/Statistics.java
index 02d72fad08..9d931f3ce3 100644
--- a/src/main/java/org/apache/sysds/utils/Statistics.java
+++ b/src/main/java/org/apache/sysds/utils/Statistics.java
@@ -638,10 +638,15 @@ public class Statistics
                        if (DMLScript.LINEAGE && !ReuseCacheType.isNone()) {
                                sb.append("LinCache hits (Mem/FS/Del): \t" + 
LineageCacheStatistics.displayHits() + ".\n");
                                sb.append("LinCache MultiLevel (Ins/SB/Fn):" + 
LineageCacheStatistics.displayMultiLevelHits() + ".\n");
-                               sb.append("LinCache GPU (Hit/Async/Sync): \t" + 
LineageCacheStatistics.displayGpuStats() + ".\n");
-                               sb.append("LinCache GPU (Recyc/Del): \t" + 
LineageCacheStatistics.displayGpuPointerStats() + ".\n");
-                               sb.append("LinCache GPU evict time: \t" + 
LineageCacheStatistics.displayGpuEvictTime() + " sec.\n");
-                               sb.append("LinCache Spark (Col/Loc/Dist): \t" + 
LineageCacheStatistics.displaySparkStats() + ".\n");
+                               if (LineageCacheStatistics.ifGpuStats()) {
+                                       sb.append("LinCache GPU 
(Hit/Async/Sync): \t" + LineageCacheStatistics.displayGpuStats() + ".\n");
+                                       sb.append("LinCache GPU (Recyc/Del): 
\t" + LineageCacheStatistics.displayGpuPointerStats() + ".\n");
+                                       sb.append("LinCache GPU evict time: \t" 
+ LineageCacheStatistics.displayGpuEvictTime() + " sec.\n");
+                               }
+                               if (LineageCacheStatistics.ifSparkStats()) {
+                                       sb.append("LinCache Spark 
(Col/Loc/Dist): \t" + LineageCacheStatistics.displaySparkHits() + ".\n");
+                                       sb.append("LinCache Spark (Per/Unper): 
\t" + LineageCacheStatistics.displaySparkPersist() + ".\n");
+                               }
                                sb.append("LinCache writes (Mem/FS/Del): \t" + 
LineageCacheStatistics.displayWtrites() + ".\n");
                                sb.append("LinCache FStimes (Rd/Wr): \t" + 
LineageCacheStatistics.displayFSTime() + " sec.\n");
                                sb.append("LinCache Computetime (S/M): \t" + 
LineageCacheStatistics.displayComputeTime() + " sec.\n");
diff --git 
a/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
 
b/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
index ad131069c4..68db0caaba 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
@@ -74,8 +74,6 @@ public class LineageReuseSparkTest extends AutomatedTestBase {
        }
 
        public void runTest(String testname, ExecMode execMode, int testId) {
-               setOutputBuffering(true);
-               
                boolean old_simplification = 
OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
                boolean old_sum_product = 
OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES;
                boolean old_trans_exec_type = 
OptimizerUtils.ALLOW_TRANSITIVE_SPARK_EXEC_TYPE;


Reply via email to