This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new cee72fcd62 [SYSTEMDS-3795] Fix multi-threaded HDF5 readers/writers
cee72fcd62 is described below

commit cee72fcd62524cd6afb29208bdefdd54fa5fe5d0
Author: Matthias Boehm <[email protected]>
AuthorDate: Thu Nov 28 13:18:01 2024 +0100

    [SYSTEMDS-3795] Fix multi-threaded HDF5 readers/writers
    
    This patch fixes the existing multi-threaded HDF5 readers/writers by
    adding (1) proper NNZ maintenance of the overall block, and (2) handling
    of both single- and multi-part HDF5 files/directories.
---
 .../org/apache/sysds/runtime/io/ReaderHDF5.java    |  6 ++--
 .../sysds/runtime/io/ReaderHDF5Parallel.java       | 41 ++++++++++++++--------
 .../sysds/runtime/io/WriterHDF5Parallel.java       |  9 +----
 .../sysds/test/functions/io/SeqParReadTest2.java   |  4 +--
 4 files changed, 32 insertions(+), 28 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5.java 
b/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5.java
index ff2e6945d2..f65887a2cb 100644
--- a/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5.java
+++ b/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5.java
@@ -108,7 +108,7 @@ public class ReaderHDF5 extends MatrixReader {
 
                //determine matrix size via additional pass if required
                if(dest == null) {
-                       dest = computeHDF5Size(files, fs, datasetName);
+                       dest = computeHDF5Size(files, fs, datasetName, 
rlen*clen);
                        clen = dest.getNumColumns();
                        rlen = dest.getNumRows();
                }
@@ -169,7 +169,7 @@ public class ReaderHDF5 extends MatrixReader {
                return lnnz;
        }
 
-       public static MatrixBlock computeHDF5Size(List<Path> files, FileSystem 
fs, String datasetName)
+       public static MatrixBlock computeHDF5Size(List<Path> files, FileSystem 
fs, String datasetName, long estnnz)
                throws IOException, DMLRuntimeException
        {
                int nrow = 0;
@@ -186,6 +186,6 @@ public class ReaderHDF5 extends MatrixReader {
                        IOUtilFunctions.closeSilently(bis);
                }
                // allocate target matrix block based on given size;
-               return createOutputMatrixBlock(nrow, ncol, nrow, (long) nrow * 
ncol, true, false);
+               return createOutputMatrixBlock(nrow, ncol, nrow, estnnz, true, 
true);
        }
 }
