This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new a4b3de305c [MINOR] Updated reader/write tests to force parallel writers
a4b3de305c is described below

commit a4b3de305cb77dd43d4c3cc11808f4d2c865a7ee
Author: Matthias Boehm <[email protected]>
AuthorDate: Mon Nov 25 18:58:04 2024 +0100

    [MINOR] Updated reader/write tests to force parallel writers
---
 src/main/java/org/apache/sysds/runtime/io/FrameWriter.java          | 6 ++++++
 .../org/apache/sysds/runtime/io/FrameWriterBinaryBlockParallel.java | 6 +++---
 .../org/apache/sysds/runtime/io/FrameWriterTextCSVParallel.java     | 2 +-
 .../org/apache/sysds/runtime/io/FrameWriterTextCellParallel.java    | 2 +-
 src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java         | 6 ++++++
 src/main/java/org/apache/sysds/runtime/io/WriterHDF5Parallel.java   | 2 +-
 .../org/apache/sysds/runtime/io/WriterMatrixMarketParallel.java     | 2 +-
 .../java/org/apache/sysds/runtime/io/WriterTextCSVParallel.java     | 2 +-
 .../java/org/apache/sysds/runtime/io/WriterTextCellParallel.java    | 2 +-
 .../java/org/apache/sysds/runtime/io/WriterTextLIBSVMParallel.java  | 2 +-
 .../java/org/apache/sysds/test/functions/io/SeqParReadTest2.java    | 6 ++++--
 11 files changed, 26 insertions(+), 12 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameWriter.java 
