This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new c842072446 [MINOR] Bug fixes
c842072446 is described below

commit c842072446386853f5688f04569a8a119e4c2ae8
Author: Arnab Phani <[email protected]>
AuthorDate: Mon Dec 25 23:32:46 2023 +0100

    [MINOR] Bug fixes
---
 .../apache/sysds/runtime/lineage/LineageCache.java | 70 ++++++++++------------
 .../sysds/runtime/lineage/LineageCacheConfig.java  |  5 ++
 .../runtime/lineage/LineageCacheEviction.java      |  9 +++
 .../runtime/lineage/LineageCacheStatistics.java    | 11 +++-
 .../runtime/lineage/LineageSparkCacheEviction.java |  1 +
 .../java/org/apache/sysds/utils/Statistics.java    |  2 +-
 6 files changed, 57 insertions(+), 41 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index e2dc6f6b0e..6678e338b1 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -161,27 +161,23 @@ public class LineageCache
                                                        return false;  //the 
executing thread removed this entry from cache
 
                                                //Reuse the cached RDD (local 
or persisted at the executors)
-                                               switch(e.getCacheStatus()) {
-                                                       case TOPERSISTRDD:
-                                                               //Change status 
to PERSISTEDRDD on the second hit
-                                                               //putValueRDD 
method will save the RDD and call persist
-                                                               
e.setCacheStatus(LineageCacheStatus.PERSISTEDRDD);
-                                                               //Cannot reuse 
rdd as already garbage collected
-                                                               
ec.replaceLineageItem(outName, e._key); //still reuse the lineage trace
-                                                               return false;
-                                                       case PERSISTEDRDD:
-                                                               //Reuse the 
persisted intermediate at the executors
-                                                               
((SparkExecutionContext) ec).setRDDHandleForVariable(outName, rdd);
-                                                               //Safely 
cleanup the child RDDs if this RDD is persisted already
-                                                               //If reused 3 
times and still not persisted, move to Spark asynchronously
-                                                               if 
(probeRDDDistributed(e))
-                                                                       
LineageSparkCacheEviction.cleanupChildRDDs(e);
-                                                               else
-                                                                       
LineageSparkCacheEviction.moveToSpark(e);
-                                                               break;
-                                                       default:
-                                                               return false;
+                                               if (e.getCacheStatus() == 
LineageCacheStatus.TOPERSISTRDD) {  //second hit
+                                                       //Cannot reuse rdd as 
already garbage collected
+                                                       //putValueRDD will save 
the RDD and call persist
+                                                       if 
(DMLScript.STATISTICS) LineageCacheStatistics.incrementDelHitsRdd();
+                                                       
ec.replaceLineageItem(outName, e._key); //still reuse the lineage trace
+                                                       return false;
                                                }
+                                               //Reuse from third hit onwards 
(status == PERSISTEDRDD)
+                                               ((SparkExecutionContext) 
ec).setRDDHandleForVariable(outName, rdd);
+                                               //Set the cached data 
characteristics to the output matrix object
+                                               
ec.getMatrixObject(outName).updateDataCharacteristics(rdd.getDataCharacteristics());
+                                               //Safely cleanup the child RDDs 
if this RDD is persisted already
+                                               //If reused 3 times and still 
not persisted, move to Spark asynchronously
+                                               if (probeRDDDistributed(e))
+                                                       
LineageSparkCacheEviction.cleanupChildRDDs(e);
+                                               else
+                                                       
LineageSparkCacheEviction.moveToSpark(e);
                                        }
                                        else { //TODO handle locks on gpu 
objects
                                                Pointer gpuPtr = 
e.getGPUPointer();
@@ -288,8 +284,10 @@ public class LineageCache
                                        RDDObject rdd = e.getRDDObject();
                                        if (rdd == null && e.getCacheStatus() 
== LineageCacheStatus.NOTCACHED)
                                                return false;  //the executing 
thread removed this entry from cache
+                                       //Set the data characteristics and hdfs 
file to the output matrix object
                                        MetaDataFormat md = new 
MetaDataFormat(rdd.getDataCharacteristics(),FileFormat.BINARY);
-                                       boundValue = new 
MatrixObject(ValueType.FP64, boundVarName, md);
+                                       String filename = rdd.getHDFSFilename() 
!= null ? rdd.getHDFSFilename() : boundVarName;
+                                       boundValue = new 
MatrixObject(ValueType.FP64, filename, md);
                                        ((MatrixObject) 
boundValue).setRDDHandle(rdd);
                                }
                                else if (e.isScalarValue()) {
@@ -343,6 +341,7 @@ public class LineageCache
                                                        //Cannot reuse rdd as 
already garbage collected
                                                        //putValue method will 
save the RDD and call persist
                                                        //while caching the 
original instruction
+                                                       if 
(DMLScript.STATISTICS) LineageCacheStatistics.incrementDelHitsRdd(); //increase 
miss count
                                                        return false;
                                                case PERSISTEDRDD:
                                                        //Reuse the persisted 
intermediate at the executors
@@ -800,10 +799,6 @@ public class LineageCache
                        if (!probe(instLI))
                                return;
                        LineageCacheEntry centry = _cache.get(instLI);
-                       // Remember the 1st hit and put the RDD in the cache 
the 2nd time
-                       if (centry.getCacheStatus() != LineageCacheStatus.EMPTY 
           //first hit
-                               && centry.getCacheStatus() != 
LineageCacheStatus.PERSISTEDRDD) //second hit
-                               return;
                        // Avoid reuse chkpoint, which is unnecessary
                        if (inst.getOpcode().equalsIgnoreCase("chkpoint")) {
                                removePlaceholder(instLI);
@@ -816,27 +811,24 @@ public class LineageCache
                                return;
                        }
 
-                       // Filter out Spark instructions with broadcast input
-                       // TODO: This code avoids one crash. Remove once fixed.
-                       if (!opToPersist && !allInputsSpark(inst, ec)) {
-                               removePlaceholder(instLI);
-                               return;
-                       }
-
                        // Get the RDD handle of the RDD
                        CacheableData<?> cd = 
ec.getCacheableData(((ComputationSPInstruction)inst).output.getName());
                        RDDObject rddObj = cd.getRDDHandle();
-                       // Save the metadata. Required for estimating cached 
space overhead.
+                       // Save the metadata and hdfs filename. Required during 
reuse and space management.
                        
rddObj.setDataCharacteristics(cd.getDataCharacteristics());
+                       rddObj.setHDFSFilename(cd.getFileName());
                        // Set the RDD object in the cache
                        switch(centry.getCacheStatus()) {
                                case EMPTY:  //first hit
-                                       // Do not save the child RDDS (incl. 
broadcast vars) on the first hit.
-                                       // Let them be garbage collected via 
rmvar. Save them on the second hit
-                                       // by disabling garbage collection on 
this and the child RDDs.
-                                       centry.setRDDValue(rddObj, 
computetime); //rddObj will be garbage collected
-                                       break;
-                               case PERSISTEDRDD:  //second hit
+                                       // Cache right away if delayed caching 
is disabled
+                                       if 
(LineageCacheConfig.isDelayedCachingRDD()) {
+                                               // Do not save the child RDDS 
(incl. broadcast vars) on the first hit.
+                                               // Let them be garbage 
collected via rmvar. Save them on the second hit
+                                               // by disabling garbage 
collection on this and the child RDDs.
+                                               centry.setRDDValue(rddObj, 
computetime); //rddObj will be garbage collected
+                                               break;
+                                       } //else, fall through and cache
+                               case TOPERSISTRDD:  //second hit
                                        // Replace the old RDD (GCed) with the 
new one
                                        centry.setRDDValue(rddObj);
                                        // Set the correct status to indicate 
the RDD is marked to be persisted
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index 04f41e5ce5..e64b519c42 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -122,6 +122,7 @@ public class LineageCacheConfig
        // Note, delayed caching helps in reducing lineage caching/probing 
overhead for use cases with
        // no reusable instructions, but is anti-productive for use cases with 
repeating patterns (eg. scoring).
        private static boolean DELAYED_CACHING_GPU = true;
+       private static boolean DELAYED_CACHING_RDD = true;
 
        //-------------DISK SPILLING RELATED CONFIGURATIONS--------------//
 
@@ -409,6 +410,10 @@ public class LineageCacheConfig
                return DELAYED_CACHING_GPU;
        }
 
