This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit eb1a69718c9009eae2a6c292407bb8362ecbeaec
Author: Arnab Phani <[email protected]>
AuthorDate: Tue Aug 8 09:26:32 2023 +0200

    [SYSTEMDS-2913] Cache GPU objects on second hit
    
    This patch updates the reuse logic of GPU objects to skip
    the first reference and cache on the second hit. This filters
    out many never-repeating intermediates, which in turns reduces
    GPU memory pressure, allocation and deallocation counts.
    
    Closes #1876
---
 src/main/java/org/apache/sysds/lops/UnaryCP.java   |  8 ++--
 .../apache/sysds/runtime/lineage/LineageCache.java | 55 +++++++++++++++++-----
 .../sysds/runtime/lineage/LineageCacheConfig.java  |  3 +-
 .../functions/async/LineageReuseSparkTest.java     | 25 ++++++----
 .../LineageReuseSpark7.dml}                        | 31 ++++--------
 .../scripts/functions/lineage/LineageReuseGPU3.dml |  9 ++--
 6 files changed, 78 insertions(+), 53 deletions(-)

diff --git a/src/main/java/org/apache/sysds/lops/UnaryCP.java 
b/src/main/java/org/apache/sysds/lops/UnaryCP.java
index 7dd6a30e58..e44c8968e7 100644
--- a/src/main/java/org/apache/sysds/lops/UnaryCP.java
+++ b/src/main/java/org/apache/sysds/lops/UnaryCP.java
@@ -72,10 +72,12 @@ public class UnaryCP extends Lop {
 
        @Override
        public String getInstructions(String input, String output) {
-               return InstructionUtils.concatOperands(
+               String ret =  InstructionUtils.concatOperands(
                        getExecType().name(), getOpCode(),
                        
getInputs().get(0).prepScalarInputOperand(getExecType()),
-                       prepOutputOperand(output),
-                       Integer.toString(_numThreads));
+                       prepOutputOperand(output));
+               if (getExecType() == ExecType.CP || getExecType() == 
ExecType.FED)
+                       ret = InstructionUtils.concatOperands(ret, 
Integer.toString(_numThreads));
+               return ret;
        }
 }
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index 5ef81d3133..57f909efd7 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -161,6 +161,7 @@ public class LineageCache
                                                                //putValueRDD 
method will save the RDD and call persist
                                                                
e.setCacheStatus(LineageCacheStatus.PERSISTEDRDD);
                                                                //Cannot reuse 
rdd as already garbage collected
+                                                               
ec.replaceLineageItem(outName, e._key); //still reuse the lineage trace
                                                                return false;
                                                        case PERSISTEDRDD:
                                                                //Reuse the 
persisted intermediate at the executors
@@ -180,6 +181,12 @@ public class LineageCache
                                                Pointer gpuPtr = 
e.getGPUPointer();
                                                if (gpuPtr == null && 
e.getCacheStatus() == LineageCacheStatus.NOTCACHED)
                                                        return false;  //the 
executing thread removed this entry from cache
+                                               if (e.getCacheStatus() == 
LineageCacheStatus.TOCACHEGPU) {  //second hit
+                                                       //Cannot reuse as 
already garbage collected
+                                                       
ec.replaceLineageItem(outName, e._key); //still reuse the lineage trace
+                                                       return false;
+                                               }
+                                               //Reuse from third hit onwards 
(status == GPUCACHED)
                                                //Create a GPUObject with the 
cached pointer
                                                GPUObject gpuObj = new 
