This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 6272b0eb64 [SYSTEMDS-3814] Fix invalid rename of csv input to output
files
6272b0eb64 is described below
commit 6272b0eb6483d65fb8c02fdfd1871fbcce3b731b
Author: Matthias Boehm <[email protected]>
AuthorDate: Fri Jan 17 19:48:47 2025 +0100
[SYSTEMDS-3814] Fix invalid rename of csv input to output files
This patch fixes a remaining invalid rename of persistently read input
csv files to csv output files, which "deletes" the input file. So far
we based this information on the PREAD variable name, but certain
assignments loose this information. We now properly capture this
information at createvar instructions, preserve them inside all
matrices, frames, and tensors, and thus ensure robustness for all
kind of programs.
---
.../controlprogram/caching/CacheableData.java | 9 ++
.../instructions/cp/VariableCPInstruction.java | 16 +--
.../sysds/test/functions/io/RenameIssueTest.java | 134 +++++++++++++++++++++
src/test/scripts/functions/io/Rename.dml | 31 +++++
4 files changed, 182 insertions(+), 8 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index dc8ca3aec6..eba22e7f15 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -183,6 +183,7 @@ public abstract class CacheableData<T extends
CacheBlock<?>> extends Data
/** The name of HDFS file in which the data is backed up. */
protected String _hdfsFileName = null; // file name and path
+ protected boolean _isPRead = false; //persistent read, must not be
deleted
/**
* Flag that indicates whether or not hdfs file exists.It is used
@@ -285,6 +286,14 @@ public abstract class CacheableData<T extends
CacheBlock<?>> extends Data
return _hdfsFileName;
}
+ public boolean isPersistentRead() {
+ return _isPRead;
+ }
+
+ public void setPersistentRead(boolean pread) {
+ _isPRead = pread;
+ }
+
public long getUniqueID() {
return _uniqueID;
}
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
index 6ed292bbb0..c1397381ba 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
@@ -706,7 +706,7 @@ public class VariableCPInstruction extends CPInstruction
implements LineageTrace
case MATRIX: {
String fname = createUniqueFilename();
MatrixObject obj = new
MatrixObject(getInput1().getValueType(), fname);
- setCacheableDataFields(obj);
+ setCacheableDataFields(obj,
getInput1().getName());
obj.setUpdateType(_updateType);
obj.setMarkForLinCache(true);
ec.setVariable(getInput1().getName(), obj);
@@ -717,14 +717,14 @@ public class VariableCPInstruction extends CPInstruction
implements LineageTrace
case TENSOR: {
String fname = createUniqueFilename();
TensorObject obj = new
TensorObject(getInput1().getValueType(), fname);
- setCacheableDataFields(obj);
+ setCacheableDataFields(obj,
getInput1().getName());
ec.setVariable(getInput1().getName(), obj);
break;
}
case FRAME: {
String fname = createUniqueFilename();
FrameObject fobj = new FrameObject(fname);
- setCacheableDataFields(fobj);
+ setCacheableDataFields(fobj,
getInput1().getName());
if( _schema != null )
fobj.setSchema(_schema); //after
metadata
ec.setVariable(getInput1().getName(), fobj);
@@ -757,13 +757,14 @@ public class VariableCPInstruction extends CPInstruction
implements LineageTrace
return fname;
}
- private void setCacheableDataFields(CacheableData<?> obj){
+ private void setCacheableDataFields(CacheableData<?> obj, String
varname){
//clone metadata because it is updated on copy-on-write,
otherwise there
//is potential for hidden side effects between variables.
obj.setMetaData((MetaData)metadata.clone());
obj.enableCleanup(!getInput1().getName()
.startsWith(org.apache.sysds.lops.Data.PREAD_PREFIX));
obj.setFileFormatProperties(_formatProperties);
+
obj.setPersistentRead(varname.startsWith(org.apache.sysds.lops.Data.PREAD_PREFIX));
}
/**
@@ -960,7 +961,7 @@ public class VariableCPInstruction extends CPInstruction
implements LineageTrace
/**
* Handler for CastAsFrameVariable instruction
- *
+ *
* @param ec execution context
*/
private void processCastAsFrameVariableInstruction(ExecutionContext ec){
@@ -1018,6 +1019,7 @@ public class VariableCPInstruction extends CPInstruction
implements LineageTrace
* @param ec execution context
*/
private void processCopyInstruction(ExecutionContext ec) {
+
// get source variable
Data dd = ec.getVariable(getInput1().getName());
@@ -1142,9 +1144,7 @@ public class VariableCPInstruction extends CPInstruction
implements LineageTrace
try {
FileFormat fmt =
((MetaDataFormat)mo.getMetaData()).getFileFormat();
DataCharacteristics dc =
(mo.getMetaData()).getDataCharacteristics();
- if( fmt == FileFormat.CSV
- &&
!getInput1().getName().startsWith(org.apache.sysds.lops.Data.PREAD_PREFIX) )
- {
+ if( fmt == FileFormat.CSV &&
!mo.isPersistentRead() ) {
WriterTextCSV writer = new
WriterTextCSV((FileFormatPropertiesCSV)fprop);
writer.addHeaderToCSV(mo.getFileName(),
fname, dc.getRows(), dc.getCols());
}
diff --git
a/src/test/java/org/apache/sysds/test/functions/io/RenameIssueTest.java
b/src/test/java/org/apache/sysds/test/functions/io/RenameIssueTest.java
new file mode 100644
index 0000000000..e7e975f304
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/io/RenameIssueTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.io;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.common.Types.ExecMode;
+import org.apache.sysds.common.Types.FileFormat;
+import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.io.MatrixWriter;
+import org.apache.sysds.runtime.io.MatrixWriterFactory;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.DataConverter;
+import org.apache.sysds.runtime.util.HDFSTool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RenameIssueTest extends AutomatedTestBase {
+
+ protected static final Log LOG =
LogFactory.getLog(RenameIssueTest.class.getName());
+
+ private final static String TEST_NAME1 = "Rename";
+ private final static String TEST_DIR = "functions/io/";
+ private final static String TEST_CLASS_DIR = TEST_DIR +
RenameIssueTest.class.getSimpleName() + "/";
+
+ @Override
+ public void setUp() {
+ TestUtils.clearAssertionInformation();
+ addTestConfiguration(TEST_NAME1, new
TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"L","R1"}) );
+ }
+
+ @Test
+ public void testCSVSinglenode() {
+ runRameTest(FileFormat.CSV, ExecMode.SINGLE_NODE);
+ }
+
+ @Test
+ public void testCSVHybrid() {
+ runRameTest(FileFormat.CSV, ExecMode.HYBRID);
+ }
+
+ @Test
+ public void testCSVSpark() {
+ runRameTest(FileFormat.CSV, ExecMode.SPARK);
+ }
+
+ @Test
+ public void testTextSinglenode() {
+ runRameTest(FileFormat.TEXT, ExecMode.SINGLE_NODE);
+ }
+
+ @Test
+ public void testTextHybrid() {
+ runRameTest(FileFormat.TEXT, ExecMode.HYBRID);
+ }
+
+ @Test
+ public void testTextSpark() {
+ runRameTest(FileFormat.TEXT, ExecMode.SPARK);
+ }
+
+ @Test
+ public void testBinarySinglenode() {
+ runRameTest(FileFormat.BINARY, ExecMode.SINGLE_NODE);
+ }
+
+ @Test
+ public void testBinaryHybrid() {
+ runRameTest(FileFormat.BINARY, ExecMode.HYBRID);
+ }
+
+ @Test
+ public void testBinarySpark() {
+ runRameTest(FileFormat.BINARY, ExecMode.SPARK);
+ }
+
+ private void runRameTest(FileFormat fmt, ExecMode mode)
+ {
+ ExecMode modeOld = setExecMode(mode);
+
+ try {
+ TestConfiguration config =
getTestConfiguration(TEST_NAME1);
+ loadTestConfiguration(config);
+
+ MatrixBlock a =
DataConverter.convertToMatrixBlock(getRandomMatrix(7, 7, -1, 1, 0.5, -1));
+ MatrixWriter writer =
MatrixWriterFactory.createMatrixWriter(fmt);
+ writer.writeMatrixToHDFS(a, input("A"),
+ (long)a.getNumRows(), (long)a.getNumColumns(),
(int)a.getNonZeros(), 1000);
+ HDFSTool.writeMetaDataFile(input("A")+".mtd",
ValueType.FP64,
+ new MatrixCharacteristics(7,7,1000), fmt);
+
+ String HOME = SCRIPT_DIR + TEST_DIR;
+ fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
+ programArgs = new String[]{"-explain",
+ "-args", input("A"),
fmt.toString().toLowerCase(), output("B")};
+ runTest(true, false, null, -1);
+
+ //check file existence (no rename to output)
+ Assert.assertTrue(new File(input("A")).exists());
+ Assert.assertTrue(new File(output("B")).exists());
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ finally {
+ resetExecMode(modeOld);
+ }
+ }
+}
diff --git a/src/test/scripts/functions/io/Rename.dml
b/src/test/scripts/functions/io/Rename.dml
new file mode 100644
index 0000000000..14b114bba4
--- /dev/null
+++ b/src/test/scripts/functions/io/Rename.dml
@@ -0,0 +1,31 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X1 = read($1);
+
+Xa = X1;
+for(i in 1:2) {
+ write(Xa, $3, format=$2);
+ while(FALSE){} #write first
+ Xa = rbind(Xa, X1);
+ print("Creating and writing replicated dataset ["+i+"]");
+}
+