This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new e2414e0 [MINOR] Minior fixes in lineage cache eviction.
e2414e0 is described below
commit e2414e039d64df6e0d547e4902a9c9770aed5b5d
Author: arnabp <[email protected]>
AuthorDate: Fri Nov 6 15:16:52 2020 +0100
[MINOR] Minior fixes in lineage cache eviction.
This patch removes a bug where we were making space after
putting an entry in the cache -- this can evict the current
entry and introduce memory leak. Now we make space before
adding an entry to cache. Furthermore, this patch increases
the default I/O speed values to enable early starting of
spilling. The rolling adjustment logic anyway updates
the speed values according to the disk.
---
.../apache/sysds/runtime/lineage/LineageCache.java | 30 ++++++++++++++--------
.../sysds/runtime/lineage/LineageCacheConfig.java | 11 +++++---
.../runtime/lineage/LineageCacheEviction.java | 13 +++++-----
3 files changed, 32 insertions(+), 22 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index e3cb62b..71d55e3 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -277,18 +277,18 @@ public class LineageCache
LineageItem item = entry.getKey();
Data data = entry.getValue();
LineageCacheEntry centry =
_cache.get(item);
- if (data instanceof MatrixObject)
-
centry.setValue(((MatrixObject)data).acquireReadAndRelease(), computetime);
- else if (data instanceof ScalarObject)
-
centry.setValue((ScalarObject)data, computetime);
- else {
+
+ if (!(data instanceof MatrixObject) &&
!(data instanceof ScalarObject)) {
// Reusable instructions can
return a frame (rightIndex). Remove placeholders.
_cache.remove(item);
continue;
}
- long size = centry.getSize();
- //remove the entry if the entry is
bigger than the cache.
+ MatrixBlock mb = (data instanceof
MatrixObject) ?
+
((MatrixObject)data).acquireReadAndRelease() : null;
+ long size = mb != null ?
mb.getInMemorySize() : ((ScalarObject)data).getSize();
+
+ //remove the placeholder if the entry
is bigger than the cache.
//FIXME: the resumed threads will enter
into infinite wait as the entry
//is removed. Need to add support for
graceful remove (placeholder) and resume.
if (size >
LineageCacheEviction.getCacheLimit()) {
@@ -296,12 +296,20 @@ public class LineageCache
continue;
}
- //maintain order for eviction
- LineageCacheEviction.addEntry(centry);
-
+ //make space for the data
if
(!LineageCacheEviction.isBelowThreshold(size))
LineageCacheEviction.makeSpace(_cache, size);
LineageCacheEviction.updateSize(size,
true);
+
+ //place the data
+ if (data instanceof MatrixObject)
+ centry.setValue(mb,
computetime);
+ else if (data instanceof ScalarObject)
+
centry.setValue((ScalarObject)data, computetime);
+
+ //maintain order for eviction
+ LineageCacheEviction.addEntry(centry);
+
}
}
}
@@ -358,7 +366,7 @@ public class LineageCache
// Create a new entry.
LineageCacheEntry newItem = new LineageCacheEntry(key, dt,
Mval, Sval, computetime);
- // Make space by removing or spilling LRU entries.
+ // Make space by removing or spilling entries.
if( Mval != null || Sval != null ) {
long size = newItem.getSize();
if( size > LineageCacheEviction.getCacheLimit())
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index 66972c4..6a235b6 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -79,10 +79,13 @@ public class LineageCacheConfig
// Minimum reliable data size for spilling estimate in MB.
public static final double MIN_SPILL_DATA = 2;
// Default I/O in MB per second for binary blocks
- public static double FSREAD_DENSE = 200;
- public static double FSREAD_SPARSE = 100;
- public static double FSWRITE_DENSE = 150;
- public static double FSWRITE_SPARSE = 75;
+ // NOTE: These defaults are tuned according to high
+ // speed disks, so that spilling starts early. These
+ // will anyway be adjusted as per the current disk.
+ public static double FSREAD_DENSE = 500;
+ public static double FSREAD_SPARSE = 400;
+ public static double FSWRITE_DENSE = 450;
+ public static double FSWRITE_SPARSE = 225;
private enum CachedItemHead {
TSMM,
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
index 553ca03..7025818 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
@@ -213,13 +213,12 @@ public class LineageCacheEviction
double exectime = ((double) e._computeTime) / 1000000;
// in milliseconds
if (LineageCache.DEBUG) {
- if (exectime >
LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE) {
- System.out.print("LI " +
e._key.getOpcode());
- System.out.print(" exec time " +
((double) e._computeTime) / 1000000);
- System.out.print(" spill time " +
getDiskSpillEstimate(e) * 1000);
- System.out.print(" dim " +
e.getMBValue().getNumRows() + " " + e.getMBValue().getNumColumns());
- System.out.println(" size " +
getDiskSizeEstimate(e));
- }
+ System.out.print("LI = " + e._key.getOpcode());
+ System.out.print(" exec time = " + ((double)
e._computeTime) / 1000000);
+ System.out.println(" spill time = " +
getDiskSpillEstimate(e) * 1000);
+ System.out.print("dim = " +
e.getMBValue().getNumRows() + " " + e.getMBValue().getNumColumns());
+ System.out.print(" size = " +
getDiskSizeEstimate(e));
+ System.out.println(" DAG height = " +
e._key.getDistLeaf2Node());
}
if (spilltime <
LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE) {