Repository: systemml Updated Branches: refs/heads/master e2dc85688 -> 4d8df33cc
[MINOR] Add two helper utilities. - First, PersistentLRUCache to cache double[], float[] and MatrixBlock without requiring the user to worry about OOM. - Second, reblockAndWrite method in MLContextUtil class to reblock the output of a DML script as rectangular blocked RDDs. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/4d8df33c Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/4d8df33c Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/4d8df33c Branch: refs/heads/master Commit: 4d8df33cc53583e71c4a488577270461e6f712e2 Parents: e2dc856 Author: Niketan Pansare <npan...@us.ibm.com> Authored: Fri Sep 14 12:17:11 2018 -0700 Committer: Niketan Pansare <npan...@us.ibm.com> Committed: Fri Sep 14 12:17:11 2018 -0700 ---------------------------------------------------------------------- .../sysml/api/mlcontext/MLContextUtil.java | 31 ++ .../apache/sysml/utils/PersistentLRUCache.java | 518 +++++++++++++++++++ 2 files changed, 549 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/4d8df33c/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java index dcf4595..c0ce6c9 100644 --- a/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java +++ b/src/main/java/org/apache/sysml/api/mlcontext/MLContextUtil.java @@ -21,6 +21,7 @@ package org.apache.sysml.api.mlcontext; import java.io.File; import java.io.FileNotFoundException; +import java.io.IOException; import java.net.URL; import java.util.ArrayList; import java.util.Date; @@ -74,6 +75,7 @@ import org.apache.sysml.runtime.instructions.cp.DoubleObject; import org.apache.sysml.runtime.instructions.cp.IntObject; import org.apache.sysml.runtime.instructions.cp.StringObject; import org.apache.sysml.runtime.instructions.cp.VariableCPInstruction; +import org.apache.sysml.runtime.matrix.MatrixCharacteristics; import org.apache.sysml.runtime.matrix.data.FrameBlock; import org.apache.sysml.runtime.matrix.data.MatrixBlock; import org.apache.sysml.runtime.matrix.data.MatrixIndexes; @@ -118,6 +120,35 @@ public final class MLContextUtil { @SuppressWarnings("rawtypes") public static final Class[] ALL_SUPPORTED_DATA_TYPES = (Class[]) ArrayUtils.addAll(BASIC_DATA_TYPES, COMPLEX_DATA_TYPES); + + /** + * Utility method to write an output as rectangular blocked RDD + * + * @param spark spark session + * @param dmlScript script that generates the outVariable + * @param outVariable variable name + * @param outFilePath output file path + * @param rowsPerBlock number of rows per block + * @param colsPerBlock number of columns per block + * @throws IOException if error occurs + */ + public static void reblockAndWrite(SparkSession spark, String dmlScript, String outVariable, String outFilePath, int rowsPerBlock, int colsPerBlock) throws IOException { + MLContext ml = new MLContext(spark); + Script helloScript = org.apache.sysml.api.mlcontext.ScriptFactory.dml(dmlScript).out(outVariable); + MLResults res = ml.execute(helloScript); + JavaPairRDD<MatrixIndexes, MatrixBlock> rdd = res.getMatrix(outVariable).toBinaryBlocks(); + MatrixCharacteristics mc = res.getMatrix(outVariable).getMatrixMetadata().asMatrixCharacteristics(); + MatrixCharacteristics mcOut = new MatrixCharacteristics(mc); + mcOut.setRowsPerBlock(rowsPerBlock); + mcOut.setColsPerBlock(colsPerBlock); + JavaPairRDD<MatrixIndexes, MatrixBlock> out = org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils.mergeByKey(rdd.flatMapToPair( + new org.apache.sysml.runtime.instructions.spark.functions.ExtractBlockForBinaryReblock(mc, mcOut)), false); + out.saveAsHadoopFile(outFilePath, MatrixIndexes.class, MatrixBlock.class, org.apache.hadoop.mapred.SequenceFileOutputFormat.class); + org.apache.sysml.runtime.util.MapReduceTool.writeMetaDataFile(outFilePath + ".mtd", + org.apache.sysml.parser.Expression.ValueType.DOUBLE, mcOut, + org.apache.sysml.runtime.matrix.data.OutputInfo.BinaryBlockOutputInfo, + new org.apache.sysml.runtime.io.FileFormatProperties()); + } /** * Compare two version strings (ie, "1.4.0" and "1.4.1"). http://git-wip-us.apache.org/repos/asf/systemml/blob/4d8df33c/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java b/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java new file mode 100644 index 0000000..83a0dcf --- /dev/null +++ b/src/main/java/org/apache/sysml/utils/PersistentLRUCache.java @@ -0,0 +1,518 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sysml.utils; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.ref.SoftReference; +import java.nio.file.Files; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.log4j.Level; +import org.apache.sysml.conf.ConfigurationManager; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.controlprogram.caching.MatrixObject; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.util.DataConverter; +import org.apache.sysml.runtime.util.FastBufferedDataInputStream; +import org.apache.sysml.runtime.util.FastBufferedDataOutputStream; + +/** + * Simple utility to store double[], float[] and MatrixBlock in-memory. + * It is designed to guard against OOM by using soft reference as well as max capacity. + * When memory is full or if capacity is exceeded, SimplePersistingCache stores the least recently used values into the local filesystem. + * Assumption: GC occurs before an OutOfMemoryException, and GC requires prior finalize call. + * + * The user should use custom put and get methods: + * - put(String key, double[] value); + * - put(String key, float[] value); + * - put(String key, MatrixBlock value); + * - double [] getAsDoubleArray(String key); + * - float [] getAsFloatArray(String key); + * - MatrixBlock getAsMatrixBlock(String key); + * + * Additionally, the user can also use standard Map methods: + * - void clear(); + * - boolean containsKey(String key) + * - remove(String key); + * + * Instead of using generic types i.e. LinkedHashMap<String, ?>, we are allowing the cache to store values of different types. + * ValueWrapper is a container in this case to store the actual values (i.e. double[]. float[] or MatrixBlock). + * + * The cache can be used in two modes: + * - Read-only mode (only applicable for MatrixBlock keys): + * = We delete the value when capacity is exceeded or when GC occurs. + * = When get is invoked on the deleted key, the key is treated as the full path and MatrixBlock is read from that path. + * = Note: in the current version, the metadata file is ignored and the file-format is assumed to be binary-block. We can extend this later. + * - General case: + * = We persist the values to the file system (into temporary directory) when capacity is exceeded or when GC occurs. + * = When get is invoked on the deleted key, the key is treated as the file name (not the absolute path) and MatrixBlock is read from that path. + * + * This class does not assume minimum capacity and hence only soft references. + * + * To test this class, please use the below command: + * java -cp systemml-*-standalone.jar:commons-lang3-3.8.jar org.apache.sysml.utils.PersistentLRUCache. + */ +public class PersistentLRUCache extends LinkedHashMap<String, ValueWrapper> { + static final Log LOG = LogFactory.getLog(PersistentLRUCache.class.getName()); + private static final long serialVersionUID = -6838798881747433047L; + private String _prefixFilePath; + final AtomicLong _currentNumBytes = new AtomicLong(); + private final long _maxNumBytes; + Random _rand = new Random(); + boolean isInReadOnlyMode; + + public static void main(String [] args) throws IOException { + org.apache.log4j.Logger.getRootLogger().setLevel(Level.DEBUG); + double numBytesInMB = 1e+7; + int numDoubleInMB = (int) (numBytesInMB / 8); + PersistentLRUCache cache = new PersistentLRUCache((long)(numBytesInMB*25)); + for(int i = 0; i < 30; ++i) { + LOG.debug("Putting a double array of size 1MB."); + cache.put("file_" + i, new double[numDoubleInMB]); + } + cache.clear(); + } + + /** + * When enabled, the cache will discard the values instead of writing it to the local file system. + * + * @return this + */ + public PersistentLRUCache enableReadOnlyMode(boolean enable) { + isInReadOnlyMode = enable; + return this; + } + + /** + * Creates a persisting cache + * @param maxNumBytes maximum capacity in bytes + * @throws IOException if unable to create a temporary directory on the local file system + */ + public PersistentLRUCache(long maxNumBytes) throws IOException { + _maxNumBytes = maxNumBytes; + File tmp = Files.createTempDirectory("systemml_" + Math.abs(_rand.nextLong())).toFile(); + tmp.deleteOnExit(); + _prefixFilePath = tmp.getAbsolutePath(); + } + public ValueWrapper put(String key, double[] value) throws FileNotFoundException, IOException { + return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this)), value.length*Double.BYTES); + } + public ValueWrapper put(String key, float[] value) throws FileNotFoundException, IOException { + return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this)), value.length*Float.BYTES); + } + public ValueWrapper put(String key, MatrixBlock value) throws FileNotFoundException, IOException { + return putImplm(key, new ValueWrapper(new DataWrapper(key, value, this)), value.getInMemorySize()); + } + + private ValueWrapper putImplm(String key, ValueWrapper value, long sizeInBytes) throws FileNotFoundException, IOException { + ValueWrapper prev = null; + if(containsKey(key)) + prev = remove(key); + ensureCapacity(sizeInBytes); + super.put(key, value); + return prev; + } + + @Override + public ValueWrapper remove(Object key) { + ValueWrapper prev = super.remove(key); + if(prev != null) { + long size = prev.getSize(); + if(size > 0) + _currentNumBytes.addAndGet(-size); + prev.remove(); + } + return prev; + } + + @Override + public ValueWrapper put(String key, ValueWrapper value) { + // super.put(key, value); + throw new DMLRuntimeException("Incorrect usage: Value should be of type double[], float[], or MatrixBlock"); + } + + @Override + public void putAll(Map<? extends String, ? extends ValueWrapper> m) { + // super.putAll(m); + throw new DMLRuntimeException("Incorrect usage: Value should be of type double[], float[], or MatrixBlock"); + } + + @Override + public ValueWrapper get(Object key) { + // return super.get(key); + throw new DMLRuntimeException("Incorrect usage: Use getAsDoubleArray, getAsFloatArray or getAsMatrixBlock instead."); + } + + void makeRecent(String key) { + // super.get(key); // didn't work. + ValueWrapper value = super.get(key); + super.remove(key); + super.put(key, value); + } + + @Override + public void clear() { + super.clear(); + _currentNumBytes.set(0); + File tmp; + try { + tmp = Files.createTempDirectory("systemml_" + Math.abs(_rand.nextLong())).toFile(); + tmp.deleteOnExit(); + _prefixFilePath = tmp.getAbsolutePath(); + } catch (IOException e) { + throw new RuntimeException("Error occured while creating the temp directory.", e); + } + } + + Map.Entry<String, ValueWrapper> _eldest; + @Override + protected boolean removeEldestEntry(Map.Entry<String, ValueWrapper> eldest) { + _eldest = eldest; + return false; // Never ask LinkedHashMap to remove eldest entry, instead do that in ensureCapacity. + } + + float [] tmp = new float[0]; + String dummyKey = "RAND_KEY_" + Math.abs(_rand.nextLong()) + "_" + Math.abs(_rand.nextLong()); + void ensureCapacity(long newNumBytes) throws FileNotFoundException, IOException { + if(newNumBytes > _maxNumBytes) { + throw new DMLRuntimeException("Exceeds maximum capacity. Cannot put a value of size " + newNumBytes + + " bytes as max capacity is " + _maxNumBytes + " bytes."); + } + long newCapacity = _currentNumBytes.addAndGet(newNumBytes); + if(newCapacity > _maxNumBytes) { + synchronized(this) { + if(LOG.isDebugEnabled()) + LOG.debug("The required capacity (" + newCapacity + ") is greater than max capacity:" + _maxNumBytes); + ValueWrapper dummyValue = new ValueWrapper(new DataWrapper(dummyKey, tmp, this)); + int maxIter = size(); + while(_currentNumBytes.get() > _maxNumBytes && maxIter > 0) { + super.put(dummyKey, dummyValue); // This will invoke removeEldestEntry, which will set _eldest + remove(dummyKey); + if(_eldest != null && _eldest.getKey() != dummyKey) { + DataWrapper data = _eldest.getValue().get(); + if(data != null) { + data.write(false); // Write the eldest entry to disk if not garbage collected. + } + makeRecent(_eldest.getKey()); // Make recent. + } + maxIter--; + } + } + } + } + +// public void put(String key, MatrixObject value) { +// _globalMap.put(key, new ValueWrapper(new DataWrapper(key, value, this))); +// } + + String getFilePath(String key) { + return _prefixFilePath + File.separator + key; + } + + public double [] getAsDoubleArray(String key) throws FileNotFoundException, IOException { + ValueWrapper value = super.get(key); + if(!value.isAvailable()) { + // Fine-grained synchronization: only one read per key, but will allow parallel loading + // of distinct keys. + synchronized(value._lock) { + if(!value.isAvailable()) { + value.update(DataWrapper.loadDoubleArr(key, this)); + } + } + } + DataWrapper ret = value.get(); + if(ret == null) + throw new DMLRuntimeException("Potential race-condition with Java's garbage collector while loading the value in SimplePersistingCache."); + return ret._dArr; + } + + public float [] getAsFloatArray(String key) throws FileNotFoundException, IOException { + ValueWrapper value = super.get(key); + if(!value.isAvailable()) { + // Fine-grained synchronization: only one read per key, but will allow parallel loading + // of distinct keys. + synchronized(value._lock) { + if(!value.isAvailable()) { + value.update(DataWrapper.loadFloatArr(key, this)); + } + } + } + DataWrapper ret = value.get(); + if(ret == null) + throw new DMLRuntimeException("Potential race-condition with Java's garbage collector while loading the value in SimplePersistingCache."); + return ret._fArr; + } + + public MatrixBlock getAsMatrixBlock(String key) throws FileNotFoundException, IOException { + ValueWrapper value = super.get(key); + if(!value.isAvailable()) { + // Fine-grained synchronization: only one read per key, but will allow parallel loading + // of distinct keys. + synchronized(value._lock) { + if(!value.isAvailable()) { + value.update(DataWrapper.loadMatrixBlock(key, this, value._rlen, value._clen, value._nnz)); + } + } + } + DataWrapper ret = value.get(); + if(ret == null) + throw new DMLRuntimeException("Potential race-condition with Java's garbage collector while loading the value in SimplePersistingCache."); + return ret._mb; + } +} + +// ---------------------------------------------------------------------------------------- +// Internal helper class +class DataWrapper { + double [] _dArr; + float [] _fArr; + MatrixBlock _mb; + MatrixObject _mo; + final PersistentLRUCache _cache; + final String _key; + DataWrapper(String key, double [] value, PersistentLRUCache cache) { + _key = key; + _dArr = value; + _fArr = null; + _mb = null; + _mo = null; + _cache = cache; + } + DataWrapper(String key, float [] value, PersistentLRUCache cache) { + _key = key; + _dArr = null; + _fArr = value; + _mb = null; + _mo = null; + _cache = cache; + } + DataWrapper(String key, MatrixBlock value, PersistentLRUCache cache) { + _key = key; + _dArr = null; + _fArr = null; + _mb = value; + _mo = null; + _cache = cache; + } + DataWrapper(String key, MatrixObject value, PersistentLRUCache cache) { + _key = key; + _dArr = null; + _fArr = null; + _mb = null; + _mo = value; + _cache = cache; + } + @Override + protected void finalize() throws Throwable { + super.finalize(); + write(true); + } + + public synchronized void write(boolean isBeingGarbageCollected) throws FileNotFoundException, IOException { + if(_key.equals(_cache.dummyKey)) + return; + _cache.makeRecent(_key); // Make it recent. + + if(_dArr != null || _fArr != null || _mb != null || _mo != null) { + _cache._currentNumBytes.addAndGet(-getSize()); + } + + if(!_cache.isInReadOnlyMode) { + String debugSuffix = null; + if(PersistentLRUCache.LOG.isDebugEnabled()) { + if(isBeingGarbageCollected) + debugSuffix = " (is being garbage collected)."; + else + debugSuffix = " (capacity exceeded)."; + } + + if(_dArr != null) { + try (ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(_cache.getFilePath(_key)))) { + os.writeInt(_dArr.length); + for(int i = 0; i < _dArr.length; i++) { + os.writeDouble(_dArr[i]); + } + } + if(PersistentLRUCache.LOG.isDebugEnabled()) + PersistentLRUCache.LOG.debug("Writing value (double[] of size " + getSize() + " bytes) for the key " + _key + " to disk" + debugSuffix); + } + else if(_fArr != null) { + try (ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(_cache.getFilePath(_key)))) { + os.writeInt(_fArr.length); + for(int i = 0; i < _fArr.length; i++) { + os.writeFloat(_fArr[i]); + } + } + if(PersistentLRUCache.LOG.isDebugEnabled()) + PersistentLRUCache.LOG.debug("Writing value (float[] of size " + getSize() + " bytes) for the key " + _key + " to disk" + debugSuffix); + } + else if(_mb != null) { + try(FastBufferedDataOutputStream os = new FastBufferedDataOutputStream(new ObjectOutputStream(new FileOutputStream(_cache.getFilePath(_key))))) { + os.writeLong(_mb.getInMemorySize()); + _mb.write(os); + } + if(PersistentLRUCache.LOG.isDebugEnabled()) + PersistentLRUCache.LOG.debug("Writing value (MatrixBlock of size " + getSize() + " bytes) for the key " + _key + " to disk" + debugSuffix); + } + else if(_mo != null) { + throw new DMLRuntimeException("Not implemented"); + } + else { + if(PersistentLRUCache.LOG.isDebugEnabled()) + PersistentLRUCache.LOG.debug("Skipping writing of the key " + _key + " to disk as the value is already written" + debugSuffix); + } + } + _dArr = null; _fArr = null; _mb = null; _mo = null; + } + + static DataWrapper loadDoubleArr(String key, PersistentLRUCache cache) throws FileNotFoundException, IOException { + if(cache.isInReadOnlyMode) + throw new IOException("Read-only mode is only supported for MatrixBlock."); + if(PersistentLRUCache.LOG.isDebugEnabled()) + PersistentLRUCache.LOG.debug("Loading double array the key " + key + " from the disk."); + double [] ret; + try (ObjectInputStream is = new ObjectInputStream(new FileInputStream(cache.getFilePath(key)))) { + int size = is.readInt(); + cache.ensureCapacity(size*Double.BYTES); + ret = new double[size]; + for(int i = 0; i < size; i++) { + ret[i] = is.readDouble(); + } + } + return new DataWrapper(key, ret, cache); + } + + static DataWrapper loadFloatArr(String key, PersistentLRUCache cache) throws FileNotFoundException, IOException { + if(cache.isInReadOnlyMode) + throw new IOException("Read-only mode is only supported for MatrixBlock."); + if(PersistentLRUCache.LOG.isDebugEnabled()) + PersistentLRUCache.LOG.debug("Loading float array the key " + key + " from the disk."); + float [] ret; + try (ObjectInputStream is = new ObjectInputStream(new FileInputStream(cache.getFilePath(key)))) { + int size = is.readInt(); + cache.ensureCapacity(size*Float.BYTES); + ret = new float[size]; + for(int i = 0; i < size; i++) { + ret[i] = is.readFloat(); + } + } + return new DataWrapper(key, ret, cache); + } + + static DataWrapper loadMatrixBlock(String key, + PersistentLRUCache cache, long rlen, long clen, long nnz) throws FileNotFoundException, IOException { + if(PersistentLRUCache.LOG.isDebugEnabled()) + PersistentLRUCache.LOG.debug("Loading matrix block array the key " + key + " from the disk."); + MatrixBlock ret = null; + if(cache.isInReadOnlyMode) { + // Read from the filesystem in the read-only mode assuming binary-blocked format. + // TODO: Read the meta-data file and remove the format requirement. + ret = DataConverter.readMatrixFromHDFS(key, + org.apache.sysml.runtime.matrix.data.InputInfo.BinaryBlockInputInfo, rlen, clen, + ConfigurationManager.getBlocksize(), ConfigurationManager.getBlocksize(), nnz, + new org.apache.sysml.runtime.io.FileFormatProperties()); + } + else { + try (FastBufferedDataInputStream is = new FastBufferedDataInputStream(new ObjectInputStream(new FileInputStream(cache.getFilePath(key))))) { + long size = is.readLong(); + cache.ensureCapacity(size); + ret = new MatrixBlock(); + ret.readFields(is); + } + } + return new DataWrapper(key, ret, cache); + } + + void remove() { + File file = new File(_cache.getFilePath(_key)); + if(file.exists()) { + file.delete(); + } + } + + long getSize() { + if(_dArr != null) + return _dArr.length*Double.BYTES; + else if(_fArr != null) + return _fArr.length*Float.BYTES; + else if(_fArr != null) + return _mb.getInMemorySize(); + else + throw new DMLRuntimeException("Not implemented"); + } + +} + +// Internal helper class +class ValueWrapper { + final Object _lock; + private SoftReference<DataWrapper> _ref; + long _rlen; + long _clen; + long _nnz; + + ValueWrapper(DataWrapper _data) { + _lock = new Object(); + _ref = new SoftReference<>(_data); + if(_data._mb != null) { + _rlen = _data._mb.getNumRows(); + _clen = _data._mb.getNumColumns(); + _nnz = _data._mb.getNonZeros(); + } + } + void update(DataWrapper _data) { + _ref = new SoftReference<>(_data); + if(_data._mb != null) { + _rlen = _data._mb.getNumRows(); + _clen = _data._mb.getNumColumns(); + _nnz = _data._mb.getNonZeros(); + } + } + boolean isAvailable() { + return _ref.get() != null; + } + DataWrapper get() { + return _ref.get(); + } + long getSize() { + DataWrapper data = _ref.get(); + if(data != null) + return data.getSize(); + else + return 0; + } + void remove() { + DataWrapper data = _ref.get(); + if(data != null) { + data.remove(); + } + } +} +