This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 350957e923 [SYSTEMDS-3795] Fix missing sparse block support in HDF5
reader/writer
350957e923 is described below
commit 350957e9239572ac5c90e9e6faf603d79090ca01
Author: Matthias Boehm <[email protected]>
AuthorDate: Thu Nov 28 11:25:29 2024 +0100
[SYSTEMDS-3795] Fix missing sparse block support in HDF5 reader/writer
HDF5 only supports dense matrices and so far our HDF5 writers and
readers simply ignored sparse blocks. This patch adds this missing
support and writes/reads sparse blocks as dense rows (converted in a
streaming fashion row-by-row).
---
.../org/apache/sysds/runtime/io/ReaderHDF5.java | 50 ++++++++++++------
.../org/apache/sysds/runtime/io/WriterHDF5.java | 60 ++++++++++++++--------
.../sysds/test/functions/io/SeqParReadTest2.java | 10 ++--
3 files changed, 79 insertions(+), 41 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5.java
b/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5.java
index 2dc6bb1fbf..ff2e6945d2 100644
--- a/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5.java
+++ b/src/main/java/org/apache/sysds/runtime/io/ReaderHDF5.java
@@ -32,11 +32,13 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.data.DenseBlock;
+import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.io.hdf5.H5;
import org.apache.sysds.runtime.io.hdf5.H5Constants;
import org.apache.sysds.runtime.io.hdf5.H5ContiguousDataset;
import org.apache.sysds.runtime.io.hdf5.H5RootObject;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.util.UtilFunctions;
public class ReaderHDF5 extends MatrixReader {
protected final FileFormatPropertiesHDF5 _props;
@@ -51,7 +53,7 @@ public class ReaderHDF5 extends MatrixReader {
//allocate output matrix block
MatrixBlock ret = null;
if(rlen >= 0 && clen >= 0) //otherwise allocated on read
- ret = createOutputMatrixBlock(rlen, clen, (int) rlen,
estnnz, true, false);
+ ret = createOutputMatrixBlock(rlen, clen, (int) rlen,
estnnz, true, true);
//prepare file access
JobConf job = new
JobConf(ConfigurationManager.getCachedJobConf());
@@ -92,7 +94,8 @@ public class ReaderHDF5 extends MatrixReader {
private static MatrixBlock readHDF5MatrixFromHDFS(Path path, JobConf
job,
FileSystem fs, MatrixBlock dest, long rlen, long clen, int
blen, String datasetName)
- throws IOException, DMLRuntimeException {
+ throws IOException, DMLRuntimeException
+ {
//prepare file paths in alphanumeric order
ArrayList<Path> files = new ArrayList<>();
if(fs.getFileStatus(path).isDirectory()) {
@@ -124,7 +127,8 @@ public class ReaderHDF5 extends MatrixReader {
}
public static long readMatrixFromHDF5(BufferedInputStream bis, String
datasetName, MatrixBlock dest,
- int row, long rlen, long clen, int blen) {
+ int rl, long ru, long clen, int blen)
+ {
bis.mark(0);
long lnnz = 0;
H5RootObject rootObject = H5.H5Fopen(bis);
@@ -133,28 +137,44 @@ public class ReaderHDF5 extends MatrixReader {
int[] dims = rootObject.getDimensions();
int ncol = dims[1];
- DenseBlock denseBlock = dest.getDenseBlock();
- double[] data = new double[ncol];
- for(int i = row; i < rlen; i++) {
- H5.H5Dread(contiguousDataset, i, data);
- for(int j = 0; j < ncol; j++) {
- if(data[j] != 0) {
- denseBlock.set(i, j, data[j]);
- lnnz++;
+ try {
+ double[] row = new double[ncol];
+ if( dest.isInSparseFormat() ) {
+ SparseBlock sb = dest.getSparseBlock();
+ for(int i = rl; i < ru; i++) {
+ H5.H5Dread(contiguousDataset, i, row);
+ int lnnzi =
UtilFunctions.computeNnz(row, 0, (int)clen);
+ sb.allocate(i, lnnzi); //avoid row
reallocations
+ for(int j = 0; j < ncol; j++)
+ sb.append(i, j, row[j]);
//prunes zeros
+ lnnz += lnnzi;
+ }
+ }
+ else {
+ DenseBlock denseBlock = dest.getDenseBlock();
+ for(int i = rl; i < ru; i++) {
+ H5.H5Dread(contiguousDataset, i, row);
+ for(int j = 0; j < ncol; j++) {
+ if(row[j] != 0) {
+ denseBlock.set(i, j,
row[j]);
+ lnnz++;
+ }
+ }
}
}
- row++;
}
- IOUtilFunctions.closeSilently(bis);
+ finally {
+ IOUtilFunctions.closeSilently(bis);
+ }
return lnnz;
}
public static MatrixBlock computeHDF5Size(List<Path> files, FileSystem
fs, String datasetName)
- throws IOException, DMLRuntimeException {
+ throws IOException, DMLRuntimeException
+ {
int nrow = 0;
int ncol = 0;
for(int fileNo = 0; fileNo < files.size(); fileNo++) {
-
BufferedInputStream bis = new
BufferedInputStream(fs.open(files.get(fileNo)));
H5RootObject rootObject = H5.H5Fopen(bis);
H5.H5Dopen(rootObject, datasetName);
diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterHDF5.java
b/src/main/java/org/apache/sysds/runtime/io/WriterHDF5.java
index a2fcd2c5b6..34b34333e6 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterHDF5.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterHDF5.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.data.DenseBlock;
+import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.io.hdf5.H5;
import org.apache.sysds.runtime.io.hdf5.H5RootObject;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -32,6 +33,7 @@ import org.apache.sysds.runtime.util.HDFSTool;
import java.io.BufferedOutputStream;
import java.io.IOException;
+import java.util.Arrays;
public class WriterHDF5 extends MatrixWriter {
@@ -42,9 +44,9 @@ public class WriterHDF5 extends MatrixWriter {
}
@Override
- public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen,
long clen, int blen, long nnz,
- boolean diag) throws IOException, DMLRuntimeException {
-
+ public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen,
long clen, int blen, long nnz, boolean diag)
+ throws IOException, DMLRuntimeException
+ {
//validity check matrix dimensions
if(src.getNumRows() != rlen || src.getNumColumns() != clen)
throw new IOException("Matrix dimensions mismatch with
metadata: " + src.getNumRows() + "x" + src
@@ -65,23 +67,24 @@ public class WriterHDF5 extends MatrixWriter {
writeHDF5MatrixToHDFS(path, job, fs, src);
IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
-
}
@Override
public final void writeEmptyMatrixToHDFS(String fname, long rlen, long
clen, int blen)
- throws IOException, DMLRuntimeException {
-
+ throws IOException, DMLRuntimeException
+ {
+ throw new DMLRuntimeException("writing empty HDF5 matrices not
supported yet");
}
- protected void writeHDF5MatrixToHDFS(Path path, JobConf job, FileSystem
fs, MatrixBlock src) throws IOException {
- //sequential write HDF5 file
+ protected void writeHDF5MatrixToHDFS(Path path, JobConf job, FileSystem
fs, MatrixBlock src)
+ throws IOException
+ {
writeHDF5MatrixToFile(path, job, fs, src, 0, src.getNumRows());
}
- protected static void writeHDF5MatrixToFile(Path path, JobConf job,
FileSystem fs, MatrixBlock src, int rl,
- int rlen) throws IOException {
-
+ protected static void writeHDF5MatrixToFile(Path path, JobConf job,
FileSystem fs, MatrixBlock src, int rl, int rlen)
+ throws IOException
+ {
int clen = src.getNumColumns();
BufferedOutputStream bos = new
BufferedOutputStream(fs.create(path, true));
String datasetName = _props.getDatasetName();
@@ -94,23 +97,36 @@ public class WriterHDF5 extends MatrixWriter {
}
try {
- //TODO: HDF5 format don't support spars matrix
- // How to store spars matrix in HDF5 format?
-
// Write the data to the datasets.
- double[] data = new double[clen];
- DenseBlock d = src.getDenseBlock();
- for(int i = rl; i < rlen; i++) {
- for(int j = 0; j < clen;j++) {
- double lvalue = d!=null ? d.get(i, j) :
0;
- data[j] = lvalue;
+ double[] row = new double[clen];
+ if( src.isInSparseFormat() ) {
+ SparseBlock sb = src.getSparseBlock();
+ for(int i = rl; i < rlen; i++) {
+ Arrays.fill(row, 0);
+ if( !sb.isEmpty(i) ) {
+ int apos = sb.pos(i);
+ int alen = sb.size(i);
+ double[] avals = sb.values(i);
+ int[] aix = sb.indexes(i);
+ for(int j = apos; j <
apos+alen; j++)
+ row[aix[j]] = avals[j];
+ }
+ H5.H5Dwrite(rootObject, row);
+ }
+ }
+ else {
+ DenseBlock db = src.getDenseBlock();
+ for(int i = rl; i < rlen; i++) {
+ for(int j = 0; j < clen;j++) {
+ double lvalue = db!=null ?
db.get(i, j) : 0;
+ row[j] = lvalue;
+ }
+ H5.H5Dwrite(rootObject, row);
}
- H5.H5Dwrite(rootObject, data);
}
}
finally {
IOUtilFunctions.closeSilently(bos);
}
-
}
}
diff --git
a/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
b/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
index a36e678b99..84dc72be0e 100644
--- a/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
+++ b/src/test/java/org/apache/sysds/test/functions/io/SeqParReadTest2.java
@@ -145,9 +145,9 @@ public class SeqParReadTest2 extends AutomatedTestBase {
{false, "binary", false, 0.1},
{false, "binary", true, 0.7},
{false, "binary", true, 0.1},
- {true, "hdf5", false, 0.7},
- //{true, "hdf5", false, 0.1}, //FIXME
- //{true, "hdf5", true, 0.7},
+ {true, "hdf5", false, 0.7},
+ {true, "hdf5", false, 0.1},
+ //{true, "hdf5", true, 0.7}, //FIXME
//{true, "hdf5", true, 0.1},
{true, "libsvm", false, 0.7},
{true, "libsvm", false, 0.1},
@@ -190,8 +190,10 @@ public class SeqParReadTest2 extends AutomatedTestBase {
}
//compare read content is equivalent to original
- if( data2 != null )
+ if( data2 != null ) {
+ Assert.assertEquals(data.getNonZeros(),
data2.getNonZeros());
TestUtils.compareMatrices(data, data2, eps);
+ }
}
@SuppressWarnings("incomplete-switch")