This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new e2414e0  [MINOR] Minior fixes in lineage cache eviction.
e2414e0 is described below

commit e2414e039d64df6e0d547e4902a9c9770aed5b5d
Author: arnabp <[email protected]>
AuthorDate: Fri Nov 6 15:16:52 2020 +0100

    [MINOR] Minior fixes in lineage cache eviction.
    
    This patch removes a bug where we were making space after
    putting an entry in the cache -- this can evict the current
    entry and introduce memory leak. Now we make space before
    adding an entry to cache. Furthermore, this patch increases
    the default I/O speed values to enable early starting of
    spilling. The rolling adjustment logic anyway updates
    the speed values according to the disk.
---
 .../apache/sysds/runtime/lineage/LineageCache.java | 30 ++++++++++++++--------
 .../sysds/runtime/lineage/LineageCacheConfig.java  | 11 +++++---
 .../runtime/lineage/LineageCacheEviction.java      | 13 +++++-----
 3 files changed, 32 insertions(+), 22 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index e3cb62b..71d55e3 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -277,18 +277,18 @@ public class LineageCache
                                        LineageItem item = entry.getKey();
                                        Data data = entry.getValue();
                                        LineageCacheEntry centry = 
_cache.get(item);
-                                       if (data instanceof MatrixObject)
-                                               
centry.setValue(((MatrixObject)data).acquireReadAndRelease(), computetime);
-                                       else if (data instanceof ScalarObject)
-                                               
centry.setValue((ScalarObject)data, computetime);
-                                       else {
+
+                                       if (!(data instanceof MatrixObject) && 
!(data instanceof ScalarObject)) {
                                                // Reusable instructions can 
return a frame (rightIndex). Remove placeholders.
                                                _cache.remove(item);
                                                continue;
                                        }
 
-                                       long size = centry.getSize();
-                                       //remove the entry if the entry is 
bigger than the cache.
+                                       MatrixBlock mb = (data instanceof 
MatrixObject) ? 
+                                                       
((MatrixObject)data).acquireReadAndRelease() : null;
+                                       long size = mb != null ? 
mb.getInMemorySize() : ((ScalarObject)data).getSize();
+
+                                       //remove the placeholder if the entry 
is bigger than the cache.
                                        //FIXME: the resumed threads will enter 
into infinite wait as the entry
                                        //is removed. Need to add support for 
graceful remove (placeholder) and resume.
                                        if (size > 
LineageCacheEviction.getCacheLimit()) {
@@ -296,12 +296,20 @@ public class LineageCache
                                                continue; 
                                        }
 
-                                       //maintain order for eviction
-                                       LineageCacheEviction.addEntry(centry);
-
+                                       //make space for the data
                                        if 
(!LineageCacheEviction.isBelowThreshold(size))
                                                
LineageCacheEviction.makeSpace(_cache, size);
                                        LineageCacheEviction.updateSize(size, 
true);
+
+                                       //place the data
+                                       if (data instanceof MatrixObject)
+                                               centry.setValue(mb, 
computetime);
+                                       else if (data instanceof ScalarObject)
+                                               
centry.setValue((ScalarObject)data, computetime);
+
+                                       //maintain order for eviction
+                                       LineageCacheEviction.addEntry(centry);
+
                                }
                        }
                }
@@ -358,7 +366,7 @@ public class LineageCache
                // Create a new entry.
                LineageCacheEntry newItem = new LineageCacheEntry(key, dt, 
Mval, Sval, computetime);
                
-               // Make space by removing or spilling LRU entries.
+               // Make space by removing or spilling entries.
                if( Mval != null || Sval != null ) {
                        long size = newItem.getSize();
                        if( size > LineageCacheEviction.getCacheLimit())
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index 66972c4..6a235b6 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -79,10 +79,13 @@ public class LineageCacheConfig
        // Minimum reliable data size for spilling estimate in MB.
        public static final double MIN_SPILL_DATA = 2;
        // Default I/O in MB per second for binary blocks
-       public static double FSREAD_DENSE = 200;
-       public static double FSREAD_SPARSE = 100;
-       public static double FSWRITE_DENSE = 150;
-       public static double FSWRITE_SPARSE = 75;
+       // NOTE: These defaults are tuned according to high
+       // speed disks, so that spilling starts early. These 
+       // will anyway be adjusted as per the current disk.
+       public static double FSREAD_DENSE = 500;
+       public static double FSREAD_SPARSE = 400;
+       public static double FSWRITE_DENSE = 450;
+       public static double FSWRITE_SPARSE = 225;
        
        private enum CachedItemHead {
                TSMM,
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
index 553ca03..7025818 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
@@ -213,13 +213,12 @@ public class LineageCacheEviction
                        double exectime = ((double) e._computeTime) / 1000000; 
// in milliseconds
 
                        if (LineageCache.DEBUG) {
-                               if (exectime > 
LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE) {
-                                       System.out.print("LI " + 
e._key.getOpcode());
-                                       System.out.print(" exec time " + 
((double) e._computeTime) / 1000000);
-                                       System.out.print(" spill time " + 
getDiskSpillEstimate(e) * 1000);
-                                       System.out.print(" dim " + 
e.getMBValue().getNumRows() + " " + e.getMBValue().getNumColumns());
-                                       System.out.println(" size " + 
getDiskSizeEstimate(e));
-                               }
+                               System.out.print("LI = " + e._key.getOpcode());
+                               System.out.print(" exec time = " + ((double) 
e._computeTime) / 1000000);
+                               System.out.println(" spill time = " + 
getDiskSpillEstimate(e) * 1000);
+                               System.out.print("dim = " + 
e.getMBValue().getNumRows() + " " + e.getMBValue().getNumColumns());
+                               System.out.print(" size = " + 
getDiskSizeEstimate(e));
+                               System.out.println(" DAG height = " + 
e._key.getDistLeaf2Node());
                        }
 
                        if (spilltime < 
LineageCacheConfig.MIN_SPILL_TIME_ESTIMATE) {

Reply via email to