This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 350957e923 [SYSTEMDS-3795] Fix missing sparse block support in HDF5 
reader/writer
350957e923 is described below

commit 350957e9239572ac5c90e9e6faf603d79090ca01
Author: Matthias Boehm <[email protected]>
AuthorDate: Thu Nov 28 11:25:29 2024 +0100

    [SYSTEMDS-3795] Fix missing sparse block support in HDF5 reader/writer
    
    HDF5 only supports dense matrices and so far our HDF5 writers and
    readers simply ignored sparse blocks. This patch adds this missing
    support and writes/reads sparse blocks as dense rows (converted in a
    streaming fashion row-by-row).
---
 .../org/apache/sysds/runtime/io/ReaderHDF5.java    | 50 ++++++++++++------
 .../org/apache/sysds/runtime/io/WriterHDF5.java    | 60 ++++++++++++++--------
 .../sysds/test/functions/io/SeqParReadTest2.java   | 10 ++--
 3 files changed, 79 insertions(+), 41 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5.java 
b/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5.java
index 2dc6bb1fbf..ff2e6945d2 100644
--- a/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5.java
+++ b/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5.java
@@ -32,11 +32,13 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.data.DenseBlock;
+import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.io.hdf5.H5;
 import org.apache.sysds.runtime.io.hdf5.H5Constants;
 import org.apache.sysds.runtime.io.hdf5.H5ContiguousDataset;
 import org.apache.sysds.runtime.io.hdf5.H5RootObject;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.util.UtilFunctions;
 
 public class ReaderHDF5 extends MatrixReader {
        protected final FileFormatPropertiesHDF5 _props;
@@ -51,7 +53,7 @@ public class ReaderHDF5 extends MatrixReader {
                //allocate output matrix block
                MatrixBlock ret = null;
                if(rlen >= 0 && clen >= 0) //otherwise allocated on read
-                       ret = createOutputMatrixBlock(rlen, clen, (int) rlen, 
estnnz, true, false);
+                       ret = createOutputMatrixBlock(rlen, clen, (int) rlen, 
estnnz, true, true);
 
                //prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
@@ -92,7 +94,8 @@ public class ReaderHDF5 extends MatrixReader {
 
        private static MatrixBlock readHDF5MatrixFromHDFS(Path path, JobConf 
job,
                FileSystem fs, MatrixBlock dest, long rlen, long clen, int 
blen, String datasetName)
-               throws IOException, DMLRuntimeException {
+               throws IOException, DMLRuntimeException
+       {
                //prepare file paths in alphanumeric order
                ArrayList<Path> files = new ArrayList<>();
                if(fs.getFileStatus(path).isDirectory()) {
@@ -124,7 +127,8 @@ public class ReaderHDF5 extends MatrixReader {
        }
 
        public static long readMatrixFromHDF5(BufferedInputStream bis, String 
datasetName, MatrixBlock dest,
-               int row, long rlen, long clen, int blen) {
+               int rl, long ru, long clen, int blen)
+       {
                bis.mark(0);
                long lnnz = 0;
                H5RootObject rootObject = H5.H5Fopen(bis);
@@ -133,28 +137,44 @@ public class ReaderHDF5 extends MatrixReader {
                int[] dims = rootObject.getDimensions();
                int ncol = dims[1];
 
-               DenseBlock denseBlock = dest.getDenseBlock();
-               double[] data = new double[ncol];
-               for(int i = row; i < rlen; i++) {
-                       H5.H5Dread(contiguousDataset, i, data);
-                       for(int j = 0; j < ncol; j++) {
-                               if(data[j] != 0) {
-                                       denseBlock.set(i, j, data[j]);
-                                       lnnz++;
+               try {
+                       double[] row = new double[ncol];
+                       if( dest.isInSparseFormat() ) {
+                               SparseBlock sb = dest.getSparseBlock();
+                               for(int i = rl; i < ru; i++) {
+                                       H5.H5Dread(contiguousDataset, i, row);
+                                       int lnnzi = 
UtilFunctions.computeNnz(row, 0, (int)clen);
+                                       sb.allocate(i, lnnzi); //avoid row 
reallocations
+                                       for(int j = 0; j < ncol; j++) 
+                                               sb.append(i, j, row[j]); 
//prunes zeros
+                                       lnnz += lnnzi;
+                               }
+                       }
+                       else {
+                               DenseBlock denseBlock = dest.getDenseBlock();
+                               for(int i = rl; i < ru; i++) {
+                                       H5.H5Dread(contiguousDataset, i, row);
+                                       for(int j = 0; j < ncol; j++) {
+                                               if(row[j] != 0) {
+                                                       denseBlock.set(i, j, 
row[j]);
+                                                       lnnz++;
+                                               }
+                                       }
                                }
                        }
-                       row++;
                }
-               IOUtilFunctions.closeSilently(bis);
+               finally {
+                       IOUtilFunctions.closeSilently(bis);
+               }
                return lnnz;
        }
 
        public static MatrixBlock computeHDF5Size(List<Path> files, FileSystem 
fs, String datasetName)
-               throws IOException, DMLRuntimeException {
+               throws IOException, DMLRuntimeException
+       {
                int nrow = 0;
                int ncol = 0;
                for(int fileNo = 0; fileNo < files.size(); fileNo++) {
-
                        BufferedInputStream bis = new 
BufferedInputStream(fs.open(files.get(fileNo)));
                        H5RootObject rootObject = H5.H5Fopen(bis);
                        H5.H5Dopen(rootObject, datasetName);
diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterHDF5.java 
b/src/main/java/org/apache/sysds/runtime/io/WriterHDF5.java
index a2fcd2c5b6..34b34333e6 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterHDF5.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterHDF5.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.data.DenseBlock;
+import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.io.hdf5.H5;
 import org.apache.sysds.runtime.io.hdf5.H5RootObject;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -32,6 +33,7 @@ import org.apache.sysds.runtime.util.HDFSTool;
 
 import java.io.BufferedOutputStream;
 import java.io.IOException;
+import java.util.Arrays;
 
 public class WriterHDF5 extends MatrixWriter {
 
@@ -42,9 +44,9 @@ public class WriterHDF5 extends MatrixWriter {
        }
 
        @Override
-       public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, 
long clen, int blen, long nnz,
-               boolean diag) throws IOException, DMLRuntimeException {
-
+       public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, 
long clen, int blen, long nnz, boolean diag)
+               throws IOException, DMLRuntimeException
+       {
                //validity check matrix dimensions
                if(src.getNumRows() != rlen || src.getNumColumns() != clen)
                        throw new IOException("Matrix dimensions mismatch with 
metadata: " + src.getNumRows() + "x" + src
@@ -65,23 +67,24 @@ public class WriterHDF5 extends MatrixWriter {
                writeHDF5MatrixToHDFS(path, job, fs, src);
 
                IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
-
        }
 
        @Override
        public final void writeEmptyMatrixToHDFS(String fname, long rlen, long 
clen, int blen)
-               throws IOException, DMLRuntimeException {
-
+               throws IOException, DMLRuntimeException 
+       {
+               throw new DMLRuntimeException("writing empty HDF5 matrices not 
supported yet");
        }
 
-       protected void writeHDF5MatrixToHDFS(Path path, JobConf job, FileSystem 
fs, MatrixBlock src) throws IOException {
-               //sequential write HDF5 file
+       protected void writeHDF5MatrixToHDFS(Path path, JobConf job, FileSystem 
fs, MatrixBlock src) 
+               throws IOException
+       {
                writeHDF5MatrixToFile(path, job, fs, src, 0, src.getNumRows());
        }
 
-       protected static void writeHDF5MatrixToFile(Path path, JobConf job, 
FileSystem fs, MatrixBlock src, int rl,
-               int rlen) throws IOException {
-
+       protected static void writeHDF5MatrixToFile(Path path, JobConf job, 
FileSystem fs, MatrixBlock src, int rl, int rlen) 
+               throws IOException 
+       {
                int clen = src.getNumColumns();
                BufferedOutputStream bos = new 
BufferedOutputStream(fs.create(path, true));
                String datasetName = _props.getDatasetName();
@@ -94,23 +97,36 @@ public class WriterHDF5 extends MatrixWriter {
                }
 
                try {
-                       //TODO: HDF5 format don't support spars matrix
-                       // How to store spars matrix in HDF5 format?
-
                        // Write the data to the datasets.
-                       double[] data = new double[clen];
-                       DenseBlock d = src.getDenseBlock();
-                       for(int i = rl; i < rlen; i++) {
-                               for(int j = 0; j < clen;j++) {
-                                       double lvalue = d!=null ? d.get(i, j) : 
0;
-                                       data[j] = lvalue;
+                       double[] row = new double[clen];
+                       if( src.isInSparseFormat() ) {
+                               SparseBlock sb = src.getSparseBlock();
+                               for(int i = rl; i < rlen; i++) {
+                                       Arrays.fill(row, 0);
+                                       if( !sb.isEmpty(i) ) {
+                                               int apos = sb.pos(i);
+                                               int alen = sb.size(i);
+                                               double[] avals = sb.values(i);
+                                               int[] aix = sb.indexes(i);
+                                               for(int j = apos; j < 
apos+alen; j++)
+                                                       row[aix[j]] = avals[j];
+                                       }
+                                       H5.H5Dwrite(rootObject, row);
+                               }
+                       }
+                       else {
+                               DenseBlock db = src.getDenseBlock();
+                               for(int i = rl; i < rlen; i++) {
+                                       for(int j = 0; j < clen;j++) {
+                                               double lvalue = db!=null ? 
db.get(i, j) : 0;
+                                               row[j] = lvalue;
+                                       }
+                                       H5.H5Dwrite(rootObject, row);
                                }
-                               H5.H5Dwrite(rootObject, data);
                        }
                }
                finally {
                        IOUtilFunctions.closeSilently(bos);
                }
-
        }
 }
diff --git 
a/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java 
b/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
index a36e678b99..84dc72be0e 100644
--- a/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
+++ b/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
@@ -145,9 +145,9 @@ public class SeqParReadTest2 extends AutomatedTestBase {
                        {false, "binary", false, 0.1},
                        {false, "binary", true, 0.7},
                        {false, "binary", true, 0.1},
-                       {true, "hdf5", false, 0.7}, 
-                       //{true, "hdf5", false, 0.1}, //FIXME
-                       //{true, "hdf5", true, 0.7},
+                       {true, "hdf5", false, 0.7},
+                       {true, "hdf5", false, 0.1},
+                       //{true, "hdf5", true, 0.7}, //FIXME
                        //{true, "hdf5", true, 0.1},
                        {true, "libsvm", false, 0.7},
                        {true, "libsvm", false, 0.1},
@@ -190,8 +190,10 @@ public class SeqParReadTest2 extends AutomatedTestBase {
                }
                
                //compare read content is equivalent to original
-               if( data2 != null )
+               if( data2 != null ) {
+                       Assert.assertEquals(data.getNonZeros(), 
data2.getNonZeros());
                        TestUtils.compareMatrices(data, data2, eps);
+               }
        }
        
        @SuppressWarnings("incomplete-switch")

Reply via email to