This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 03c546fe33 [SYSTEMDS-3470] Generalize selecting reusable Spark
instructions
03c546fe33 is described below
commit 03c546fe3392759b18529db0ccb3d26328b02e0f
Author: Arnab Phani <[email protected]>
AuthorDate: Tue Nov 29 00:05:01 2022 +0100
[SYSTEMDS-3470] Generalize selecting reusable Spark instructions
This patch allows putting any Matrix Object in the lineage cache
which does not have a valid RDD. This change makes it easier to
separate Spark instructions which returns intermediate back to local.
Closes #1742
---
.../sysds/runtime/controlprogram/caching/CacheableData.java | 4 ++++
.../java/org/apache/sysds/runtime/lineage/LineageCache.java | 6 ++++++
.../apache/sysds/runtime/lineage/LineageCacheConfig.java | 2 +-
.../sysds/test/functions/async/LineageReuseSparkTest.java | 13 ++++++++++---
.../test/functions/builtin/part2/BuiltinNaLocfTest.java | 2 ++
5 files changed, 23 insertions(+), 4 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index 57cd49791b..9d195ea7fd 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -442,6 +442,10 @@ public abstract class CacheableData<T extends
CacheBlock<?>> extends Data
if( _rddHandle != null )
rdd.setBackReference(this);
}
+
+ public boolean hasRDDHandle() {
+ return _rddHandle != null && _rddHandle.hasBackReference();
+ }
public BroadcastObject<T> getBroadcastHandle() {
return _bcHandle;
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
index 4c79b885b7..ecd734c588 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
@@ -579,6 +579,12 @@ public class LineageCache
continue;
}
+ if (data instanceof MatrixObject &&
((MatrixObject) data).hasRDDHandle()) {
+ // Avoid triggering pre-matured Spark
instruction chains
+ removePlaceholder(item);
+ continue;
+ }
+
if (LineageCacheConfig.isOutputFederated(inst,
data)) {
// Do not cache federated outputs (in
the coordinator)
// Cannot skip putting the placeholder
as the above is only known after execution
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
index 4259481444..5a7c46dfe7 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageCacheConfig.java
@@ -206,7 +206,7 @@ public class LineageCacheConfig
boolean insttype = (inst instanceof ComputationCPInstruction
|| inst instanceof ComputationFEDInstruction
|| inst instanceof GPUInstruction
- || (inst instanceof ComputationSPInstruction &&
isRightSparkOp(inst)))
+ || inst instanceof ComputationSPInstruction)
&& !(inst instanceof ListIndexingCPInstruction);
boolean rightop = (ArrayUtils.contains(REUSE_OPCODES,
inst.getOpcode())
|| (inst.getOpcode().equals("append") &&
isVectorAppend(inst, ec))
diff --git
a/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
b/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
index 83992c5772..e958093122 100644
---
a/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/async/LineageReuseSparkTest.java
@@ -51,15 +51,22 @@ public class LineageReuseSparkTest extends
AutomatedTestBase {
}
@Test
- public void testlmds() {
- runTest(TEST_NAME+"1");
+ public void testlmdsHB() {
+ runTest(TEST_NAME+"1", ExecMode.HYBRID);
}
- public void runTest(String testname) {
+ @Test
+ public void testlmdsSP() {
+ // Only reuse the actions
+ runTest(TEST_NAME+"1", ExecMode.SPARK);
+ }
+
+ public void runTest(String testname, ExecMode execMode) {
boolean old_simplification =
OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
boolean old_sum_product =
OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES;
boolean old_trans_exec_type =
OptimizerUtils.ALLOW_TRANSITIVE_SPARK_EXEC_TYPE;
ExecMode oldPlatform = setExecMode(ExecMode.HYBRID);
+ rtplatform = execMode;
long oldmem = InfrastructureAnalyzer.getLocalMaxMemory();
long mem = 1024*1024*8;
diff --git
a/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinNaLocfTest.java
b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinNaLocfTest.java
index 4d9ddac658..64999e46ae 100644
---
a/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinNaLocfTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/builtin/part2/BuiltinNaLocfTest.java
@@ -22,6 +22,7 @@ package org.apache.sysds.test.functions.builtin.part2;
import org.apache.commons.lang.ArrayUtils;
import org.apache.sysds.common.Types;
import org.apache.sysds.common.Types.ExecType;
+import org.apache.sysds.runtime.lineage.Lineage;
import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
import org.apache.sysds.runtime.matrix.data.MatrixValue;
import org.apache.sysds.test.AutomatedTestBase;
@@ -105,6 +106,7 @@ public class BuiltinNaLocfTest extends AutomatedTestBase {
double[][] A = getRandomMatrix(rows, cols, -10, 10,
0.6, 7);
writeInputMatrixWithMTD("A", A, true);
+ Lineage.resetInternalState();
runTest(true, false, null, -1);
runRScript(true);
//compare matrices