This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new c44bcca816 [SYSTEMDS-3612] Fix lineage-based reuse of Spark RDDs
c44bcca816 is described below

commit c44bcca81678859c12feb3aa6b2b9db2348cff78
Author: Matthias Boehm <[email protected]>
AuthorDate: Fri Sep 1 16:52:00 2023 +0200

    [SYSTEMDS-3612] Fix lineage-based reuse of Spark RDDs
    
    Some use cases ran into errors, where reusing a cached and materialized
    RDD after their input RDDs and broadcasts have been cleaned up and thus,
    destroyed. We believe the reason is how Spark creates task closures,
    erring on serializing destroyed broadcasts that are not covered under
    shuffle dependencies. We now create an explicit Spark checkpoint, which
    cuts off the Spark lineage, preventing this serialization issue.
---
 .../org/apache/sysds/runtime/lineage/LineageCache.java     |  5 ++++-
 .../sysds/test/functions/async/LineageReuseSparkTest.java  | 14 ++++++++------
 2 files changed, 12 insertions(+), 7 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java 
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index 026411fd34..e6e959b130 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -1203,9 +1203,12 @@ public class LineageCache
                RDDObject rddObj = centry.getRDDObject();
                JavaPairRDD<?,?> rdd = rddObj.getRDD();
                rdd = rdd.persist(StorageLevel.MEMORY_AND_DISK());
+               //cut-off RDD lineage & broadcasts to prevent errors on
+               // task closure serialization with destroyed broadcasts
+               rdd.checkpoint();
                rddObj.setRDD(rdd);
                rddObj.setCheckpointRDD(true);
-
+               
                // Make space based on the estimated size
                if(!LineageSparkCacheEviction.isBelowThreshold(estimatedSize))
                        LineageSparkCacheEviction.makeSpace(_cache, 
estimatedSize);
diff --git 
a/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
 
b/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
index 12558b1412..02f6d12465 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
@@ -81,20 +81,22 @@ public class LineageReuseSparkTest extends 
AutomatedTestBase {
                runTest(TEST_NAME+"4", ExecMode.HYBRID, 
ReuseCacheType.REUSE_MULTILEVEL, 4);
        }
 
-       /*@Test
+       @Test
        public void testEnsemble() {
                runTest(TEST_NAME+"5", ExecMode.HYBRID, 
ReuseCacheType.REUSE_MULTILEVEL, 5);
        }
 
        //FIXME: Collecting a persisted RDD still needs the broadcast vars. 
Debug.
-       @Test
-       public void testHyperband() {
-               runTest(TEST_NAME+"6", ExecMode.HYBRID, 
ReuseCacheType.REUSE_FULL, 6);
-       }
+       //@Test
+       //public void testHyperband() {
+       //      runTest(TEST_NAME+"6", ExecMode.HYBRID, 
ReuseCacheType.REUSE_FULL, 6);
+       //}
+       
        @Test
        public void testBroadcastBug() {
                runTest(TEST_NAME+"7", ExecMode.HYBRID, 
ReuseCacheType.REUSE_FULL, 7);
-       }*/
+       }
+       
        @Test
        public void testTopKClean() {
                // Multiple cleaning pipelines with real dataset (Nashville 
accident)

Reply via email to