This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 078f43eba0 [SYSTEMDS-3518] Eviction of lineage-cached RDDs from Spark
storage
078f43eba0 is described below
commit 078f43eba00f938f1c677db79ba611a7e6d30ada
Author: Arnab Phani <[email protected]>
AuthorDate: Sat May 27 23:55:00 2023 +0200
[SYSTEMDS-3518] Eviction of lineage-cached RDDs from Spark storage
This patch extends the lineage cache eviction policies to support RDDs
persisted at the executors.
- We checkpoint a RDD on the second cache hit (reduce cache pollution).
- While checkpointing, we rely on the worst case size estimations and
later update the eviction data structures with actual size once
the RDDs are persisted.
- We split the Spark operators into two groups, one for expensive,
shuffle-based operations, and another for map-based operations.
For the scoring function, we assume the first set is 2x more expensive
- We also track the reference counts of RDDs and use that in the scoring.
More references (many consumers) indicates higher importance.
- We reduce the score by one hit count if we collect a persisted RDD. This
is to evict the intermediates which are cached at multiple locations.
Closes #1834
---
.../context/SparkExecutionContext.java | 32 ++++-
.../spark/ComputationSPInstruction.java | 22 ---
.../instructions/spark/data/LineageObject.java | 9 ++
.../runtime/instructions/spark/data/RDDObject.java | 4 +
.../apache/sysds/runtime/lineage/LineageCache.java | 114 ++++++++++++---
.../sysds/runtime/lineage/LineageCacheConfig.java | 39 +++++-
.../sysds/runtime/lineage/LineageCacheEntry.java | 53 ++++++-
.../runtime/lineage/LineageCacheEviction.java | 2 +-
.../runtime/lineage/LineageCacheStatistics.java | 35 ++++-
.../runtime/lineage/LineageSparkCacheEviction.java | 155 +++++++++++++++++++++
.../sysds/runtime/matrix/data/MatrixBlock.java | 4 +
.../java/org/apache/sysds/utils/Statistics.java | 13 +-
.../functions/async/LineageReuseSparkTest.java | 2 -
13 files changed, 422 insertions(+), 62 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
index 1d4681ec75..ce7b43972d 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
@@ -1695,9 +1695,12 @@ public class SparkExecutionContext extends
ExecutionContext
return jsc.sc().getPersistentRDDs().contains(rddID);
}
- public boolean isRDDCached( int rddID ) {
+ public static boolean isRDDCached( int rddID ) {
+ if (!isSparkContextCreated())
+ return false;
+
+ JavaSparkContext jsc = _spctx;
//check that rdd is marked for caching
- JavaSparkContext jsc = getSparkContext();
if( !jsc.sc().getPersistentRDDs().contains(rddID) ) {
return false;
}
@@ -1710,6 +1713,31 @@ public class SparkExecutionContext extends
ExecutionContext
return false;
}
+ public static long getMemCachedRDDSize(int rddID) {
+ if (!isSparkContextCreated())
+ return 0;
+
+ JavaSparkContext jsc = _spctx;
+ //check that rdd is marked for caching
+ if( !jsc.sc().getPersistentRDDs().contains(rddID) )
+ return 0;
+
+ for (RDDInfo info : jsc.sc().getRDDStorageInfo()) {
+ if (info.id() == rddID && info.isCached())
+ return info.memSize(); //total size summing all
executors
+ }
+ return 0;
+ }
+
+ public static long getStorageSpaceUsed() {
+ //return the sum of the sizes of the cached RDDs in all
executors
+ if (!isSparkContextCreated())
+ return 0;
+
+ JavaSparkContext jsc = _spctx;
+ return
Arrays.stream(jsc.sc().getRDDStorageInfo()).mapToLong(RDDInfo::memSize).sum();
+ }
+
///////////////////////////////////////////
// Spark configuration handling
///////
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ComputationSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ComputationSPInstruction.java
index f55a0b398e..92f21d86c2 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/ComputationSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/ComputationSPInstruction.java
@@ -20,10 +20,7 @@
package org.apache.sysds.runtime.instructions.spark;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.storage.StorageLevel;
import org.apache.sysds.runtime.DMLRuntimeException;
-import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.functionobjects.IndexFunction;
@@ -31,13 +28,9 @@ import org.apache.sysds.runtime.functionobjects.ReduceAll;
import org.apache.sysds.runtime.functionobjects.ReduceCol;
import org.apache.sysds.runtime.functionobjects.ReduceRow;
import org.apache.sysds.runtime.instructions.cp.CPOperand;
-import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
-import org.apache.sysds.runtime.instructions.spark.utils.SparkUtils;
import org.apache.sysds.runtime.lineage.LineageItem;
import org.apache.sysds.runtime.lineage.LineageItemUtils;
import org.apache.sysds.runtime.lineage.LineageTraceable;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.matrix.operators.Operator;
import org.apache.sysds.runtime.meta.DataCharacteristics;
@@ -134,21 +127,6 @@ public abstract class ComputationSPInstruction extends
SPInstruction implements
}
}
- @SuppressWarnings("unchecked")
- public void checkpointRDD(ExecutionContext ec) {
- SparkExecutionContext sec = (SparkExecutionContext)ec;
- CacheableData<?> cd = sec.getCacheableData(output.getName());
- RDDObject inro = cd.getRDDHandle();
- JavaPairRDD<?,?> outrdd =
SparkUtils.copyBinaryBlockMatrix((JavaPairRDD<MatrixIndexes,
MatrixBlock>)inro.getRDD(), false);
- //TODO: remove shallow copying as short-circuit collect is
disabled if locally cached
- outrdd = outrdd.persist((StorageLevel.MEMORY_AND_DISK()));
- RDDObject outro = new RDDObject(outrdd); //create new rdd object
- outro.setCheckpointRDD(true); //mark as checkpointed
- outro.addLineageChild(inro); //keep lineage to
prevent cycles on cleanup
- cd.setRDDHandle(outro);
- sec.setVariable(output.getName(), cd);
- }
-
@Override
public Pair<String, LineageItem> getLineageItem(ExecutionContext ec) {
return Pair.of(output.getName(), new LineageItem(getOpcode(),
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/LineageObject.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/LineageObject.java
index f4b99bb03e..2f9d1bb0bd 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/LineageObject.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/LineageObject.java
@@ -28,6 +28,7 @@ public abstract class LineageObject
{
//basic lineage information
protected int _numRef = -1;
+ protected int _maxNumRef = -1;
protected boolean _lineageCached = false;
protected final List<LineageObject> _childs;
@@ -62,11 +63,19 @@ public abstract class LineageObject
public void incrementNumReferences() {
_numRef++;
+
+ // Maintain the maximum reference count. Higher reference
+ // count indicates higher importance to persist (in lineage
cache)
+ _maxNumRef = Math.max(_numRef, _maxNumRef);
}
public void decrementNumReferences() {
_numRef--;
}
+
+ public int getMaxReferenceCount() {
+ return _maxNumRef;
+ }
public List<LineageObject> getLineageChilds() {
return _childs;
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/RDDObject.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/RDDObject.java
index 04d021b6ff..6ae7ed2061 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/data/RDDObject.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/data/RDDObject.java
@@ -40,6 +40,10 @@ public class RDDObject extends LineageObject
public JavaPairRDD<?,?> getRDD() {
return _rddHandle;
}
+
+ public void setRDD(JavaPairRDD<?,?> rddHandle) {
+ _rddHandle = rddHandle;
+ }
public void setCheckpointRDD( boolean flag ) {
_checkpointed = flag;
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index f007e719b9..a51c0ae9e3 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -22,6 +22,8 @@ package org.apache.sysds.runtime.lineage;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.storage.StorageLevel;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.common.Types.FileFormat;
@@ -146,9 +148,29 @@ public class LineageCache
ec.setScalarOutput(outName, so);
}
else if (e.isRDDPersist()) {
- //Reuse the cached RDD (local
or persisted at the executors)
RDDObject rdd =
e.getRDDObject();
- ((SparkExecutionContext)
ec).setRDDHandleForVariable(outName, rdd);
+ if (rdd == null &&
e.getCacheStatus() == LineageCacheStatus.NOTCACHED)
+ return false; //the
executing thread removed this entry from cache
+
+ //Reuse the cached RDD (local
or persisted at the executors)
+ switch(e.getCacheStatus()) {
+ case TOPERSISTRDD:
+ //Mark for
caching on the second hit
+
persistRDD(inst, e, ec);
+ //Update status
to indicate persisted in the executors
+
e.setCacheStatus(LineageCacheStatus.PERSISTEDRDD);
+ //Even not
persisted, reuse the rdd locally for shuffle operations
+ if
(!LineageCacheConfig.isShuffleOp(inst))
+ return
false;
+
((SparkExecutionContext) ec).setRDDHandleForVariable(outName, rdd);
+ break;
+ case PERSISTEDRDD:
+ //Reuse the
persisted intermediate at the executors
+
((SparkExecutionContext) ec).setRDDHandleForVariable(outName, rdd);
+ break;
+ default:
+ return false;
+ }
}
else { //TODO handle locks on gpu
objects
//Create a GPUObject with the
cached pointer
@@ -424,6 +446,15 @@ public class LineageCache
LineageCacheStatistics.incrementDelHits();
return p;
}
+
+ private static boolean probeRDDDistributed(LineageItem key) {
+ if (!_cache.containsKey(key))
+ return false;
+ LineageCacheEntry e = _cache.get(key);
+ if (!e.isRDDPersist())
+ return false;
+ return
SparkExecutionContext.isRDDCached(e.getRDDObject().getRDD().id());
+ }
//This method is for hard removal of an entry, w/o maintaining eviction
data structures
public static void removeEntry(LineageItem key) {
@@ -431,6 +462,12 @@ public class LineageCache
if (!p) return;
synchronized(_cache) {
LineageCacheEntry e = getEntry(key);
+ if (e.isRDDPersist()) {
+ e.getRDDObject().getRDD().unpersist(false);
+ e.getRDDObject().setCheckpointRDD(false);
+ return;
+ }
+
long size = e.getSize();
if (e._origItem == null)
_cache.remove(e._key);
@@ -443,7 +480,11 @@ public class LineageCache
_cache.remove(tmp._key);
}
}
- LineageCacheEviction.updateSize(size, false);
+
+ if (e.isRDDPersist())
+
LineageSparkCacheEviction.updateSize(e.getSize(), false);
+ else
+ LineageCacheEviction.updateSize(size, false);
}
}
@@ -632,6 +673,10 @@ public class LineageCache
synchronized( _cache ) {
if (!probe(instLI))
return;
+ LineageCacheEntry centry = _cache.get(instLI);
+ // Put in the cache only the first time
+ if (centry.getCacheStatus() != LineageCacheStatus.EMPTY)
+ return;
// Avoid reuse chkpoint, which is unnecessary
if (inst.getOpcode().equalsIgnoreCase("chkpoint")) {
removePlaceholder(instLI);
@@ -652,21 +697,12 @@ public class LineageCache
return;
}
- // Call persist on the output RDD if required
- if (opToPersist)
- ((ComputationSPInstruction)
inst).checkpointRDD(ec);
// Get the RDD handle of the RDD
CacheableData<?> cd =
ec.getCacheableData(((ComputationSPInstruction)inst).output.getName());
RDDObject rddObj = cd.getRDDHandle();
-
- LineageCacheEntry centry = _cache.get(instLI);
- // Set the RDD object in the cache
- // TODO: Make space in the executors
- // TODO: Estimate the actual compute time for this
operation
+ // Set the RDD object in the cache and set the status
to TOPERSISTRDD
rddObj.setLineageCached();
centry.setRDDValue(rddObj, computetime);
- // Maintain order for eviction
- LineageCacheEviction.addEntry(centry);
}
}
@@ -685,6 +721,13 @@ public class LineageCache
synchronized( _cache )
{
+ // If prefetching a persisted rdd, reduce the score of
the persisted rdd by one hit count
+ if (instLI.getOpcode().equals("prefetch") &&
probeRDDDistributed(instLI.getInputs()[0])) {
+ LineageCacheEntry e =
_cache.get(instLI.getInputs()[0]);
+ if (e.getRDDObject().getNumReferences() < 1)
//no other rdd consumer
+ e.updateScore(false);
+ }
+
long computetime = System.nanoTime() - starttime;
// Make space, place data and manage queue
putIntern(instLI, DataType.MATRIX, mb, null,
computetime);
@@ -851,6 +894,7 @@ public class LineageCache
_cache.clear();
LineageCacheEviction.resetEviction();
LineageGPUCacheEviction.resetEviction();
+ LineageSparkCacheEviction.resetEviction();
}
}
@@ -913,7 +957,10 @@ public class LineageCache
LineageCacheStatistics.incrementMemHits();
// Maintain order for eviction
- LineageCacheEviction.getEntry(e);
+ if (e.isRDDPersist())
+ LineageSparkCacheEviction.maintainOrder(e);
+ else
+ LineageCacheEviction.getEntry(e);
return e;
}
else
@@ -985,7 +1032,42 @@ public class LineageCache
else
return true;
}
-
+
+ private static void persistRDD(Instruction inst, LineageCacheEntry
centry, ExecutionContext ec) {
+ boolean opToPersist =
LineageCacheConfig.isReusableRDDType(inst);
+ // Return if the operation is not in the list of instructions
which benefit
+ // from persisting and the local only RDD caching is disabled
+ if (!opToPersist &&
!LineageCacheConfig.ENABLE_LOCAL_ONLY_RDD_CACHING)
+ return;
+
+ if (opToPersist && centry.getCacheStatus() ==
LineageCacheStatus.TOPERSISTRDD) {
+ CacheableData<?> cd =
ec.getCacheableData(((ComputationSPInstruction)inst).output.getName());
+ // Estimate worst case dense size
+ long estimatedSize =
MatrixBlock.estimateSizeInMemory(cd.getDataCharacteristics());
+ // Skip if the entry is bigger than the total storage.
+ if (estimatedSize >
LineageSparkCacheEviction.getSparkStorageLimit())
+ return;
+
+ // Mark the rdd for lazy checkpointing
+ RDDObject rddObj = centry.getRDDObject();
+ JavaPairRDD<?,?> rdd = rddObj.getRDD();
+ rdd = rdd.persist(StorageLevel.MEMORY_AND_DISK());
+ rddObj.setRDD(rdd);
+ rddObj.setCheckpointRDD(true);
+
+ // Make space based on the estimated size
+
if(!LineageSparkCacheEviction.isBelowThreshold(estimatedSize))
+ LineageSparkCacheEviction.makeSpace(_cache,
estimatedSize);
+ LineageSparkCacheEviction.updateSize(estimatedSize,
true);
+ // Maintain order for eviction
+ LineageSparkCacheEviction.addEntry(centry,
estimatedSize);
+
+ // Count number of RDDs marked for caching at the
executors
+ if (DMLScript.STATISTICS)
+ LineageCacheStatistics.incrementRDDPersists();
+ }
+ }
+
@Deprecated
@SuppressWarnings("unused")
private static double getRecomputeEstimate(Instruction inst,
ExecutionContext ec) {
@@ -1210,7 +1292,7 @@ public class LineageCache
LineageCacheStatistics.incrementSavedComputeTime(e._computeTime);
if (e.isGPUObject()) LineageCacheStatistics.incrementGpuHits();
if (e.isRDDPersist()) {
- if (((SparkExecutionContext)
ec).isRDDCached(e.getRDDObject().getRDD().id()))
+ if
(SparkExecutionContext.isRDDCached(e.getRDDObject().getRDD().id()))
LineageCacheStatistics.incrementRDDPersistHits(); //persisted in the executors
else
LineageCacheStatistics.incrementRDDHits();
//only locally cached
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index 0ce6cf3a8e..d0e32570b9 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -39,7 +39,9 @@ import
org.apache.sysds.runtime.instructions.spark.ComputationSPInstruction;
import org.apache.sysds.runtime.instructions.spark.CpmmSPInstruction;
import org.apache.sysds.runtime.instructions.spark.MapmmSPInstruction;
+import java.util.Arrays;
import java.util.Comparator;
+import java.util.stream.Stream;
public class LineageCacheConfig
{
@@ -52,12 +54,23 @@ public class LineageCacheConfig
"^", "uamax", "uark+", "uacmean", "eigen", "ctableexpand",
"replace",
"^2", "uack+", "tak+*", "uacsqk+", "uark+", "n+", "uarimax",
"qsort",
"qpick", "transformapply", "uarmax", "n+", "-*", "castdtm",
"lowertri",
- "mapmm", "cpmm", "rmm", "prefetch", "chkpoint"
- //TODO: Reuse everything.
+ "prefetch", "mapmm"
+ //TODO: Reuse everything.
};
- private static final String[] PERSIST_OPCODES = new String[] {
- "mapmm", "cpmm", "rmm"
+ // Relatively expensive instructions. Most include shuffles.
+ private static final String[] PERSIST_OPCODES1 = new String[] {
+ "cpmm", "rmm", "pmm", "rev", "rshape", "rsort", "+", "-", "*",
+ "/", "%%", "%/%", "1-*", "^", "^2", "*2", "==", "!=", "<", ">",
+ "<=", ">=", "&&", "||", "xor", "max", "min", "rmempty",
"rappend",
+ "gappend", "galignedappend", "rbind", "cbind", "nmin", "nmax",
+ "n+", "ctable", "ucumack+", "ucumac*", "ucumacmin", "ucumacmax",
+ "qsort", "qpick"
+ };
+
+ // Relatively inexpensive instructions.
+ private static final String[] PERSIST_OPCODES2 = new String[] {
+ "mapmm"
};
private static String[] REUSE_OPCODES = new String[] {};
@@ -139,6 +152,8 @@ public class LineageCacheConfig
RELOADED, //Reloaded from disk. Can be evicted.
PINNED, //Pinned to memory. Cannot be evicted.
GPUCACHED, //Points to GPU intermediate
+ PERSISTEDRDD, //Persisted at the Spark executors
+ TOPERSISTRDD, //To be persisted if the instruction reoccur
TOSPILL, //To be spilled lazily
TODELETE; //TO be removed lazily
public boolean canEvict() {
@@ -199,7 +214,8 @@ public class LineageCacheConfig
static {
//setup static configuration parameters
REUSE_OPCODES = OPCODES;
- CHKPOINT_OPCODES = PERSIST_OPCODES;
+ CHKPOINT_OPCODES =
Stream.concat(Arrays.stream(PERSIST_OPCODES1), Arrays.stream(PERSIST_OPCODES2))
+ .toArray(String[]::new);
//setSpill(true);
setCachePolicy(LineageCachePolicy.COSTNSIZE);
setCompAssRW(true);
@@ -223,16 +239,17 @@ public class LineageCacheConfig
|| inst instanceof GPUInstruction
|| inst instanceof ComputationSPInstruction)
&& !(inst instanceof ListIndexingCPInstruction);
- boolean rightop = (ArrayUtils.contains(REUSE_OPCODES,
inst.getOpcode())
+ boolean rightCPOp = (ArrayUtils.contains(REUSE_OPCODES,
inst.getOpcode())
|| (inst.getOpcode().equals("append") &&
isVectorAppend(inst, ec))
|| (inst.getOpcode().startsWith("spoof"))
|| (inst instanceof DataGenCPInstruction) &&
((DataGenCPInstruction) inst).isMatrixCall());
+ boolean rightSPOp = isReusableRDDType(inst);
boolean updateInplace = (inst instanceof
MatrixIndexingCPInstruction)
&&
ec.getMatrixObject(((ComputationCPInstruction)inst).input1).getUpdateType().isInPlace();
updateInplace = updateInplace || ((inst instanceof
BinaryMatrixMatrixCPInstruction)
&& ((BinaryMatrixMatrixCPInstruction)
inst).isInPlace());
boolean federatedOutput = false;
- return insttype && rightop && !updateInplace &&
!federatedOutput;
+ return insttype && (rightCPOp || rightSPOp) && !updateInplace
&& !federatedOutput;
}
private static boolean isVectorAppend(Instruction inst,
ExecutionContext ec) {
@@ -282,6 +299,14 @@ public class LineageCacheConfig
return insttype && rightOp;
}
+ protected static boolean isShuffleOp(Instruction inst) {
+ return ArrayUtils.contains(PERSIST_OPCODES1, inst.getOpcode());
+ }
+
+ protected static int getComputeGroup(String opcode) {
+ return ArrayUtils.contains(PERSIST_OPCODES1, opcode) ? 2 : 1;
+ }
+
public static boolean isOutputFederated(Instruction inst, Data data) {
if (!(inst instanceof ComputationFEDInstruction))
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
index f4d3e24982..81898e1f6d 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
@@ -24,6 +24,7 @@ import java.util.Map;
import jcuda.Pointer;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.instructions.cp.ScalarObject;
import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCacheStatus;
@@ -160,6 +161,9 @@ public class LineageCacheEntry {
size += _SOval.getSize();
if (_gpuPointer != null)
size += _gpuPointer.getPointerSize();
+ if (_rddObject != null)
+ //Return total cached size in the executors
+ size +=
SparkExecutionContext.getMemCachedRDDSize(_rddObject.getRDD().id());
return size;
}
@@ -175,6 +179,10 @@ public class LineageCacheEntry {
return _dt.isScalar() && _rddObject == null && _gpuPointer ==
null;
}
+ public boolean isLocalObject() {
+ return isMatrixValue() || isScalarValue();
+ }
+
public boolean isRDDPersist() {
return _rddObject != null;
}
@@ -226,7 +234,8 @@ public class LineageCacheEntry {
public synchronized void setRDDValue(RDDObject rdd, long computetime) {
_rddObject = rdd;
_computeTime = computetime;
- _status = isNullVal() ? LineageCacheStatus.EMPTY :
LineageCacheStatus.CACHED;
+ //_status = isNullVal() ? LineageCacheStatus.EMPTY :
LineageCacheStatus.CACHED;
+ _status = isNullVal() ? LineageCacheStatus.EMPTY :
LineageCacheStatus.TOPERSISTRDD;
//resume all threads waiting for val
notifyAll();
}
@@ -292,18 +301,41 @@ public class LineageCacheEntry {
// Update score to emulate computeTime scaling by #misses
if (removeList.containsKey(_key) &&
LineageCacheConfig.isCostNsize()) {
- //score = score * (1 + removeList.get(_key));
double w1 = LineageCacheConfig.WEIGHTS[0];
int missCount = 1 + removeList.get(_key);
- score = score + (w1*(((double)_computeTime)/getSize())
* missCount);
+ long size = getSize();
+ if (isLocalObject())
+ score = score +
(w1*(((double)_computeTime)/getSize()) * missCount);
}
}
+
+ protected synchronized void initiateScoreSpark(Map<LineageItem,
Integer> removeList, long estimatedSize) {
+ // Set timestamp
+ _timestamp = System.currentTimeMillis() -
LineageCacheEviction.getStartTimestamp();
+ if (_timestamp < 0)
+ throw new DMLRuntimeException ("Execution timestamp
shouldn't be -ve. Key: "+_key);
+
+ // Gather the weights for scoring components
+ double w1 = LineageCacheConfig.WEIGHTS[0];
+ double w2 = LineageCacheConfig.WEIGHTS[1];
+ double w3 = LineageCacheConfig.WEIGHTS[2];
+ // Generate initial score
+ int computeGroup =
LineageCacheConfig.getComputeGroup(_key.getOpcode());
+ int refCount = Math.max(_rddObject.getMaxReferenceCount(), 1);
+ score = w1*(((double)computeGroup*refCount)/estimatedSize) +
w2*getTimestamp() + w3*(((double)1)/getDagHeight());
+ }
- protected synchronized void updateScore() {
+ protected synchronized void updateScore(boolean add) {
// Update score to emulate computeTime scaling by cache hit
- //score *= 2;
double w1 = LineageCacheConfig.WEIGHTS[0];
- score = score + w1*(((double)_computeTime)/getSize());
+ long size = getSize();
+ int sign = add ? 1: -1;
+ if(isLocalObject())
+ score = score + sign * w1 * (((double) _computeTime) /
size);
+ if(isRDDPersist() && size != 0) { //size == 0 means not
persisted yet
+ int computeGroup =
LineageCacheConfig.getComputeGroup(_key.getOpcode());
+ score = score + sign * w1 * (((double) computeGroup) /
size);
+ }
}
protected synchronized long getTimestamp() {
@@ -324,7 +356,14 @@ public class LineageCacheEntry {
double w2 = LineageCacheConfig.WEIGHTS[1];
double w3 = LineageCacheConfig.WEIGHTS[2];
// Generate scores
- score = w1*(((double)_computeTime)/getSize()) +
w2*getTimestamp() + w3*(((double)1)/getDagHeight());
+ long size = getSize();
+ if (isLocalObject())
+ score = w1*(((double)_computeTime)/size) +
w2*getTimestamp() + w3*(((double)1)/getDagHeight());
+ if (isRDDPersist() && size != 0) { //size == 0 means not
persisted yet
+ int computeGroup =
LineageCacheConfig.getComputeGroup(_key.getOpcode());
+ int refCount =
Math.max(_rddObject.getMaxReferenceCount(), 1);
+ score = w1*(((double)computeGroup*refCount)/size) +
w2*getTimestamp() + w3*(((double)1)/getDagHeight());
+ }
}
static class GPUPointer {
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
index 267440d44a..9d642c8afe 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
@@ -90,7 +90,7 @@ public class LineageCacheEviction
// FIXME: avoid when called from partial reuse methods
if (LineageCacheConfig.isCostNsize()) {
if (weightedQueue.remove(entry)) {
- entry.updateScore();
+ entry.updateScore(true);
weightedQueue.add(entry);
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
index 182e04cfb6..c7bbd6a00d 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
@@ -52,6 +52,8 @@ public class LineageCacheStatistics {
private static final LongAdder _numHitsRdd = new LongAdder();
private static final LongAdder _numHitsSparkActions = new LongAdder();
private static final LongAdder _numHitsRddPersist = new LongAdder();
+ private static final LongAdder _numRddPersist = new LongAdder();
+ private static final LongAdder _numRddUnpersist = new LongAdder();
public static void reset() {
_numHitsMem.reset();
@@ -77,6 +79,8 @@ public class LineageCacheStatistics {
_numHitsRdd.reset();
_numHitsSparkActions.reset();
_numHitsRddPersist.reset();
+ _numRddPersist.reset();
+ _numRddUnpersist.reset();
}
public static void incrementMemHits() {
@@ -241,6 +245,16 @@ public class LineageCacheStatistics {
_numHitsRddPersist.increment();
}
+ public static void incrementRDDPersists() {
+ // Number of RDDs marked for persistence
+ _numRddPersist.increment();
+ }
+
+ public static void incrementRDDUnpersists() {
+ // Number of RDDs unpersisted due the due to memory pressure
+ _numRddUnpersist.increment();
+ }
+
public static String displayHits() {
StringBuilder sb = new StringBuilder();
sb.append(_numHitsMem.longValue());
@@ -316,7 +330,13 @@ public class LineageCacheStatistics {
return sb.toString();
}
- public static String displaySparkStats() {
+ public static boolean ifGpuStats() {
+ return (_numHitsGpu.longValue() + _numAsyncEvictGpu.longValue()
+ + _numSyncEvictGpu.longValue() +
_numRecycleGpu.longValue()
+ + _numDelGpu.longValue() + _evtimeGpu.longValue()) != 0;
+ }
+
+ public static String displaySparkHits() {
StringBuilder sb = new StringBuilder();
sb.append(_numHitsSparkActions.longValue());
sb.append("/");
@@ -325,4 +345,17 @@ public class LineageCacheStatistics {
sb.append(_numHitsRddPersist.longValue());
return sb.toString();
}
+
+ public static String displaySparkPersist() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(_numRddPersist.longValue());
+ sb.append("/");
+ sb.append(_numRddUnpersist.longValue());
+ return sb.toString();
+ }
+
+ public static boolean ifSparkStats() {
+ return (_numHitsSparkActions.longValue() +
_numHitsRdd.longValue()
+ + _numHitsRddPersist.longValue() +
_numRddUnpersist.longValue()) != 0;
+ }
}
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java
new file mode 100644
index 0000000000..a639b69b63
--- /dev/null
+++
b/src/main/java/org/apache/sysds/runtime/lineage/LineageSparkCacheEviction.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.lineage;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.sysds.api.DMLScript;
+import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCacheStatus;
+
+import java.util.Map;
+import java.util.TreeSet;
+
+public class LineageSparkCacheEviction
+{
+ private static long SPARK_STORAGE_LIMIT = 0; //60% (upper limit of
Spark unified memory)
+ private static long _sparkStorageSize = 0; //current size
+ private static TreeSet<LineageCacheEntry> weightedQueue = new
TreeSet<>(LineageCacheConfig.LineageCacheComparator);
+
+ protected static void resetEviction() {
+ _sparkStorageSize = 0;
+ weightedQueue.clear();
+ }
+
+ //--------------- CACHE MAINTENANCE & LOOKUP FUNCTIONS --------------//
+
+ // This method is called at the first cache hit.
+ protected static void addEntry(LineageCacheEntry entry, long
estimatedSize) {
+ if (entry.isNullVal())
+ // Placeholders shouldn't participate in eviction
cycles.
+ return;
+
+ entry.initiateScoreSpark(LineageCacheEviction._removelist,
estimatedSize);
+ weightedQueue.add(entry);
+ }
+
+ protected static void maintainOrder(LineageCacheEntry entry) {
+ // Reset the timestamp to maintain the LRU component of the
scoring function
+ if (LineageCacheConfig.isTimeBased()) {
+ if (weightedQueue.remove(entry)) {
+ entry.updateTimestamp();
+ weightedQueue.add(entry);
+ }
+ }
+ // Scale score of the sought entry after every cache hit
+ // FIXME: avoid when called from partial reuse methods
+ if (LineageCacheConfig.isCostNsize()) {
+ // Exists in weighted queue only if already marked for
persistent
+ if (weightedQueue.remove(entry)) {
+ // Score stays same if not persisted (i.e. size
== 0)
+ entry.updateScore(true);
+ weightedQueue.add(entry);
+ }
+ }
+ }
+
+ protected static void removeSingleEntry(Map<LineageItem,
LineageCacheEntry> cache, LineageCacheEntry e) {
+ // Keep in cache. Just change the status to be persisted on the
next hit
+ e.setCacheStatus(LineageCacheStatus.TOPERSISTRDD);
+ // Mark for lazy unpersisting
+ JavaPairRDD<?,?> rdd = e.getRDDObject().getRDD();
+ rdd.unpersist(false);
+ // Maintain the current size
+ _sparkStorageSize -= e.getSize();
+ // Maintain miss count to increase the score if the item enters
the cache again
+ LineageCacheEviction._removelist.merge(e._key, 1, Integer::sum);
+
+ if (DMLScript.STATISTICS)
+ LineageCacheStatistics.incrementRDDUnpersists();
+ // NOTE: The caller of this method maintains the eviction queue.
+ }
+
+ private static void removeEntry(Map<LineageItem, LineageCacheEntry>
cache, LineageCacheEntry e) {
+ if (e._origItem == null) {
+ // Single entry. Remove.
+ removeSingleEntry(cache, e);
+ return;
+ }
+
+ // Defer the eviction till all the entries with the same
intermediate are evicted.
+ e.setCacheStatus(LineageCacheStatus.TODELETE);
+
+ boolean del = false;
+ LineageCacheEntry tmp = cache.get(e._origItem);
+ while (tmp != null) {
+ if (tmp.getCacheStatus() != LineageCacheStatus.TODELETE)
+ return; //do nothing
+ del |= (tmp.getCacheStatus() ==
LineageCacheStatus.TODELETE);
+ tmp = tmp._nextEntry;
+ }
+ if (del) {
+ tmp = cache.get(e._origItem);
+ while (tmp != null) {
+ removeSingleEntry(cache, tmp);
+ tmp = tmp._nextEntry;
+ }
+ }
+ }
+
+ //---------------- CACHE SPACE MANAGEMENT METHODS -----------------//
+
+ private static void setSparkStorageLimit() {
+ // Set the limit only during the first RDD caching to avoid
context creation
+ if (SPARK_STORAGE_LIMIT == 0)
+ SPARK_STORAGE_LIMIT = (long)
SparkExecutionContext.getDataMemoryBudget(false, true); //FIXME
+ }
+
+ protected static double getSparkStorageLimit() {
+ if (SPARK_STORAGE_LIMIT == 0)
+ setSparkStorageLimit();
+ return SPARK_STORAGE_LIMIT;
+ }
+
+ protected static void updateSize(long space, boolean addspace) {
+ _sparkStorageSize += addspace ? space : -space;
+ // NOTE: this doesn't represent the true size as we maintain
total size based on estimations
+ }
+
+ protected static boolean isBelowThreshold(long estimateSize) {
+ boolean available = (estimateSize + _sparkStorageSize) <=
getSparkStorageLimit();
+ if (!available)
+ // Get exact storage used (including checkpoints from
outside of lineage)
+ _sparkStorageSize =
SparkExecutionContext.getStorageSpaceUsed();
+
+ return (estimateSize + _sparkStorageSize) <=
getSparkStorageLimit();
+ }
+
+ protected static void makeSpace(Map<LineageItem, LineageCacheEntry>
cache, long estimatedSize) {
+ // Cost-based eviction
+ while ((estimatedSize + _sparkStorageSize) >
getSparkStorageLimit()) {
+ LineageCacheEntry e = weightedQueue.pollFirst();
+ if (e == null)
+ // Nothing to evict.
+ break;
+
+ removeEntry(cache, e);
+ }
+ }
+}
diff --git
a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
index 9d79f7f971..f016235231 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
@@ -2564,6 +2564,10 @@ public class MatrixBlock extends MatrixValue implements
CacheBlock<MatrixBlock>,
return estimateSizeDenseInMemory(nrows, ncols);
}
+ public static long estimateSizeInMemory(DataCharacteristics dc) {
+ return estimateSizeInMemory(dc.getRows(), dc.getCols(),
dc.getSparsity());
+ }
+
public long estimateSizeDenseInMemory() {
return estimateSizeDenseInMemory(rlen, clen);
}
diff --git a/src/main/java/org/apache/sysds/utils/Statistics.java
b/src/main/java/org/apache/sysds/utils/Statistics.java
index 02d72fad08..9d931f3ce3 100644
--- a/src/main/java/org/apache/sysds/utils/Statistics.java
+++ b/src/main/java/org/apache/sysds/utils/Statistics.java
@@ -638,10 +638,15 @@ public class Statistics
if (DMLScript.LINEAGE && !ReuseCacheType.isNone()) {
sb.append("LinCache hits (Mem/FS/Del): \t" +
LineageCacheStatistics.displayHits() + ".\n");
sb.append("LinCache MultiLevel (Ins/SB/Fn):" +
LineageCacheStatistics.displayMultiLevelHits() + ".\n");
- sb.append("LinCache GPU (Hit/Async/Sync): \t" +
LineageCacheStatistics.displayGpuStats() + ".\n");
- sb.append("LinCache GPU (Recyc/Del): \t" +
LineageCacheStatistics.displayGpuPointerStats() + ".\n");
- sb.append("LinCache GPU evict time: \t" +
LineageCacheStatistics.displayGpuEvictTime() + " sec.\n");
- sb.append("LinCache Spark (Col/Loc/Dist): \t" +
LineageCacheStatistics.displaySparkStats() + ".\n");
+ if (LineageCacheStatistics.ifGpuStats()) {
+ sb.append("LinCache GPU
(Hit/Async/Sync): \t" + LineageCacheStatistics.displayGpuStats() + ".\n");
+ sb.append("LinCache GPU (Recyc/Del):
\t" + LineageCacheStatistics.displayGpuPointerStats() + ".\n");
+ sb.append("LinCache GPU evict time: \t"
+ LineageCacheStatistics.displayGpuEvictTime() + " sec.\n");
+ }
+ if (LineageCacheStatistics.ifSparkStats()) {
+ sb.append("LinCache Spark
(Col/Loc/Dist): \t" + LineageCacheStatistics.displaySparkHits() + ".\n");
+ sb.append("LinCache Spark (Per/Unper):
\t" + LineageCacheStatistics.displaySparkPersist() + ".\n");
+ }
sb.append("LinCache writes (Mem/FS/Del): \t" +
LineageCacheStatistics.displayWtrites() + ".\n");
sb.append("LinCache FStimes (Rd/Wr): \t" +
LineageCacheStatistics.displayFSTime() + " sec.\n");
sb.append("LinCache Computetime (S/M): \t" +
LineageCacheStatistics.displayComputeTime() + " sec.\n");
diff --git
a/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
b/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
index ad131069c4..68db0caaba 100644
---
a/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
@@ -74,8 +74,6 @@ public class LineageReuseSparkTest extends AutomatedTestBase {
}
public void runTest(String testname, ExecMode execMode, int testId) {
- setOutputBuffering(true);
-
boolean old_simplification =
OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
boolean old_sum_product =
OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES;
boolean old_trans_exec_type =
OptimizerUtils.ALLOW_TRANSITIVE_SPARK_EXEC_TYPE;