This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 54a90adcb9dcd6c400db07ca8a641de8c04c184d
Author: Janardhan Pulivarthi <[email protected]>
AuthorDate: Sat Aug 9 11:38:09 2025 +0200

    [SYSTEMDS-3900] New OOC block stream binary writer
    
    Closes #2301.
    Closes #2302.
---
 .../runtime/compress/io/WriterCompressed.java      |  7 ++++
 .../instructions/cp/VariableCPInstruction.java     | 35 ++++++++++++++++++--
 .../org/apache/sysds/runtime/io/MatrixWriter.java  | 16 +++++++++
 .../apache/sysds/runtime/io/WriterBinaryBlock.java | 38 ++++++++++++++++++++++
 .../org/apache/sysds/runtime/io/WriterHDF5.java    |  7 ++++
 .../sysds/runtime/io/WriterMatrixMarket.java       |  7 ++++
 .../org/apache/sysds/runtime/io/WriterTextCSV.java |  7 ++++
 .../apache/sysds/runtime/io/WriterTextCell.java    |  7 ++++
 .../apache/sysds/runtime/io/WriterTextLIBSVM.java  |  7 ++++
 .../apache/sysds/test/functions/ooc/UnaryTest.java | 22 ++++++++++---
 src/test/scripts/functions/ooc/Unary.dml           |  6 ++--
 11 files changed, 149 insertions(+), 10 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java 
b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
index cf39ca6fba..c4d9db367b 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
@@ -46,7 +46,9 @@ import 
org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary;
 import org.apache.sysds.runtime.compress.lib.CLALibSeparator;
 import org.apache.sysds.runtime.compress.lib.CLALibSeparator.SeparatedGroups;
 import org.apache.sysds.runtime.compress.lib.CLALibSlice;
+import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
 import 
org.apache.sysds.runtime.instructions.spark.CompressionSPInstruction.CompressionFunction;
+import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
 import org.apache.sysds.runtime.instructions.spark.utils.RDDConverterUtils;
 import org.apache.sysds.runtime.io.FileFormatProperties;
 import org.apache.sysds.runtime.io.IOUtilFunctions;
@@ -407,4 +409,9 @@ public final class WriterCompressed extends MatrixWriter {
 
        }
 
