This is an automated email from the ASF dual-hosted git repository. arnabp20 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push: new e291bb645c [SYSTEMDS-2650] Standardize dedup lineage trace serialization format e291bb645c is described below commit e291bb645c98c9e47d676a2519838bf7ced8f9df Author: Arnab Phani <phaniar...@gmail.com> AuthorDate: Tue Aug 27 13:10:08 2024 +0200 [SYSTEMDS-2650] Standardize dedup lineage trace serialization format This patch standardizes the dedup patch serialization format, regardless of if they are captured by the lineage() built-in or via a write operator. Previously, all patches were stored in a separate file. Now, we always create a single file, containing first the global trace, followed by the patches. This change also allows us to copy the traces from the console (lineage()) and store them in a single file for future recomputation. --- .../runtime/controlprogram/caching/FrameObject.java | 2 +- .../runtime/controlprogram/caching/MatrixObject.java | 2 +- .../runtime/controlprogram/caching/TensorObject.java | 2 +- .../instructions/cp/AggregateUnaryCPInstruction.java | 2 +- .../org/apache/sysds/runtime/lineage/LineageMap.java | 9 ++++----- .../apache/sysds/runtime/lineage/LineageParser.java | 20 +++++++++++++++++++- .../sysds/runtime/lineage/LineageRecomputeUtils.java | 17 +++++++++-------- .../test/functions/lineage/LineageCodegenTest.java | 2 +- .../functions/lineage/LineageTraceBuiltinTest.java | 2 +- .../functions/lineage/LineageTraceDedupTest.java | 3 +-- .../functions/lineage/LineageTraceExecSparkTest.java | 4 ++-- .../test/functions/lineage/LineageTraceExecTest.java | 2 +- .../functions/lineage/LineageTraceFunctionTest.java | 2 +- .../test/functions/lineage/LineageTraceGPUTest.java | 2 +- .../functions/lineage/LineageTraceParforTest.java | 2 +- 15 files changed, 45 insertions(+), 28 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java index 945021626e..582bb64dd8 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java @@ -308,7 +308,7 @@ public class FrameObject extends CacheableData<FrameBlock> @Override protected FrameBlock reconstructByLineage(LineageItem li) throws IOException { return ((FrameObject) LineageRecomputeUtils - .parseNComputeLineageTrace(li.getData(), null)) + .parseNComputeLineageTrace(li.getData())) .acquireReadAndRelease(); } } diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java index 29f2c1cb59..f58b315e68 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java @@ -588,7 +588,7 @@ public class MatrixObject extends CacheableData<MatrixBlock> { @Override protected MatrixBlock reconstructByLineage(LineageItem li) throws IOException { return ((MatrixObject) LineageRecomputeUtils - .parseNComputeLineageTrace(Explain.explain(li), null)) + .parseNComputeLineageTrace(Explain.explain(li))) .acquireReadAndRelease(); } diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java index 13665f65a2..8908f55d06 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java @@ -202,7 +202,7 @@ public class TensorObject extends CacheableData<TensorBlock> { @Override protected TensorBlock reconstructByLineage(LineageItem li) throws IOException { return ((TensorObject) LineageRecomputeUtils - .parseNComputeLineageTrace(li.getData(), null)) + .parseNComputeLineageTrace(li.getData())) .acquireReadAndRelease(); } } diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java index 78bc7d132d..920a79b77d 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/AggregateUnaryCPInstruction.java @@ -185,7 +185,7 @@ public class AggregateUnaryCPInstruction extends UnaryCPInstruction { LineageItem li = ec.getLineageItem(input1); String out = !DMLScript.LINEAGE_DEDUP ? Explain.explain(li) : - Explain.explain(li) + LineageDedupUtils.mergeExplainDedupBlocks(ec); + Explain.explain(li) + "\n" + LineageDedupUtils.mergeExplainDedupBlocks(ec); ec.setScalarOutput(outputName, new StringObject(out)); break; } diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java index df4d0c2e1b..41875bdfdf 100644 --- a/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java +++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageMap.java @@ -245,10 +245,9 @@ public class LineageMap { LineageItem li = get(input1); String fName = ec.getScalarInput(input2.getName(), Types.ValueType.STRING, input2.isLiteral()).getStringValue(); - if (DMLScript.LINEAGE_DEDUP) { - // gracefully serialize the dedup maps without decompressing - LineageItemUtils.writeTraceToHDFS(LineageDedupUtils.mergeExplainDedupBlocks(ec), fName + ".lineage.dedup"); - } - LineageItemUtils.writeTraceToHDFS(Explain.explain(li), fName + ".lineage"); + // Combine the global trace and dedup patches in a single file. + String out = !DMLScript.LINEAGE_DEDUP ? Explain.explain(li) : + Explain.explain(li) + "\n" + LineageDedupUtils.mergeExplainDedupBlocks(ec); + LineageItemUtils.writeTraceToHDFS(out, fName + ".lineage"); } } diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java index 439fe1f10d..8acfbdc9e9 100644 --- a/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java +++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageParser.java @@ -122,7 +122,7 @@ public class LineageParser } return new LineageItem(id, "", opcode, inputs.toArray(new LineageItem[0]), specialValueBits); } - + protected static void parseLineageTraceDedup(String str) { str.replaceAll("\r\n", "\n"); String[] allPatches = str.split("\n\n"); @@ -145,4 +145,22 @@ public class LineageParser loopItem.patchLiMap.get(pathId).put(parts[1], patchLi); } } + + protected static String[] separateMainAndDedupPatches(String str) { + str.replaceAll("\r\n", "\n"); + String[] allPatches = str.split("\n\n"); + if (allPatches.length == 1) //no dedup patches + return allPatches; + + // Merge the dedup patches into a single string + String[] patches = new String[2]; + patches[0] = allPatches[0]; + StringBuilder sb = new StringBuilder(); + for (int i=1; i<allPatches.length; i++) { + sb.append(allPatches[i]); + sb.append("\n\n"); + } + patches[1] = sb.toString(); + return patches; + } } \ No newline at end of file diff --git a/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java b/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java index c5fceed547..6c691c2b8b 100644 --- a/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java +++ b/src/main/java/org/apache/sysds/runtime/lineage/LineageRecomputeUtils.java @@ -81,17 +81,18 @@ import org.apache.sysds.utils.Statistics; public class LineageRecomputeUtils { private static final String LVARPREFIX = "lvar"; public static final String LPLACEHOLDER = "IN#"; - private static final boolean DEBUG = false; + private static final boolean DEBUG = true; public static Map<String, DedupLoopItem> loopPatchMap = new HashMap<>(); - public static Data parseNComputeLineageTrace(String mainTrace, String dedupPatches) { - if (DEBUG) { + public static Data parseNComputeLineageTrace(String mainTrace) { + if (DEBUG) System.out.println(mainTrace); - System.out.println(dedupPatches); - } - LineageItem root = LineageParser.parseLineageTrace(mainTrace); - if (dedupPatches != null) - LineageParser.parseLineageTraceDedup(dedupPatches); + + // Separate the global trace and the dedup patches + String[] patches = LineageParser.separateMainAndDedupPatches(mainTrace); + LineageItem root = LineageParser.parseLineageTrace(patches[0]); //global trace + if (patches.length > 1) + LineageParser.parseLineageTraceDedup(patches[1]); // Disable GPU execution. TODO: Support GPU boolean GPUenabled = false; diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java index cc88317d14..59cda3cc69 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageCodegenTest.java @@ -100,7 +100,7 @@ public class LineageCodegenTest extends LineageBase { //get lineage and generate program String Rtrace = readDMLLineageFromHDFS("R"); - Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null); + Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace); HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromOutputDir("R"); MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease(); TestUtils.compareMatrices(dmlfile, tmp, 1e-6); diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java index 84cea6d74a..3e3845b311 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceBuiltinTest.java @@ -78,7 +78,7 @@ public class LineageTraceBuiltinTest extends LineageBase { //get lineage and generate program String Rtrace = readDMLLineageFromHDFS("R"); - Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null); + Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace); HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromOutputDir("R"); MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease(); diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceDedupTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceDedupTest.java index f372cd3aa9..c9a6beb1f0 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceDedupTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceDedupTest.java @@ -152,8 +152,7 @@ public class LineageTraceDedupTest extends LineageBase //deserialize, generate program and execute String Rtrace = readDMLLineageFromHDFS("R"); - String RDedupPatches = readDMLLineageDedupFromHDFS("R"); - Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, RDedupPatches); + Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace); //match the original and recomputed results HashMap<CellIndex, Double> orig = readDMLMatrixFromOutputDir("R"); diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java index 88456dd962..4009cea684 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecSparkTest.java @@ -116,12 +116,12 @@ public class LineageTraceExecSparkTest extends LineageBase { TestUtils.compareScalars(Y_lineage, Explain.explain(Y_li)); //generate program - Data X_data = LineageRecomputeUtils.parseNComputeLineageTrace(X_lineage, null); + Data X_data = LineageRecomputeUtils.parseNComputeLineageTrace(X_lineage); HashMap<MatrixValue.CellIndex, Double> X_dmlfile = readDMLMatrixFromOutputDir("X"); MatrixBlock X_tmp = ((MatrixObject)X_data).acquireReadAndRelease(); TestUtils.compareMatrices(X_dmlfile, X_tmp, 1e-6); - Data Y_data = LineageRecomputeUtils.parseNComputeLineageTrace(Y_lineage, null); + Data Y_data = LineageRecomputeUtils.parseNComputeLineageTrace(Y_lineage); HashMap<MatrixValue.CellIndex, Double> Y_dmlfile = readDMLMatrixFromOutputDir("Y"); MatrixBlock Y_tmp = ((MatrixObject)Y_data).acquireReadAndRelease(); TestUtils.compareMatrices(Y_dmlfile, Y_tmp, 1e-6); diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java index 7e19d1db8d..127426b5bd 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceExecTest.java @@ -127,7 +127,7 @@ public class LineageTraceExecTest extends LineageBase { //get lineage and generate program String Rtrace = readDMLLineageFromHDFS("R"); - Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null); + Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace); if( testname.equals(TEST_NAME2) || testname.equals(TEST_NAME5)) { double val1 = readDMLScalarFromOutputDir("R").get(new CellIndex(1,1)); diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java index 763e73591e..53fc1b54ab 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceFunctionTest.java @@ -86,7 +86,7 @@ public class LineageTraceFunctionTest extends LineageBase //get lineage and generate program String Rtrace = readDMLLineageFromHDFS("R"); - Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null); + Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace); HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromOutputDir("R"); MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease(); diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceGPUTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceGPUTest.java index 031ed65f96..a6cb707b21 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceGPUTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceGPUTest.java @@ -83,7 +83,7 @@ public class LineageTraceGPUTest extends AutomatedTestBase{ String Rtrace = readDMLLineageFromHDFS("R"); AutomatedTestBase.TEST_GPU = false; //NOTE: the generated program is CP-only. - Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null); + Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace); HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromOutputDir("R"); MatrixBlock tmp = ((MatrixObject)ret).acquireReadAndRelease(); diff --git a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java index 48c39b0bdf..3abd25ce63 100644 --- a/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java +++ b/src/test/java/org/apache/sysds/test/functions/lineage/LineageTraceParforTest.java @@ -160,7 +160,7 @@ public class LineageTraceParforTest extends LineageBase { //get lineage and generate program String Rtrace = readDMLLineageFromHDFS("R"); - Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace, null); + Data ret = LineageRecomputeUtils.parseNComputeLineageTrace(Rtrace); HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromOutputDir("R"); MatrixBlock tmp = ((MatrixObject) ret).acquireReadAndRelease();