This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new e291bb645c [SYSTEMDS-2650] Standardize dedup lineage trace
serialization format
e291bb645c is described below
commit e291bb645c98c9e47d676a2519838bf7ced8f9df
Author: Arnab Phani <[email protected]>
AuthorDate: Tue Aug 27 13:10:08 2024 +0200
[SYSTEMDS-2650] Standardize dedup lineage trace serialization format
This patch standardizes the dedup patch serialization format, regardless of
if
they are captured by the lineage() built-in or via a write operator.
Previously,
all patches were stored in a separate file. Now, we always create a single
file,
containing first the global trace, followed by the patches. This change also
allows us to copy the traces from the console (lineage()) and store them in
a
single file for future recomputation.
---
.../runtime/controlprogram/caching/FrameObject.java | 2 +-
.../runtime/controlprogram/caching/MatrixObject.java | 2 +-
.../runtime/controlprogram/caching/TensorObject.java | 2 +-
.../instructions/cp/AggregateUnaryCPInstruction.java | 2 +-
.../org/apache/sysds/runtime/lineage/LineageMap.java | 9 ++++-----
.../apache/sysds/runtime/lineage/LineageParser.java | 20 +++++++++++++++++++-
.../sysds/runtime/lineage/LineageRecomputeUtils.java | 17 +++++++++--------
.../test/functions/lineage/LineageCodegenTest.java | 2 +-
.../functions/lineage/LineageTraceBuiltinTest.java | 2 +-
.../functions/lineage/LineageTraceDedupTest.java | 3 +--
.../functions/lineage/LineageTraceExecSparkTest.java | 4 ++--
.../test/functions/lineage/LineageTraceExecTest.java | 2 +-
.../functions/lineage/LineageTraceFunctionTest.java | 2 +-
.../test/functions/lineage/LineageTraceGPUTest.java | 2 +-
.../functions/lineage/LineageTraceParforTest.java | 2 +-
15 files changed, 45 insertions(+), 28 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
index 945021626e..582bb64dd8 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
@@ -308,7 +308,7 @@ public class FrameObject extends CacheableData<FrameBlock>
@Override
protected FrameBlock reconstructByLineage(LineageItem li) throws
IOException {
return ((FrameObject) LineageRecomputeUtils
- .parseNComputeLineageTrace(li.getData(), null))
+ .parseNComputeLineageTrace(li.getData()))
.acquireReadAndRelease();
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
index 29f2c1cb59..f58b315e68 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
@@ -588,7 +588,7 @@ public class MatrixObject extends
CacheableData<MatrixBlock> {
@Override
protected MatrixBlock reconstructByLineage(LineageItem li) throws
IOException {
return ((MatrixObject) LineageRecomputeUtils
- .parseNComputeLineageTrace(Explain.explain(li), null))
+ .parseNComputeLineageTrace(Explain.explain(li)))
.acquireReadAndRelease();
}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java
index 13665f65a2..8908f55d06 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java
@@ -202,7 +202,7 @@ public class TensorObject extends
CacheableData<TensorBlock> {
@Override
protected TensorBlock reconstructByLineage(LineageItem li) throws
IOException {
return ((TensorObject) LineageRecomputeUtils
- .parseNComputeLineageTrace(li.getData(), null))
+ .parseNComputeLineageTrace(li.getData()))
.acquireReadAndRelease();
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java
index 78bc7d132d..920a79b77d 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java
@@ -185,7 +185,7 @@ public class AggregateUnaryCPInstruction extends
UnaryCPInstruction {
LineageItem li = ec.getLineageItem(input1);
String out = !DMLScript.LINEAGE_DEDUP ?
Explain.explain(li) :
- Explain.explain(li) +
LineageDedupUtils.mergeExplainDedupBlocks(ec);
+ Explain.explain(li) + "\n" +
LineageDedupUtils.mergeExplainDedupBlocks(ec);
ec.setScalarOutput(outputName, new
StringObject(out));
break;
}
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java
index df4d0c2e1b..41875bdfdf 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java
@@ -245,10 +245,9 @@ public class LineageMap {
LineageItem li = get(input1);
String fName = ec.getScalarInput(input2.getName(),
Types.ValueType.STRING, input2.isLiteral()).getStringValue();
- if (DMLScript.LINEAGE_DEDUP) {
- // gracefully serialize the dedup maps without
decompressing
-
LineageItemUtils.writeTraceToHDFS(LineageDedupUtils.mergeExplainDedupBlocks(ec),
fName + ".lineage.dedup");
- }
- LineageItemUtils.writeTraceToHDFS(Explain.explain(li), fName +
".lineage");
+ // Combine the global trace and dedup patches in a single file.
+ String out = !DMLScript.LINEAGE_DEDUP ? Explain.explain(li) :
+ Explain.explain(li) + "\n" +
LineageDedupUtils.mergeExplainDedupBlocks(ec);
+ LineageItemUtils.writeTraceToHDFS(out, fName + ".lineage");
}
}
diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java
index 439fe1f10d..8acfbdc9e9 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java
@@ -122,7 +122,7 @@ public class LineageParser
}
return new LineageItem(id, "", opcode, inputs.toArray(new
LineageItem[0]), specialValueBits);
}
-
+
protected static void parseLineageTraceDedup(String str) {
str.replaceAll("\r\n", "\n");
String[] allPatches = str.split("\n\n");
@@ -145,4 +145,22 @@ public class LineageParser
loopItem.patchLiMap.get(pathId).put(parts[1], patchLi);
}
}
+
+ protected static String[] separateMainAndDedupPatches(String str) {
+ str.replaceAll("\r\n", "\n");
+ String[] allPatches = str.split("\n\n");
+ if (allPatches.length == 1) //no dedup patches
+ return allPatches;
+
+ // Merge the dedup patches into a single string
+ String[] patches = new String[2];
+ patches[0] = allPatches[0];
+ StringBuilder sb = new StringBuilder();
+ for (int i=1; i<allPatches.length; i++) {
+ sb.append(allPatches[i]);
+ sb.append("\n\n");
+ }
+ patches[1] = sb.toString();
+ return patches;
+ }
}
\ No newline at end of file
diff --git
a/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java
b/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java
index c5fceed547..6c691c2b8b 100644
--- a/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java
@@ -81,17 +81,18 @@ import org.apache.sysds.utils.Statistics;
public class LineageRecomputeUtils {
private static final String LVARPREFIX = "lvar";
public static final String LPLACEHOLDER = "IN#";
- private static final boolean DEBUG = false;
+ private static final boolean DEBUG = true;
public static Map<String, DedupLoopItem> loopPatchMap = new HashMap<>();
- public static Data parseNComputeLineageTrace(String mainTrace, String
dedupPatches) {
- if (DEBUG) {
+ public static Data parseNComputeLineageTrace(String mainTrace) {
+ if (DEBUG)
System.out.println(mainTrace);
- System.out.println(dedupPatches);
- }
- LineageItem root = LineageParser.parseLineageTrace(mainTrace);
- if (dedupPatches != null)
- LineageParser.parseLineageTraceDedup(dedupPatches);
+
+ // Separate the global trace and the dedup patches
+ String[] patches =
LineageParser.separateMainAndDedupPatches(mainTrace);
+ LineageItem root = LineageParser.parseLineageTrace(patches[0]);
//global trace
+ if (patches.length > 1)
+ LineageParser.parseLineageTraceDedup(patches[1]);
// Disable GPU execution. TODO: Support GPU
boolean GPUenabled = false;
diff --git
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java
index cc88317d14..59cda3cc69 100644
---
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java
@@ -100,7 +100,7 @@ public class LineageCodegenTest extends LineageBase {
//get lineage and generate program
String Rtrace = readDMLLineageFromHDFS("R");
- Data ret =
LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
+ Data ret =
LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace);
HashMap<CellIndex, Double> dmlfile =
readDMLMatrixFromOutputDir("R");
MatrixBlock tmp =
((MatrixObject)ret).acquireReadAndRelease();
TestUtils.compareMatrices(dmlfile, tmp, 1e-6);
diff --git
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java
index 84cea6d74a..3e3845b311 100644
---
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java
@@ -78,7 +78,7 @@ public class LineageTraceBuiltinTest extends LineageBase {
//get lineage and generate program
String Rtrace = readDMLLineageFromHDFS("R");
- Data ret =
LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
+ Data ret =
LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace);
HashMap<CellIndex, Double> dmlfile =
readDMLMatrixFromOutputDir("R");
MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease();
diff --git
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceDedupTest.java
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceDedupTest.java
index f372cd3aa9..c9a6beb1f0 100644
---
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceDedupTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceDedupTest.java
@@ -152,8 +152,7 @@ public class LineageTraceDedupTest extends LineageBase
//deserialize, generate program and execute
String Rtrace = readDMLLineageFromHDFS("R");
- String RDedupPatches = readDMLLineageDedupFromHDFS("R");
- Data ret =
LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, RDedupPatches);
+ Data ret =
LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace);
//match the original and recomputed results
HashMap<CellIndex, Double> orig =
readDMLMatrixFromOutputDir("R");
diff --git
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java
index 88456dd962..4009cea684 100644
---
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java
@@ -116,12 +116,12 @@ public class LineageTraceExecSparkTest extends
LineageBase {
TestUtils.compareScalars(Y_lineage,
Explain.explain(Y_li));
//generate program
- Data X_data =
LineageRecomputeUtils.parseNComputeLineageTrace(X_lineage, null);
+ Data X_data =
LineageRecomputeUtils.parseNComputeLineageTrace(X_lineage);
HashMap<MatrixValue.CellIndex, Double> X_dmlfile =
readDMLMatrixFromOutputDir("X");
MatrixBlock X_tmp =
((MatrixObject)X_data).acquireReadAndRelease();
TestUtils.compareMatrices(X_dmlfile, X_tmp, 1e-6);
- Data Y_data =
LineageRecomputeUtils.parseNComputeLineageTrace(Y_lineage, null);
+ Data Y_data =
LineageRecomputeUtils.parseNComputeLineageTrace(Y_lineage);
HashMap<MatrixValue.CellIndex, Double> Y_dmlfile =
readDMLMatrixFromOutputDir("Y");
MatrixBlock Y_tmp =
((MatrixObject)Y_data).acquireReadAndRelease();
TestUtils.compareMatrices(Y_dmlfile, Y_tmp, 1e-6);
diff --git
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java
index 7e19d1db8d..127426b5bd 100644
---
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java
@@ -127,7 +127,7 @@ public class LineageTraceExecTest extends LineageBase {
//get lineage and generate program
String Rtrace = readDMLLineageFromHDFS("R");
- Data ret =
LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
+ Data ret =
LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace);
if( testname.equals(TEST_NAME2) || testname.equals(TEST_NAME5))
{
double val1 = readDMLScalarFromOutputDir("R").get(new
CellIndex(1,1));
diff --git
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java
index 763e73591e..53fc1b54ab 100644
---
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java
@@ -86,7 +86,7 @@ public class LineageTraceFunctionTest extends LineageBase
//get lineage and generate program
String Rtrace = readDMLLineageFromHDFS("R");
- Data ret =
LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
+ Data ret =
LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace);
HashMap<CellIndex, Double> dmlfile =
readDMLMatrixFromOutputDir("R");
MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease();
diff --git
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceGPUTest.java
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceGPUTest.java
index 031ed65f96..a6cb707b21 100644
---
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceGPUTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceGPUTest.java
@@ -83,7 +83,7 @@ public class LineageTraceGPUTest extends AutomatedTestBase{
String Rtrace = readDMLLineageFromHDFS("R");
AutomatedTestBase.TEST_GPU = false;
//NOTE: the generated program is CP-only.
- Data ret =
LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
+ Data ret =
LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace);
HashMap<CellIndex, Double> dmlfile =
readDMLMatrixFromOutputDir("R");
MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease();
diff --git
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java
index 48c39b0bdf..3abd25ce63 100644
---
a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java
+++
b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java
@@ -160,7 +160,7 @@ public class LineageTraceParforTest extends LineageBase {
//get lineage and generate program
String Rtrace = readDMLLineageFromHDFS("R");
- Data ret =
LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null);
+ Data ret =
LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace);
HashMap<CellIndex, Double> dmlfile =
readDMLMatrixFromOutputDir("R");
MatrixBlock tmp = ((MatrixObject)
ret).acquireReadAndRelease();