+       @Override
+       public long writeMatrixFromStream(String fname, 
LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) {
+               throw new UnsupportedOperationException("Writing from an OOC 
stream is not supported for the HDF5 format.");
+       };
+
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
index 8400ec54e6..bd40f253b4 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
@@ -41,11 +41,13 @@ import 
org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType;
 import org.apache.sysds.runtime.controlprogram.caching.TensorObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
 import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
 import org.apache.sysds.runtime.data.TensorBlock;
 import org.apache.sysds.runtime.frame.data.FrameBlock;
 import org.apache.sysds.runtime.instructions.Instruction;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
 import org.apache.sysds.runtime.io.FileFormatProperties;
 import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
 import org.apache.sysds.runtime.io.FileFormatPropertiesHDF5;
@@ -55,6 +57,8 @@ import org.apache.sysds.runtime.io.ListWriter;
 import org.apache.sysds.runtime.io.WriterHDF5;
 import org.apache.sysds.runtime.io.WriterMatrixMarket;
 import org.apache.sysds.runtime.io.WriterTextCSV;
+import org.apache.sysds.runtime.io.MatrixWriterFactory;
+import org.apache.sysds.runtime.io.MatrixWriter;
 import org.apache.sysds.runtime.lineage.LineageItem;
 import org.apache.sysds.runtime.lineage.LineageItemUtils;
 import org.apache.sysds.runtime.lineage.LineageTraceable;
@@ -1060,6 +1064,35 @@ public class VariableCPInstruction extends CPInstruction 
implements LineageTrace
                        
HDFSTool.writeScalarToHDFS(ec.getScalarInput(getInput1()), fname);
                }
                else if( getInput1().getDataType() == DataType.MATRIX ) {
+                       MatrixObject mo = 
ec.getMatrixObject(getInput1().getName());
+                       int blen = Integer.parseInt(getInput4().getName());
+                       LocalTaskQueue<IndexedMatrixValue> stream = 
mo.getStreamHandle();
+
+                       if (stream != null) {
+
+                               try {
+                                       MatrixWriter writer = 
MatrixWriterFactory.createMatrixWriter(fmt);
+                    long nrows = mo.getNumRows();
+                    long ncols = mo.getNumColumns();
+
+                                       long totalNnz = 
writer.writeMatrixFromStream(fname, stream, nrows, ncols, blen);
+                                       MatrixCharacteristics mc = new 
MatrixCharacteristics(nrows, ncols, blen, totalNnz);
+                    HDFSTool.writeMetaDataFile(fname + ".mtd", 
mo.getValueType(), mc, fmt);
+
+                                       // 1. Update the metadata of the 
MatrixObject in the symbol table.
+                                       mo.updateDataCharacteristics(mc);
+                                       System.out.println("MO characterstics 
updated to avoid recompilation");
+
+                                       // 2. Clear its dirty flag and update 
its file path to the result we just wrote.
+                                       // This tells the system that the data 
for this variable now lives in 'fname'.
+                                       HDFSTool.copyFileOnHDFS(fname, 
mo.getFileName());
+                                       mo.setDirty(false);
+
+                               }
+                               catch(Exception ex) {
+                                       throw new DMLRuntimeException("Failed 
to write OOC stream to " + fname, ex);
+                               }
+                       }
                        if( fmt == FileFormat.MM )
                                writeMMFile(ec, fname);
                        else if( fmt == FileFormat.CSV )
@@ -1070,8 +1103,6 @@ public class VariableCPInstruction extends CPInstruction 
implements LineageTrace
                                writeHDF5File(ec, fname);
                        else {
                                // Default behavior (text, binary)
-                               MatrixObject mo = 
ec.getMatrixObject(getInput1().getName());
-                               int blen = 
Integer.parseInt(getInput4().getName());
                                mo.exportData(fname, fmtStr, new 
FileFormatProperties(blen));
                        }
                }
diff --git a/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java 
b/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java
index 0f335477bd..1844cc1af7 100644
--- a/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java
+++ b/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java
@@ -23,6 +23,8 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
+import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
 /**
@@ -42,6 +44,20 @@ public abstract class MatrixWriter {
 
        public abstract void writeMatrixToHDFS( MatrixBlock src, String fname, 
long rlen, long clen, int blen, long nnz, boolean diag )
                throws IOException;
+
+       /**
+        * Consumes an out-of-core stream of matrix blocks and writes them to a 
single file.
+        * This method must be implemented by writers that support OOC 
streaming output.
+        *
+        * @param fname The target output filename
+        * @param stream The OOC stream of matrix blocks to consume
+        * @param rlen The total number of rows in the matrix
+        * @param clen The total number of columns in the matrix
+        * @param blen The block size
+        * @throws IOException if an I/O error occurs
+        */
+       public abstract long writeMatrixFromStream(String fname, 
LocalTaskQueue<IndexedMatrixValue> stream,
+                                                                               
           long rlen, long clen, int blen) throws IOException;
        
        public void setForcedParallel(boolean par) {
                _forcedParallel = par;
diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java 
b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java
index e3dd3935c6..a3798a4513 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysds.conf.CompilerConfig.ConfigType;
@@ -30,10 +31,13 @@ import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
+import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
+import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysds.runtime.util.HDFSTool;
 
+
 public class WriterBinaryBlock extends MatrixWriter {
        protected int _replication = -1;
 
@@ -228,4 +232,38 @@ public class WriterBinaryBlock extends MatrixWriter {
                        IOUtilFunctions.closeSilently(writer);
                }
        }
+
+       @Override
+       public long writeMatrixFromStream(String fname, 
LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) 
throws IOException {
+               JobConf conf = ConfigurationManager.getCachedJobConf();
+               Path path = new Path(fname);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);
+
+               SequenceFile.Writer writer = null;
+
+               long totalNnz = 0;
+               try {
+                       // 1. Create Sequence file writer for the final 
destination file                        writer = new SequenceFile.Writer(fs, 
conf, path, MatrixIndexes.class, MatrixBlock.class);
+            writer = SequenceFile.createWriter(fs, conf, path, 
MatrixIndexes.class, MatrixBlock.class);
+
+                       // 2. Loop through OOC stream
+                       IndexedMatrixValue i_val =  null;
+                       while((i_val = stream.dequeueTask()) != 
LocalTaskQueue.NO_MORE_TASKS) {
+                               MatrixBlock mb = (MatrixBlock) i_val.getValue();
+                               MatrixIndexes ix = i_val.getIndexes();
+
+                               // 3. Append (key, value) record as a new value 
in the file
+                               writer.append(ix, mb);
+
+                               totalNnz += mb.getNonZeros();
+                       }
+
+               } catch (IOException | InterruptedException e) {
+                       throw new DMLRuntimeException(e);
+        } finally {
+                       IOUtilFunctions.closeSilently(writer);
+               }
+
+        return totalNnz;
+       }
 }
diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterHDF5.java 
b/src/main/java/org/apache/sysds/runtime/io/WriterHDF5.java
index 34b34333e6..ba3bbcda54 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterHDF5.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterHDF5.java
@@ -24,8 +24,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
 import org.apache.sysds.runtime.data.DenseBlock;
 import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
 import org.apache.sysds.runtime.io.hdf5.H5;
 import org.apache.sysds.runtime.io.hdf5.H5RootObject;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -129,4 +131,9 @@ public class WriterHDF5 extends MatrixWriter {
                        IOUtilFunctions.closeSilently(bos);
                }
        }
