This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 945fd9e7c6 [SYSTEMDS-3567] Cost-based eviction from GPU lineage cache
945fd9e7c6 is described below
commit 945fd9e7c643de6d1f01cedb413ab79446507d54
Author: Arnab Phani <[email protected]>
AuthorDate: Sat Oct 7 21:23:42 2023 +0200
[SYSTEMDS-3567] Cost-based eviction from GPU lineage cache
This patch extends the unified GPU memory manager with a
cost-based eviction policy, which defines the order of
intermediates to be recycled in each free list. The score
has two components: timestamp and DAG height. Size is
irrelevant for GPU as all the entries in a free list
have the same size. Cache hits and misses increase the
score by updating the last access timestamp.
Closes #1920
---
.../java/org/apache/sysds/hops/OptimizerUtils.java | 2 +-
.../sysds/lops/rewrite/RewriteAddPrefetchLop.java | 17 +-
.../instructions/cp/ComputationCPInstruction.java | 10 ++
.../instructions/gpu/context/GPUContextPool.java | 3 -
.../instructions/gpu/context/GPUMemoryManager.java | 6 +-
.../apache/sysds/runtime/lineage/LineageCache.java | 26 +--
.../sysds/runtime/lineage/LineageCacheConfig.java | 19 +-
.../sysds/runtime/lineage/LineageCacheEntry.java | 32 +++-
.../runtime/lineage/LineageCacheStatistics.java | 9 +
.../runtime/lineage/LineageGPUCacheEviction.java | 196 +++++++++------------
.../sysds/runtime/lineage/LineageItemUtils.java | 10 +-
.../java/org/apache/sysds/utils/Statistics.java | 2 +-
.../lineage/GPULineageCacheEvictionTest.java | 3 +
.../functions/lineage/GPUCacheEviction1.dml | 14 +-
.../functions/lineage/GPUCacheEviction3.dml | 71 ++++----
15 files changed, 230 insertions(+), 190 deletions(-)
diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
index 8da0ff110d..a4abc513c6 100644
--- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
@@ -308,7 +308,7 @@ public class OptimizerUtils
/**
* Rule-based operator placement policy for GPU.
*/
- public static boolean RULE_BASED_GPU_EXEC = true;
+ public static boolean RULE_BASED_GPU_EXEC = false;
//////////////////////
// Optimizer levels //
diff --git
a/src/main/java/org/apache/sysds/lops/rewrite/RewriteAddPrefetchLop.java
b/src/main/java/org/apache/sysds/lops/rewrite/RewriteAddPrefetchLop.java
index 4567e88d52..09458cd51a 100644
--- a/src/main/java/org/apache/sysds/lops/rewrite/RewriteAddPrefetchLop.java
+++ b/src/main/java/org/apache/sysds/lops/rewrite/RewriteAddPrefetchLop.java
@@ -59,12 +59,12 @@ public class RewriteAddPrefetchLop extends LopRewriteRule
return List.of(sb);
ArrayList<Lop> nodesWithPrefetch = new ArrayList<>();
- //Find the Spark nodes with all CP outputs
+ //Find the Spark/GPU nodes with all CP outputs
for (Lop l : lops) {
nodesWithPrefetch.add(l);
if (isPrefetchNeeded(l)) {
List<Lop> oldOuts = new
ArrayList<>(l.getOutputs());
- //Construct a Prefetch lop that takes this
Spark node as a input
+ //Construct a Prefetch lop that takes this
Spark/GPU node as an input
UnaryCP prefetch = new UnaryCP(l,
Types.OpOp1.PREFETCH, l.getDataType(), l.getValueType(), Types.ExecType.CP);
prefetch.setAsynchronous(true);
//Reset asynchronous flag for the input if
already set (e.g. mapmm -> prefetch)
@@ -126,7 +126,18 @@ public class RewriteAddPrefetchLop extends LopRewriteRule
private boolean isPrefetchFromGPUNeeded(Lop lop) {
// Prefetch a GPU intermediate if all the outputs are CP.
- return lop.getDataType() == Types.DataType.MATRIX
+ boolean gpuOP = lop.getDataType() == Types.DataType.MATRIX
&& lop.isExecGPU() && lop.isAllOutputsCP();
+
+ // Exclude List consumers. List is just a metadata handle.
+ boolean anyOutputList = lop.getOutputs().stream()
+ .anyMatch(out -> out.getDataType() ==
Types.DataType.LIST);
+
+ //FIXME: Rewire _inputParams when needed (e.g. Replace)
+ boolean hasParameterizedOut = lop.getOutputs().stream()
+ .anyMatch(out -> ((out instanceof ParameterizedBuiltin)
+ || (out instanceof GroupedAggregate)
+ || (out instanceof GroupedAggregateM)));
+ return gpuOP && !hasParameterizedOut && !anyOutputList;
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ComputationCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ComputationCPInstruction.java
index 76c1617229..de45036991 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/ComputationCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/ComputationCPInstruction.java
@@ -67,6 +67,16 @@ public abstract class ComputationCPInstruction extends
CPInstruction implements
return new CPOperand[]{input1, input2, input3};
}
+ public boolean hasFrameInput() {
+ if (input1 != null && input1.isFrame())
+ return true;
+ if (input2 != null && input2.isFrame())
+ return true;
+ if (input3 != null && input3.isFrame())
+ return true;
+ return false;
+ }
+
protected boolean checkGuardedRepresentationChange( MatrixBlock in1,
MatrixBlock out ) {
return checkGuardedRepresentationChange(in1, null, out);
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUContextPool.java
b/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUContextPool.java
index 9437e71566..aa9bb8563a 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUContextPool.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUContextPool.java
@@ -153,9 +153,6 @@ public class GPUContextPool {
//LOG.debug("Active CUDA device number : " + device[0]);
//LOG.debug("Max Blocks/Threads/SharedMem on active device: " +
maxBlocks + "/" + maxThreadsPerBlock + "/" + sharedMemPerBlock);
GPUStatistics.cudaInitTime = System.nanoTime() - start;
-
- // Initialize the maximum size of the lineage cache in the GPU
(30% of initial GPU memory)
- LineageGPUCacheEviction.setGPULineageCacheLimit();
}
/**
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUMemoryManager.java
b/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUMemoryManager.java
index 4999d8c4d6..26efd68162 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUMemoryManager.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/gpu/context/GPUMemoryManager.java
@@ -258,7 +258,7 @@ public class GPUMemoryManager {
// Step 1: First try reusing exact match in rmvarGPUPointers to
avoid holes in the GPU memory
Pointer A = lazyCudaFreeMemoryManager.getRmvarPointer(opcode,
size);
-
+
Pointer tmpA = (A == null) ? new Pointer() : null;
// Step 2: Allocate a new pointer in the GPU memory (since
memory is available)
// Step 3 has potential to create holes as well as limit future
reuse, hence perform this step before step 3.
@@ -297,7 +297,7 @@ public class GPUMemoryManager {
if (le != null) {
if(!LineageCacheConfig.GPU2HOSTEVICTION) {
A = le.getGPUPointer(); //recycle
-
LineageGPUCacheEviction.removeFromDeviceCache(le, le.getGPUPointer(), true);
+
//LineageGPUCacheEviction.removeFromDeviceCache(le, le.getGPUPointer(), true);
if (DMLScript.STATISTICS)
LineageCacheStatistics.incrementGpuRecycle();
}
@@ -327,7 +327,7 @@ public class GPUMemoryManager {
if(le != null) {
freedSize +=
getSizeAllocatedGPUPointer(le.getGPUPointer());
if(!LineageCacheConfig.GPU2HOSTEVICTION) {
-
LineageGPUCacheEviction.removeFromDeviceCache(le, le.getGPUPointer(), true);
+
//LineageGPUCacheEviction.removeFromDeviceCache(le, le.getGPUPointer(), true);
guardedCudaFree(le.getGPUPointer()); //free
if (DMLScript.STATISTICS)
LineageCacheStatistics.incrementGpuDel();
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index 7aacfb5151..e9529213f2 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -78,7 +78,6 @@ public class LineageCache
static {
LineageCacheEviction.setCacheLimit(LineageCacheConfig.CPU_CACHE_FRAC); //5%
LineageCacheEviction.setStartTimestamp();
- LineageGPUCacheEviction.setStartTimestamp();
// Note: GPU cache initialization is done in
GPUContextPool:initializeGPU()
}
@@ -204,6 +203,8 @@ public class LineageCache
ec.getMatrixObject(outName).updateDataCharacteristics(e.getDataCharacteristics());
//Increment the live count for
this pointer
LineageGPUCacheEviction.incrementLiveCount(gpuPtr);
+ //Maintain the eviction list in
the free list
+
LineageGPUCacheEviction.maintainOrder(e);
}
//Replace the live lineage trace with
the cached one (if not parfor, dedup)
ec.replaceLineageItem(outName, e._key);
@@ -325,6 +326,8 @@ public class LineageCache
case GPUCACHED:
//Increment the live
count for this pointer
LineageGPUCacheEviction.incrementLiveCount(e.getGPUPointer());
+ //Maintain the eviction
list in the free list
+
LineageGPUCacheEviction.maintainOrder(e);
if
(DMLScript.STATISTICS) LineageCacheStatistics.incrementGpuHits();
break;
default:
@@ -762,17 +765,19 @@ public class LineageCache
removePlaceholder(instLI);
return;
}
+ if(DMLScript.STATISTICS &&
LineageCacheEviction._removelist.containsKey(centry._key))
+ LineageCacheStatistics.incrementDelHitsGpu();
switch(centry.getCacheStatus()) {
case EMPTY: //first hit
// Set the GPUOject in the cache. Will
be garbage collected
-
centry.setGPUValue(gpuObj.getDensePointer(), gpuObj.getAllocatedSize(),
-
gpuObj.getMatrixObject().getMetaData(), computetime);
-
centry.setCacheStatus(LineageCacheStatus.TOCACHEGPU);
- break;
+ if
(!LineageCacheEviction._removelist.containsKey(centry._key)) {
+ // Cache right away if removed
before
+
centry.setGPUValue(gpuObj.getDensePointer(), gpuObj.getAllocatedSize(),
+
gpuObj.getMatrixObject().getMetaData(), computetime);
+
centry.setCacheStatus(LineageCacheStatus.TOCACHEGPU);
+ break;
+ }
case TOCACHEGPU: //second hit
- // Update the total size of lineage
cached gpu objects
- // The eviction is handled by the
unified gpu memory manager
-
LineageGPUCacheEviction.updateSize(gpuObj.getAllocatedSize(), true);
// Set the GPUOject in the cache and
update the status
centry.setGPUValue(gpuObj.getDensePointer(), gpuObj.getAllocatedSize(),
gpuObj.getMatrixObject().getMetaData(), computetime);
@@ -1099,7 +1104,7 @@ public class LineageCache
// Maintain order for eviction
if (e.isRDDPersist())
LineageSparkCacheEviction.maintainOrder(e);
- else
+ else if (!e.isGPUObject())
LineageCacheEviction.getEntry(e);
return e;
}
@@ -1115,7 +1120,8 @@ public class LineageCache
// Entries with RDDs are cached twice. First hit is GCed,
// Second hit saves the child RDDs
if (LineageCache.probe(probeItem)) {
- LineageCacheEntry oe = getIntern(probeItem);
+ //LineageCacheEntry oe = getIntern(probeItem);
+ LineageCacheEntry oe = _cache.get(probeItem);
LineageCacheEntry e = _cache.get(item);
boolean exists = !e.isNullVal();
e.copyValueFrom(oe, computetime);
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index d5265f33e8..67eeed9481 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -99,7 +99,6 @@ public class LineageCacheConfig
}
protected static final double CPU_CACHE_FRAC = 0.05; // 5% of JVM heap
size
- protected static final double GPU_CACHE_MAX = 0.30; // 30% of gpu memory
private static ReuseCacheType _cacheType = null;
private static CachedItemHead _itemH = null;
private static CachedItemTail _itemT = null;
@@ -144,8 +143,6 @@ public class LineageCacheConfig
// Weights for scoring components (computeTime/size, LRU timestamp, DAG
height)
protected static double[] WEIGHTS = {1, 0, 0};
public static boolean GPU2HOSTEVICTION = false;
- public static boolean CONCURRENTGPUEVICTION = false;
- public static volatile boolean STOPBACKGROUNDEVICTION = false;
protected enum LineageCacheStatus {
EMPTY, //Placeholder with no data. Cannot be evicted.
@@ -209,6 +206,14 @@ public class LineageCacheConfig
return ret;
};
+ protected static Comparator<LineageCacheEntry>
LineageGPUCacheComparator = (e1, e2) -> {
+ if (e1._key.getId() == e2._key.getId())
+ return 0;
+ if (e1.score == e2.score)
+ return Long.compare(e1._key.getId(), e2._key.getId());
+ else
+ return e1.score < e2.score ? -1 : 1;
+ };
//-------------SPARK OPERATION RELATED CONFIGURATIONS--------------//
@@ -362,14 +367,6 @@ public class LineageCacheConfig
&& _cacheType.isMultilevelReuse();
}
- public static CachedItemHead getCachedItemHead() {
- return _itemH;
- }
-
- public static CachedItemTail getCachedItemTail() {
- return _itemT;
- }
-
public static boolean getCompAssRW() {
return _compilerAssistedRW;
}
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
index a603111031..e019f9810d 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheEntry.java
@@ -342,6 +342,30 @@ public class LineageCacheEntry {
int computeGroup =
LineageCacheConfig.getComputeGroup(_key.getOpcode());
int refCount = Math.max(_rddObject.getMaxReferenceCount(), 1);
score = w1*(((double)computeGroup*refCount)/estimatedSize) +
w2*getTimestamp() + w3*(((double)1)/getDagHeight());
+ // Update score to emulate computeTime scaling by #misses
+ if (removeList.containsKey(_key) &&
LineageCacheConfig.isCostNsize()) {
+ int missCount = 1 + removeList.get(_key);
+ score = score + (w1*(((double) computeGroup * refCount)
/ estimatedSize) * missCount);
+ }
+ }
+
+ protected synchronized void initiateScoreGPU(Map<LineageItem, Integer>
removeList) {
+ // Set timestamp
+ _timestamp = System.currentTimeMillis() -
LineageCacheEviction.getStartTimestamp();
+ if (_timestamp < 0)
+ throw new DMLRuntimeException ("Execution timestamp
shouldn't be -ve. Key: "+_key);
+ // Weights for scoring components in GPU
+ double w1 = 0;
+ double w2 = 1;
+ double w3 = 1;
+ // Generate initial score
+ score = w2*getTimestamp() + w3*(((double)1)/getDagHeight());
+ // TODO: timestamp >> DAg_height. Normalize timestamp and DAG
height.
+ // Update score to emulate computeTime scaling by #misses
+ if (removeList.containsKey(_key)) {
+ int missCount = 1 + removeList.get(_key);
+ score = score + ((w2*getTimestamp() +
w3*(((double)1)/getDagHeight())) * missCount);
+ }
}
protected synchronized void updateScore(boolean add) {
@@ -349,12 +373,14 @@ public class LineageCacheEntry {
double w1 = LineageCacheConfig.WEIGHTS[0];
long size = getSize();
int sign = add ? 1: -1;
- if(isLocalObject())
+ if (isLocalObject())
score = score + sign * w1 * (((double) _computeTime) /
size);
- if(isRDDPersist() && size != 0) { //size == 0 means not
persisted yet
+ if (isRDDPersist() && size != 0) { //size == 0 means not
persisted yet
int computeGroup =
LineageCacheConfig.getComputeGroup(_key.getOpcode());
score = score + sign * w1 * (((double) computeGroup) /
size);
}
+ if (isGPUObject())
+ score = score + sign * (getTimestamp() +
((double)1)/getDagHeight());
}
protected synchronized long getTimestamp() {
@@ -383,6 +409,8 @@ public class LineageCacheEntry {
int refCount =
Math.max(_rddObject.getMaxReferenceCount(), 1);
score = w1*(((double)computeGroup*refCount)/size) +
w2*getTimestamp() + w3*(((double)1)/getDagHeight());
}
+ if (isGPUObject())
+ score = getTimestamp() + (((double)1)/getDagHeight());
}
static class GPUPointer {
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
index be7460f6fd..ffc7e5eeff 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheStatistics.java
@@ -48,6 +48,7 @@ public class LineageCacheStatistics {
private static final LongAdder _numSyncEvictGpu = new LongAdder();
private static final LongAdder _numRecycleGpu = new LongAdder();
private static final LongAdder _numDelGpu = new LongAdder();
+ private static final LongAdder _numHitsDelGpu = new LongAdder();
private static final LongAdder _evtimeGpu = new LongAdder();
// Below entries are specific to Spark instructions
private static final LongAdder _numHitsRdd = new LongAdder();
@@ -78,6 +79,7 @@ public class LineageCacheStatistics {
_numSyncEvictGpu.reset();
_numRecycleGpu.reset();
_numDelGpu.reset();
+ _numHitsDelGpu.reset();
_numHitsRdd.reset();
_numHitsSparkActions.reset();
_numHitsRddPersist.reset();
@@ -230,6 +232,11 @@ public class LineageCacheStatistics {
_numDelGpu.increment();
}
+ public static void incrementDelHitsGpu() {
+ // Number of hits on pointers that are deleted/recycled before
+ _numHitsDelGpu.increment();
+ }
+
public static void incrementEvictTimeGpu(long delta) {
// Total time spent on evicting from GPU to main memory or
deleting from GPU lineage cache
_evtimeGpu.add(delta);
@@ -327,6 +334,8 @@ public class LineageCacheStatistics {
sb.append(_numRecycleGpu.longValue());
sb.append("/");
sb.append(_numDelGpu.longValue());
+ sb.append("/");
+ sb.append(_numHitsDelGpu.longValue());
return sb.toString();
}
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageGPUCacheEviction.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageGPUCacheEviction.java
index 60d5e00a9e..8d2072adc5 100644
---
a/src/main/java/org/apache/sysds/runtime/lineage/LineageGPUCacheEviction.java
+++
b/src/main/java/org/apache/sysds/runtime/lineage/LineageGPUCacheEviction.java
@@ -32,7 +32,6 @@ import java.util.stream.Collectors;
import jcuda.Pointer;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.instructions.gpu.context.GPUContext;
-import org.apache.sysds.runtime.instructions.gpu.context.GPUContextPool;
import org.apache.sysds.runtime.matrix.data.LibMatrixCUDA;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.meta.DataCharacteristics;
@@ -41,110 +40,97 @@ import static
org.apache.sysds.runtime.instructions.gpu.context.GPUObject.toIntE
public class LineageGPUCacheEviction
{
- private static long _currentCacheSize = 0;
- private static long GPU_CACHE_LIMIT; //limit in bytes
+ // GPU context for this GPU
private static GPUContext _gpuContext = null;
- private static long _startTimestamp = 0;
+ // Thread for asynchronous copy
public static ExecutorService gpuEvictionThread = null;
-
// Weighted queue of freed pointers.
private static HashMap<Long, TreeSet<LineageCacheEntry>> freeQueues =
new HashMap<>();
-
// Pointers and live counts associated
private static HashMap<Pointer, Integer> livePointers = new HashMap<>();
-
// All cached pointers mapped to the corresponding lineage cache entries
private static HashMap<Pointer, LineageCacheEntry> GPUCacheEntries =
new HashMap<>();
protected static void resetEviction() {
- _currentCacheSize = 0;
gpuEvictionThread = null;
- //LineageCacheConfig.CONCURRENTGPUEVICTION = false;
freeQueues.clear();
livePointers.clear();
GPUCacheEntries.clear();
}
- public static void setGPUContext(GPUContext gpuCtx) {
- _gpuContext = gpuCtx;
- }
-
- protected static GPUContext getGPUContext() {
- return _gpuContext;
- }
-
- protected static long getPointerSize(Pointer ptr) {
- return
_gpuContext.getMemoryManager().getSizeAllocatedGPUPointer(ptr);
- }
+ //--------------- CACHE & POINTER QUEUE MAINTENANCE --------------//
+ // During reuse, move the reused pointer from free lists to the live
list
protected static void incrementLiveCount(Pointer ptr) {
// Move from free list (if exists) to live list
if(livePointers.merge(ptr, 1, Integer::sum) == 1)
freeQueues.get(getPointerSize(ptr)).remove(GPUCacheEntries.get(ptr));
}
+ // rmVar moves the live pointer to a free list
public static void decrementLiveCount(Pointer ptr) {
// Decrement and move to the free list if the live count
becomes 0
if(livePointers.compute(ptr, (k, v) -> v==1 ? null : v-1) ==
null) {
long size = getPointerSize(ptr);
if (!freeQueues.containsKey(size))
- freeQueues.put(size, new
TreeSet<>(LineageCacheConfig.LineageCacheComparator));
- //FIXME: Multiple entries can point to same
pointer due to multi-level reuse
+ freeQueues.put(size, new
TreeSet<>(LineageCacheConfig.LineageGPUCacheComparator));
freeQueues.get(size).add(GPUCacheEntries.get(ptr));
}
}
+ // Check if this pointer is live
public static boolean probeLiveCachedPointers(Pointer ptr) {
return livePointers.containsKey(ptr);
}
- //---------------- COSTING RELATED METHODS -----------------
-
- /**
- * Set the max constraint for the lineage cache in GPU
- */
- public static void setGPULineageCacheLimit() {
- long available = GPUContextPool.initialGPUMemBudget();
- GPU_CACHE_LIMIT = (long) (available *
LineageCacheConfig.GPU_CACHE_MAX);
- }
- protected static void setStartTimestamp() {
- _startTimestamp = System.currentTimeMillis();
- }
-
- protected static long getStartTimestamp() {
- return _startTimestamp;
- }
-
- private static void adjustD2HTransferSpeed(double sizeByte, double
copyTime) {
- double sizeMB = sizeByte / (1024*1024);
- double newTSpeed = sizeMB / copyTime; //bandwidth (MB/sec) +
java overhead
-
- if (newTSpeed > LineageCacheConfig.D2HMAXBANDWIDTH)
- return; //filter out errorneous measurements (~
>8GB/sec)
- // Perform exponential smoothing.
- double smFactor = 0.5; //smoothing factor
- LineageCacheConfig.D2HCOPYBANDWIDTH = (smFactor * newTSpeed) +
((1-smFactor) * LineageCacheConfig.D2HCOPYBANDWIDTH);
- //System.out.println("size_t: "+sizeMB+ " speed_t: "+newTSpeed
+ " estimate_t+1: "+LineageCacheConfig.D2HCOPYBANDWIDTH);
- }
-
- //--------------- CACHE MAINTENANCE & LOOKUP FUNCTIONS --------------//
-
protected static void addEntry(LineageCacheEntry entry) {
if (entry.isNullVal())
- // Placeholders shouldn't participate in eviction
cycles.
- return;
+ return; // Placeholders shouldn't participate in
eviction cycles.
if (entry.isScalarValue())
throw new DMLRuntimeException ("Scalars are never
stored in GPU. Lineage: "+ entry._key);
- // TODO: Separate removelist, starttimestamp, score and weights
from CPU cache
- entry.computeScore(LineageCacheEviction._removelist);
+ entry.initiateScoreGPU(LineageCacheEviction._removelist);
// The pointer must be live at this moment
livePointers.put(entry.getGPUPointer(), 1);
GPUCacheEntries.put(entry.getGPUPointer(), entry);
}
-
- public static boolean isGPUCacheEmpty() {
- return (freeQueues.isEmpty() && livePointers.isEmpty());
+
+ // MaintainOrder is called on a reuse, which means the pointer is
+ // then moved to the livePointers list and removed from the
+ // free queue. Later, the pointer will be moved to the free
+ // queue in a new position due to the updated score.
+ protected static void maintainOrder (LineageCacheEntry entry) {
+ if (entry.getCacheStatus() !=
LineageCacheConfig.LineageCacheStatus.GPUCACHED)
+ return;
+ // Reset the timestamp to maintain the LRU component of the
scoring function
+ entry.updateTimestamp();
+ // Scale score of the sought entry after every cache hit
+ entry.updateScore(true);
+ }
+
+ protected static void removeSingleEntry(Map<LineageItem,
LineageCacheEntry> cache, LineageCacheEntry e) {
+ cache.remove(e._key);
+ // Maintain miss count to increase the score if the item enters
the cache again
+ LineageCacheEviction._removelist.merge(e._key, 1, Integer::sum);
+ }
+
+ private static void removeEntry(LineageCacheEntry e) {
+ Map<LineageItem, LineageCacheEntry> cache =
LineageCache.getLineageCache();
+ if (e._origItem == null) {
+ // Single entry. Remove.
+ removeSingleEntry(cache, e);
+ return;
+ }
+ // Remove all entries pointing to this pointer
+ LineageCacheEntry tmp = cache.get(e._origItem);
+ while (tmp != null) {
+ removeSingleEntry(cache, tmp);
+ tmp = tmp._nextEntry;
+ }
+ }
+
+ public static void setGPUContext(GPUContext gpuCtx) {
+ _gpuContext = gpuCtx;
}
public static boolean isGPUCacheFreeQEmpty() {
@@ -158,8 +144,15 @@ public class LineageGPUCacheEviction
freeQueues.remove(size); //remove if empty
// Poll the first pointer from the queue
- if (freeList != null && !freeList.isEmpty())
- return freeList.pollFirst();
+ LineageCacheEntry e = null;
+ if (freeList != null && !freeList.isEmpty()) {
+ e = freeList.pollFirst();
+ if (probeLiveCachedPointers(e.getGPUPointer()))
+ throw new DMLRuntimeException("Recycling live
pointer: "+e._key);
+ removeEntry(e);
+ GPUCacheEntries.remove(e.getGPUPointer());
+ return e;
+ }
return null;
}
@@ -180,43 +173,21 @@ public class LineageGPUCacheEviction
return null;
}
- public static LineageCacheEntry peekFirstFreeEntry(long size) {
- return freeQueues.get(size).first();
- }
-
- public static void removeFreeEntry(LineageCacheEntry e) {
- long size = getPointerSize(e.getGPUPointer());
- freeQueues.get(size).remove(e);
- }
-
- //---------------- CACHE SPACE MANAGEMENT METHODS -----------------//
-
- protected static void updateSize(long space, boolean addspace) {
- if (addspace)
- _currentCacheSize += space;
- else
- _currentCacheSize -= space;
- }
-
- protected static boolean isBelowMaxThreshold(long spaceNeeded) {
- return ((spaceNeeded + _currentCacheSize) <= GPU_CACHE_LIMIT);
- }
-
- protected static long getGPUCacheLimit() {
- return GPU_CACHE_LIMIT;
- }
+ //---------------- SPACE MANAGEMENT, DEBUG PRINT & D2H COPY
-----------------//
public static int numPointersCached() {
- return livePointers.size() +
freeQueues.values().stream().mapToInt(TreeSet::size).sum();
+ return
freeQueues.values().stream().mapToInt(TreeSet::size).sum();
}
public static long totalMemoryCached() {
- long totLive = livePointers.keySet().stream()
- .mapToLong(ptr ->
_gpuContext.getMemoryManager().getSizeAllocatedGPUPointer(ptr)).sum();
long totFree = 0;
for (Map.Entry<Long, TreeSet<LineageCacheEntry>> entry :
freeQueues.entrySet())
totFree += entry.getKey() * entry.getValue().size();
- return totLive + totFree;
+ return totFree;
+ }
+
+ protected static long getPointerSize(Pointer ptr) {
+ return
_gpuContext.getMemoryManager().getSizeAllocatedGPUPointer(ptr);
}
public static Set<Pointer> getAllCachedPointers() {
@@ -224,7 +195,6 @@ public class LineageGPUCacheEviction
for (Map.Entry<Long, TreeSet<LineageCacheEntry>> entry :
freeQueues.entrySet())
cachedPointers.addAll(entry.getValue().stream()
.map(LineageCacheEntry::getGPUPointer).collect(Collectors.toSet()));
- cachedPointers.addAll(livePointers.keySet());
return cachedPointers;
}
@@ -255,38 +225,38 @@ public class LineageGPUCacheEviction
return ptr;
}
+ private static void adjustD2HTransferSpeed(double sizeByte, double
copyTime) {
+ double sizeMB = sizeByte / (1024*1024);
+ double newTSpeed = sizeMB / copyTime; //bandwidth (MB/sec) +
java overhead
+
+ if (newTSpeed > LineageCacheConfig.D2HMAXBANDWIDTH)
+ return; //filter out errorneous measurements (~
>8GB/sec)
+ // Perform exponential smoothing.
+ double smFactor = 0.5; //smoothing factor
+ LineageCacheConfig.D2HCOPYBANDWIDTH = (smFactor * newTSpeed) +
((1-smFactor) * LineageCacheConfig.D2HCOPYBANDWIDTH);
+ //System.out.println("size_t: "+sizeMB+ " speed_t: "+newTSpeed
+ " estimate_t+1: "+LineageCacheConfig.D2HCOPYBANDWIDTH);
+ }
+
private static MatrixBlock pointerToMatrixBlock(LineageCacheEntry le) {
MatrixBlock ret = null;
DataCharacteristics dc = le.getDataCharacteristics();
- if (le.isDensePointer()) {
- ret = new MatrixBlock(toIntExact(dc.getRows()),
toIntExact(dc.getCols()), false);
- ret.allocateDenseBlock();
- // copy to the host
-
LibMatrixCUDA.cudaSupportFunctions.deviceToHost(getGPUContext(),
- le.getGPUPointer(), ret.getDenseBlockValues(),
null, true);
- ret.recomputeNonZeros();
- } /*else {
- int rows = toIntExact(dc.getRows());
- int cols = toIntExact(dc.getCols());
- int nnz = toIntExact(le.getGPUPointer().nnz);
- double[] values = new double[nnz];
-
LibMatrixCUDA.cudaSupportFunctions.deviceToHost(getGPUContext(),
le.getGPUPointer().val, values, null, true);
- int[] rowPtr = new int[rows + 1];
- int[] colInd = new int[nnz];
- CSRPointer.copyPtrToHost(le.getGPUPointer(), rows, nnz,
rowPtr, colInd);
- SparseBlockCSR sparseBlock = new SparseBlockCSR(rowPtr,
colInd, values, nnz);
- ret = new MatrixBlock(rows, cols, nnz, sparseBlock);
- }*/
+ if (!le.isDensePointer())
+ throw new DMLRuntimeException ("Sparse pointers should
not be cached in GPU. Lineage: "+ le._key);
+ ret = new MatrixBlock(toIntExact(dc.getRows()),
toIntExact(dc.getCols()), false);
+ ret.allocateDenseBlock();
+ // copy to the host
+ LibMatrixCUDA.cudaSupportFunctions.deviceToHost(_gpuContext,
+ le.getGPUPointer(), ret.getDenseBlockValues(), null,
true);
+ ret.recomputeNonZeros();
//mat.acquireModify(tmp);
//mat.release();
return ret;
}
+ // Hard removal from GPU cache
public static void removeFromDeviceCache(LineageCacheEntry entry,
Pointer ptr, boolean removeFromCache) {
- long size =
_gpuContext.getMemoryManager().getSizeAllocatedGPUPointer(ptr);
if (removeFromCache)
LineageCache.removeEntry(entry._key);
- updateSize(size, false);
GPUCacheEntries.remove(ptr);
}
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java
index 12a87ff2b4..b3c296ac7e 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageItemUtils.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.sysds.runtime.instructions.cp.ComputationCPInstruction;
import org.apache.sysds.runtime.instructions.spark.ComputationSPInstruction;
import org.apache.sysds.runtime.instructions.spark.RandSPInstruction;
import org.apache.sysds.runtime.io.IOUtilFunctions;
@@ -519,11 +520,12 @@ public class LineageItemUtils {
}
// A statement block benefits from reuse if is large enough (>10
instructions) or has
- // Spark instructions. Caching small SBs lead to long chains of
LineageCacheEntries,
- // which in turn leads to reduced evictable entries.
+ // Spark instructions or has input frames. Caching small SBs lead to
long chains of
+ // LineageCacheEntries,which in turn leads to reduced evictable entries.
public static boolean hasValidInsts(ArrayList<Instruction> insts) {
int count = 0;
boolean hasSPInst = false;
+ boolean hasFrameInput = false;
for (Instruction ins : insts) {
if (ins instanceof VariableCPInstruction)
continue;
@@ -531,8 +533,10 @@ public class LineageItemUtils {
if ((ins instanceof ComputationSPInstruction &&
!ins.getOpcode().equals("chkpoint"))
|| ins.getOpcode().equals("prefetch"))
hasSPInst = true;
+ if (ins instanceof ComputationCPInstruction &&
((ComputationCPInstruction) ins).hasFrameInput())
+ hasFrameInput = true;
}
- return count >= 10 || hasSPInst;
+ return count >= 10 || hasSPInst || hasFrameInput;
}
public static void addAllDataLineage(ExecutionContext ec) {
diff --git a/src/main/java/org/apache/sysds/utils/Statistics.java
b/src/main/java/org/apache/sysds/utils/Statistics.java
index 01c9682d90..2fbb4edb8a 100644
--- a/src/main/java/org/apache/sysds/utils/Statistics.java
+++ b/src/main/java/org/apache/sysds/utils/Statistics.java
@@ -640,7 +640,7 @@ public class Statistics
sb.append("LinCache MultiLevel (Ins/SB/Fn):" +
LineageCacheStatistics.displayMultiLevelHits() + ".\n");
if (LineageCacheStatistics.ifGpuStats()) {
sb.append("LinCache GPU (Hit/PF): \t" +
LineageCacheStatistics.displayGpuStats() + ".\n");
- sb.append("LinCache GPU (Recyc/Del):
\t" + LineageCacheStatistics.displayGpuPointerStats() + ".\n");
+ sb.append("LinCache GPU
(Recyc/Del/Miss): \t" + LineageCacheStatistics.displayGpuPointerStats() +
".\n");
sb.append("LinCache GPU evict time: \t"
+ LineageCacheStatistics.displayGpuEvictTime() + " sec.\n");
}
if (LineageCacheStatistics.ifSparkStats()) {
diff --git
a/src/test/java/org/apache/sysds/test/functions/lineage/GPULineageCacheEvictionTest.java
b/src/test/java/org/apache/sysds/test/functions/lineage/GPULineageCacheEvictionTest.java
index a40eee0316..e0a6a726e3 100644
---
a/src/test/java/org/apache/sysds/test/functions/lineage/GPULineageCacheEvictionTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/lineage/GPULineageCacheEvictionTest.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.lineage.Lineage;
import org.apache.sysds.runtime.lineage.LineageCacheConfig;
import org.apache.sysds.runtime.matrix.data.MatrixValue;
@@ -87,6 +88,7 @@ public class GPULineageCacheEvictionTest extends
AutomatedTestBase{
Lineage.resetInternalState();
boolean gpu2Mem = LineageCacheConfig.GPU2HOSTEVICTION;
//LineageCacheConfig.GPU2HOSTEVICTION = true;
+ OptimizerUtils.ASYNC_PREFETCH = true;
//run the test
runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
HashMap<MatrixValue.CellIndex, Double> R_orig =
readDMLMatrixFromOutputDir("R");
@@ -104,6 +106,7 @@ public class GPULineageCacheEvictionTest extends
AutomatedTestBase{
runTest(true, EXCEPTION_NOT_EXPECTED, null, -1);
AutomatedTestBase.TEST_GPU = false;
LineageCacheConfig.GPU2HOSTEVICTION = gpu2Mem;
+ OptimizerUtils.ASYNC_PREFETCH = false;
HashMap<MatrixValue.CellIndex, Double> R_reused =
readDMLMatrixFromOutputDir("R");
//compare results
diff --git a/src/test/scripts/functions/lineage/GPUCacheEviction1.dml
b/src/test/scripts/functions/lineage/GPUCacheEviction1.dml
index 330ef6742c..18a9ead4bc 100644
--- a/src/test/scripts/functions/lineage/GPUCacheEviction1.dml
+++ b/src/test/scripts/functions/lineage/GPUCacheEviction1.dml
@@ -23,16 +23,18 @@ X = rand(rows=10000, cols=1000, seed=42);
y = rand(rows=10000, cols=1000, seed=43);
S = matrix(0, rows=1, cols=3);
+for (j in 1:2) {
X1 = X;
y1 = y;
S1 = 0;
# fill half of the cache
-for (i in 1:20) {
- R = X1 * y1;
- X1 = cbind(X1, rand(rows=10000, cols=1, seed=42));
- y1 = cbind(y1, rand(rows=10000, cols=1, seed=42));
- while(FALSE){}
- S1 = S1 + sum(R);
+ for (i in 1:20) {
+ R = X1 * y1;
+ X1 = cbind(X1, rand(rows=10000, cols=1, seed=42));
+ y1 = cbind(y1, rand(rows=10000, cols=1, seed=42));
+ while(FALSE){}
+ S1 = S1 + sum(R);
+ }
}
S[,1] = S1;
diff --git a/src/test/scripts/functions/lineage/GPUCacheEviction3.dml
b/src/test/scripts/functions/lineage/GPUCacheEviction3.dml
index 818b8b2c0a..dd617928ea 100644
--- a/src/test/scripts/functions/lineage/GPUCacheEviction3.dml
+++ b/src/test/scripts/functions/lineage/GPUCacheEviction3.dml
@@ -18,43 +18,46 @@
# under the License.
#
#-------------------------------------------------------------
-D = rand(rows=25600, cols=784, min=0, max=20, seed=42)
-bs = 128;
-ep = 10;
-iter_ep = ceil(nrow(D)/bs);
-maxiter = ep * iter_ep;
-beg = 1;
-iter = 0;
-i = 1;
-while (iter < maxiter) {
- end = beg + bs - 1;
- if (end>nrow(D))
- end = nrow(D);
- X = D[beg:end,]
+miniBatch = function(Matrix[Double] D) return (Matrix[Double] R) {
+ bs = 2048;
+ ep = 10;
+ iter_ep = ceil(nrow(D)/bs);
+ maxiter = ep * iter_ep;
+ beg = 1;
+ iter = 0;
+ i = 1;
+
+ while (iter < maxiter) {
+ end = beg + bs - 1;
+ if (end>nrow(D))
+ end = nrow(D);
+ X = D[beg:end,]
- # reusable OP across epochs
- X = scale(X, TRUE, TRUE);
- # pollute cache with not reusable OPs
- X = ((X + X) * i - X) / (i+1)
- X = ((X + X) * i - X) / (i+1)
- X = ((X + X) * i - X) / (i+1)
- X = ((X + X) * i - X) / (i+1)
- X = ((X + X) * i - X) / (i+1)
- X = ((X + X) * i - X) / (i+1)
- X = ((X + X) * i - X) / (i+1)
- X = ((X + X) * i - X) / (i+1)
- X = ((X + X) * i - X) / (i+1)
- X = ((X + X) * i - X) / (i+1)
+ # reusable OP across epochs
+ X = scale(X, TRUE, TRUE);
+ # pollute cache with not reusable OPs
+ X = ((X + X) * i - X) / (i+1)
+ X = ((X + X) * i - X) / (i+1)
+ X = ((X + X) * i - X) / (i+1)
+ X = ((X + X) * i - X) / (i+1)
+ X = ((X + X) * i - X) / (i+1)
- iter = iter + 1;
- if (end == nrow(D))
- beg = 1;
- else
- beg = end + 1;
- i = i + 1;
+ iter = iter + 1;
+ if (end == nrow(D))
+ beg = 1;
+ else
+ beg = end + 1;
+ i = i + 1;
+ }
+ R = X;
+}
+
+D = rand(rows=25600, cols=784, min=0, max=20, seed=42)
+for (i in 1:3) {
+ R = miniBatch(D);
+ print(sum(R));
}
-print(sum(X));
-write(X, $1, format="text");
+write(R, $1, format="text");