This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new a4b3de305c [MINOR] Updated reader/write tests to force parallel writers
a4b3de305c is described below
commit a4b3de305cb77dd43d4c3cc11808f4d2c865a7ee
Author: Matthias Boehm <[email protected]>
AuthorDate: Mon Nov 25 18:58:04 2024 +0100
[MINOR] Updated reader/write tests to force parallel writers
---
src/main/java/org/apache/sysds/runtime/io/FrameWriter.java | 6 ++++++
.../org/apache/sysds/runtime/io/FrameWriterBinaryBlockParallel.java | 6 +++---
.../org/apache/sysds/runtime/io/FrameWriterTextCSVParallel.java | 2 +-
.../org/apache/sysds/runtime/io/FrameWriterTextCellParallel.java | 2 +-
src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java | 6 ++++++
src/main/java/org/apache/sysds/runtime/io/WriterHDF5Parallel.java | 2 +-
.../org/apache/sysds/runtime/io/WriterMatrixMarketParallel.java | 2 +-
.../java/org/apache/sysds/runtime/io/WriterTextCSVParallel.java | 2 +-
.../java/org/apache/sysds/runtime/io/WriterTextCellParallel.java | 2 +-
.../java/org/apache/sysds/runtime/io/WriterTextLIBSVMParallel.java | 2 +-
.../java/org/apache/sysds/test/functions/io/SeqParReadTest2.java | 6 ++++--
11 files changed, 26 insertions(+), 12 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameWriter.java
b/src/main/java/org/apache/sysds/runtime/io/FrameWriter.java
index 6e4b52449d..be71a6316c 100644
--- a/src/main/java/org/apache/sysds/runtime/io/FrameWriter.java
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameWriter.java
@@ -35,7 +35,13 @@ import org.apache.sysds.runtime.frame.data.FrameBlock;
*/
public abstract class FrameWriter {
protected static final Log LOG =
LogFactory.getLog(FrameWriter.class.getName());
+
+ protected boolean _forcedParallel = false;
+
public abstract void writeFrameToHDFS( FrameBlock src, String fname,
long rlen, long clen )
throws IOException, DMLRuntimeException;
+ public void setForcedParallel(boolean par) {
+ _forcedParallel = par;
+ }
}
diff --git
a/src/main/java/org/apache/sysds/runtime/io/FrameWriterBinaryBlockParallel.java
b/src/main/java/org/apache/sysds/runtime/io/FrameWriterBinaryBlockParallel.java
index 331897ea41..531465b61c 100644
---
a/src/main/java/org/apache/sysds/runtime/io/FrameWriterBinaryBlockParallel.java
+++
b/src/main/java/org/apache/sysds/runtime/io/FrameWriterBinaryBlockParallel.java
@@ -44,7 +44,7 @@ import org.apache.sysds.utils.stats.InfrastructureAnalyzer;
*
*/
public class FrameWriterBinaryBlockParallel extends FrameWriterBinaryBlock
-{
+{
@Override
protected void writeBinaryBlockFrameToHDFS( Path path, JobConf job,
FrameBlock src, long rlen, long clen )
throws IOException, DMLRuntimeException
@@ -59,11 +59,11 @@ public class FrameWriterBinaryBlockParallel extends
FrameWriterBinaryBlock
numThreads = Math.min(numThreads, numPartFiles);
//fall back to sequential write if dop is 1 (e.g., <128MB) in
order to create single file
- if( numThreads <= 1 ) {
+ if( !_forcedParallel && numThreads <= 1 ) {
super.writeBinaryBlockFrameToHDFS(path, job, src, rlen,
clen);
return;
}
-
+
//create directory for concurrent tasks
HDFSTool.createDirIfNotExistOnHDFS(path,
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
FileSystem fs = IOUtilFunctions.getFileSystem(path);
diff --git
a/src/main/java/org/apache/sysds/runtime/io/FrameWriterTextCSVParallel.java
b/src/main/java/org/apache/sysds/runtime/io/FrameWriterTextCSVParallel.java
index 65f7751a2b..e20d1290de 100644
--- a/src/main/java/org/apache/sysds/runtime/io/FrameWriterTextCSVParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameWriterTextCSVParallel.java
@@ -60,7 +60,7 @@ public class FrameWriterTextCSVParallel extends
FrameWriterTextCSV
numThreads = Math.min(numThreads, numPartFiles);
//fall back to sequential write if dop is 1 (e.g., <128MB) in
order to create single file
- if( numThreads <= 1 ) {
+ if( !_forcedParallel && numThreads <= 1 ) {
super.writeCSVFrameToHDFS(path, job, src, rlen, clen,
csvprops);
return;
}
diff --git
a/src/main/java/org/apache/sysds/runtime/io/FrameWriterTextCellParallel.java
b/src/main/java/org/apache/sysds/runtime/io/FrameWriterTextCellParallel.java
index c605e30528..e746152392 100644
--- a/src/main/java/org/apache/sysds/runtime/io/FrameWriterTextCellParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameWriterTextCellParallel.java
@@ -56,7 +56,7 @@ public class FrameWriterTextCellParallel extends
FrameWriterTextCell
numThreads = Math.min(numThreads, numPartFiles);
//fall back to sequential write if dop is 1 (e.g., <128MB) in
order to create single file
- if( numThreads <= 1 ) {
+ if( !_forcedParallel && numThreads <= 1 ) {
super.writeTextCellFrameToHDFS(path, job, src, rlen,
clen);
return;
}
diff --git a/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java
b/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java
index faddd66466..0f335477bd 100644
--- a/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java
+++ b/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java
@@ -34,6 +34,8 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
public abstract class MatrixWriter {
protected static final Log LOG =
LogFactory.getLog(MatrixWriter.class.getName());
+ protected boolean _forcedParallel = false;
+
public void writeMatrixToHDFS( MatrixBlock src, String fname, long
rlen, long clen, int blen, long nnz ) throws IOException {
writeMatrixToHDFS(src, fname, rlen, clen, blen, nnz, false);
}
@@ -41,6 +43,10 @@ public abstract class MatrixWriter {
public abstract void writeMatrixToHDFS( MatrixBlock src, String fname,
long rlen, long clen, int blen, long nnz, boolean diag )
throws IOException;
+ public void setForcedParallel(boolean par) {
+ _forcedParallel = par;
+ }
+
/**
* Writes a minimal entry to represent an empty matrix on hdfs.
diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterHDF5Parallel.java
b/src/main/java/org/apache/sysds/runtime/io/WriterHDF5Parallel.java
index ddcd069734..e136c496b3 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterHDF5Parallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterHDF5Parallel.java
@@ -59,7 +59,7 @@ public class WriterHDF5Parallel extends WriterHDF5 {
numThreads = Math.min(numThreads, numPartFiles);
//fall back to sequential write if dop is 1 (e.g., <128MB) in
order to create single file
- if(numThreads <= 1) {
+ if( !_forcedParallel && numThreads <= 1 ) {
super.writeHDF5MatrixToHDFS(path, job, fs, src);
return;
}
diff --git
a/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarketParallel.java
b/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarketParallel.java
index d17a88412d..28acb17c0d 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarketParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterMatrixMarketParallel.java
@@ -55,7 +55,7 @@ public class WriterMatrixMarketParallel extends
WriterMatrixMarket
numThreads = Math.min(numThreads, numPartFiles);
//fall back to sequential write if dop is 1 (e.g., <128MB) in
order to create single file
- if( numThreads <= 1 ) {
+ if( !_forcedParallel && numThreads <= 1 ) {
super.writeMatrixMarketMatrixToHDFS(path, job, fs, src);
return;
}
diff --git
a/src/main/java/org/apache/sysds/runtime/io/WriterTextCSVParallel.java
b/src/main/java/org/apache/sysds/runtime/io/WriterTextCSVParallel.java
index ead26c3fb7..c2f2eed55f 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterTextCSVParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterTextCSVParallel.java
@@ -57,7 +57,7 @@ public class WriterTextCSVParallel extends WriterTextCSV
numThreads = Math.min(numThreads, numPartFiles);
//fall back to sequential write if dop is 1 (e.g., <128MB) in
order to create single file
- if( numThreads <= 1 ) {
+ if( !_forcedParallel && numThreads <= 1 ) {
super.writeCSVMatrixToHDFS(path, job, fs, src,
csvprops);
return;
}
diff --git
a/src/main/java/org/apache/sysds/runtime/io/WriterTextCellParallel.java
b/src/main/java/org/apache/sysds/runtime/io/WriterTextCellParallel.java
index 3091cb224b..8ab439884d 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterTextCellParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterTextCellParallel.java
@@ -54,7 +54,7 @@ public class WriterTextCellParallel extends WriterTextCell
numThreads = Math.min(numThreads, numPartFiles);
//fall back to sequential write if dop is 1 (e.g., <128MB) in
order to create single file
- if( numThreads <= 1 || src.getNonZeros()==0 ) {
+ if( !_forcedParallel && (numThreads <= 1 ||
src.getNonZeros()==0) ) {
super.writeTextCellMatrixToHDFS(path, job, fs, src,
rlen, clen);
return;
}
diff --git
a/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVMParallel.java
b/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVMParallel.java
index 60d5aa87ed..4111a882ee 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVMParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterTextLIBSVMParallel.java
@@ -57,7 +57,7 @@ public class WriterTextLIBSVMParallel extends WriterTextLIBSVM
numThreads = Math.min(numThreads, numPartFiles);
//fall back to sequential write if dop is 1 (e.g., <128MB) in
order to create single file
- if( numThreads <= 1 ) {
+ if( !_forcedParallel && numThreads <= 1 ) {
super.writeLIBSVMMatrixToHDFS(path, job, fs, src);
return;
}
diff --git
a/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
b/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
index a0ed411ef6..a36e678b99 100644
--- a/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
+++ b/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
@@ -169,13 +169,15 @@ public class SeqParReadTest2 extends AutomatedTestBase {
try {
if( _matrix ) {
MatrixWriter writer =
createMatrixWriter(FileFormat.safeValueOf(_format), _par);
- writer.writeMatrixToHDFS(data, fname, rows,
cols, 1000, data.getNonZeros());
+ writer.writeMatrixToHDFS(data, fname, rows,
cols, 100, data.getNonZeros());
+ writer.setForcedParallel(_par);
MatrixReader reader =
createMatrixReader(FileFormat.safeValueOf(_format), _par);
- data2 = reader.readMatrixFromHDFS(fname, rows,
cols, 1000, data.getNonZeros());
+ data2 = reader.readMatrixFromHDFS(fname, rows,
cols, 100, data.getNonZeros());
}
else {
FrameBlock fdata =
DataConverter.convertToFrameBlock(data);
FrameWriter writer =
createFrameWriter(FileFormat.safeValueOf(_format), _par);
+ writer.setForcedParallel(_par);
writer.writeFrameToHDFS(fdata, fname, rows,
cols);
FrameReader reader =
createFrameReader(FileFormat.safeValueOf(_format), _par);
FrameBlock fdata2 =
reader.readFrameFromHDFS(fname, schema, rows, cols);