b/src/main/java/org/apache/sysds/runtime/io/FrameWriter.java
index 6e4b52449d..be71a6316c 100644
--- a/src/main/java/org/apache/sysds/runtime/io/FrameWriter.java
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameWriter.java
@@ -35,7 +35,13 @@ import org.apache.sysds.runtime.frame.data.FrameBlock;
  */
 public abstract class FrameWriter {
        protected static final Log LOG = 
LogFactory.getLog(FrameWriter.class.getName());
+       
+       protected boolean _forcedParallel = false;
+       
        public abstract void writeFrameToHDFS( FrameBlock src, String fname, 
long rlen, long clen )
                throws IOException, DMLRuntimeException;
 
+       public void setForcedParallel(boolean par) {
+               _forcedParallel = par;
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/io/FrameWriterBinaryBlockParallel.java 
b/src/main/java/org/apache/sysds/runtime/io/FrameWriterBinaryBlockParallel.java
index 331897ea41..531465b61c 100644
--- 
a/src/main/java/org/apache/sysds/runtime/io/FrameWriterBinaryBlockParallel.java
+++ 
b/src/main/java/org/apache/sysds/runtime/io/FrameWriterBinaryBlockParallel.java
@@ -44,7 +44,7 @@ import org.apache.sysds.utils.stats.InfrastructureAnalyzer;
  * 
  */
 public class FrameWriterBinaryBlockParallel extends FrameWriterBinaryBlock
-{      
+{
        @Override
        protected void writeBinaryBlockFrameToHDFS( Path path, JobConf job, 
FrameBlock src, long rlen, long clen )
                throws IOException, DMLRuntimeException
@@ -59,11 +59,11 @@ public class FrameWriterBinaryBlockParallel extends 
FrameWriterBinaryBlock
                numThreads = Math.min(numThreads, numPartFiles);
 
                //fall back to sequential write if dop is 1 (e.g., <128MB) in 
order to create single file
-               if( numThreads <= 1 ) {
+               if( !_forcedParallel && numThreads <= 1 ) {
                        super.writeBinaryBlockFrameToHDFS(path, job, src, rlen, 
clen);
                        return;
                }
-               
+       
                //create directory for concurrent tasks
                HDFSTool.createDirIfNotExistOnHDFS(path, 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
                FileSystem fs = IOUtilFunctions.getFileSystem(path);
diff --git 
a/src/main/java/org/apache/sysds/runtime/io/FrameWriterTextCSVParallel.java 
b/src/main/java/org/apache/sysds/runtime/io/FrameWriterTextCSVParallel.java
index 65f7751a2b..e20d1290de 100644
--- a/src/main/java/org/apache/sysds/runtime/io/FrameWriterTextCSVParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameWriterTextCSVParallel.java
@@ -60,7 +60,7 @@ public class FrameWriterTextCSVParallel extends 
FrameWriterTextCSV
                numThreads = Math.min(numThreads, numPartFiles);
                
                //fall back to sequential write if dop is 1 (e.g., <128MB) in 
order to create single file
-               if( numThreads <= 1 ) {
+               if( !_forcedParallel && numThreads <= 1 ) {
                        super.writeCSVFrameToHDFS(path, job, src, rlen, clen, 
csvprops);
                        return;
                }
diff --git 
a/src/main/java/org/apache/sysds/runtime/io/FrameWriterTextCellParallel.java 
b/src/main/java/org/apache/sysds/runtime/io/FrameWriterTextCellParallel.java
index c605e30528..e746152392 100644
--- a/src/main/java/org/apache/sysds/runtime/io/FrameWriterTextCellParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameWriterTextCellParallel.java
@@ -56,7 +56,7 @@ public class FrameWriterTextCellParallel extends 
FrameWriterTextCell
                numThreads = Math.min(numThreads, numPartFiles);
                
                //fall back to sequential write if dop is 1 (e.g., <128MB) in 
order to create single file
-               if( numThreads <= 1 ) {
+               if( !_forcedParallel && numThreads <= 1 ) {
                        super.writeTextCellFrameToHDFS(path, job, src, rlen, 
clen);
                        return;
                }
diff --git a/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java 
b/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java
index faddd66466..0f335477bd 100644
--- a/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java
+++ b/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java
@@ -34,6 +34,8 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 public abstract class MatrixWriter {
        protected static final Log LOG = 
LogFactory.getLog(MatrixWriter.class.getName());
        
+       protected boolean _forcedParallel = false;
+       
        public void writeMatrixToHDFS( MatrixBlock src, String fname, long 
rlen, long clen, int blen, long nnz ) throws IOException {
                writeMatrixToHDFS(src, fname, rlen, clen, blen, nnz, false);
        }
@@ -41,6 +43,10 @@ public abstract class MatrixWriter {
        public abstract void writeMatrixToHDFS( MatrixBlock src, String fname, 
long rlen, long clen, int blen, long nnz, boolean diag )
                throws IOException;
        
+       public void setForcedParallel(boolean par) {
+               _forcedParallel = par;
+       }
+       
        
        /**
         * Writes a minimal entry to represent an empty matrix on hdfs.
diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterHDF5Parallel.java 
b/src/main/java/org/apache/sysds/runtime/io/WriterHDF5Parallel.java
index ddcd069734..e136c496b3 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterHDF5Parallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterHDF5Parallel.java
@@ -59,7 +59,7 @@ public class WriterHDF5Parallel extends WriterHDF5 {
                numThreads = Math.min(numThreads, numPartFiles);
 
                //fall back to sequential write if dop is 1 (e.g., <128MB) in 
order to create single file
-               if(numThreads <= 1) {
+               if( !_forcedParallel && numThreads <= 1 ) {
                        super.writeHDF5MatrixToHDFS(path, job, fs, src);
                        return;
                }
diff --git 
a/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarketParallel.java 
b/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarketParallel.java
index d17a88412d..28acb17c0d 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarketParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarketParallel.java
@@ -55,7 +55,7 @@ public class WriterMatrixMarketParallel extends 
WriterMatrixMarket
                numThreads = Math.min(numThreads, numPartFiles);
                
                //fall back to sequential write if dop is 1 (e.g., <128MB) in 
order to create single file
-               if( numThreads <= 1 ) {
+               if( !_forcedParallel && numThreads <= 1 ) {
                        super.writeMatrixMarketMatrixToHDFS(path, job, fs, src);
                        return;
                }
diff --git 
a/src/main/java/org/apache/sysds/runtime/io/WriterTextCSVParallel.java 
b/src/main/java/org/apache/sysds/runtime/io/WriterTextCSVParallel.java
index ead26c3fb7..c2f2eed55f 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterTextCSVParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterTextCSVParallel.java
@@ -57,7 +57,7 @@ public class WriterTextCSVParallel extends WriterTextCSV
                numThreads = Math.min(numThreads, numPartFiles);
        
                //fall back to sequential write if dop is 1 (e.g., <128MB) in 
order to create single file
-               if( numThreads <= 1 ) {
+               if( !_forcedParallel && numThreads <= 1 ) {
                        super.writeCSVMatrixToHDFS(path, job, fs, src, 
csvprops);
                        return;
                }
diff --git 
a/src/main/java/org/apache/sysds/runtime/io/WriterTextCellParallel.java 
b/src/main/java/org/apache/sysds/runtime/io/WriterTextCellParallel.java
index 3091cb224b..8ab439884d 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterTextCellParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterTextCellParallel.java
@@ -54,7 +54,7 @@ public class WriterTextCellParallel extends WriterTextCell
                numThreads = Math.min(numThreads, numPartFiles);
                
                //fall back to sequential write if dop is 1 (e.g., <128MB) in 
order to create single file
-               if( numThreads <= 1 || src.getNonZeros()==0 ) {
+               if( !_forcedParallel && (numThreads <= 1 || 
src.getNonZeros()==0) ) {
                        super.writeTextCellMatrixToHDFS(path, job, fs, src, 
rlen, clen);
                        return;
                }
diff --git 
a/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVMParallel.java 
b/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVMParallel.java
index 60d5aa87ed..4111a882ee 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVMParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVMParallel.java
@@ -57,7 +57,7 @@ public class WriterTextLIBSVMParallel extends WriterTextLIBSVM
                numThreads = Math.min(numThreads, numPartFiles);
 
                //fall back to sequential write if dop is 1 (e.g., <128MB) in 
order to create single file
-               if( numThreads <= 1 ) {
+               if( !_forcedParallel && numThreads <= 1 ) {
                        super.writeLIBSVMMatrixToHDFS(path, job, fs, src);
                        return;
                }
diff --git 
a/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java 
b/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
index a0ed411ef6..a36e678b99 100644
--- a/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
+++ b/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
@@ -169,13 +169,15 @@ public class SeqParReadTest2 extends AutomatedTestBase {
                try {
                        if( _matrix ) {
                                MatrixWriter writer = 
createMatrixWriter(FileFormat.safeValueOf(_format), _par);
-                               writer.writeMatrixToHDFS(data, fname, rows, 
cols, 1000, data.getNonZeros());
+                               writer.writeMatrixToHDFS(data, fname, rows, 
cols, 100, data.getNonZeros());
+                               writer.setForcedParallel(_par);
                                MatrixReader reader = 
createMatrixReader(FileFormat.safeValueOf(_format), _par);
-                               data2 = reader.readMatrixFromHDFS(fname, rows, 
cols, 1000, data.getNonZeros());
+                               data2 = reader.readMatrixFromHDFS(fname, rows, 
cols, 100, data.getNonZeros());
                        }
                        else {
                                FrameBlock fdata = 
DataConverter.convertToFrameBlock(data);
                                FrameWriter writer = 
createFrameWriter(FileFormat.safeValueOf(_format), _par);
+                               writer.setForcedParallel(_par);
                                writer.writeFrameToHDFS(fdata, fname, rows, 
cols);
                                FrameReader reader = 
createFrameReader(FileFormat.safeValueOf(_format), _par);
                                FrameBlock fdata2 = 
reader.readFrameFromHDFS(fname, schema, rows, cols);

Reply via email to