+       public static boolean isDelayedCachingRDD() {
+               return DELAYED_CACHING_RDD;
+       }
+
        public static void setCachePolicy(LineageCachePolicy policy) {
                // TODO: Automatic tuning of weights.
                switch(policy) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
index 511ad93ca5..4c65280b16 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
@@ -210,6 +210,15 @@ public class LineageCacheEviction
                        _cachesize -= space;
        }
 
+       public static void removeAll(Map<LineageItem, LineageCacheEntry> cache) 
{
+               while (!weightedQueue.isEmpty()) {
+                       LineageCacheEntry e = weightedQueue.pollFirst();
+                       if (e == null)
+                               continue;
+                       removeOrSpillEntry(cache, e, false);
+               }
+       }
+
        protected static boolean isBelowThreshold(long spaceNeeded) {
                return ((spaceNeeded + _cachesize) <= CACHE_LIMIT);
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
index ffc7e5eeff..6f214e86a9 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
@@ -56,6 +56,7 @@ public class LineageCacheStatistics {
        private static final LongAdder _numHitsRddPersist   = new LongAdder();
        private static final LongAdder _numRddPersist   = new LongAdder();
        private static final LongAdder _numRddUnpersist   = new LongAdder();
+       private static final LongAdder _numHitsDelRdd   = new LongAdder();
 
        public static void reset() {
                _numHitsMem.reset();
@@ -85,6 +86,7 @@ public class LineageCacheStatistics {
                _numHitsRddPersist.reset();
                _numRddPersist.reset();
                _numRddUnpersist.reset();
+               _numHitsDelRdd.reset();
        }
        
        public static void incrementMemHits() {
@@ -233,7 +235,7 @@ public class LineageCacheStatistics {
        }
 
        public static void incrementDelHitsGpu() {
-               // Number of hits on pointers that are deleted/recycled before
+               // Number of hits on pointers that are delayed for caching or 
deleted/recycled before
                _numHitsDelGpu.increment();
        }
 
@@ -268,6 +270,11 @@ public class LineageCacheStatistics {
                _numRddUnpersist.increment();
        }
 
+       public static void incrementDelHitsRdd() {
+               // Number of hits on RDDs that are delayed for caching or 
evicted
+               _numHitsDelRdd.increment();
+       }
+
        public static String displayHits() {
                StringBuilder sb = new StringBuilder();
                sb.append(_numHitsMem.longValue());
@@ -366,6 +373,8 @@ public class LineageCacheStatistics {
                sb.append(_numRddPersist.longValue());
                sb.append("/");
                sb.append(_numRddUnpersist.longValue());
+               sb.append("/");
+               sb.append(_numHitsDelRdd.longValue());
                return sb.toString();
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java
index e00ff0b84b..e114fee29e 100644
--- 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java
@@ -125,6 +125,7 @@ public class LineageSparkCacheEviction
        private static void setSparkStorageLimit() {
                // Set the limit only during the first RDD caching to avoid 
context creation
                // Cache size = 70% of unified Spark memory = 0.7 * 0.6 = 42%.
+               // TODO: Reduce to avoid disk spilling. 80% of storage.
                if (SPARK_STORAGE_LIMIT == 0) {
                        long unifiedSparkMem = (long) 
SparkExecutionContext.getDataMemoryBudget(false, true);
                        SPARK_STORAGE_LIMIT = (long)(unifiedSparkMem * 0.7d);
diff --git a/src/main/java/org/apache/sysds/utils/Statistics.java 
b/src/main/java/org/apache/sysds/utils/Statistics.java
index 6547be2cd6..f22c5415c1 100644
--- a/src/main/java/org/apache/sysds/utils/Statistics.java
+++ b/src/main/java/org/apache/sysds/utils/Statistics.java
@@ -645,7 +645,7 @@ public class Statistics
                                }
                                if (LineageCacheStatistics.ifSparkStats()) {
                                        sb.append("LinCache Spark 
(Col/Loc/Dist): \t" + LineageCacheStatistics.displaySparkHits() + ".\n");
-                                       sb.append("LinCache Spark (Per/Unper): 
\t" + LineageCacheStatistics.displaySparkPersist() + ".\n");
+                                       sb.append("LinCache Spark 
(Per/Unper/Del):\t" + LineageCacheStatistics.displaySparkPersist() + ".\n");
                                }
                                sb.append("LinCache writes (Mem/FS/Del): \t" + 
LineageCacheStatistics.displayWtrites() + ".\n");
                                sb.append("LinCache FStimes (Rd/Wr): \t" + 
LineageCacheStatistics.displayFSTime() + " sec.\n");

Reply via email to