Repository: systemml
Updated Branches:
  refs/heads/master 3fbfbaecb -> 69f2d377c


[SYSTEMML-445] Write to disk when the cache is used in the write-mode

- This avoids the need to depend on finalize to perform writing.

Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/69f2d377
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/69f2d377
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/69f2d377

Branch: refs/heads/master
Commit: 69f2d377c456f9baea1e248818d544b54ee00e6f
Parents: 3fbfbae
Author: Niketan Pansare <npan...@us.ibm.com>
Authored: Thu Sep 20 10:44:27 2018 -0700
Committer: Niketan Pansare <npan...@us.ibm.com>
Committed: Thu Sep 20 10:44:27 2018 -0700

----------------------------------------------------------------------
 .../apache/sysml/utils/PersistentLRUCache.java  | 100 ++++++++++++-------
 1 file changed, 64 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/69f2d377/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java 
b/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java
index bf356bb..71a1e28 100644
--- a/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java
+++ b/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java
@@ -86,7 +86,7 @@ public class PersistentLRUCache extends LinkedHashMap<String, 
ValueWrapper> {
        private String _prefixFilePath;
        final AtomicLong _currentNumBytes = new AtomicLong();
        private final long _maxNumBytes;
-       Random _rand = new Random();
+       private static final Random _rand = new Random();
        boolean isInReadOnlyMode;
        HashSet<String> persistedKeys = new HashSet<>();
        
@@ -101,6 +101,9 @@ public class PersistentLRUCache extends 
LinkedHashMap<String, ValueWrapper> {
                for(long i = 0; i < numIter; ++i) {
                        LOG.debug("Putting a double array of size 50MB.");
                        cache.put("file_" + i, new double[numDoubleIn50MB]);
+                       try {
+                               Thread.sleep(100);
+                       } catch (InterruptedException e) {}
                }
                cache.clear();
        }
@@ -127,13 +130,13 @@ public class PersistentLRUCache extends 
LinkedHashMap<String, ValueWrapper> {
                _prefixFilePath = tmp.getAbsolutePath();
        }
        public ValueWrapper put(String key, double[] value) throws 
FileNotFoundException, IOException {
-               return putImplm(key, new ValueWrapper(new DataWrapper(key, 
value, this)), value.length*Double.BYTES);
+               return putImplm(key, new ValueWrapper(new DataWrapper(key, 
value, this), isInReadOnlyMode), value.length*Double.BYTES);
        }
        public ValueWrapper put(String key, float[] value) throws 
FileNotFoundException, IOException {
-               return putImplm(key, new ValueWrapper(new DataWrapper(key, 
value, this)), value.length*Float.BYTES);
+               return putImplm(key, new ValueWrapper(new DataWrapper(key, 
value, this), isInReadOnlyMode), value.length*Float.BYTES);
        }
        public ValueWrapper put(String key, MatrixBlock value) throws 
FileNotFoundException, IOException {
-               return putImplm(key, new ValueWrapper(new DataWrapper(key, 
value, this)), value.getInMemorySize());
+               return putImplm(key, new ValueWrapper(new DataWrapper(key, 
value, this), isInReadOnlyMode), value.getInMemorySize());
        }
        
        private ValueWrapper putImplm(String key, ValueWrapper value, long 
sizeInBytes) throws FileNotFoundException, IOException {
@@ -206,7 +209,7 @@ public class PersistentLRUCache extends 
LinkedHashMap<String, ValueWrapper> {
     }
        
        float [] tmp = new float[0];
-       String dummyKey = "RAND_KEY_" + Math.abs(_rand.nextLong()) + "_" + 
Math.abs(_rand.nextLong());
+       static String dummyKey = "RAND_KEY_" + Math.abs(_rand.nextLong()) + "_" 
+ Math.abs(_rand.nextLong());
        void ensureCapacity(long newNumBytes) throws FileNotFoundException, 
IOException {
                if(newNumBytes > _maxNumBytes) {
                        throw new DMLRuntimeException("Exceeds maximum 
capacity. Cannot put a value of size " + newNumBytes + 
@@ -217,7 +220,7 @@ public class PersistentLRUCache extends 
LinkedHashMap<String, ValueWrapper> {
                        synchronized(this) {
                                if(LOG.isDebugEnabled())
                                        LOG.debug("The required capacity (" + 
newCapacity + ") is greater than max capacity:" + _maxNumBytes);
-                               ValueWrapper dummyValue = new ValueWrapper(new 
DataWrapper(dummyKey, tmp, this));
+                               ValueWrapper dummyValue = new ValueWrapper(new 
DataWrapper(dummyKey, tmp, this), isInReadOnlyMode);
                                int maxIter = size();
                                while(_currentNumBytes.get() > _maxNumBytes && 
maxIter > 0) {
                                        super.put(dummyKey, dummyValue); // 
This will invoke removeEldestEntry, which will set _eldest
@@ -348,17 +351,13 @@ class DataWrapper {
                _mo = value;
                _cache = cache;
        }
-       @Override
-       protected void finalize() throws Throwable {
-               super.finalize();
-               write(true);
-       }
        
-       public synchronized void write(boolean isBeingGarbageCollected) throws 
FileNotFoundException, IOException {
-               if(_key.equals(_cache.dummyKey))
+       public synchronized void write(boolean forceAggresiveWrites) throws 
FileNotFoundException, IOException {
+               if(_key.equals(PersistentLRUCache.dummyKey))
                        return;
-               _cache.makeRecent(_key); // Make it recent.
                
+               // Prepare for writing
+               _cache.makeRecent(_key); // Make it recent.
                if(_dArr != null || _fArr != null || _mb != null || _mo != 
null) {
                        _cache._currentNumBytes.addAndGet(-getSize());
                }
@@ -366,14 +365,16 @@ class DataWrapper {
                if(!_cache.isInReadOnlyMode) {
                        String debugSuffix = null;
                        if(PersistentLRUCache.LOG.isDebugEnabled()) {
-                               if(isBeingGarbageCollected)
-                                       debugSuffix = " (is being garbage 
collected).";
+                               if(forceAggresiveWrites)
+                                       debugSuffix = " (aggressively 
written).";
                                else
                                        debugSuffix = " (capacity exceeded).";
                        }
                        
                        if(_dArr != null) {
-                               try (ObjectOutputStream os = new 
ObjectOutputStream(new FileOutputStream(_cache.getFilePath(_key)))) {
+                               File file = new File(_cache.getFilePath(_key));
+                               file.deleteOnExit();
+                               try (ObjectOutputStream os = new 
ObjectOutputStream(new FileOutputStream(file))) {
                                        os.writeInt(_dArr.length);
                                        for(int i = 0; i < _dArr.length; i++) {
                                                os.writeDouble(_dArr[i]);
@@ -384,7 +385,9 @@ class DataWrapper {
                                        PersistentLRUCache.LOG.debug("Writing 
value (double[] of size " + getSize() + " bytes) for the key " + _key + " to 
disk" + debugSuffix);
                        }
                        else if(_fArr != null) {
-                               try (ObjectOutputStream os = new 
ObjectOutputStream(new FileOutputStream(_cache.getFilePath(_key)))) {
+                               File file = new File(_cache.getFilePath(_key));
+                               file.deleteOnExit();
+                               try (ObjectOutputStream os = new 
ObjectOutputStream(new FileOutputStream(file))) {
                                        os.writeInt(_fArr.length);
                                        for(int i = 0; i < _fArr.length; i++) {
                                                os.writeFloat(_fArr[i]);
@@ -395,7 +398,9 @@ class DataWrapper {
                                        PersistentLRUCache.LOG.debug("Writing 
value (float[] of size " + getSize() + " bytes) for the key " + _key + " to 
disk" + debugSuffix);
                        }
                        else if(_mb != null) {
-                               try(FastBufferedDataOutputStream os = new 
FastBufferedDataOutputStream(new ObjectOutputStream(new 
FileOutputStream(_cache.getFilePath(_key))))) {
+                               File file = new File(_cache.getFilePath(_key));
+                               file.deleteOnExit();
+                               try(FastBufferedDataOutputStream os = new 
FastBufferedDataOutputStream(new ObjectOutputStream(new 
FileOutputStream(file)))) {
                                        os.writeLong(_mb.getInMemorySize());
                                        _mb.write(os);
                                }
@@ -508,44 +513,67 @@ class DataWrapper {
 // Internal helper class
 class ValueWrapper {
        final Object _lock;
-       private SoftReference<DataWrapper> _ref;
+       final boolean _isInReadOnlyMode;
+       private SoftReference<DataWrapper> _softRef;
        long _rlen;
        long _clen;
        long _nnz;
        
-       ValueWrapper(DataWrapper _data) {
+       // This is only used in write-mode until the writing to the disk is 
completed.
+       // It also prevents the _softRef from being garbage collected while it 
is written.
+       volatile DataWrapper _strongRef; 
+       
+       ValueWrapper(DataWrapper data, boolean isInReadOnlyMode) {
                _lock = new Object();
-               _ref = new SoftReference<>(_data);
-               if(_data._mb != null) {
-                       _rlen = _data._mb.getNumRows();
-                       _clen = _data._mb.getNumColumns();
-                       _nnz = _data._mb.getNonZeros();
+               _isInReadOnlyMode = isInReadOnlyMode;
+               boolean isDummyValue = (data._key == 
PersistentLRUCache.dummyKey);
+               if(!_isInReadOnlyMode && !isDummyValue) {
+                       // Aggressive write to disk when the cache is used in 
the write-mode.
+                       // This avoids the need to depend on finalize to 
perform writing.
+                       _strongRef = data;
+                       Thread t = new Thread() {
+                           public void run() {
+                               try {
+                                               _strongRef.write(true);
+                                               _strongRef = null; // Reset the 
strong reference after aggresive writing
+                                       } catch (IOException e) {
+                                               throw new 
DMLRuntimeException("Error occured while aggressively writing the value to 
disk.", e);
+                                       }
+                           }
+                       };
+                       t.start();
+               }
+               _softRef = new SoftReference<>(data);
+               if(data._mb != null) {
+                       _rlen = data._mb.getNumRows();
+                       _clen = data._mb.getNumColumns();
+                       _nnz = data._mb.getNonZeros();
                }
        }
-       void update(DataWrapper _data) {
-               _ref = new SoftReference<>(_data);
-               if(_data._mb != null) {
-                       _rlen = _data._mb.getNumRows();
-                       _clen = _data._mb.getNumColumns();
-                       _nnz = _data._mb.getNonZeros();
+       void update(DataWrapper data) {
+               _softRef = new SoftReference<>(data);
+               if(data._mb != null) {
+                       _rlen = data._mb.getNumRows();
+                       _clen = data._mb.getNumColumns();
+                       _nnz = data._mb.getNonZeros();
                }
        }
        boolean isAvailable() {
-               DataWrapper data = _ref.get();
+               DataWrapper data = _softRef.get();
                return data != null && data.isAvailable();
        }
        DataWrapper get() {
-               return _ref.get();
+               return _softRef.get();
        }
        long getSize() {
-               DataWrapper data = _ref.get();
+               DataWrapper data = _softRef.get();
                if(data != null) 
                        return data.getSize();
                else
                        return 0;
        }
        void remove() {
-               DataWrapper data = _ref.get();
+               DataWrapper data = _softRef.get();
                if(data != null) {
                        data.remove();
                }

Reply via email to