+
+       @Override
+       public long writeMatrixFromStream(String fname, 
LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) {
+               throw new UnsupportedOperationException("Writing from an OOC 
stream is not supported for the HDF5 format.");
+       };
 }
diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarket.java 
b/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarket.java
index d5eeabeb50..3985596820 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarket.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarket.java
@@ -35,7 +35,9 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
 import org.apache.sysds.runtime.data.DenseBlock;
+import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
 import org.apache.sysds.runtime.matrix.data.IJV;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.HDFSTool;
@@ -220,4 +222,9 @@ public class WriterMatrixMarket extends MatrixWriter
                        throw new IOException(src.toString() + ": No such file 
or directory");
                }
        }
+
+       @Override
+       public long writeMatrixFromStream(String fname, 
LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) {
+               throw new UnsupportedOperationException("Writing from an OOC 
stream is not supported for the MatrixMarket format.");
+       };
 }
diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterTextCSV.java 
b/src/main/java/org/apache/sysds/runtime/io/WriterTextCSV.java
index 4e3cf74239..9bc1edace9 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterTextCSV.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterTextCSV.java
@@ -35,8 +35,10 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
 import org.apache.sysds.runtime.data.DenseBlock;
 import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.HDFSTool;
 
@@ -341,4 +343,9 @@ public class WriterTextCSV extends MatrixWriter
                        throw new IOException(srcFilePath.toString() + ": No 
such file or directory");
                }
        }
+
+       @Override
+       public long writeMatrixFromStream(String fname, 
LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) {
+               throw new UnsupportedOperationException("Writing from an OOC 
stream is not supported for the TextCSV format.");
+       };
 }
diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterTextCell.java 
b/src/main/java/org/apache/sysds/runtime/io/WriterTextCell.java
index f5dd64af8f..b876f21752 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterTextCell.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterTextCell.java
@@ -30,7 +30,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
 import org.apache.sysds.runtime.data.DenseBlock;
+import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
 import org.apache.sysds.runtime.matrix.data.IJV;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.HDFSTool;
@@ -137,4 +139,9 @@ public class WriterTextCell extends MatrixWriter
                                br.write(IOUtilFunctions.EMPTY_TEXT_LINE);
                }
        }
