This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 0d788591e1 [SYSTEMDS-3621] Adaptive delayed lineage caching
0d788591e1 is described below
commit 0d788591e1dd13e007a751262713287f60db44a8
Author: Arnab Phani <[email protected]>
AuthorDate: Thu Sep 21 15:41:42 2023 +0200
[SYSTEMDS-3621] Adaptive delayed lineage caching
This patch introduces a new feature to control the entry to the
local lineage cache. If enabled, we delay the caching of the
large matrix blocks, which were never cached/evicted before.
To avoid unnecessarily restricting entries, we only delay the
matrices that are larger than 5% of the available memory.
This way we guarantee not delay for larger caches.
Closes #1917
---
.../apache/sysds/runtime/lineage/LineageCache.java | 48 ++++++++++++++--------
.../sysds/runtime/lineage/LineageCacheConfig.java | 18 +++++---
.../sysds/runtime/lineage/LineageCacheEntry.java | 13 ++++++
.../runtime/lineage/LineageCacheEviction.java | 4 ++
.../runtime/lineage/LineageCacheStatistics.java | 2 +-
.../functions/async/LineageReuseSparkTest.java | 4 +-
.../scripts/functions/async/LineageReuseSpark4.dml | 18 --------
.../scripts/functions/async/LineageReuseSpark6.dml | 12 +++---
.../functions/lineage/FunctionFullReuse1.dml | 5 ++-
9 files changed, 73 insertions(+), 51 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index e6e959b130..d05ea234a1 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -140,15 +140,21 @@ public class LineageCache
MatrixBlock mb =
e.getMBValue(); //wait if another thread is executing the same inst.
if (mb == null &&
e.getCacheStatus() == LineageCacheStatus.NOTCACHED)
return false; //the
executing thread removed this entry from cache
- else
-
ec.setMatrixOutput(outName, mb);
+ if (e.getCacheStatus() ==
LineageCacheStatus.TOCACHE) { //not cached yet
+
ec.replaceLineageItem(outName, e._key); //reuse the lineage trace
+ return false;
+ }
+ ec.setMatrixOutput(outName, mb);
}
else if (e.isScalarValue()) {
ScalarObject so =
e.getSOValue(); //wait if another thread is executing the same inst.
if (so == null &&
e.getCacheStatus() == LineageCacheStatus.NOTCACHED)
return false; //the
executing thread removed this entry from cache
- else
-
ec.setScalarOutput(outName, so);
+ if (e.getCacheStatus() ==
LineageCacheStatus.TOCACHE) { //not cached yet
+
ec.replaceLineageItem(outName, e._key); //reuse the lineage trace
+ return false;
+ }
+ ec.setScalarOutput(outName, so);
}
else if (e.isRDDPersist()) {
RDDObject rdd =
e.getRDDObject();
@@ -254,6 +260,8 @@ public class LineageCache
MatrixBlock mb = e.getMBValue();
if (mb == null && e.getCacheStatus() ==
LineageCacheStatus.NOTCACHED)
return false; //the executing
thread removed this entry from cache
+ if (e.getCacheStatus() ==
LineageCacheStatus.TOCACHE) //not cached yet
+ return false;
MetaDataFormat md = new MetaDataFormat(
e.getMBValue().getDataCharacteristics(),FileFormat.BINARY);
md.getDataCharacteristics().setBlocksize(ConfigurationManager.getBlocksize());
@@ -286,6 +294,8 @@ public class LineageCache
boundValue = e.getSOValue();
if (boundValue == null &&
e.getCacheStatus() == LineageCacheStatus.NOTCACHED)
return false; //the executing
thread removed this entry from cache
+ if (e.getCacheStatus() ==
LineageCacheStatus.TOCACHE) //not cached yet
+ return false;
}
//TODO: support reusing RDD output of functions
@@ -514,12 +524,7 @@ public class LineageCache
public static boolean probe(LineageItem key) {
//TODO problematic as after probe the matrix might be kicked
out of cache
- boolean p = _cache.containsKey(key); // in cache or in disk
- if (!p && DMLScript.STATISTICS &&
LineageCacheEviction._removelist.containsKey(key))
- // The sought entry was in cache but removed later
- LineageCacheStatistics.incrementDelHits();
-
- return p;
+ return _cache.containsKey(key);
}
private static boolean probeRDDDistributed(LineageItem key) {
@@ -715,6 +720,16 @@ public class LineageCache
continue;
}
+ //delay caching of large matrix blocks if the
feature is enabled
+ if (centry.getCacheStatus() ==
LineageCacheStatus.EMPTY && LineageCacheConfig.isDelayedCaching()) {
+ if (data instanceof MatrixObject //no
delayed caching for scalars
+ &&
!LineageCacheEviction._removelist.containsKey(centry._key) //evicted before
+ && size > 0.05 *
LineageCacheEviction.getAvailableSpace()) { //size adaptive
+
centry.setCacheStatus(LineageCacheStatus.TOCACHE);
+ continue;
+ }
+ }
+
//make space for the data
if
(!LineageCacheEviction.isBelowThreshold(size))
LineageCacheEviction.makeSpace(_cache,
size);
@@ -725,6 +740,7 @@ public class LineageCache
centry.setValue(mb, computetime);
else if (data instanceof ScalarObject)
centry.setValue((ScalarObject)data,
computetime);
+
centry.setCacheStatus(LineageCacheStatus.CACHED);
if (DMLScript.STATISTICS &&
LineageCacheEviction._removelist.containsKey(centry._key)) {
// Add to missed compute time
@@ -785,8 +801,7 @@ public class LineageCache
}
boolean opToPersist =
LineageCacheConfig.isReusableRDDType(inst);
// Return if the intermediate is not to be persisted in
the executors
- // and the local only RDD caching is disabled
- if (!opToPersist &&
!LineageCacheConfig.ENABLE_LOCAL_ONLY_RDD_CACHING) {
+ if (!opToPersist) {
removePlaceholder(instLI);
return;
}
@@ -1064,8 +1079,6 @@ public class LineageCache
LineageCacheEviction.addEntry(newItem);
_cache.put(key, newItem);
- if (DMLScript.STATISTICS)
- LineageCacheStatistics.incrementMemWrites();
}
private static LineageCacheEntry getIntern(LineageItem key) {
@@ -1105,8 +1118,8 @@ public class LineageCache
LineageCacheEntry e = _cache.get(item);
boolean exists = !e.isNullVal();
e.copyValueFrom(oe, computetime);
- if (e.isNullVal())
- throw new DMLRuntimeException("Lineage Cache:
Original item is empty: "+oe._key);
+ //if (e.isNullVal())
+ // throw new DMLRuntimeException("Lineage Cache:
Original item is empty: "+oe._key);
e._origItem = probeItem;
// Add itself as original item to navigate the list.
@@ -1205,7 +1218,8 @@ public class LineageCache
rdd = rdd.persist(StorageLevel.MEMORY_AND_DISK());
//cut-off RDD lineage & broadcasts to prevent errors on
// task closure serialization with destroyed broadcasts
- rdd.checkpoint();
+ //rdd.checkpoint();
+ rdd.rdd().localCheckpoint();
rddObj.setRDD(rdd);
rddObj.setCheckpointRDD(true);
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index 571d864fc5..d5265f33e8 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -28,6 +28,7 @@ import
org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.instructions.Instruction;
import
org.apache.sysds.runtime.instructions.cp.BinaryMatrixMatrixCPInstruction;
+import
org.apache.sysds.runtime.instructions.cp.BinaryScalarScalarCPInstruction;
import org.apache.sysds.runtime.instructions.cp.ComputationCPInstruction;
import org.apache.sysds.runtime.instructions.cp.Data;
import org.apache.sysds.runtime.instructions.cp.DataGenCPInstruction;
@@ -53,24 +54,24 @@ public class LineageCacheConfig
"uamean", "max", "min", "ifelse", "-", "sqrt", "<", ">",
"uak+", "<=",
"^", "uamax", "uark+", "uacmean", "eigen","ctable",
"ctableexpand", "replace",
"^2", "*2", "uack+", "tak+*", "uacsqk+", "uark+", "n+",
"uarimax", "qsort",
- "qpick", "transformapply", "uarmax", "n+", "-*", "castdtm",
"lowertri",
+ "qpick", "transformapply", "uarmax", "n+", "-*", "castdtm",
"lowertri", "1-*",
"prefetch", "mapmm", "contains", "mmchain", "mapmmchain", "+*",
"==", "rmempty"
//TODO: Reuse everything.
};
// Relatively expensive instructions. Most include shuffles.
private static final String[] PERSIST_OPCODES1 = new String[] {
- "cpmm", "rmm", "pmm", "rev", "rshape", "rsort", "-", "*", "+",
+ "cpmm", "rmm", "pmm", "zipmm", "rev", "rshape", "rsort", "-",
"*", "+",
"/", "%%", "%/%", "1-*", "^", "^2", "*2", "==", "!=", "<", ">",
"<=", ">=", "&&", "||", "xor", "max", "min", "rmempty",
"rappend",
"gappend", "galignedappend", "rbind", "cbind", "nmin", "nmax",
"n+", "ctable", "ucumack+", "ucumac*", "ucumacmin", "ucumacmax",
- "qsort", "qpick", "replace"
+ "qsort", "qpick"
};
// Relatively inexpensive instructions.
private static final String[] PERSIST_OPCODES2 = new String[] {
- "mapmm", "isna", "leftIndex", "rightIndex"
+ "mapmm", "isna", "leftIndex"
};
private static String[] REUSE_OPCODES = new String[] {};
@@ -105,6 +106,7 @@ public class LineageCacheConfig
private static boolean _compilerAssistedRW = false;
private static boolean _onlyEstimate = false;
private static boolean _reuseLineageTraces = true;
+ private static boolean DELAYED_CACHING = false;
//-------------DISK SPILLING RELATED CONFIGURATIONS--------------//
@@ -148,6 +150,7 @@ public class LineageCacheConfig
protected enum LineageCacheStatus {
EMPTY, //Placeholder with no data. Cannot be evicted.
NOTCACHED, //Placeholder removed from the cache
+ TOCACHE, //To be cached in memory if reoccur
CACHED, //General cached data. Can be evicted.
SPILLED, //Data is in disk. Empty value. Cannot be evicted.
RELOADED, //Reloaded from disk. Can be evicted.
@@ -240,7 +243,8 @@ public class LineageCacheConfig
|| inst instanceof ComputationFEDInstruction
|| inst instanceof GPUInstruction
|| inst instanceof ComputationSPInstruction)
- && !(inst instanceof ListIndexingCPInstruction);
+ && !(inst instanceof ListIndexingCPInstruction)
+ && !(inst instanceof BinaryScalarScalarCPInstruction);
boolean rightCPOp = (ArrayUtils.contains(REUSE_OPCODES,
inst.getOpcode())
|| (inst.getOpcode().equals("append") &&
isVectorAppend(inst, ec))
|| (inst.getOpcode().startsWith("spoof"))
@@ -378,6 +382,10 @@ public class LineageCacheConfig
return _reuseLineageTraces;
}
+ public static boolean isDelayedCaching() {
+ return DELAYED_CACHING;
+ }
+
public static void setCachePolicy(LineageCachePolicy policy) {
// TODO: Automatic tuning of weights.
switch(policy) {
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
index cc66b321a0..a603111031 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
@@ -22,6 +22,7 @@ package org.apache.sysds.runtime.lineage;
import java.util.Map;
import jcuda.Pointer;
+import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.DataType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
@@ -208,6 +209,8 @@ public class LineageCacheEntry {
_status = isNullVal() ? LineageCacheStatus.EMPTY :
LineageCacheStatus.CACHED;
//resume all threads waiting for val
notifyAll();
+ if (DMLScript.STATISTICS && val != null)
+ LineageCacheStatistics.incrementMemWrites();
}
public synchronized void setValue(MatrixBlock val) {
@@ -221,6 +224,8 @@ public class LineageCacheEntry {
_status = isNullVal() ? LineageCacheStatus.EMPTY :
LineageCacheStatus.CACHED;
//resume all threads waiting for val
notifyAll();
+ if (DMLScript.STATISTICS && val != null)
+ LineageCacheStatistics.incrementMemWrites();
}
public synchronized void setGPUValue(Pointer ptr, long size, MetaData
md, long computetime) {
@@ -229,6 +234,8 @@ public class LineageCacheEntry {
_status = isNullVal() ? LineageCacheStatus.EMPTY :
LineageCacheStatus.GPUCACHED;
//resume all threads waiting for val
notifyAll();
+ if (DMLScript.STATISTICS && ptr != null)
+ LineageCacheStatistics.incrementMemWrites();
}
public synchronized void setRDDValue(RDDObject rdd, long computetime) {
@@ -238,6 +245,8 @@ public class LineageCacheEntry {
_status = isNullVal() ? LineageCacheStatus.EMPTY :
LineageCacheStatus.TOPERSISTRDD;
//resume all threads waiting for val
notifyAll();
+ if (DMLScript.STATISTICS && rdd != null)
+ LineageCacheStatistics.incrementMemWrites();
}
public synchronized void setRDDValue(RDDObject rdd) {
@@ -245,6 +254,8 @@ public class LineageCacheEntry {
_status = isNullVal() ? LineageCacheStatus.EMPTY :
LineageCacheStatus.TOPERSISTRDD;
//resume all threads waiting for val
notifyAll();
+ if (DMLScript.STATISTICS && rdd != null)
+ LineageCacheStatistics.incrementMemWrites();
}
public synchronized void setValue(byte[] serialBytes, long computetime)
{
@@ -253,6 +264,8 @@ public class LineageCacheEntry {
_status = isNullVal() ? LineageCacheStatus.EMPTY :
LineageCacheStatus.CACHED;
// resume all threads waiting for val
notifyAll();
+ if (DMLScript.STATISTICS && serialBytes != null)
+ LineageCacheStatistics.incrementMemWrites();
}
public synchronized void copyValueFrom(LineageCacheEntry src, long
computetime) {
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
index 04b393f9d8..511ad93ca5 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEviction.java
@@ -198,6 +198,10 @@ public class LineageCacheEviction
public static long getCacheLimit() {
return CACHE_LIMIT;
}
+
+ public static long getAvailableSpace() {
+ return CACHE_LIMIT - _cachesize;
+ }
protected static void updateSize(long space, boolean addspace) {
if (addspace)
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
index fee5b8a0dd..00fd36c378 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
@@ -364,6 +364,6 @@ public class LineageCacheStatistics {
public static boolean ifSparkStats() {
return (_numHitsSparkActions.longValue() +
_numHitsRdd.longValue()
- + _numHitsRddPersist.longValue() +
_numRddUnpersist.longValue()) != 0;
+ + _numHitsRddPersist.longValue() + _numRddPersist.longValue())
!= 0;
}
}
diff --git
a/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
b/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
index 02f6d12465..07aefb83cf 100644
---
a/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
@@ -72,7 +72,7 @@ public class LineageReuseSparkTest extends AutomatedTestBase {
@Test
public void testL2svm() {
- runTest(TEST_NAME+"3", ExecMode.HYBRID,
ReuseCacheType.REUSE_FULL, 3);
+ runTest(TEST_NAME+"3", ExecMode.SPARK,
ReuseCacheType.REUSE_FULL, 3);
}
@Test
@@ -91,12 +91,10 @@ public class LineageReuseSparkTest extends
AutomatedTestBase {
//public void testHyperband() {
// runTest(TEST_NAME+"6", ExecMode.HYBRID,
ReuseCacheType.REUSE_FULL, 6);
//}
-
@Test
public void testBroadcastBug() {
runTest(TEST_NAME+"7", ExecMode.HYBRID,
ReuseCacheType.REUSE_FULL, 7);
}
-
@Test
public void testTopKClean() {
// Multiple cleaning pipelines with real dataset (Nashville
accident)
diff --git a/src/test/scripts/functions/async/LineageReuseSpark4.dml
b/src/test/scripts/functions/async/LineageReuseSpark4.dml
index 18cff0cf34..920dddb87a 100644
--- a/src/test/scripts/functions/async/LineageReuseSpark4.dml
+++ b/src/test/scripts/functions/async/LineageReuseSpark4.dml
@@ -50,24 +50,6 @@ while (lamda < lim)
i = i + 1;
}
-/*[A, b] = SimlinRegDS(X, y);
-A_diag = A + diag(matrix(lamda, rows=N, cols=1));
-beta = solve(A_diag, b);
-R[,1] = beta;
-lamda = lamda + stp;
-
-# Reuse function call
-[A, b] = SimlinRegDS(X, y);
-A_diag = A + diag(matrix(lamda, rows=N, cols=1));
-beta = solve(A_diag, b);
-R[,2] = beta;
-lamda = lamda + stp;
-
-[A, b] = SimlinRegDS(X, y);
-A_diag = A + diag(matrix(lamda, rows=N, cols=1));
-beta = solve(A_diag, b);
-R[,3] = beta;*/
-
R = sum(R);
write(R, $1, format="text");
diff --git a/src/test/scripts/functions/async/LineageReuseSpark6.dml
b/src/test/scripts/functions/async/LineageReuseSpark6.dml
index e1b253a8cf..1c348f8275 100644
--- a/src/test/scripts/functions/async/LineageReuseSpark6.dml
+++ b/src/test/scripts/functions/async/LineageReuseSpark6.dml
@@ -34,13 +34,13 @@ return (Double accuracy) {
M = 10000;
N = 200;
sp = 1.0; #1.0
-no_bracket = 2; #5
+no_bracket = 1; #5
X = rand(rows=M, cols=N, sparsity=sp, seed=42);
y = rand(rows=M, cols=1, min=0, max=2, seed=42);
y = ceil(y);
-no_lamda = 25; #starting combintaions = 25 * 4 = 100 HPs
+no_lamda = 3; #starting combintaions = 25 * 4 = 100 HPs
stp = (0.1 - 0.0001)/no_lamda;
HPlamdas = seq(0.0001, 0.1, stp);
maxIter = 10; #starting interation count = 100 * 10 = 1k
@@ -55,10 +55,10 @@ for (r in 1:no_bracket) {
{
#print("lamda = "+as.scalar(HPlamdas[i,1])+", maxIterations = "+maxIter);
#Run L2svm with intercept true
- beta = l2svm(X=X, Y=y, intercept=TRUE, epsilon=1e-12,
+ /*beta = l2svm(X=X, Y=y, intercept=TRUE, epsilon=1e-12,
reg = as.scalar(HPlamdas[i,1]), maxIterations=maxIter, verbose=FALSE);
svmModels[i,1] = l2norm(X, y, beta); #1st column
- svmModels[i,2:nrow(beta)+1] = t(beta);
+ svmModels[i,2:nrow(beta)+1] = t(beta);*/
#Run L2svm with intercept false
beta = l2svm(X=X, Y=y, intercept=FALSE, epsilon=1e-12,
@@ -67,7 +67,7 @@ for (r in 1:no_bracket) {
svmModels[i,2:nrow(beta)+1] = t(beta);
#Run multilogreg with intercept true
- beta = multiLogReg(X=X, Y=y, icpt=2, tol=1e-6,
reg=as.scalar(HPlamdas[i,1]),
+ /*beta = multiLogReg(X=X, Y=y, icpt=2, tol=1e-6,
reg=as.scalar(HPlamdas[i,1]),
maxi=maxIter, maxii=20, verbose=FALSE);
[prob_mlr, Y_mlr, acc] = multiLogRegPredict(X=X, B=beta, Y=y,
verbose=FALSE);
mlrModels[i,1] = acc; #1st column
@@ -78,7 +78,7 @@ for (r in 1:no_bracket) {
maxi=maxIter, maxii=20, verbose=FALSE);
[prob_mlr, Y_mlr, acc] = multiLogRegPredict(X=X, B=beta, Y=y,
verbose=FALSE);
mlrModels[i,1] = acc; #1st column
- mlrModels[i,2:nrow(beta)+1] = t(beta);
+ mlrModels[i,2:nrow(beta)+1] = t(beta);*/
i = i + 1;
}
diff --git a/src/test/scripts/functions/lineage/FunctionFullReuse1.dml
b/src/test/scripts/functions/lineage/FunctionFullReuse1.dml
index e75f8e033c..599ea6e222 100644
--- a/src/test/scripts/functions/lineage/FunctionFullReuse1.dml
+++ b/src/test/scripts/functions/lineage/FunctionFullReuse1.dml
@@ -36,7 +36,7 @@ c = 10
X = rand(rows=r, cols=c, seed=42);
y = rand(rows=r, cols=1, seed=43);
-R = matrix(0, 1, 2);
+R = matrix(0, 1, 3);
beta1 = SimLM(X, y, 0.0001);
R[,1] = beta1;
@@ -44,4 +44,7 @@ R[,1] = beta1;
beta2 = SimLM(X, y, 0.0001);
R[,2] = beta2;
+beta2 = SimLM(X, y, 0.0001);
+R[,3] = beta2;
+
write(R, $1, format="text");