GPUObject(ec.getGPUContext(0),
                                                        
ec.getMatrixObject(outName), gpuPtr);
@@ -295,9 +302,20 @@ public class LineageCache
                if (reuse) {
                        //Additional maintenance for GPU pointers and RDDs
                        for (LineageCacheEntry e : funcOutLIs) {
-                               if (e.isGPUObject())
-                                       //Increment the live count for this 
pointer
-                                       
LineageGPUCacheEviction.incrementLiveCount(e.getGPUPointer());
+                               if (e.isGPUObject()) {
+                                       switch(e.getCacheStatus()) {
+                                               case TOCACHEGPU:
+                                                       //Cannot reuse as 
already garbage collected putValue method
+                                                       // will save the 
pointer while caching the original instruction
+                                                       return false;
+                                               case GPUCACHED:
+                                                       //Increment the live 
count for this pointer
+                                                       
LineageGPUCacheEviction.incrementLiveCount(e.getGPUPointer());
+                                                       break;
+                                               default:
+                                                       return false;
+                                       }
+                               }
                                else if (e.isRDDPersist()) {
                                        //Reuse the cached RDD (local or 
persisted at the executors)
                                        switch(e.getCacheStatus()) {
@@ -598,7 +616,7 @@ public class LineageCache
                                }
                        }
                        else if (inst instanceof GPUInstruction) {
-                               // TODO: gpu multiretrun instructions
+                               // TODO: gpu multi-return instructions
                                Data gpudata = ec.getVariable(((GPUInstruction) 
inst)._output);
                                liGPUObj = gpudata instanceof MatrixObject ?
                                                
ec.getMatrixObject(((GPUInstruction)inst)._output).
@@ -708,14 +726,27 @@ public class LineageCache
                                removePlaceholder(instLI);
                                return;
                        }
-                       // Update the total size of lineage cached gpu objects
-                       // The eviction is handled by the unified gpu memory 
manager
-                       
LineageGPUCacheEviction.updateSize(gpuObj.getAllocatedSize(), true);
-                       // Set the GPUOject in the cache
-                       centry.setGPUValue(gpuObj.getDensePointer(), 
gpuObj.getAllocatedSize(),
-                               gpuObj.getMatrixObject().getMetaData(), 
computetime);
-                       // Maintain order for eviction
-                       LineageGPUCacheEviction.addEntry(centry);
+                       switch(centry.getCacheStatus()) {
+                               case EMPTY:  //first hit
+                                       // Set the GPUOject in the cache. Will 
be garbage collected
+                                       
centry.setGPUValue(gpuObj.getDensePointer(), gpuObj.getAllocatedSize(),
+                                               
gpuObj.getMatrixObject().getMetaData(), computetime);
+                                       
centry.setCacheStatus(LineageCacheStatus.TOCACHEGPU);
+                                       break;
+                               case TOCACHEGPU:  //second hit
+                                       // Update the total size of lineage 
cached gpu objects
+                                       // The eviction is handled by the 
unified gpu memory manager
+                                       
LineageGPUCacheEviction.updateSize(gpuObj.getAllocatedSize(), true);
+                                       // Set the GPUOject in the cache and 
update the status
+                                       
centry.setGPUValue(gpuObj.getDensePointer(), gpuObj.getAllocatedSize(),
+                                               
gpuObj.getMatrixObject().getMetaData(), computetime);
+                                       
centry.setCacheStatus(LineageCacheStatus.GPUCACHED);
+                                       // Maintain order for eviction
+                                       
LineageGPUCacheEviction.addEntry(centry);
+                                       break;
+                               default:
+                                       throw new 
DMLRuntimeException("Execution should not reach here: "+centry._key);
+                       }
                }
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index 2971c36f16..0925820184 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -54,7 +54,7 @@ public class LineageCacheConfig
                "^", "uamax", "uark+", "uacmean", "eigen", "ctableexpand", 
"replace",
                "^2", "*2", "uack+", "tak+*", "uacsqk+", "uark+", "n+", 
"uarimax", "qsort",
                "qpick", "transformapply", "uarmax", "n+", "-*", "castdtm", 
"lowertri",
-               "prefetch", "mapmm", "contains", "mmchain", "mapmmchain", "+*"
+               "prefetch", "mapmm", "contains", "mmchain", "mapmmchain", "+*", 
"=="
                //TODO: Reuse everything.
        };
 
@@ -152,6 +152,7 @@ public class LineageCacheConfig
                SPILLED,   //Data is in disk. Empty value. Cannot be evicted.
                RELOADED,  //Reloaded from disk. Can be evicted.
                PINNED,    //Pinned to memory. Cannot be evicted.
