This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 14e2995409 [SYSTEMDS-3518] Periodically update actual Spark storage
used
14e2995409 is described below
commit 14e2995409dfb65aca49cee9e56f096cd6b0fb7f
Author: Arnab Phani <[email protected]>
AuthorDate: Sat Mar 30 12:21:02 2024 +0100
[SYSTEMDS-3518] Periodically update actual Spark storage used
The estimated total Spark storage memory used for caching is
often far from actual used due to compiler-placed checkpoints.
This patch enables periodic update of the _sparkStorageSize
metadata during the cleanup of child RDDs and broadcasts.
---
.../java/org/apache/sysds/runtime/lineage/LineageCache.java | 1 +
.../sysds/runtime/lineage/LineageSparkCacheEviction.java | 11 ++++++++++-
2 files changed, 11 insertions(+), 1 deletion(-)
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index e5fc9be938..c084938d0f 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -1214,6 +1214,7 @@ public class LineageCache
// Mark for distributed caching and change status
persistRDDIntern(centry, estimatedSize);
centry.setCacheStatus(LineageCacheStatus.PERSISTEDRDD);
+ //centry.getRDDObject().getRDD().count(); //eager caching
(experimental)
return false;
}
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java
index e114fee29e..cb778a67e4 100644
---
a/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java
+++
b/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java
@@ -138,11 +138,18 @@ public class LineageSparkCacheEviction
return SPARK_STORAGE_LIMIT;
}
+ // NOTE: _sparkStorageSize doesn't represent the true size as we
maintain total size based on estimations
protected static void updateSize(long space, boolean addspace) {
_sparkStorageSize += addspace ? space : -space;
- // NOTE: this doesn't represent the true size as we maintain
total size based on estimations
+
+ // Debug print: actual vs estimated size
+ //System.out.println("Storage space used =
"+(SparkExecutionContext.getStorageSpaceUsed()/1024)/1024+" MB, "
+ // +"Estimated used = "+(_sparkStorageSize/1024)/1024 +"
MB, "+"Limit = "+(getSparkStorageLimit()/1024)/1024+" MB");
}
+ // FIXME: Actual memory usage is often way more than the estimated,
+ // mostly due to not tracking the compiler-placed checkpoints.
+ // Always use actual as getStorageSpaceUsed() is not expensive
(verify).
protected static boolean isBelowThreshold(long estimateSize) {
boolean available = (estimateSize + _sparkStorageSize) <=
getSparkStorageLimit();
if (!available)
@@ -175,6 +182,8 @@ public class LineageSparkCacheEviction
}
// Also detach the child RDDs to be GCed
e.getRDDObject().removeAllChild();
+ // Update actual storage used (including
compiler-placed checkpoints)
+ _sparkStorageSize =
SparkExecutionContext.getStorageSpaceUsed();
}
// TODO: Cleanup the child RDDs of the persisted RDDs
// which are never reused after the second hit.