+
+       @Override
+       public long writeMatrixFromStream(String fname, 
LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) {
+               throw new UnsupportedOperationException("Writing from an OOC 
stream is not supported for the TextCell format.");
+       };
 }
diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVM.java 
b/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVM.java
index 125217a2a0..4a97abefc5 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVM.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVM.java
@@ -28,8 +28,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
 import org.apache.sysds.runtime.data.DenseBlock;
 import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.HDFSTool;
 
@@ -156,4 +158,9 @@ public class WriterTextLIBSVM extends MatrixWriter {
                sb.append(_props.getIndexDelim());
                sb.append(value);
        }
+
+       @Override
+       public long writeMatrixFromStream(String fname, 
LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) {
+               throw new UnsupportedOperationException("Writing from an OOC 
stream is not supported for the LIBSVM format.");
+       };
 }
diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java 
b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java
index a81689af37..fc6f01a37f 100644
--- a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java
@@ -31,6 +31,7 @@ import org.apache.sysds.runtime.io.MatrixWriterFactory;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixValue;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.runtime.util.DataConverter;
 import org.apache.sysds.runtime.util.HDFSTool;
 import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
@@ -38,8 +39,11 @@ import org.apache.sysds.test.TestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.HashMap;
 
+import static org.apache.sysds.test.TestUtils.readDMLMatrixFromHDFS;
+
 public class UnaryTest extends AutomatedTestBase {
 
        private static final String TEST_NAME = "Unary";
@@ -86,17 +90,17 @@ public class UnaryTest extends AutomatedTestBase {
                        
                        runTest(true, false, null, -1);
 
-                       HashMap<MatrixValue.CellIndex, Double> dmlfile = 
readDMLMatrixFromOutputDir(OUTPUT_NAME);
-                       Double result = dmlfile.get(new 
MatrixValue.CellIndex(1, 1));
+                       double[][] C1 = readMatrix(output(OUTPUT_NAME), 
FileFormat.BINARY, rows, cols, 1000, 1000);
                        double expected = 0.0;
+                       double result = 0.0;
                        for(int i = 0; i < rows; i++) {
                                for(int j = 0; j < cols; j++) {
-                                       expected += Math.ceil(mb.get(i, j));
+                                       expected = Math.ceil(mb.get(i, j));
+                                       result = C1[i][j];
+                                       Assert.assertEquals(expected, result, 
1e-10);
                                }
                        }
 
-                       Assert.assertEquals(expected, result, 1e-10);
-
                        String prefix = Instruction.OOC_INST_PREFIX;
                        Assert.assertTrue("OOC wasn't used for RBLK",
                                heavyHittersContainsString(prefix + 
Opcodes.RBLK));
@@ -111,4 +115,12 @@ public class UnaryTest extends AutomatedTestBase {
                        resetExecMode(platformOld);
                }
        }
+
+       private static double[][] readMatrix( String fname, FileFormat fmt, 
long rows, long cols, int brows, int bcols )
+                       throws IOException
+       {
+               MatrixBlock mb = DataConverter.readMatrixFromHDFS(fname, fmt, 
rows, cols, brows, bcols);
+               double[][] C = DataConverter.convertToDoubleMatrix(mb);
+               return C;
+       }
 }
diff --git a/src/test/scripts/functions/ooc/Unary.dml 
b/src/test/scripts/functions/ooc/Unary.dml
index 6d34e8fd76..24c0d98fb4 100644
--- a/src/test/scripts/functions/ooc/Unary.dml
+++ b/src/test/scripts/functions/ooc/Unary.dml
@@ -22,8 +22,8 @@
 # Read input matrix and operator from command line args
 X = read($1);
 #print(toString(X))
-Y = ceil(X);
+res = ceil(X);
 #print(toString(Y))
-res = as.matrix(sum(Y));
+#res = as.matrix(sum(Y));
 # Write the final matrix result
-write(res, $2);
+write(res, $2, format="binary");

Reply via email to