+               TOCACHEGPU, //To be cached in GPU if the instruction reoccur
                GPUCACHED, //Points to GPU intermediate
                PERSISTEDRDD, //Persisted at the Spark executors
                TOPERSISTRDD, //To be persisted if the instruction reoccur
diff --git 
a/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
 
b/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
index f2cf085838..3ace92b994 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
@@ -30,6 +30,7 @@ import 
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyze
 import org.apache.sysds.runtime.lineage.Lineage;
 import org.apache.sysds.runtime.lineage.LineageCacheConfig;
 import org.apache.sysds.runtime.lineage.LineageCacheStatistics;
+import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
 import org.apache.sysds.runtime.matrix.data.MatrixValue;
 import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
@@ -42,7 +43,7 @@ public class LineageReuseSparkTest extends AutomatedTestBase {
 
        protected static final String TEST_DIR = "functions/async/";
        protected static final String TEST_NAME = "LineageReuseSpark";
-       protected static final int TEST_VARIANTS = 6;
+       protected static final int TEST_VARIANTS = 7;
        protected static String TEST_CLASS_DIR = TEST_DIR + 
LineageReuseSparkTest.class.getSimpleName() + "/";
 
        @Override
@@ -54,44 +55,48 @@ public class LineageReuseSparkTest extends 
AutomatedTestBase {
 
        @Test
        public void testlmdsHB() {
-               runTest(TEST_NAME+"1", ExecMode.HYBRID, 1);
+               runTest(TEST_NAME+"1", ExecMode.HYBRID, 
ReuseCacheType.REUSE_FULL, 1);
        }
 
        @Test
        public void testlmdsSP() {
                // Only reuse the actions
-               runTest(TEST_NAME+"1", ExecMode.SPARK, 1);
+               runTest(TEST_NAME+"1", ExecMode.SPARK, 
ReuseCacheType.REUSE_MULTILEVEL, 1);
        }
 
        @Test
        public void testlmdsRDD() {
                // Cache all RDDs and persist shuffle-based Spark operations 
(eg. rmm, cpmm)
-               runTest(TEST_NAME+"2", ExecMode.HYBRID, 2);
+               runTest(TEST_NAME+"2", ExecMode.HYBRID, 
ReuseCacheType.REUSE_FULL, 2);
        }
 
        @Test
        public void testL2svm() {
-               runTest(TEST_NAME+"3", ExecMode.HYBRID, 3);
+               runTest(TEST_NAME+"3", ExecMode.HYBRID, 
ReuseCacheType.REUSE_FULL, 3);
        }
 
        @Test
        public void testlmdsMultiLevel() {
                // Cache RDD and matrix block function returns and reuse
-               runTest(TEST_NAME+"4", ExecMode.HYBRID, 4);
+               runTest(TEST_NAME+"4", ExecMode.HYBRID, 
ReuseCacheType.REUSE_MULTILEVEL, 4);
        }
 
        @Test
        public void testEnsemble() {
-               runTest(TEST_NAME+"5", ExecMode.HYBRID, 5);
+               runTest(TEST_NAME+"5", ExecMode.HYBRID, 
ReuseCacheType.REUSE_MULTILEVEL, 5);
        }
 
        //FIXME: Collecting a persisted RDD still needs the broadcast vars. 
Debug.
        /*@Test
        public void testHyperband() {
-               runTest(TEST_NAME+"6", ExecMode.HYBRID, 6);
+               runTest(TEST_NAME+"6", ExecMode.HYBRID, 
ReuseCacheType.REUSE_FULL, 6);
+       }*/
+       /*@Test
+       public void testBroadcastBug() {
+               runTest(TEST_NAME+"7", ExecMode.HYBRID, 
ReuseCacheType.REUSE_FULL, 7);
        }*/
 
-       public void runTest(String testname, ExecMode execMode, int testId) {
+       public void runTest(String testname, ExecMode execMode, 
LineageCacheConfig.ReuseCacheType reuse, int testId) {
                boolean old_simplification = 
OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
                boolean old_sum_product = 
OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES;
                boolean old_trans_exec_type = 
OptimizerUtils.ALLOW_TRANSITIVE_SPARK_EXEC_TYPE;
@@ -126,7 +131,7 @@ public class LineageReuseSparkTest extends 
AutomatedTestBase {
                        //proArgs.add("recompile_runtime");
                        proArgs.add("-stats");
                        proArgs.add("-lineage");
-                       
proArgs.add(LineageCacheConfig.ReuseCacheType.REUSE_MULTILEVEL.name().toLowerCase());
+                       proArgs.add(reuse.name().toLowerCase());
                        proArgs.add("-args");
                        proArgs.add(output("R"));
                        programArgs = proArgs.toArray(new 
String[proArgs.size()]);
diff --git a/src/test/scripts/functions/lineage/LineageReuseGPU3.dml 
b/src/test/scripts/functions/async/LineageReuseSpark7.dml
similarity index 64%
copy from src/test/scripts/functions/lineage/LineageReuseGPU3.dml
copy to src/test/scripts/functions/async/LineageReuseSpark7.dml
index be15ba2026..a1c0088168 100644
--- a/src/test/scripts/functions/lineage/LineageReuseGPU3.dml
+++ b/src/test/scripts/functions/async/LineageReuseSpark7.dml
@@ -19,28 +19,13 @@
 #
 #-------------------------------------------------------------
 
-# Increase rows and cols for better performance gains
-
-SimLM = function(Matrix[Double] X, Matrix[Double] y, Double lamda=0.0001) 
return (Matrix[Double] beta) 
-{
-  A = t(X) %*% X + diag(matrix(lamda, rows=ncol(X), cols=1));
-  while(FALSE) {}
-  b = t(X) %*% y;
-  beta = cbind(A, b); #gpu
+X = rand(rows=10000, cols=200, seed=42);
+y = rand(rows=10000, cols=1, seed=43);
+
+for (i in 1:3) {
+  s = t(X) %*% y;   #mapmm single-block
+  Xd = X %*% s;     #mapmm multi-block
+  out = 1 - y * Xd; #cp OP collects Xd
+  R = sum(out);
 }
-
-r = 100
-c = 10
-
-X = rand(rows=r, cols=c, seed=42);
-y = rand(rows=r, cols=1, seed=43);
-R = matrix(0, 1, 2);
-
-beta1 = SimLM(X, y, 0.0001);
-R[,1] = sum(beta1);
-
-beta2 = SimLM(X, y, 0.0001);
-R[,2] = sum(beta2); #function reuse
-
 write(R, $1, format="text");
-
diff --git a/src/test/scripts/functions/lineage/LineageReuseGPU3.dml 
b/src/test/scripts/functions/lineage/LineageReuseGPU3.dml
index be15ba2026..71674d426e 100644
--- a/src/test/scripts/functions/lineage/LineageReuseGPU3.dml
+++ b/src/test/scripts/functions/lineage/LineageReuseGPU3.dml
@@ -19,8 +19,6 @@
 #
 #-------------------------------------------------------------
 
-# Increase rows and cols for better performance gains
-
 SimLM = function(Matrix[Double] X, Matrix[Double] y, Double lamda=0.0001) 
return (Matrix[Double] beta) 
 {
   A = t(X) %*% X + diag(matrix(lamda, rows=ncol(X), cols=1));
@@ -34,13 +32,16 @@ c = 10
 
 X = rand(rows=r, cols=c, seed=42);
 y = rand(rows=r, cols=1, seed=43);
-R = matrix(0, 1, 2);
+R = matrix(0, 1, 3);
 
 beta1 = SimLM(X, y, 0.0001);
 R[,1] = sum(beta1);
 
 beta2 = SimLM(X, y, 0.0001);
-R[,2] = sum(beta2); #function reuse
+R[,2] = sum(beta2); #second hit
+
+beta3 = SimLM(X, y, 0.0001);
+R[,3] = sum(beta3); #function reuse
 
 write(R, $1, format="text");
 

Reply via email to