diff --git a/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5Parallel.java 
b/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5Parallel.java
index 56c211d54c..658eb53826 100644
--- a/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5Parallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5Parallel.java
@@ -38,6 +38,7 @@ import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.io.hdf5.H5Constants;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.CommonThreadPool;
+import org.apache.sysds.runtime.util.HDFSTool;
 
 public class ReaderHDF5Parallel extends ReaderHDF5 {
 
@@ -46,7 +47,7 @@ public class ReaderHDF5Parallel extends ReaderHDF5 {
 
        public ReaderHDF5Parallel(FileFormatPropertiesHDF5 props) {
                super(props);
-               _numThreads = OptimizerUtils.getParallelTextReadParallelism();
+               _numThreads = OptimizerUtils.getParallelBinaryReadParallelism();
        }
 
        @Override
@@ -69,26 +70,31 @@ public class ReaderHDF5Parallel extends ReaderHDF5 {
                // allocate output matrix block
                ArrayList<Path> files = new ArrayList<>();
                files.add(path);
-               MatrixBlock src = computeHDF5Size(files, fs, 
_props.getDatasetName());
-
+               MatrixBlock src = computeHDF5Size(files, fs, 
_props.getDatasetName(), estnnz);
+               int numParts = Math.min(files.size(), _numThreads);
+               
                //create and execute tasks
                ExecutorService pool = CommonThreadPool.get(_numThreads);
                try {
                        int bufferSize = (src.getNumColumns() * 
src.getNumRows()) * 8 + H5Constants.STATIC_HEADER_SIZE;
                        ArrayList<ReadHDF5Task> tasks = new ArrayList<>();
                        rlen = src.getNumRows();
-                       int blklen = (int) Math.ceil((double) rlen / 
_numThreads);
+                       int blklen = (int) Math.ceil((double) rlen / numParts);
                        for(int i = 0; i < _numThreads & i * blklen < rlen; 
i++) {
                                int rl = i * blklen;
                                int ru = (int) Math.min((i + 1) * blklen, rlen);
-                               BufferedInputStream bis = new 
BufferedInputStream(fs.open(path), bufferSize);
+                               Path newPath = HDFSTool.isDirectory(fs, path) ? 
+                                       new Path(path, 
IOUtilFunctions.getPartFileName(i)) : path;
+                               BufferedInputStream bis = new 
BufferedInputStream(fs.open(newPath), bufferSize);
 
                                //BufferedInputStream bis, String datasetName, 
MatrixBlock src, MutableInt rl, int ru
-                               tasks.add(new ReadHDF5Task(bis, 
_props.getDatasetName(), src, rl, ru));
+                               tasks.add(new ReadHDF5Task(bis, 
_props.getDatasetName(), src, rl, ru, clen, blklen));
                        }
 
-                       for(Future<Object> task : pool.invokeAll(tasks))
-                               task.get();
+                       long nnz = 0;
+                       for(Future<Long> task : pool.invokeAll(tasks))
+                               nnz += task.get();
+                       src.setNonZeros(nnz);
                        
                        return src;
                }
@@ -102,31 +108,36 @@ public class ReaderHDF5Parallel extends ReaderHDF5 {
 
        @Override
        public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, 
long clen, int blen, long estnnz)
-               throws IOException, DMLRuntimeException {
-
+               throws IOException, DMLRuntimeException
+       {
                return new ReaderHDF5(_props).readMatrixFromInputStream(is, 
rlen, clen, blen, estnnz);
        }
 
-       private static class ReadHDF5Task implements Callable<Object> {
+       private static class ReadHDF5Task implements Callable<Long> {
 
                private final BufferedInputStream _bis;
                private final String _datasetName;
                private final MatrixBlock _src;
                private final int _rl;
                private final int _ru;
+               private final long _clen;
+               private final int _blen;
 
-               public ReadHDF5Task(BufferedInputStream bis, String 
datasetName, MatrixBlock src, int rl, int ru) {
+               public ReadHDF5Task(BufferedInputStream bis, String 
datasetName, MatrixBlock src, 
+                       int rl, int ru, long clen, int blen)
+               {
                        _bis = bis;
                        _datasetName = datasetName;
                        _src = src;
                        _rl = rl;
                        _ru = ru;
+                       _clen = clen;
+                       _blen = blen;
                }
 
                @Override
-               public Object call() throws IOException {
-                       readMatrixFromHDF5(_bis, _datasetName, _src, _rl, _ru, 
0, 0);
-                       return null;
+               public Long call() throws IOException {
+                       return readMatrixFromHDF5(_bis, _datasetName, _src, 
_rl, _ru, _clen, _blen);
                }
        }
 }
diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterHDF5Parallel.java 
b/src/main/java/org/apache/sysds/runtime/io/WriterHDF5Parallel.java
index e136c496b3..f8cb47acb2 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterHDF5Parallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterHDF5Parallel.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysds.common.Types;
@@ -80,13 +79,6 @@ public class WriterHDF5Parallel extends WriterHDF5 {
 
                        for(Future<Object> task : pool.invokeAll(tasks))
                                task.get();
-
-                       // delete crc files if written to local file system
-                       if(fs instanceof LocalFileSystem) {
-                               for(int i = 0; i < numThreads & i * blklen < 
rlen; i++)
-                                       IOUtilFunctions
-                                               
.deleteCrcFilesFromLocalFileSystem(fs, new Path(path, 
IOUtilFunctions.getPartFileName(i)));
-                       }
                }
                catch(Exception e) {
                        throw new IOException("Failed parallel write of HDF5 
output.", e);
@@ -115,6 +107,7 @@ public class WriterHDF5Parallel extends WriterHDF5 {
                @Override
                public Object call() throws IOException {
                        writeHDF5MatrixToFile(_path, _job, _fs, _src, _rl, _ru);
+                       IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(_job, 
_path);
                        return null;
                }
        }
diff --git 
a/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java 
b/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
index 84dc72be0e..7f2aedfe0f 100644
--- a/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
+++ b/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
@@ -147,8 +147,8 @@ public class SeqParReadTest2 extends AutomatedTestBase {
                        {false, "binary", true, 0.1},
                        {true, "hdf5", false, 0.7},
                        {true, "hdf5", false, 0.1},
-                       //{true, "hdf5", true, 0.7}, //FIXME
-                       //{true, "hdf5", true, 0.1},
+                       {true, "hdf5", true, 0.7},
+                       {true, "hdf5", true, 0.1},
                        {true, "libsvm", false, 0.7},
                        {true, "libsvm", false, 0.1},
                        {true, "libsvm", true, 0.7},

Reply via email to