This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new c842072446 [MINOR] Bug fixes
c842072446 is described below
commit c842072446386853f5688f04569a8a119e4c2ae8
Author: Arnab Phani <[email protected]>
AuthorDate: Mon Dec 25 23:32:46 2023 +0100
[MINOR] Bug fixes
---
.../apache/sysds/runtime/lineage/LineageCache.java | 70 ++++++++++------------
.../sysds/runtime/lineage/LineageCacheConfig.java | 5 ++
.../runtime/lineage/LineageCacheEviction.java | 9 +++
.../runtime/lineage/LineageCacheStatistics.java | 11 +++-
.../runtime/lineage/LineageSparkCacheEviction.java | 1 +
.../java/org/apache/sysds/utils/Statistics.java | 2 +-
6 files changed, 57 insertions(+), 41 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index e2dc6f6b0e..6678e338b1 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -161,27 +161,23 @@ public class LineageCache
return false; //the
executing thread removed this entry from cache
//Reuse the cached RDD (local
or persisted at the executors)
- switch(e.getCacheStatus()) {
- case TOPERSISTRDD:
- //Change status
to PERSISTEDRDD on the second hit
- //putValueRDD
method will save the RDD and call persist
-
e.setCacheStatus(LineageCacheStatus.PERSISTEDRDD);
- //Cannot reuse
rdd as already garbage collected
-
ec.replaceLineageItem(outName, e._key); //still reuse the lineage trace
- return false;
- case PERSISTEDRDD:
- //Reuse the
persisted intermediate at the executors
-
((SparkExecutionContext) ec).setRDDHandleForVariable(outName, rdd);
- //Safely
cleanup the child RDDs if this RDD is persisted already
- //If reused 3
times and still not persisted, move to Spark asynchronously
- if
(probeRDDDistributed(e))
-
LineageSparkCacheEviction.cleanupChildRDDs(e);
- else
-
LineageSparkCacheEviction.moveToSpark(e);
- break;
- default:
- return false;
+ if (e.getCacheStatus() ==
LineageCacheStatus.TOPERSISTRDD) { //second hit
+ //Cannot reuse rdd as
already garbage collected
+ //putValueRDD will save
the RDD and call persist
+ if
(DMLScript.STATISTICS) LineageCacheStatistics.incrementDelHitsRdd();
+
ec.replaceLineageItem(outName, e._key); //still reuse the lineage trace
+ return false;
}
+ //Reuse from third hit onwards
(status == PERSISTEDRDD)
+ ((SparkExecutionContext)
ec).setRDDHandleForVariable(outName, rdd);
+ //Set the cached data
characteristics to the output matrix object
+
ec.getMatrixObject(outName).updateDataCharacteristics(rdd.getDataCharacteristics());
+ //Safely cleanup the child RDDs
if this RDD is persisted already
+ //If reused 3 times and still
not persisted, move to Spark asynchronously
+ if (probeRDDDistributed(e))
+
LineageSparkCacheEviction.cleanupChildRDDs(e);
+ else
+
LineageSparkCacheEviction.moveToSpark(e);
}
else { //TODO handle locks on gpu
objects
Pointer gpuPtr =
e.getGPUPointer();
@@ -288,8 +284,10 @@ public class LineageCache
RDDObject rdd = e.getRDDObject();
if (rdd == null && e.getCacheStatus()
== LineageCacheStatus.NOTCACHED)
return false; //the executing
thread removed this entry from cache
+ //Set the data characteristics and hdfs
file to the output matrix object
MetaDataFormat md = new
MetaDataFormat(rdd.getDataCharacteristics(),FileFormat.BINARY);
- boundValue = new
MatrixObject(ValueType.FP64, boundVarName, md);
+ String filename = rdd.getHDFSFilename()
!= null ? rdd.getHDFSFilename() : boundVarName;
+ boundValue = new
MatrixObject(ValueType.FP64, filename, md);
((MatrixObject)
boundValue).setRDDHandle(rdd);
}
else if (e.isScalarValue()) {
@@ -343,6 +341,7 @@ public class LineageCache
//Cannot reuse rdd as
already garbage collected
//putValue method will
save the RDD and call persist
//while caching the
original instruction
+ if
(DMLScript.STATISTICS) LineageCacheStatistics.incrementDelHitsRdd(); //increase
miss count
return false;
case PERSISTEDRDD:
//Reuse the persisted
intermediate at the executors
@@ -800,10 +799,6 @@ public class LineageCache
if (!probe(instLI))
return;
LineageCacheEntry centry = _cache.get(instLI);
- // Remember the 1st hit and put the RDD in the cache
the 2nd time
- if (centry.getCacheStatus() != LineageCacheStatus.EMPTY
//first hit
- && centry.getCacheStatus() !=
LineageCacheStatus.PERSISTEDRDD) //second hit
- return;
// Avoid reuse chkpoint, which is unnecessary
if (inst.getOpcode().equalsIgnoreCase("chkpoint")) {
removePlaceholder(instLI);
@@ -816,27 +811,24 @@ public class LineageCache
return;
}
- // Filter out Spark instructions with broadcast input
- // TODO: This code avoids one crash. Remove once fixed.
- if (!opToPersist && !allInputsSpark(inst, ec)) {
- removePlaceholder(instLI);
- return;
- }
-
// Get the RDD handle of the RDD
CacheableData<?> cd =
ec.getCacheableData(((ComputationSPInstruction)inst).output.getName());
RDDObject rddObj = cd.getRDDHandle();
- // Save the metadata. Required for estimating cached
space overhead.
+ // Save the metadata and hdfs filename. Required during
reuse and space management.
rddObj.setDataCharacteristics(cd.getDataCharacteristics());
+ rddObj.setHDFSFilename(cd.getFileName());
// Set the RDD object in the cache
switch(centry.getCacheStatus()) {
case EMPTY: //first hit
- // Do not save the child RDDS (incl.
broadcast vars) on the first hit.
- // Let them be garbage collected via
rmvar. Save them on the second hit
- // by disabling garbage collection on
this and the child RDDs.
- centry.setRDDValue(rddObj,
computetime); //rddObj will be garbage collected
- break;
- case PERSISTEDRDD: //second hit
+ // Cache right away if delayed caching
is disabled
+ if
(LineageCacheConfig.isDelayedCachingRDD()) {
+ // Do not save the child RDDS
(incl. broadcast vars) on the first hit.
+ // Let them be garbage
collected via rmvar. Save them on the second hit
+ // by disabling garbage
collection on this and the child RDDs.
+ centry.setRDDValue(rddObj,
computetime); //rddObj will be garbage collected
+ break;
+ } //else, fall through and cache
+ case TOPERSISTRDD: //second hit
// Replace the old RDD (GCed) with the
new one
centry.setRDDValue(rddObj);
// Set the correct status to indicate
the RDD is marked to be persisted
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index 04f41e5ce5..e64b519c42 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -122,6 +122,7 @@ public class LineageCacheConfig
// Note, delayed caching helps in reducing lineage caching/probing
overhead for use cases with
// no reusable instructions, but is anti-productive for use cases with
repeating patterns (eg. scoring).
private static boolean DELAYED_CACHING_GPU = true;
+ private static boolean DELAYED_CACHING_RDD = true;
//-------------DISK SPILLING RELATED CONFIGURATIONS--------------//
@@ -409,6 +410,10 @@ public class LineageCacheConfig
return DELAYED_CACHING_GPU;
}
+ public static boolean isDelayedCachingRDD() {
+ return DELAYED_CACHING_RDD;
+ }
+
public static void setCachePolicy(LineageCachePolicy policy) {
// TODO: Automatic tuning of weights.
switch(policy) {
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
index 511ad93ca5..4c65280b16 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
@@ -210,6 +210,15 @@ public class LineageCacheEviction
_cachesize -= space;
}
+ public static void removeAll(Map<LineageItem, LineageCacheEntry> cache)
{
+ while (!weightedQueue.isEmpty()) {
+ LineageCacheEntry e = weightedQueue.pollFirst();
+ if (e == null)
+ continue;
+ removeOrSpillEntry(cache, e, false);
+ }
+ }
+
protected static boolean isBelowThreshold(long spaceNeeded) {
return ((spaceNeeded + _cachesize) <= CACHE_LIMIT);
}
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
index ffc7e5eeff..6f214e86a9 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
@@ -56,6 +56,7 @@ public class LineageCacheStatistics {
private static final LongAdder _numHitsRddPersist = new LongAdder();
private static final LongAdder _numRddPersist = new LongAdder();
private static final LongAdder _numRddUnpersist = new LongAdder();
+ private static final LongAdder _numHitsDelRdd = new LongAdder();
public static void reset() {
_numHitsMem.reset();
@@ -85,6 +86,7 @@ public class LineageCacheStatistics {
_numHitsRddPersist.reset();
_numRddPersist.reset();
_numRddUnpersist.reset();
+ _numHitsDelRdd.reset();
}
public static void incrementMemHits() {
@@ -233,7 +235,7 @@ public class LineageCacheStatistics {
}
public static void incrementDelHitsGpu() {
- // Number of hits on pointers that are deleted/recycled before
+ // Number of hits on pointers that are delayed for caching or
deleted/recycled before
_numHitsDelGpu.increment();
}
@@ -268,6 +270,11 @@ public class LineageCacheStatistics {
_numRddUnpersist.increment();
}
+ public static void incrementDelHitsRdd() {
+ // Number of hits on RDDs that are delayed for caching or
evicted
+ _numHitsDelRdd.increment();
+ }
+
public static String displayHits() {
StringBuilder sb = new StringBuilder();
sb.append(_numHitsMem.longValue());
@@ -366,6 +373,8 @@ public class LineageCacheStatistics {
sb.append(_numRddPersist.longValue());
sb.append("/");
sb.append(_numRddUnpersist.longValue());
+ sb.append("/");
+ sb.append(_numHitsDelRdd.longValue());
return sb.toString();
}
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java
index e00ff0b84b..e114fee29e 100644
---
a/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java
+++
b/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java
@@ -125,6 +125,7 @@ public class LineageSparkCacheEviction
private static void setSparkStorageLimit() {
// Set the limit only during the first RDD caching to avoid
context creation
// Cache size = 70% of unified Spark memory = 0.7 * 0.6 = 42%.
+ // TODO: Reduce to avoid disk spilling. 80% of storage.
if (SPARK_STORAGE_LIMIT == 0) {
long unifiedSparkMem = (long)
SparkExecutionContext.getDataMemoryBudget(false, true);
SPARK_STORAGE_LIMIT = (long)(unifiedSparkMem * 0.7d);
diff --git a/src/main/java/org/apache/sysds/utils/Statistics.java
b/src/main/java/org/apache/sysds/utils/Statistics.java
index 6547be2cd6..f22c5415c1 100644
--- a/src/main/java/org/apache/sysds/utils/Statistics.java
+++ b/src/main/java/org/apache/sysds/utils/Statistics.java
@@ -645,7 +645,7 @@ public class Statistics
}
if (LineageCacheStatistics.ifSparkStats()) {
sb.append("LinCache Spark
(Col/Loc/Dist): \t" + LineageCacheStatistics.displaySparkHits() + ".\n");
- sb.append("LinCache Spark (Per/Unper):
\t" + LineageCacheStatistics.displaySparkPersist() + ".\n");
+ sb.append("LinCache Spark
(Per/Unper/Del):\t" + LineageCacheStatistics.displaySparkPersist() + ".\n");
}
sb.append("LinCache writes (Mem/FS/Del): \t" +
LineageCacheStatistics.displayWtrites() + ".\n");
sb.append("LinCache FStimes (Rd/Wr): \t" +
LineageCacheStatistics.displayFSTime() + " sec.\n");