This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/master by this push:
new 02c9751 [SYSTEMDS-2950] Graceful removal of cache entry placeholders
02c9751 is described below
commit 02c97510653693ed24b396eb5640981463b8dfb4
Author: arnabp <[email protected]>
AuthorDate: Mon Apr 26 14:58:44 2021 +0200
[SYSTEMDS-2950] Graceful removal of cache entry placeholders
This patch fixes a bug in the task-parallel reuse logic.
Problem: After executing an instruction, if a thread removes the
entry from the cache, all the suspended threads wait forever.
Fix: Wake up the threads before removing the entry. The resumed
threads check the status to realize that the entry is removed,
and go on to execute in parallel.
---
.../apache/sysds/runtime/lineage/LineageCache.java | 53 +++++++++++++++-------
.../sysds/runtime/lineage/LineageCacheConfig.java | 1 +
.../sysds/runtime/lineage/LineageCacheEntry.java | 14 +++++-
3 files changed, 50 insertions(+), 18 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index 255b6e1..f908967 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -156,10 +156,20 @@ public class LineageCache
else if (inst instanceof GPUInstruction)
outName =
gpuinst._output.getName();
- if (e.isMatrixValue() && e._gpuObject
== null)
- ec.setMatrixOutput(outName,
e.getMBValue());
- else if (e.isScalarValue())
- ec.setScalarOutput(outName,
e.getSOValue());
+ if (e.isMatrixValue() && e._gpuObject
== null) {
+ MatrixBlock mb =
e.getMBValue(); //wait if another thread is executing the same inst.
+ if (mb == null &&
e.getCacheStatus() == LineageCacheStatus.NOTCACHED)
+ return false; //the
executing thread removed this entry from cache
+ else
+
ec.setMatrixOutput(outName, e.getMBValue());
+ }
+ else if (e.isScalarValue()) {
+ ScalarObject so =
e.getSOValue(); //wait if another thread is executing the same inst.
+ if (so == null &&
e.getCacheStatus() == LineageCacheStatus.NOTCACHED)
+ return false; //the
executing thread removed this entry from cache
+ else
+
ec.setScalarOutput(outName, e.getSOValue());
+ }
else { //TODO handle locks on gpu
objects
//shallow copy the cached
GPUObj to the output MatrixObject
ec.getMatrixObject(outName).setGPUObject(ec.getGPUContext(0),
@@ -450,18 +460,22 @@ public class LineageCache
for (Pair<LineageItem, Data> entry : liData) {
LineageItem item = entry.getKey();
Data data = entry.getValue();
+
+ if (!probe(item))
+ continue;
+
LineageCacheEntry centry = _cache.get(item);
if (!(data instanceof MatrixObject) && !(data
instanceof ScalarObject)) {
// Reusable instructions can return a
frame (rightIndex). Remove placeholders.
- _cache.remove(item);
+ removePlaceholder(item);
continue;
}
-
+
if (LineageCacheConfig.isOutputFederated(inst,
data)) {
// Do not cache federated outputs (in
the coordinator)
// Cannot skip putting the placeholder
as the above is only known after execution
- _cache.remove(item);
+ removePlaceholder(item);
continue;
}
@@ -470,10 +484,8 @@ public class LineageCache
long size = mb != null ? mb.getInMemorySize() :
((ScalarObject)data).getSize();
//remove the placeholder if the entry is bigger
than the cache.
- //FIXME: the resumed threads will enter into
infinite wait as the entry
- //is removed. Need to add support for graceful
remove (placeholder) and resume.
if (size >
LineageCacheEviction.getCacheLimit()) {
- _cache.remove(item);
+ removePlaceholder(item);
continue;
}
@@ -544,7 +556,7 @@ public class LineageCache
if(AllOutputsCacheable)
FuncLIMap.forEach((Li, boundLI) -> mvIntern(Li,
boundLI, computetime));
else
- FuncLIMap.forEach((Li, boundLI) ->
_cache.remove(Li));
+ FuncLIMap.forEach((Li, boundLI) ->
removePlaceholder(Li));
}
return;
@@ -561,11 +573,13 @@ public class LineageCache
return;
synchronized (_cache) {
LineageItem item = udf.getLineageItem(ec).getValue();
+ if (!probe(item))
+ return;
LineageCacheEntry entry = _cache.get(item);
Data data =
ec.getVariable(String.valueOf(outIds.get(0)));
if (!(data instanceof MatrixObject) && !(data
instanceof ScalarObject)) {
// Don't cache if the udf outputs frames
- _cache.remove(item);
+ removePlaceholder(item);
return;
}
@@ -574,10 +588,8 @@ public class LineageCache
long size = mb != null ? mb.getInMemorySize() :
((ScalarObject)data).getSize();
//remove the placeholder if the entry is bigger than
the cache.
- //FIXME: the resumed threads will enter into infinite
wait as the entry
- //is removed. Need to add support for graceful remove
(placeholder) and resume.
if (size > LineageCacheEviction.getCacheLimit()) {
- _cache.remove(item);
+ removePlaceholder(item);
return;
}
@@ -689,7 +701,16 @@ public class LineageCache
LineageCacheEviction.addEntry(e);
}
else
- _cache.remove(item); //remove the placeholder
+ removePlaceholder(item); //remove the placeholder
+ }
+
+ private static void removePlaceholder(LineageItem item) {
+ //Caller should hold the monitor on _cache
+ if (!_cache.containsKey(item))
+ return;
+ LineageCacheEntry centry = _cache.get(item);
+ centry.removeAndNotify();
+ _cache.remove(item);
}
private static boolean isMarkedForCaching (Instruction inst,
ExecutionContext ec) {
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index 1c56e16..98d7c08 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -116,6 +116,7 @@ public class LineageCacheConfig
protected enum LineageCacheStatus {
EMPTY, //Placeholder with no data. Cannot be evicted.
+ NOTCACHED, //Placeholder removed from the cache
CACHED, //General cached data. Can be evicted.
SPILLED, //Data is in disk. Empty value. Cannot be evicted.
RELOADED, //Reloaded from disk. Can be evicted.
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
index f8adfb4..1a32e4b 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
@@ -63,9 +63,10 @@ public class LineageCacheEntry {
try {
//wait until other thread completes operation
//in order to avoid redundant computation
- while( _MBval == null ) {
+ while(_status == LineageCacheStatus.EMPTY) {
wait();
}
+ //comes here if data is placed or the entry is removed
by the running thread
return _MBval;
}
catch( InterruptedException ex ) {
@@ -77,9 +78,10 @@ public class LineageCacheEntry {
try {
//wait until other thread completes operation
//in order to avoid redundant computation
- while( _SOval == null ) {
+ while(_status == LineageCacheStatus.EMPTY) {
wait();
}
+ //comes here if data is placed or the entry is removed
by the running thread
return _SOval;
}
catch( InterruptedException ex ) {
@@ -91,6 +93,14 @@ public class LineageCacheEntry {
return _status;
}
+ protected synchronized void removeAndNotify() {
+ //Set the status to NOTCACHED (not cached anymore) and wake up
the sleeping threads
+ if (_status != LineageCacheStatus.EMPTY)
+ return;
+ _status = LineageCacheStatus.NOTCACHED;
+ notifyAll();
+ }
+
public synchronized long getSize() {
long size = 0;
if (_MBval != null)