This is an automated email from the ASF dual-hosted git repository. mboehm7 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
commit dc9fca930cf4e1b5f49459a5902f564c6afee9ca Author: Matthias Boehm <[email protected]> AuthorDate: Sat Aug 9 12:25:07 2025 +0200 [SYSTEMDS-3900] Improved integration of OOC binary stream writer --- .../controlprogram/caching/CacheableData.java | 85 +++++++++++++--------- .../controlprogram/caching/FrameObject.java | 8 ++ .../controlprogram/caching/MatrixObject.java | 13 ++++ .../controlprogram/caching/TensorObject.java | 7 ++ .../instructions/cp/VariableCPInstruction.java | 30 -------- .../apache/sysds/runtime/io/WriterBinaryBlock.java | 14 ++-- .../apache/sysds/test/functions/ooc/UnaryTest.java | 24 +++--- .../functions/ooc/{Unary.dml => UnaryWrite.dml} | 4 - 8 files changed, 95 insertions(+), 90 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java index e075b55e17..7517bf0e35 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java @@ -909,44 +909,58 @@ public abstract class CacheableData<T extends CacheBlock<?>> extends Data // a) get the matrix boolean federatedWrite = (outputFormat != null ) && outputFormat.contains("federated"); - if( isEmpty(true) && !federatedWrite) - { - //read data from HDFS if required (never read before), this applies only to pWrite w/ different output formats - //note: for large rdd outputs, we compile dedicated writespinstructions (no need to handle this here) + if(getStreamHandle()!=null) { try { - if( getRDDHandle()==null || getRDDHandle().allowsShortCircuitRead() ) - _data = readBlobFromHDFS( _hdfsFileName ); - else if( getRDDHandle() != null ) - _data = readBlobFromRDD( getRDDHandle(), new MutableBoolean() ); - else if(!federatedWrite) - _data = readBlobFromFederated( getFedMapping() ); - setDirty(false); - refreshMetaData(); //e.g., after unknown csv read + long totalNnz = writeStreamToHDFS(fName, outputFormat, replication, formatProperties); + updateDataCharacteristics(new MatrixCharacteristics( + getNumRows(), getNumColumns(), blen, totalNnz)); + writeMetaData(fName, outputFormat, formatProperties); } - catch (IOException e) { - throw new DMLRuntimeException("Reading of " + _hdfsFileName + " ("+hashCode()+") failed.", e); + catch(Exception ex) { + throw new DMLRuntimeException("Failed to write OOC stream to " + fName, ex); } } - //get object from cache - if(!federatedWrite) { - if( _data == null ) - getCache(); - acquire( false, _data==null ); //incl. read matrix if evicted - } - - // b) write the matrix - try { - writeMetaData( fName, outputFormat, formatProperties ); - writeBlobToHDFS( fName, outputFormat, replication, formatProperties ); - if ( !pWrite ) - setDirty(false); - } - catch (Exception e) { - throw new DMLRuntimeException("Export to " + fName + " failed.", e); - } - finally { - if(!federatedWrite) - release(); + else { + if( isEmpty(true) && !federatedWrite) + { + //read data from HDFS if required (never read before), this applies only to pWrite w/ different output formats + //note: for large rdd outputs, we compile dedicated writespinstructions (no need to handle this here) + try { + if( getRDDHandle()==null || getRDDHandle().allowsShortCircuitRead() ) + _data = readBlobFromHDFS( _hdfsFileName ); + else if( getRDDHandle() != null ) + _data = readBlobFromRDD( getRDDHandle(), new MutableBoolean() ); + else if(!federatedWrite) + _data = readBlobFromFederated( getFedMapping() ); + setDirty(false); + refreshMetaData(); //e.g., after unknown csv read + } + catch (IOException e) { + throw new DMLRuntimeException("Reading of " + _hdfsFileName + " ("+hashCode()+") failed.", e); + } + } + + //get object from cache + if(!federatedWrite) { + if( _data == null ) + getCache(); + acquire( false, _data==null ); //incl. read matrix if evicted + } + + // b) write the matrix + try { + writeMetaData( fName, outputFormat, formatProperties ); + writeBlobToHDFS( fName, outputFormat, replication, formatProperties ); + if ( !pWrite ) + setDirty(false); + } + catch (Exception e) { + throw new DMLRuntimeException("Export to " + fName + " failed.", e); + } + finally { + if(!federatedWrite) + release(); + } } } else if( pWrite ) // pwrite with same output format @@ -1132,6 +1146,9 @@ public abstract class CacheableData<T extends CacheBlock<?>> extends Data protected abstract void writeBlobToHDFS(String fname, String ofmt, int rep, FileFormatProperties fprop) throws IOException; + protected abstract long writeStreamToHDFS(String fname, String ofmt, int rep, FileFormatProperties fprop) + throws IOException; + protected abstract void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, String ofmt) throws IOException; diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java index 56cc276cd8..f4d20bb55a 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java @@ -295,6 +295,14 @@ public class FrameObject extends CacheableData<FrameBlock> writer.writeFrameToHDFS(_data, fname, getNumRows(), getNumColumns()); } + @Override + protected long writeStreamToHDFS(String fname, String ofmt, int rep, FileFormatProperties fprop) + throws IOException, DMLRuntimeException + { + throw new UnsupportedOperationException(); + } + + @Override protected void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, String ofmt) throws IOException, DMLRuntimeException diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java index e9204bdaed..9f4ca12dd7 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java @@ -47,6 +47,8 @@ import org.apache.sysds.runtime.instructions.fed.InitFEDInstruction; import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.instructions.spark.data.RDDObject; import org.apache.sysds.runtime.io.FileFormatProperties; +import org.apache.sysds.runtime.io.MatrixWriter; +import org.apache.sysds.runtime.io.MatrixWriterFactory; import org.apache.sysds.runtime.io.ReaderWriterFederated; import org.apache.sysds.runtime.lineage.LineageItem; import org.apache.sysds.runtime.lineage.LineageRecomputeUtils; @@ -601,6 +603,17 @@ public class MatrixObject extends CacheableData<MatrixBlock> { if(DMLScript.STATISTICS) CacheStatistics.incrementHDFSWrites(); } + + @Override + protected long writeStreamToHDFS(String fname, String ofmt, int rep, FileFormatProperties fprop) + throws IOException, DMLRuntimeException + { + MetaDataFormat iimd = (MetaDataFormat) _metaData; + FileFormat fmt = (ofmt != null ? FileFormat.safeValueOf(ofmt) : iimd.getFileFormat()); + MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(fmt, rep, fprop); + return writer.writeMatrixFromStream(fname, getStreamHandle(), + getNumRows(), getNumColumns(), ConfigurationManager.getBlocksize()); + } @Override protected void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, String outputFormat) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java index d39ed8c8a9..d0111a3430 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java @@ -189,6 +189,13 @@ public class TensorObject extends CacheableData<TensorBlock> { if( DMLScript.STATISTICS ) CacheStatistics.incrementHDFSWrites(); } + + @Override + protected long writeStreamToHDFS(String fname, String ofmt, int rep, FileFormatProperties fprop) + throws IOException, DMLRuntimeException + { + throw new UnsupportedOperationException(); + } @Override protected ValueType[] getSchema() { diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java index bd40f253b4..d3be925027 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java @@ -41,13 +41,11 @@ import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType; import org.apache.sysds.runtime.controlprogram.caching.TensorObject; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; -import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence; import org.apache.sysds.runtime.data.TensorBlock; import org.apache.sysds.runtime.frame.data.FrameBlock; import org.apache.sysds.runtime.instructions.Instruction; import org.apache.sysds.runtime.instructions.InstructionUtils; -import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.io.FileFormatProperties; import org.apache.sysds.runtime.io.FileFormatPropertiesCSV; import org.apache.sysds.runtime.io.FileFormatPropertiesHDF5; @@ -57,8 +55,6 @@ import org.apache.sysds.runtime.io.ListWriter; import org.apache.sysds.runtime.io.WriterHDF5; import org.apache.sysds.runtime.io.WriterMatrixMarket; import org.apache.sysds.runtime.io.WriterTextCSV; -import org.apache.sysds.runtime.io.MatrixWriterFactory; -import org.apache.sysds.runtime.io.MatrixWriter; import org.apache.sysds.runtime.lineage.LineageItem; import org.apache.sysds.runtime.lineage.LineageItemUtils; import org.apache.sysds.runtime.lineage.LineageTraceable; @@ -1066,33 +1062,7 @@ public class VariableCPInstruction extends CPInstruction implements LineageTrace else if( getInput1().getDataType() == DataType.MATRIX ) { MatrixObject mo = ec.getMatrixObject(getInput1().getName()); int blen = Integer.parseInt(getInput4().getName()); - LocalTaskQueue<IndexedMatrixValue> stream = mo.getStreamHandle(); - if (stream != null) { - - try { - MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(fmt); - long nrows = mo.getNumRows(); - long ncols = mo.getNumColumns(); - - long totalNnz = writer.writeMatrixFromStream(fname, stream, nrows, ncols, blen); - MatrixCharacteristics mc = new MatrixCharacteristics(nrows, ncols, blen, totalNnz); - HDFSTool.writeMetaDataFile(fname + ".mtd", mo.getValueType(), mc, fmt); - - // 1. Update the metadata of the MatrixObject in the symbol table. - mo.updateDataCharacteristics(mc); - System.out.println("MO characterstics updated to avoid recompilation"); - - // 2. Clear its dirty flag and update its file path to the result we just wrote. - // This tells the system that the data for this variable now lives in 'fname'. - HDFSTool.copyFileOnHDFS(fname, mo.getFileName()); - mo.setDirty(false); - - } - catch(Exception ex) { - throw new DMLRuntimeException("Failed to write OOC stream to " + fname, ex); - } - } if( fmt == FileFormat.MM ) writeMMFile(ec, fname); else if( fmt == FileFormat.CSV ) diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java index a3798a4513..82c994eb7a 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java @@ -235,16 +235,13 @@ public class WriterBinaryBlock extends MatrixWriter { @Override public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) throws IOException { - JobConf conf = ConfigurationManager.getCachedJobConf(); Path path = new Path(fname); - FileSystem fs = IOUtilFunctions.getFileSystem(path, conf); - SequenceFile.Writer writer = null; long totalNnz = 0; try { - // 1. Create Sequence file writer for the final destination file writer = new SequenceFile.Writer(fs, conf, path, MatrixIndexes.class, MatrixBlock.class); - writer = SequenceFile.createWriter(fs, conf, path, MatrixIndexes.class, MatrixBlock.class); + // 1. Create Sequence file writer for the final destination file + writer = IOUtilFunctions.getSeqWriter(path, job, _replication); // 2. Loop through OOC stream IndexedMatrixValue i_val = null; @@ -257,13 +254,12 @@ public class WriterBinaryBlock extends MatrixWriter { totalNnz += mb.getNonZeros(); } - - } catch (IOException | InterruptedException e) { + } catch (Exception e) { throw new DMLRuntimeException(e); - } finally { + } finally { IOUtilFunctions.closeSilently(writer); } - return totalNnz; + return totalNnz; } } diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java index fc6f01a37f..5e203b5bf7 100644 --- a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java +++ b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java @@ -29,7 +29,6 @@ import org.apache.sysds.runtime.instructions.Instruction; import org.apache.sysds.runtime.io.MatrixWriter; import org.apache.sysds.runtime.io.MatrixWriterFactory; import org.apache.sysds.runtime.matrix.data.MatrixBlock; -import org.apache.sysds.runtime.matrix.data.MatrixValue; import org.apache.sysds.runtime.meta.MatrixCharacteristics; import org.apache.sysds.runtime.util.DataConverter; import org.apache.sysds.runtime.util.HDFSTool; @@ -40,13 +39,10 @@ import org.junit.Assert; import org.junit.Test; import java.io.IOException; -import java.util.HashMap; - -import static org.apache.sysds.test.TestUtils.readDMLMatrixFromHDFS; public class UnaryTest extends AutomatedTestBase { - private static final String TEST_NAME = "Unary"; + private static final String TEST_NAME = "UnaryWrite"; private static final String TEST_DIR = "functions/ooc/"; private static final String TEST_CLASS_DIR = TEST_DIR + UnaryTest.class.getSimpleName() + "/"; private static final String INPUT_NAME = "X"; @@ -55,18 +51,19 @@ public class UnaryTest extends AutomatedTestBase { @Override public void setUp() { TestUtils.clearAssertionInformation(); - TestConfiguration config = new TestConfiguration(TEST_CLASS_DIR, TEST_NAME); - addTestConfiguration(TEST_NAME, config); + addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME)); } - /** - * Test the sum of scalar multiplication, "sum(X*7)", with OOC backend. - */ @Test - public void testUnary() { + public void testWriteNoRewrite() { testUnaryOperation(false); } + @Test + public void testWriteRewrite() { + testUnaryOperation(true); + } + public void testUnaryOperation(boolean rewrite) { @@ -116,8 +113,9 @@ public class UnaryTest extends AutomatedTestBase { } } - private static double[][] readMatrix( String fname, FileFormat fmt, long rows, long cols, int brows, int bcols ) - throws IOException + private static double[][] readMatrix( String fname, FileFormat fmt, + long rows, long cols, int brows, int bcols ) + throws IOException { MatrixBlock mb = DataConverter.readMatrixFromHDFS(fname, fmt, rows, cols, brows, bcols); double[][] C = DataConverter.convertToDoubleMatrix(mb); diff --git a/src/test/scripts/functions/ooc/Unary.dml b/src/test/scripts/functions/ooc/UnaryWrite.dml similarity index 91% rename from src/test/scripts/functions/ooc/Unary.dml rename to src/test/scripts/functions/ooc/UnaryWrite.dml index 24c0d98fb4..da2d262206 100644 --- a/src/test/scripts/functions/ooc/Unary.dml +++ b/src/test/scripts/functions/ooc/UnaryWrite.dml @@ -21,9 +21,5 @@ # Read input matrix and operator from command line args X = read($1); -#print(toString(X)) res = ceil(X); -#print(toString(Y)) -#res = as.matrix(sum(Y)); -# Write the final matrix result write(res, $2, format="binary");
