Repository: systemml Updated Branches: refs/heads/master 3fbfbaecb -> 69f2d377c
[SYSTEMML-445] Write to disk when the cache is used in the write-mode - This avoids the need to depend on finalize to perform writing. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/69f2d377 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/69f2d377 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/69f2d377 Branch: refs/heads/master Commit: 69f2d377c456f9baea1e248818d544b54ee00e6f Parents: 3fbfbae Author: Niketan Pansare <npan...@us.ibm.com> Authored: Thu Sep 20 10:44:27 2018 -0700 Committer: Niketan Pansare <npan...@us.ibm.com> Committed: Thu Sep 20 10:44:27 2018 -0700 ---------------------------------------------------------------------- .../apache/sysml/utils/PersistentLRUCache.java | 100 ++++++++++++------- 1 file changed, 64 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/69f2d377/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java b/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java index bf356bb..71a1e28 100644 --- a/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java +++ b/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java @@ -86,7 +86,7 @@ public class PersistentLRUCache extends LinkedHashMap<String, ValueWrapper> { private String _prefixFilePath; final AtomicLong _currentNumBytes = new AtomicLong(); private final long _maxNumBytes; - Random _rand = new Random(); + private static final Random _rand = new Random(); boolean isInReadOnlyMode; HashSet<String> persistedKeys = new HashSet<>(); @@ -101,6 +101,9 @@ public class PersistentLRUCache extends LinkedHashMap<String, ValueWrapper> { for(long i = 0; i < numIter; ++i) { LOG.debug("Putting a double array of size 50MB."); cache.put("file_" + i, new double[numDoubleIn50MB]); + try { + Thread.sleep(100); + } catch (InterruptedException e) {} } cache.clear(); } @@ -127,13 +130,13 @@ public class PersistentLRUCache extends LinkedHashMap<String, ValueWrapper> { _prefixFilePath = tmp.getAbsolutePath(); } public ValueWrapper put(String key, double[] value) throws FileNotFoundException, IOException { - return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this)), value.length*Double.BYTES); + return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this), isInReadOnlyMode), value.length*Double.BYTES); } public ValueWrapper put(String key, float[] value) throws FileNotFoundException, IOException { - return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this)), value.length*Float.BYTES); + return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this), isInReadOnlyMode), value.length*Float.BYTES); } public ValueWrapper put(String key, MatrixBlock value) throws FileNotFoundException, IOException { - return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this)), value.getInMemorySize()); + return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this), isInReadOnlyMode), value.getInMemorySize()); } private ValueWrapper putImplm(String key, ValueWrapper value, long sizeInBytes) throws FileNotFoundException, IOException { @@ -206,7 +209,7 @@ public class PersistentLRUCache extends LinkedHashMap<String, ValueWrapper> { } float [] tmp = new float[0]; - String dummyKey = "RAND_KEY_" + Math.abs(_rand.nextLong()) + "_" + Math.abs(_rand.nextLong()); + static String dummyKey = "RAND_KEY_" + Math.abs(_rand.nextLong()) + "_" + Math.abs(_rand.nextLong()); void ensureCapacity(long newNumBytes) throws FileNotFoundException, IOException { if(newNumBytes > _maxNumBytes) { throw new DMLRuntimeException("Exceeds maximum capacity. Cannot put a value of size " + newNumBytes + @@ -217,7 +220,7 @@ public class PersistentLRUCache extends LinkedHashMap<String, ValueWrapper> { synchronized(this) { if(LOG.isDebugEnabled()) LOG.debug("The required capacity (" + newCapacity + ") is greater than max capacity:" + _maxNumBytes); - ValueWrapper dummyValue = new ValueWrapper(new DataWrapper(dummyKey, tmp, this)); + ValueWrapper dummyValue = new ValueWrapper(new DataWrapper(dummyKey, tmp, this), isInReadOnlyMode); int maxIter = size(); while(_currentNumBytes.get() > _maxNumBytes && maxIter > 0) { super.put(dummyKey, dummyValue); // This will invoke removeEldestEntry, which will set _eldest @@ -348,17 +351,13 @@ class DataWrapper { _mo = value; _cache = cache; } - @Override - protected void finalize() throws Throwable { - super.finalize(); - write(true); - } - public synchronized void write(boolean isBeingGarbageCollected) throws FileNotFoundException, IOException { - if(_key.equals(_cache.dummyKey)) + public synchronized void write(boolean forceAggresiveWrites) throws FileNotFoundException, IOException { + if(_key.equals(PersistentLRUCache.dummyKey)) return; - _cache.makeRecent(_key); // Make it recent. + // Prepare for writing + _cache.makeRecent(_key); // Make it recent. if(_dArr != null || _fArr != null || _mb != null || _mo != null) { _cache._currentNumBytes.addAndGet(-getSize()); } @@ -366,14 +365,16 @@ class DataWrapper { if(!_cache.isInReadOnlyMode) { String debugSuffix = null; if(PersistentLRUCache.LOG.isDebugEnabled()) { - if(isBeingGarbageCollected) - debugSuffix = " (is being garbage collected)."; + if(forceAggresiveWrites) + debugSuffix = " (aggressively written)."; else debugSuffix = " (capacity exceeded)."; } if(_dArr != null) { - try (ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(_cache.getFilePath(_key)))) { + File file = new File(_cache.getFilePath(_key)); + file.deleteOnExit(); + try (ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(file))) { os.writeInt(_dArr.length); for(int i = 0; i < _dArr.length; i++) { os.writeDouble(_dArr[i]); @@ -384,7 +385,9 @@ class DataWrapper { PersistentLRUCache.LOG.debug("Writing value (double[] of size " + getSize() + " bytes) for the key " + _key + " to disk" + debugSuffix); } else if(_fArr != null) { - try (ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(_cache.getFilePath(_key)))) { + File file = new File(_cache.getFilePath(_key)); + file.deleteOnExit(); + try (ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(file))) { os.writeInt(_fArr.length); for(int i = 0; i < _fArr.length; i++) { os.writeFloat(_fArr[i]); @@ -395,7 +398,9 @@ class DataWrapper { PersistentLRUCache.LOG.debug("Writing value (float[] of size " + getSize() + " bytes) for the key " + _key + " to disk" + debugSuffix); } else if(_mb != null) { - try(FastBufferedDataOutputStream os = new FastBufferedDataOutputStream(new ObjectOutputStream(new FileOutputStream(_cache.getFilePath(_key))))) { + File file = new File(_cache.getFilePath(_key)); + file.deleteOnExit(); + try(FastBufferedDataOutputStream os = new FastBufferedDataOutputStream(new ObjectOutputStream(new FileOutputStream(file)))) { os.writeLong(_mb.getInMemorySize()); _mb.write(os); } @@ -508,44 +513,67 @@ class DataWrapper { // Internal helper class class ValueWrapper { final Object _lock; - private SoftReference<DataWrapper> _ref; + final boolean _isInReadOnlyMode; + private SoftReference<DataWrapper> _softRef; long _rlen; long _clen; long _nnz; - ValueWrapper(DataWrapper _data) { + // This is only used in write-mode until the writing to the disk is completed. + // It also prevents the _softRef from being garbage collected while it is written. + volatile DataWrapper _strongRef; + + ValueWrapper(DataWrapper data, boolean isInReadOnlyMode) { _lock = new Object(); - _ref = new SoftReference<>(_data); - if(_data._mb != null) { - _rlen = _data._mb.getNumRows(); - _clen = _data._mb.getNumColumns(); - _nnz = _data._mb.getNonZeros(); + _isInReadOnlyMode = isInReadOnlyMode; + boolean isDummyValue = (data._key == PersistentLRUCache.dummyKey); + if(!_isInReadOnlyMode && !isDummyValue) { + // Aggressive write to disk when the cache is used in the write-mode. + // This avoids the need to depend on finalize to perform writing. + _strongRef = data; + Thread t = new Thread() { + public void run() { + try { + _strongRef.write(true); + _strongRef = null; // Reset the strong reference after aggresive writing + } catch (IOException e) { + throw new DMLRuntimeException("Error occured while aggressively writing the value to disk.", e); + } + } + }; + t.start(); + } + _softRef = new SoftReference<>(data); + if(data._mb != null) { + _rlen = data._mb.getNumRows(); + _clen = data._mb.getNumColumns(); + _nnz = data._mb.getNonZeros(); } } - void update(DataWrapper _data) { - _ref = new SoftReference<>(_data); - if(_data._mb != null) { - _rlen = _data._mb.getNumRows(); - _clen = _data._mb.getNumColumns(); - _nnz = _data._mb.getNonZeros(); + void update(DataWrapper data) { + _softRef = new SoftReference<>(data); + if(data._mb != null) { + _rlen = data._mb.getNumRows(); + _clen = data._mb.getNumColumns(); + _nnz = data._mb.getNonZeros(); } } boolean isAvailable() { - DataWrapper data = _ref.get(); + DataWrapper data = _softRef.get(); return data != null && data.isAvailable(); } DataWrapper get() { - return _ref.get(); + return _softRef.get(); } long getSize() { - DataWrapper data = _ref.get(); + DataWrapper data = _softRef.get(); if(data != null) return data.getSize(); else return 0; } void remove() { - DataWrapper data = _ref.get(); + DataWrapper data = _softRef.get(); if(data != null) { data.remove(); }