This is an automated email from the ASF dual-hosted git repository. mboehm7 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
commit 54a90adcb9dcd6c400db07ca8a641de8c04c184d Author: Janardhan Pulivarthi <[email protected]> AuthorDate: Sat Aug 9 11:38:09 2025 +0200 [SYSTEMDS-3900] New OOC block stream binary writer Closes #2301. Closes #2302. --- .../runtime/compress/io/WriterCompressed.java | 7 ++++ .../instructions/cp/VariableCPInstruction.java | 35 ++++++++++++++++++-- .../org/apache/sysds/runtime/io/MatrixWriter.java | 16 +++++++++ .../apache/sysds/runtime/io/WriterBinaryBlock.java | 38 ++++++++++++++++++++++ .../org/apache/sysds/runtime/io/WriterHDF5.java | 7 ++++ .../sysds/runtime/io/WriterMatrixMarket.java | 7 ++++ .../org/apache/sysds/runtime/io/WriterTextCSV.java | 7 ++++ .../apache/sysds/runtime/io/WriterTextCell.java | 7 ++++ .../apache/sysds/runtime/io/WriterTextLIBSVM.java | 7 ++++ .../apache/sysds/test/functions/ooc/UnaryTest.java | 22 ++++++++++--- src/test/scripts/functions/ooc/Unary.dml | 6 ++-- 11 files changed, 149 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java index cf39ca6fba..c4d9db367b 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java +++ b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java @@ -46,7 +46,9 @@ import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary; import org.apache.sysds.runtime.compress.lib.CLALibSeparator; import org.apache.sysds.runtime.compress.lib.CLALibSeparator.SeparatedGroups; import org.apache.sysds.runtime.compress.lib.CLALibSlice; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; import org.apache.sysds.runtime.instructions.spark.CompressionSPInstruction.CompressionFunction; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.instructions.spark.utils.RDDConverterUtils; import org.apache.sysds.runtime.io.FileFormatProperties; import org.apache.sysds.runtime.io.IOUtilFunctions; @@ -407,4 +409,9 @@ public final class WriterCompressed extends MatrixWriter { } + @Override + public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) { + throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the HDF5 format."); + }; + } diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java index 8400ec54e6..bd40f253b4 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java @@ -41,11 +41,13 @@ import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType; import org.apache.sysds.runtime.controlprogram.caching.TensorObject; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence; import org.apache.sysds.runtime.data.TensorBlock; import org.apache.sysds.runtime.frame.data.FrameBlock; import org.apache.sysds.runtime.instructions.Instruction; import org.apache.sysds.runtime.instructions.InstructionUtils; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.io.FileFormatProperties; import org.apache.sysds.runtime.io.FileFormatPropertiesCSV; import org.apache.sysds.runtime.io.FileFormatPropertiesHDF5; @@ -55,6 +57,8 @@ import org.apache.sysds.runtime.io.ListWriter; import org.apache.sysds.runtime.io.WriterHDF5; import org.apache.sysds.runtime.io.WriterMatrixMarket; import org.apache.sysds.runtime.io.WriterTextCSV; +import org.apache.sysds.runtime.io.MatrixWriterFactory; +import org.apache.sysds.runtime.io.MatrixWriter; import org.apache.sysds.runtime.lineage.LineageItem; import org.apache.sysds.runtime.lineage.LineageItemUtils; import org.apache.sysds.runtime.lineage.LineageTraceable; @@ -1060,6 +1064,35 @@ public class VariableCPInstruction extends CPInstruction implements LineageTrace HDFSTool.writeScalarToHDFS(ec.getScalarInput(getInput1()), fname); } else if( getInput1().getDataType() == DataType.MATRIX ) { + MatrixObject mo = ec.getMatrixObject(getInput1().getName()); + int blen = Integer.parseInt(getInput4().getName()); + LocalTaskQueue<IndexedMatrixValue> stream = mo.getStreamHandle(); + + if (stream != null) { + + try { + MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(fmt); + long nrows = mo.getNumRows(); + long ncols = mo.getNumColumns(); + + long totalNnz = writer.writeMatrixFromStream(fname, stream, nrows, ncols, blen); + MatrixCharacteristics mc = new MatrixCharacteristics(nrows, ncols, blen, totalNnz); + HDFSTool.writeMetaDataFile(fname + ".mtd", mo.getValueType(), mc, fmt); + + // 1. Update the metadata of the MatrixObject in the symbol table. + mo.updateDataCharacteristics(mc); + System.out.println("MO characterstics updated to avoid recompilation"); + + // 2. Clear its dirty flag and update its file path to the result we just wrote. + // This tells the system that the data for this variable now lives in 'fname'. + HDFSTool.copyFileOnHDFS(fname, mo.getFileName()); + mo.setDirty(false); + + } + catch(Exception ex) { + throw new DMLRuntimeException("Failed to write OOC stream to " + fname, ex); + } + } if( fmt == FileFormat.MM ) writeMMFile(ec, fname); else if( fmt == FileFormat.CSV ) @@ -1070,8 +1103,6 @@ public class VariableCPInstruction extends CPInstruction implements LineageTrace writeHDF5File(ec, fname); else { // Default behavior (text, binary) - MatrixObject mo = ec.getMatrixObject(getInput1().getName()); - int blen = Integer.parseInt(getInput4().getName()); mo.exportData(fname, fmtStr, new FileFormatProperties(blen)); } } diff --git a/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java b/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java index 0f335477bd..1844cc1af7 100644 --- a/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java +++ b/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java @@ -23,6 +23,8 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.matrix.data.MatrixBlock; /** @@ -42,6 +44,20 @@ public abstract class MatrixWriter { public abstract void writeMatrixToHDFS( MatrixBlock src, String fname, long rlen, long clen, int blen, long nnz, boolean diag ) throws IOException; + + /** + * Consumes an out-of-core stream of matrix blocks and writes them to a single file. + * This method must be implemented by writers that support OOC streaming output. + * + * @param fname The target output filename + * @param stream The OOC stream of matrix blocks to consume + * @param rlen The total number of rows in the matrix + * @param clen The total number of columns in the matrix + * @param blen The block size + * @throws IOException if an I/O error occurs + */ + public abstract long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, + long rlen, long clen, int blen) throws IOException; public void setForcedParallel(boolean par) { _forcedParallel = par; diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java index e3dd3935c6..a3798a4513 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.mapred.JobConf; import org.apache.sysds.conf.CompilerConfig.ConfigType; @@ -30,10 +31,13 @@ import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.hops.OptimizerUtils; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.util.HDFSTool; + public class WriterBinaryBlock extends MatrixWriter { protected int _replication = -1; @@ -228,4 +232,38 @@ public class WriterBinaryBlock extends MatrixWriter { IOUtilFunctions.closeSilently(writer); } } + + @Override + public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) throws IOException { + JobConf conf = ConfigurationManager.getCachedJobConf(); + Path path = new Path(fname); + FileSystem fs = IOUtilFunctions.getFileSystem(path, conf); + + SequenceFile.Writer writer = null; + + long totalNnz = 0; + try { + // 1. Create Sequence file writer for the final destination file writer = new SequenceFile.Writer(fs, conf, path, MatrixIndexes.class, MatrixBlock.class); + writer = SequenceFile.createWriter(fs, conf, path, MatrixIndexes.class, MatrixBlock.class); + + // 2. Loop through OOC stream + IndexedMatrixValue i_val = null; + while((i_val = stream.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS) { + MatrixBlock mb = (MatrixBlock) i_val.getValue(); + MatrixIndexes ix = i_val.getIndexes(); + + // 3. Append (key, value) record as a new value in the file + writer.append(ix, mb); + + totalNnz += mb.getNonZeros(); + } + + } catch (IOException | InterruptedException e) { + throw new DMLRuntimeException(e); + } finally { + IOUtilFunctions.closeSilently(writer); + } + + return totalNnz; + } } diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterHDF5.java b/src/main/java/org/apache/sysds/runtime/io/WriterHDF5.java index 34b34333e6..ba3bbcda54 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterHDF5.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterHDF5.java @@ -24,8 +24,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; import org.apache.sysds.runtime.data.DenseBlock; import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.io.hdf5.H5; import org.apache.sysds.runtime.io.hdf5.H5RootObject; import org.apache.sysds.runtime.matrix.data.MatrixBlock; @@ -129,4 +131,9 @@ public class WriterHDF5 extends MatrixWriter { IOUtilFunctions.closeSilently(bos); } } + + @Override + public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) { + throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the HDF5 format."); + }; } diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarket.java b/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarket.java index d5eeabeb50..3985596820 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarket.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarket.java @@ -35,7 +35,9 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapred.JobConf; import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.matrix.data.IJV; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.HDFSTool; @@ -220,4 +222,9 @@ public class WriterMatrixMarket extends MatrixWriter throw new IOException(src.toString() + ": No such file or directory"); } } + + @Override + public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) { + throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the MatrixMarket format."); + }; } diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterTextCSV.java b/src/main/java/org/apache/sysds/runtime/io/WriterTextCSV.java index 4e3cf74239..9bc1edace9 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterTextCSV.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterTextCSV.java @@ -35,8 +35,10 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapred.JobConf; import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; import org.apache.sysds.runtime.data.DenseBlock; import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.HDFSTool; @@ -341,4 +343,9 @@ public class WriterTextCSV extends MatrixWriter throw new IOException(srcFilePath.toString() + ": No such file or directory"); } } + + @Override + public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) { + throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the TextCSV format."); + }; } diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterTextCell.java b/src/main/java/org/apache/sysds/runtime/io/WriterTextCell.java index f5dd64af8f..b876f21752 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterTextCell.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterTextCell.java @@ -30,7 +30,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.matrix.data.IJV; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.HDFSTool; @@ -137,4 +139,9 @@ public class WriterTextCell extends MatrixWriter br.write(IOUtilFunctions.EMPTY_TEXT_LINE); } } + + @Override + public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) { + throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the TextCell format."); + }; } diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVM.java b/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVM.java index 125217a2a0..4a97abefc5 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVM.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVM.java @@ -28,8 +28,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue; import org.apache.sysds.runtime.data.DenseBlock; import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.HDFSTool; @@ -156,4 +158,9 @@ public class WriterTextLIBSVM extends MatrixWriter { sb.append(_props.getIndexDelim()); sb.append(value); } + + @Override + public long writeMatrixFromStream(String fname, LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) { + throw new UnsupportedOperationException("Writing from an OOC stream is not supported for the LIBSVM format."); + }; } diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java index a81689af37..fc6f01a37f 100644 --- a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java +++ b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java @@ -31,6 +31,7 @@ import org.apache.sysds.runtime.io.MatrixWriterFactory; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixValue; import org.apache.sysds.runtime.meta.MatrixCharacteristics; +import org.apache.sysds.runtime.util.DataConverter; import org.apache.sysds.runtime.util.HDFSTool; import org.apache.sysds.test.AutomatedTestBase; import org.apache.sysds.test.TestConfiguration; @@ -38,8 +39,11 @@ import org.apache.sysds.test.TestUtils; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.util.HashMap; +import static org.apache.sysds.test.TestUtils.readDMLMatrixFromHDFS; + public class UnaryTest extends AutomatedTestBase { private static final String TEST_NAME = "Unary"; @@ -86,17 +90,17 @@ public class UnaryTest extends AutomatedTestBase { runTest(true, false, null, -1); - HashMap<MatrixValue.CellIndex, Double> dmlfile = readDMLMatrixFromOutputDir(OUTPUT_NAME); - Double result = dmlfile.get(new MatrixValue.CellIndex(1, 1)); + double[][] C1 = readMatrix(output(OUTPUT_NAME), FileFormat.BINARY, rows, cols, 1000, 1000); double expected = 0.0; + double result = 0.0; for(int i = 0; i < rows; i++) { for(int j = 0; j < cols; j++) { - expected += Math.ceil(mb.get(i, j)); + expected = Math.ceil(mb.get(i, j)); + result = C1[i][j]; + Assert.assertEquals(expected, result, 1e-10); } } - Assert.assertEquals(expected, result, 1e-10); - String prefix = Instruction.OOC_INST_PREFIX; Assert.assertTrue("OOC wasn't used for RBLK", heavyHittersContainsString(prefix + Opcodes.RBLK)); @@ -111,4 +115,12 @@ public class UnaryTest extends AutomatedTestBase { resetExecMode(platformOld); } } + + private static double[][] readMatrix( String fname, FileFormat fmt, long rows, long cols, int brows, int bcols ) + throws IOException + { + MatrixBlock mb = DataConverter.readMatrixFromHDFS(fname, fmt, rows, cols, brows, bcols); + double[][] C = DataConverter.convertToDoubleMatrix(mb); + return C; + } } diff --git a/src/test/scripts/functions/ooc/Unary.dml b/src/test/scripts/functions/ooc/Unary.dml index 6d34e8fd76..24c0d98fb4 100644 --- a/src/test/scripts/functions/ooc/Unary.dml +++ b/src/test/scripts/functions/ooc/Unary.dml @@ -22,8 +22,8 @@ # Read input matrix and operator from command line args X = read($1); #print(toString(X)) -Y = ceil(X); +res = ceil(X); #print(toString(Y)) -res = as.matrix(sum(Y)); +#res = as.matrix(sum(Y)); # Write the final matrix result -write(res, $2); +write(res, $2, format="binary");
