This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new 02c9751  [SYSTEMDS-2950] Graceful removal of cache entry placeholders
02c9751 is described below

commit 02c97510653693ed24b396eb5640981463b8dfb4
Author: arnabp <[email protected]>
AuthorDate: Mon Apr 26 14:58:44 2021 +0200

    [SYSTEMDS-2950] Graceful removal of cache entry placeholders
    
    This patch fixes a bug in the task-parallel reuse logic.
    Problem: After executing an instruction, if a thread removes the
    entry from the cache, all the suspended threads wait forever.
    Fix: Wake up the threads before removing the entry. The resumed
    threads check the status to realize that the entry is removed,
    and go on to execute in parallel.
---
 .../apache/sysds/runtime/lineage/LineageCache.java | 53 +++++++++++++++-------
 .../sysds/runtime/lineage/LineageCacheConfig.java  |  1 +
 .../sysds/runtime/lineage/LineageCacheEntry.java   | 14 +++++-
 3 files changed, 50 insertions(+), 18 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index 255b6e1..f908967 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -156,10 +156,20 @@ public class LineageCache
                                        else if (inst instanceof GPUInstruction)
                                                outName = 
gpuinst._output.getName();
                                        
-                                       if (e.isMatrixValue() && e._gpuObject 
== null)
-                                               ec.setMatrixOutput(outName, 
e.getMBValue());
-                                       else if (e.isScalarValue())
-                                               ec.setScalarOutput(outName, 
e.getSOValue());
+                                       if (e.isMatrixValue() && e._gpuObject 
== null) {
+                                               MatrixBlock mb = 
e.getMBValue(); //wait if another thread is executing the same inst.
+                                               if (mb == null && 
e.getCacheStatus() == LineageCacheStatus.NOTCACHED)
+                                                       return false;  //the 
executing thread removed this entry from cache
+                                               else
+                                                       
ec.setMatrixOutput(outName, e.getMBValue());
+                                       }
+                                       else if (e.isScalarValue()) {
+                                               ScalarObject so = 
e.getSOValue(); //wait if another thread is executing the same inst.
+                                               if (so == null && 
e.getCacheStatus() == LineageCacheStatus.NOTCACHED)
+                                                       return false;  //the 
executing thread removed this entry from cache
+                                               else
+                                                       
ec.setScalarOutput(outName, e.getSOValue());
+                                       }
                                        else { //TODO handle locks on gpu 
objects
                                                //shallow copy the cached 
GPUObj to the output MatrixObject
                                                
ec.getMatrixObject(outName).setGPUObject(ec.getGPUContext(0), 
@@ -450,18 +460,22 @@ public class LineageCache
                        for (Pair<LineageItem, Data> entry : liData) {
                                LineageItem item = entry.getKey();
                                Data data = entry.getValue();
+
+                               if (!probe(item))
+                                       continue;
+
                                LineageCacheEntry centry = _cache.get(item);
 
                                if (!(data instanceof MatrixObject) && !(data 
instanceof ScalarObject)) {
                                        // Reusable instructions can return a 
frame (rightIndex). Remove placeholders.
-                                       _cache.remove(item);
+                                       removePlaceholder(item);
                                        continue;
                                }
-                               
+
                                if (LineageCacheConfig.isOutputFederated(inst, 
data)) {
                                        // Do not cache federated outputs (in 
the coordinator)
                                        // Cannot skip putting the placeholder 
as the above is only known after execution
-                                       _cache.remove(item);
+                                       removePlaceholder(item);
                                        continue;
                                }
 
@@ -470,10 +484,8 @@ public class LineageCache
                                long size = mb != null ? mb.getInMemorySize() : 
((ScalarObject)data).getSize();
 
                                //remove the placeholder if the entry is bigger 
than the cache.
-                               //FIXME: the resumed threads will enter into 
infinite wait as the entry
-                               //is removed. Need to add support for graceful 
remove (placeholder) and resume.
                                if (size > 
LineageCacheEviction.getCacheLimit()) {
-                                       _cache.remove(item);
+                                       removePlaceholder(item);
                                        continue; 
                                }
 
@@ -544,7 +556,7 @@ public class LineageCache
                        if(AllOutputsCacheable)
                                FuncLIMap.forEach((Li, boundLI) -> mvIntern(Li, 
boundLI, computetime));
                        else
-                               FuncLIMap.forEach((Li, boundLI) -> 
_cache.remove(Li));
+                               FuncLIMap.forEach((Li, boundLI) -> 
removePlaceholder(Li));
                }
                
                return;
@@ -561,11 +573,13 @@ public class LineageCache
                        return;
                synchronized (_cache) {
                        LineageItem item = udf.getLineageItem(ec).getValue();
+                       if (!probe(item))
+                               return;
                        LineageCacheEntry entry = _cache.get(item);
                        Data data = 
ec.getVariable(String.valueOf(outIds.get(0)));
                        if (!(data instanceof MatrixObject) && !(data 
instanceof ScalarObject)) {
                                // Don't cache if the udf outputs frames
-                               _cache.remove(item);
+                               removePlaceholder(item);
                                return;
                        }
                        
@@ -574,10 +588,8 @@ public class LineageCache
                        long size = mb != null ? mb.getInMemorySize() : 
((ScalarObject)data).getSize();
 
                        //remove the placeholder if the entry is bigger than 
the cache.
-                       //FIXME: the resumed threads will enter into infinite 
wait as the entry
-                       //is removed. Need to add support for graceful remove 
(placeholder) and resume.
                        if (size > LineageCacheEviction.getCacheLimit()) {
-                               _cache.remove(item);
+                               removePlaceholder(item);
                                return;
                        }
 
@@ -689,7 +701,16 @@ public class LineageCache
                        LineageCacheEviction.addEntry(e);
                }
                else
-                       _cache.remove(item);    //remove the placeholder
+                       removePlaceholder(item);    //remove the placeholder
+       }
+       
+       private static void removePlaceholder(LineageItem item) {
+               //Caller should hold the monitor on _cache
+               if (!_cache.containsKey(item))
+                       return;
+               LineageCacheEntry centry = _cache.get(item);
+               centry.removeAndNotify();
+               _cache.remove(item);
        }
        
        private static boolean isMarkedForCaching (Instruction inst, 
ExecutionContext ec) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index 1c56e16..98d7c08 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -116,6 +116,7 @@ public class LineageCacheConfig
 
        protected enum LineageCacheStatus {
                EMPTY,     //Placeholder with no data. Cannot be evicted.
+               NOTCACHED, //Placeholder removed from the cache
                CACHED,    //General cached data. Can be evicted.
                SPILLED,   //Data is in disk. Empty value. Cannot be evicted.
                RELOADED,  //Reloaded from disk. Can be evicted.
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
index f8adfb4..1a32e4b 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
@@ -63,9 +63,10 @@ public class LineageCacheEntry {
                try {
                        //wait until other thread completes operation
                        //in order to avoid redundant computation
-                       while( _MBval == null ) {
+                       while(_status == LineageCacheStatus.EMPTY) {
                                wait();
                        }
+                       //comes here if data is placed or the entry is removed 
by the running thread
                        return _MBval;
                }
                catch( InterruptedException ex ) {
@@ -77,9 +78,10 @@ public class LineageCacheEntry {
                try {
                        //wait until other thread completes operation
                        //in order to avoid redundant computation
-                       while( _SOval == null ) {
+                       while(_status == LineageCacheStatus.EMPTY) {
                                wait();
                        }
+                       //comes here if data is placed or the entry is removed 
by the running thread
                        return _SOval;
                }
                catch( InterruptedException ex ) {
@@ -91,6 +93,14 @@ public class LineageCacheEntry {
                return _status;
        }
        
+       protected synchronized void removeAndNotify() {
+               //Set the status to NOTCACHED (not cached anymore) and wake up 
the sleeping threads
+               if (_status != LineageCacheStatus.EMPTY)
+                       return;
+               _status = LineageCacheStatus.NOTCACHED;
+               notifyAll();
+       }
+       
        public synchronized long getSize() {
                long size = 0;
                if (_MBval != null)

Reply via email to