This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit dc9fca930cf4e1b5f49459a5902f564c6afee9ca
Author: Matthias Boehm <[email protected]>
AuthorDate: Sat Aug 9 12:25:07 2025 +0200

    [SYSTEMDS-3900] Improved integration of OOC binary stream writer
---
 .../controlprogram/caching/CacheableData.java      | 85 +++++++++++++---------
 .../controlprogram/caching/FrameObject.java        |  8 ++
 .../controlprogram/caching/MatrixObject.java       | 13 ++++
 .../controlprogram/caching/TensorObject.java       |  7 ++
 .../instructions/cp/VariableCPInstruction.java     | 30 --------
 .../apache/sysds/runtime/io/WriterBinaryBlock.java | 14 ++--
 .../apache/sysds/test/functions/ooc/UnaryTest.java | 24 +++---
 .../functions/ooc/{Unary.dml => UnaryWrite.dml}    |  4 -
 8 files changed, 95 insertions(+), 90 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index e075b55e17..7517bf0e35 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -909,44 +909,58 @@ public abstract class CacheableData<T extends 
CacheBlock<?>> extends Data
                        // a) get the matrix
                        boolean federatedWrite = (outputFormat != null ) &&  
outputFormat.contains("federated");
 
-                       if( isEmpty(true) && !federatedWrite)
-                       {
-                               //read data from HDFS if required (never read 
before), this applies only to pWrite w/ different output formats
-                               //note: for large rdd outputs, we compile 
dedicated writespinstructions (no need to handle this here) 
+                       if(getStreamHandle()!=null) {
                                try {
-                                       if( getRDDHandle()==null || 
getRDDHandle().allowsShortCircuitRead() )
-                                               _data = readBlobFromHDFS( 
_hdfsFileName );
-                                       else if( getRDDHandle() != null )
-                                               _data = readBlobFromRDD( 
getRDDHandle(), new MutableBoolean() );
-                                       else if(!federatedWrite)
-                                               _data = readBlobFromFederated( 
getFedMapping() );
-                                       setDirty(false);
-                                       refreshMetaData(); //e.g., after 
unknown csv read
+                                       long totalNnz = 
writeStreamToHDFS(fName, outputFormat, replication, formatProperties);
+                                       updateDataCharacteristics(new 
MatrixCharacteristics(
+                                               getNumRows(), getNumColumns(), 
blen, totalNnz));
+                                       writeMetaData(fName, outputFormat, 
formatProperties);
                                }
-                               catch (IOException e) {
-                                       throw new DMLRuntimeException("Reading 
of " + _hdfsFileName + " ("+hashCode()+") failed.", e);
+                               catch(Exception ex) {
+                                       throw new DMLRuntimeException("Failed 
to write OOC stream to " + fName, ex);
                                }
                        }
-                       //get object from cache
-                       if(!federatedWrite) {
-                               if( _data == null )
-                                       getCache();
-                               acquire( false, _data==null ); //incl. read 
matrix if evicted
-                       }
-
-                       // b) write the matrix 
-                       try {
-                               writeMetaData( fName, outputFormat, 
formatProperties );
-                               writeBlobToHDFS( fName, outputFormat, 
replication, formatProperties );
-                               if ( !pWrite )
-                                       setDirty(false);
-                       }
-                       catch (Exception e) {
-                               throw new DMLRuntimeException("Export to " + 
fName + " failed.", e);
-                       }
-                       finally {
-                               if(!federatedWrite)
-                                       release();
+                       else {
+                               if( isEmpty(true) && !federatedWrite)
+                               {
+                                       //read data from HDFS if required 
(never read before), this applies only to pWrite w/ different output formats
+                                       //note: for large rdd outputs, we 
compile dedicated writespinstructions (no need to handle this here) 
+                                       try {
+                                               if( getRDDHandle()==null || 
getRDDHandle().allowsShortCircuitRead() )
+                                                       _data = 
readBlobFromHDFS( _hdfsFileName );
+                                               else if( getRDDHandle() != null 
)
+                                                       _data = 
readBlobFromRDD( getRDDHandle(), new MutableBoolean() );
+                                               else if(!federatedWrite)
+                                                       _data = 
readBlobFromFederated( getFedMapping() );
+                                               setDirty(false);
+                                               refreshMetaData(); //e.g., 
after unknown csv read
+                                       }
+                                       catch (IOException e) {
+                                               throw new 
DMLRuntimeException("Reading of " + _hdfsFileName + " ("+hashCode()+") 
failed.", e);
+                                       }
+                               }
+                               
+                               //get object from cache
+                               if(!federatedWrite) {
+                                       if( _data == null )
+                                               getCache();
+                                       acquire( false, _data==null ); //incl. 
read matrix if evicted
+                               }
+       
+                               // b) write the matrix 
+                               try {
+                                       writeMetaData( fName, outputFormat, 
formatProperties );
+                                       writeBlobToHDFS( fName, outputFormat, 
replication, formatProperties );
+                                       if ( !pWrite )
+                                               setDirty(false);
+                               }
+                               catch (Exception e) {
+                                       throw new DMLRuntimeException("Export 
to " + fName + " failed.", e);
+                               }
+                               finally {
+                                       if(!federatedWrite)
+                                               release();
+                               }
                        }
                }
                else if( pWrite ) // pwrite with same output format
@@ -1132,6 +1146,9 @@ public abstract class CacheableData<T extends 
CacheBlock<?>> extends Data
        protected abstract void writeBlobToHDFS(String fname, String ofmt, int 
rep, FileFormatProperties fprop)
                throws IOException;
 
+       protected abstract long writeStreamToHDFS(String fname, String ofmt, 
int rep, FileFormatProperties fprop)
+               throws IOException;
+       
        protected abstract void writeBlobFromRDDtoHDFS(RDDObject rdd, String 
fname, String ofmt)
                throws IOException;
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
index 56cc276cd8..f4d20bb55a 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
@@ -295,6 +295,14 @@ public class FrameObject extends CacheableData<FrameBlock>
                writer.writeFrameToHDFS(_data, fname, getNumRows(), 
getNumColumns());
        }
 
+       @Override
+       protected long writeStreamToHDFS(String fname, String ofmt, int rep, 
FileFormatProperties fprop)
+               throws IOException, DMLRuntimeException 
+       {
+               throw new UnsupportedOperationException();
+       }
+
+       
        @Override
        protected void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, 
String ofmt)
                throws IOException, DMLRuntimeException 
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
index e9204bdaed..9f4ca12dd7 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/MatrixObject.java
@@ -47,6 +47,8 @@ import 
org.apache.sysds.runtime.instructions.fed.InitFEDInstruction;
 import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
 import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
 import org.apache.sysds.runtime.io.FileFormatProperties;
+import org.apache.sysds.runtime.io.MatrixWriter;
+import org.apache.sysds.runtime.io.MatrixWriterFactory;
 import org.apache.sysds.runtime.io.ReaderWriterFederated;
 import org.apache.sysds.runtime.lineage.LineageItem;
 import org.apache.sysds.runtime.lineage.LineageRecomputeUtils;
@@ -601,6 +603,17 @@ public class MatrixObject extends 
CacheableData<MatrixBlock> {
                if(DMLScript.STATISTICS)
                        CacheStatistics.incrementHDFSWrites();
        }
+       
+       @Override
+       protected long writeStreamToHDFS(String fname, String ofmt, int rep, 
FileFormatProperties fprop)
+               throws IOException, DMLRuntimeException 
+       {
+               MetaDataFormat iimd = (MetaDataFormat) _metaData;
+               FileFormat fmt = (ofmt != null ? FileFormat.safeValueOf(ofmt) : 
iimd.getFileFormat());
+               MatrixWriter writer = 
MatrixWriterFactory.createMatrixWriter(fmt, rep, fprop);
+               return writer.writeMatrixFromStream(fname, getStreamHandle(), 
+                       getNumRows(), getNumColumns(), 
ConfigurationManager.getBlocksize());
+       }
 
        @Override
        protected void writeBlobFromRDDtoHDFS(RDDObject rdd, String fname, 
String outputFormat)
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java
index d39ed8c8a9..d0111a3430 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/TensorObject.java
@@ -189,6 +189,13 @@ public class TensorObject extends 
CacheableData<TensorBlock> {
                if( DMLScript.STATISTICS )
                        CacheStatistics.incrementHDFSWrites();
        }
+       
+       @Override
+       protected long writeStreamToHDFS(String fname, String ofmt, int rep, 
FileFormatProperties fprop)
+               throws IOException, DMLRuntimeException 
+       {
+               throw new UnsupportedOperationException();
+       }
 
        @Override
        protected ValueType[] getSchema() {
diff --git 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
index bd40f253b4..d3be925027 100644
--- 
a/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
+++ 
b/src/main/java/org/apache/sysds/runtime/instructions/cp/VariableCPInstruction.java
@@ -41,13 +41,11 @@ import 
org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType;
 import org.apache.sysds.runtime.controlprogram.caching.TensorObject;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
-import org.apache.sysds.runtime.controlprogram.parfor.LocalTaskQueue;
 import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
 import org.apache.sysds.runtime.data.TensorBlock;
 import org.apache.sysds.runtime.frame.data.FrameBlock;
 import org.apache.sysds.runtime.instructions.Instruction;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
-import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
 import org.apache.sysds.runtime.io.FileFormatProperties;
 import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
 import org.apache.sysds.runtime.io.FileFormatPropertiesHDF5;
@@ -57,8 +55,6 @@ import org.apache.sysds.runtime.io.ListWriter;
 import org.apache.sysds.runtime.io.WriterHDF5;
 import org.apache.sysds.runtime.io.WriterMatrixMarket;
 import org.apache.sysds.runtime.io.WriterTextCSV;
-import org.apache.sysds.runtime.io.MatrixWriterFactory;
-import org.apache.sysds.runtime.io.MatrixWriter;
 import org.apache.sysds.runtime.lineage.LineageItem;
 import org.apache.sysds.runtime.lineage.LineageItemUtils;
 import org.apache.sysds.runtime.lineage.LineageTraceable;
@@ -1066,33 +1062,7 @@ public class VariableCPInstruction extends CPInstruction 
implements LineageTrace
                else if( getInput1().getDataType() == DataType.MATRIX ) {
                        MatrixObject mo = 
ec.getMatrixObject(getInput1().getName());
                        int blen = Integer.parseInt(getInput4().getName());
-                       LocalTaskQueue<IndexedMatrixValue> stream = 
mo.getStreamHandle();
 
-                       if (stream != null) {
-
-                               try {
-                                       MatrixWriter writer = 
MatrixWriterFactory.createMatrixWriter(fmt);
-                    long nrows = mo.getNumRows();
-                    long ncols = mo.getNumColumns();
-
-                                       long totalNnz = 
writer.writeMatrixFromStream(fname, stream, nrows, ncols, blen);
-                                       MatrixCharacteristics mc = new 
MatrixCharacteristics(nrows, ncols, blen, totalNnz);
-                    HDFSTool.writeMetaDataFile(fname + ".mtd", 
mo.getValueType(), mc, fmt);
-
-                                       // 1. Update the metadata of the 
MatrixObject in the symbol table.
-                                       mo.updateDataCharacteristics(mc);
-                                       System.out.println("MO characterstics 
updated to avoid recompilation");
-
-                                       // 2. Clear its dirty flag and update 
its file path to the result we just wrote.
-                                       // This tells the system that the data 
for this variable now lives in 'fname'.
-                                       HDFSTool.copyFileOnHDFS(fname, 
mo.getFileName());
-                                       mo.setDirty(false);
-
-                               }
-                               catch(Exception ex) {
-                                       throw new DMLRuntimeException("Failed 
to write OOC stream to " + fname, ex);
-                               }
-                       }
                        if( fmt == FileFormat.MM )
                                writeMMFile(ec, fname);
                        else if( fmt == FileFormat.CSV )
diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java 
b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java
index a3798a4513..82c994eb7a 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java
@@ -235,16 +235,13 @@ public class WriterBinaryBlock extends MatrixWriter {
 
        @Override
        public long writeMatrixFromStream(String fname, 
LocalTaskQueue<IndexedMatrixValue> stream, long rlen, long clen, int blen) 
throws IOException {
-               JobConf conf = ConfigurationManager.getCachedJobConf();
                Path path = new Path(fname);
-               FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);
-
                SequenceFile.Writer writer = null;
 
                long totalNnz = 0;
                try {
-                       // 1. Create Sequence file writer for the final 
destination file                        writer = new SequenceFile.Writer(fs, 
conf, path, MatrixIndexes.class, MatrixBlock.class);
-            writer = SequenceFile.createWriter(fs, conf, path, 
MatrixIndexes.class, MatrixBlock.class);
+                       // 1. Create Sequence file writer for the final 
destination file
+                       writer = IOUtilFunctions.getSeqWriter(path, job, 
_replication);
 
                        // 2. Loop through OOC stream
                        IndexedMatrixValue i_val =  null;
@@ -257,13 +254,12 @@ public class WriterBinaryBlock extends MatrixWriter {
 
                                totalNnz += mb.getNonZeros();
                        }
-
-               } catch (IOException | InterruptedException e) {
+               } catch (Exception e) {
                        throw new DMLRuntimeException(e);
-        } finally {
+               } finally {
                        IOUtilFunctions.closeSilently(writer);
                }
 
-        return totalNnz;
+               return totalNnz;
        }
 }
diff --git a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java 
b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java
index fc6f01a37f..5e203b5bf7 100644
--- a/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/ooc/UnaryTest.java
@@ -29,7 +29,6 @@ import org.apache.sysds.runtime.instructions.Instruction;
 import org.apache.sysds.runtime.io.MatrixWriter;
 import org.apache.sysds.runtime.io.MatrixWriterFactory;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.apache.sysds.runtime.matrix.data.MatrixValue;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
 import org.apache.sysds.runtime.util.DataConverter;
 import org.apache.sysds.runtime.util.HDFSTool;
@@ -40,13 +39,10 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.HashMap;
-
-import static org.apache.sysds.test.TestUtils.readDMLMatrixFromHDFS;
 
 public class UnaryTest extends AutomatedTestBase {
 
-       private static final String TEST_NAME = "Unary";
+       private static final String TEST_NAME = "UnaryWrite";
        private static final String TEST_DIR = "functions/ooc/";
        private static final String TEST_CLASS_DIR = TEST_DIR + 
UnaryTest.class.getSimpleName() + "/";
        private static final String INPUT_NAME = "X";
@@ -55,18 +51,19 @@ public class UnaryTest extends AutomatedTestBase {
        @Override
        public void setUp() {
                TestUtils.clearAssertionInformation();
-               TestConfiguration config = new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME);
-               addTestConfiguration(TEST_NAME, config);
+               addTestConfiguration(TEST_NAME, new 
TestConfiguration(TEST_CLASS_DIR, TEST_NAME));
        }
 
-       /**
-        * Test the sum of scalar multiplication, "sum(X*7)", with OOC backend.
-        */
        @Test
-       public void testUnary() {
+       public void testWriteNoRewrite() {
                testUnaryOperation(false);
        }
        
+       @Test
+       public void testWriteRewrite() {
+               testUnaryOperation(true);
+       }
+       
        
        public void testUnaryOperation(boolean rewrite)
        {
@@ -116,8 +113,9 @@ public class UnaryTest extends AutomatedTestBase {
                }
        }
 
-       private static double[][] readMatrix( String fname, FileFormat fmt, 
long rows, long cols, int brows, int bcols )
-                       throws IOException
+       private static double[][] readMatrix( String fname, FileFormat fmt, 
+               long rows, long cols, int brows, int bcols )
+               throws IOException
        {
                MatrixBlock mb = DataConverter.readMatrixFromHDFS(fname, fmt, 
rows, cols, brows, bcols);
                double[][] C = DataConverter.convertToDoubleMatrix(mb);
diff --git a/src/test/scripts/functions/ooc/Unary.dml 
b/src/test/scripts/functions/ooc/UnaryWrite.dml
similarity index 91%
rename from src/test/scripts/functions/ooc/Unary.dml
rename to src/test/scripts/functions/ooc/UnaryWrite.dml
index 24c0d98fb4..da2d262206 100644
--- a/src/test/scripts/functions/ooc/Unary.dml
+++ b/src/test/scripts/functions/ooc/UnaryWrite.dml
@@ -21,9 +21,5 @@
 
 # Read input matrix and operator from command line args
 X = read($1);
-#print(toString(X))
 res = ceil(X);
-#print(toString(Y))
-#res = as.matrix(sum(Y));
-# Write the final matrix result
 write(res, $2, format="binary");

Reply via email to