This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new c44bcca816 [SYSTEMDS-3612] Fix lineage-based reuse of Spark RDDs
c44bcca816 is described below
commit c44bcca81678859c12feb3aa6b2b9db2348cff78
Author: Matthias Boehm <[email protected]>
AuthorDate: Fri Sep 1 16:52:00 2023 +0200
[SYSTEMDS-3612] Fix lineage-based reuse of Spark RDDs
Some use cases ran into errors, where reusing a cached and materialized
RDD after their input RDDs and broadcasts have been cleaned up and thus,
destroyed. We believe the reason is how Spark creates task closures,
erring on serializing destroyed broadcasts that are not covered under
shuffle dependencies. We now create an explicit Spark checkpoint, which
cuts off the Spark lineage, preventing this serialization issue.
---
.../org/apache/sysds/runtime/lineage/LineageCache.java | 5 ++++-
.../sysds/test/functions/async/LineageReuseSparkTest.java | 14 ++++++++------
2 files changed, 12 insertions(+), 7 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index 026411fd34..e6e959b130 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -1203,9 +1203,12 @@ public class LineageCache
RDDObject rddObj = centry.getRDDObject();
JavaPairRDD<?,?> rdd = rddObj.getRDD();
rdd = rdd.persist(StorageLevel.MEMORY_AND_DISK());
+ //cut-off RDD lineage & broadcasts to prevent errors on
+ // task closure serialization with destroyed broadcasts
+ rdd.checkpoint();
rddObj.setRDD(rdd);
rddObj.setCheckpointRDD(true);
-
+
// Make space based on the estimated size
if(!LineageSparkCacheEviction.isBelowThreshold(estimatedSize))
LineageSparkCacheEviction.makeSpace(_cache,
estimatedSize);
diff --git
a/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
b/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
index 12558b1412..02f6d12465 100644
---
a/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
@@ -81,20 +81,22 @@ public class LineageReuseSparkTest extends
AutomatedTestBase {
runTest(TEST_NAME+"4", ExecMode.HYBRID,
ReuseCacheType.REUSE_MULTILEVEL, 4);
}
- /*@Test
+ @Test
public void testEnsemble() {
runTest(TEST_NAME+"5", ExecMode.HYBRID,
ReuseCacheType.REUSE_MULTILEVEL, 5);
}
//FIXME: Collecting a persisted RDD still needs the broadcast vars.
Debug.
- @Test
- public void testHyperband() {
- runTest(TEST_NAME+"6", ExecMode.HYBRID,
ReuseCacheType.REUSE_FULL, 6);
- }
+ //@Test
+ //public void testHyperband() {
+ // runTest(TEST_NAME+"6", ExecMode.HYBRID,
ReuseCacheType.REUSE_FULL, 6);
+ //}
+
@Test
public void testBroadcastBug() {
runTest(TEST_NAME+"7", ExecMode.HYBRID,
ReuseCacheType.REUSE_FULL, 7);
- }*/
+ }
+
@Test
public void testTopKClean() {
// Multiple cleaning pipelines with real dataset (Nashville
accident)