This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 9241488e0c2ec3027d6157243f0a755f3cec7e16
Author: baunsgaard <[email protected]>
AuthorDate: Sat Sep 17 23:21:00 2022 +0200

    [SYSTEMDS-2699] CLA IO Compressed Matrix
    
    This commit adds the basic blocks for writing a compressed matrix to
    disk, and adds a basic test for the case of writing a matrix and
    read it back from disk.
    
    Further testing and full integration into DML is needed, and a mechanism
    to detect if the format of the compression groups have changed.
---
 .gitignore                                         |   1 +
 src/main/java/org/apache/sysds/common/Types.java   |   1 +
 .../runtime/compress/CompressedMatrixBlock.java    |  28 +++++
 .../runtime/compress/io/ReaderCompressed.java      |  99 ++++++++++++++++
 .../runtime/compress/io/WriterCompressed.java      |  95 +++++++++++++++
 .../sysds/runtime/io/MatrixReaderFactory.java      |   5 +
 .../sysds/runtime/io/MatrixWriterFactory.java      |   6 +-
 .../sysds/test/component/compress/io/IOTest.java   | 131 +++++++++++++++++++++
 8 files changed, 365 insertions(+), 1 deletion(-)

diff --git a/.gitignore b/.gitignore
index f463565a38..89df46df01 100644
--- a/.gitignore
+++ b/.gitignore
@@ -56,6 +56,7 @@ docs/_site
 # Test Artifacts
 src/test/scripts/**/*.dmlt
 src/test/scripts/functions/mlcontextin/
+src/test/java/org/apache/sysds/test/component/compress/io/files
 .factorypath
 
 # Excluded sources
diff --git a/src/main/java/org/apache/sysds/common/Types.java 
b/src/main/java/org/apache/sysds/common/Types.java
index a9b8108600..1edc511cc3 100644
--- a/src/main/java/org/apache/sysds/common/Types.java
+++ b/src/main/java/org/apache/sysds/common/Types.java
@@ -539,6 +539,7 @@ public class Types
                TEXT,   // text cell IJV representation (mm w/o header)
                MM,     // text matrix market IJV representation
                CSV,    // text dense representation
+               COMPRESSED, // Internal SYSTEMDS compressed format 
                LIBSVM, // text libsvm sparse row representation
                JSONL,  // text nested JSON (Line) representation
                BINARY, // binary block representation 
(dense/sparse/ultra-sparse)
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java 
b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
index 57ab75cae4..6d795963d4 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
@@ -168,6 +168,25 @@ public class CompressedMatrixBlock extends MatrixBlock {
                decompressedVersion = new 
SoftReference<>(uncompressedMatrixBlock);
        }
 
+       /**
+        * Direct constructor with everything.
+        * 
+        * @param rl          Number of rows in the block
+        * @param cl          Number of columns
+        * @param nnz         Number of non zeros
+        * @param overlapping If the matrix is overlapping
+        * @param groups      The list of column groups
+        */
+       protected CompressedMatrixBlock(int rl, int cl, long nnz, boolean 
overlapping, List<AColGroup> groups) {
+               super(true);
+               this.rlen = rl;
+               this.clen = cl;
+               this.sparse = false;
+               this.nonZeros = nnz;
+               this.overlappingColGroups = overlapping;
+               this._colGroups = groups;
+       }
+
        @Override
        public void reset(int rl, int cl, boolean sp, long estnnz, double val) {
                throw new DMLCompressionException("Invalid to reset a 
Compressed MatrixBlock");
@@ -370,6 +389,15 @@ public class CompressedMatrixBlock extends MatrixBlock {
                _colGroups = ColGroupIO.readGroups(in, rlen);
        }
 
+       public static CompressedMatrixBlock read(DataInput in) throws 
IOException {
+               int rlen = in.readInt();
+               int clen = in.readInt();
+               long nonZeros = in.readLong();
+               boolean overlappingColGroups = in.readBoolean();
+               List<AColGroup> groups = ColGroupIO.readGroups(in, rlen);
+               return new CompressedMatrixBlock(rlen, clen, nonZeros, 
overlappingColGroups, groups);
+       }
+
        @Override
        public void write(DataOutput out) throws IOException {
                if(nonZeros > 0 && getExactSizeOnDisk() > 
MatrixBlock.estimateSizeOnDisk(rlen, clen, nonZeros)) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/io/ReaderCompressed.java 
b/src/main/java/org/apache/sysds/runtime/compress/io/ReaderCompressed.java
new file mode 100644
index 0000000000..24db387718
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/compress/io/ReaderCompressed.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress.io;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
+import org.apache.sysds.runtime.io.IOUtilFunctions;
+import org.apache.sysds.runtime.io.MatrixReader;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+
+public class ReaderCompressed extends MatrixReader {
+
+       public static ReaderCompressed create() {
+               return new ReaderCompressed();
+       }
+
+       public static MatrixBlock readCompressedMatrixFromHDFS(String fname) 
throws IOException {
+               return create().readMatrixFromHDFS(fname, 0, 0, 0, 0);
+       }
+
+       @Override
+       public MatrixBlock readMatrixFromHDFS(String fname, long rlen, long 
clen, int blen, long estnnz)
+               throws IOException, DMLRuntimeException {
+
+               JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
+               Path path = new Path(fname);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
+
+               checkValidInputFile(fs, path);
+
+               MatrixBlock cmb = readCompressedMatrix(path, job, fs);
+
+               if(cmb.getNumRows() != rlen)
+                       LOG.warn("Metadata file does not correlate with 
compressed file, NRows : " + cmb.getNumRows() + " vs " + rlen);
+               if(cmb.getNumColumns() != clen)
+                       LOG.warn(
+                               "Metadata file does not correlate with 
compressed file, NCols : " + cmb.getNumColumns() + " vs " + clen);
+
+               return cmb;
+       }
+
+       @Override
+       public MatrixBlock readMatrixFromInputStream(InputStream is, long rlen, 
long clen, int blen, long estnnz)
+               throws IOException, DMLRuntimeException {
+               throw new NotImplementedException("Not implemented reading 
compressedMatrix from input stream");
+       }
+
+       private static MatrixBlock readCompressedMatrix(Path path, JobConf job, 
FileSystem fs) throws IOException {
+               if(fs.getFileStatus(path).isDirectory())
+                       return readCompressedMatrixFolder(path, job, fs);
+               else
+                       return readCompressedMatrixSingleFile(path, job, fs);
+       }
+
+       private static MatrixBlock readCompressedMatrixFolder(Path path, 
JobConf job, FileSystem fs) {
+               throw new NotImplementedException();
+       }
+
+       private static MatrixBlock readCompressedMatrixSingleFile(Path path, 
JobConf job, FileSystem fs) throws IOException {
+               final InputStream is = fs.open(path);
+               final DataInput in = new DataInputStream(is);
+               MatrixBlock ret;
+               try {
+                       ret = CompressedMatrixBlock.read(in);
+               }
+               finally {
+                       is.close();
+               }
+               return ret;
+       }
+
+}
diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java 
b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
new file mode 100644
index 0000000000..458ee5df31
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress.io;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
+import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory;
+import org.apache.sysds.runtime.compress.DMLCompressionException;
+import org.apache.sysds.runtime.io.FileFormatProperties;
+import org.apache.sysds.runtime.io.IOUtilFunctions;
+import org.apache.sysds.runtime.io.MatrixWriter;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.util.HDFSTool;
+
+public class WriterCompressed extends MatrixWriter {
+
+       public static WriterCompressed create(FileFormatProperties props) {
+               return new WriterCompressed();
+       }
+
+       public static void writeCompressedMatrixToHDFS(MatrixBlock src, String 
fname) throws IOException {
+               create(null).writeMatrixToHDFS(src, fname, 0, 0, 0, 0, false);
+       }
+
+       @Override
+       public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, 
long clen, int blen, long nnz, boolean diag)
+               throws IOException {
+               JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
+               Path path = new Path(fname);
+               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
+
+               HDFSTool.deleteFileIfExistOnHDFS(fname);
+               try {
+                       writeCompressedMatrixToHDFS(path, job, fs, src);
+               }
+               catch(DMLCompressionException ce) {
+                       fs.delete(path, true);
+                       throw ce;
+               }
+               finally {
+                       IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, 
path);
+               }
+       }
+
+       @Override
+       public void writeEmptyMatrixToHDFS(String fname, long rlen, long clen, 
int blen) throws IOException {
+               throw new NotImplementedException();
+       }
+
+       private void writeCompressedMatrixToHDFS(Path path, JobConf conf, 
FileSystem fs, MatrixBlock src)
+               throws IOException {
+               final OutputStream os = fs.create(path, true);
+               final DataOutput out = new DataOutputStream(os);
+               try {
+                       final MatrixBlock mb = src instanceof 
CompressedMatrixBlock ? // If compressed
+                               src : // Do not compress
+                               
CompressedMatrixBlockFactory.compress(src).getLeft(); // otherwise compress
+
+                       if(!(mb instanceof CompressedMatrixBlock))
+                               throw new DMLCompressionException("Input was 
not compressed, therefore the file was not saved to disk");
+
+                       CompressedMatrixBlock cmb = (CompressedMatrixBlock) mb;
+                       cmb.write(out);
+               }
+               finally {
+                       os.close();
+               }
+       }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/io/MatrixReaderFactory.java 
b/src/main/java/org/apache/sysds/runtime/io/MatrixReaderFactory.java
index 473f6441a2..bcf5ac6e48 100644
--- a/src/main/java/org/apache/sysds/runtime/io/MatrixReaderFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/io/MatrixReaderFactory.java
@@ -25,6 +25,7 @@ import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.conf.CompilerConfig.ConfigType;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.compress.io.ReaderCompressed;
 import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
@@ -66,6 +67,10 @@ public class MatrixReaderFactory {
                                reader = (par & mcsr) ? new ReaderHDF5Parallel(
                                        new FileFormatPropertiesHDF5()) : new 
ReaderHDF5(new FileFormatPropertiesHDF5());
                                break;
+
+                       case COMPRESSED:
+                               reader = ReaderCompressed.create();
+
                        default:
                                throw new DMLRuntimeException("Failed to create 
matrix reader for unknown format: " + fmt.toString());
                }
diff --git a/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java 
b/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java
index 10fdb2d8f1..82af28c0bf 100644
--- a/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/io/MatrixWriterFactory.java
@@ -19,10 +19,11 @@
 
 package org.apache.sysds.runtime.io;
 
-import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.conf.CompilerConfig.ConfigType;
+import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.compress.io.WriterCompressed;
 
 public class MatrixWriterFactory
 {
@@ -85,6 +86,9 @@ public class MatrixWriterFactory
                                else
                                        return new 
WriterHDF5((FileFormatPropertiesHDF5) props);
 
+                       case COMPRESSED:
+                               return WriterCompressed.create(props);
+
                        default:
                                throw new DMLRuntimeException("Failed to create 
matrix writer for unknown format: " + fmt.toString());
                }
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/io/IOTest.java 
b/src/test/java/org/apache/sysds/test/component/compress/io/IOTest.java
new file mode 100644
index 0000000000..104794f4ff
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/component/compress/io/IOTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.compress.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory;
+import org.apache.sysds.runtime.compress.DMLCompressionException;
+import org.apache.sysds.runtime.compress.io.ReaderCompressed;
+import org.apache.sysds.runtime.compress.io.WriterCompressed;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.test.TestUtils;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+public class IOTest {
+
+       protected static final Log LOG = 
LogFactory.getLog(IOTest.class.getName());
+
+       final static Object lock = new Object();
+
+       final static String nameBeginning = 
"src/test/java/org/apache/sysds/test/component/compress/io/files/";
+
+       static AtomicInteger id = new AtomicInteger(0);
+
+       public IOTest() {
+               synchronized(lock) {
+                       new File(nameBeginning).mkdirs();
+               }
+       }
+
+       private static void deleteDirectory(File file) {
+               for(File subfile : file.listFiles()) {
+                       if(subfile.isDirectory())
+                               deleteDirectory(subfile);
+                       subfile.delete();
+               }
+               file.delete();
+       }
+
+       @AfterClass
+       public static void cleanup() {
+               deleteDirectory(new File(nameBeginning));
+       }
+
+       public static String getName() {
+               return nameBeginning + "testWrite" + id.incrementAndGet() + 
".cla";
+       }
+
+       @Test
+       public void testWrite() {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(1000, 3, 1, 3, 1.0, 2514));
+               write(mb, getName());
+       }
+
+       @Test
+       public void testWriteAlreadyCompressed() {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(1000, 3, 1, 3, 1.0, 2514));
+               MatrixBlock mb2 = 
CompressedMatrixBlockFactory.compress(mb).getLeft();
+               write(mb2, getName());
+       }
+
+       @Test
+       public void testWriteAndRead() {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(1000, 3, 1, 3, 1.0, 2514));
+
+               String filename = getName();
+               write(mb, filename);
+               MatrixBlock mbr = read(filename);
+
+               assertEquals(mb.sum(), mbr.sum(), 0.0001);
+               assertEquals(mb.min(), mbr.min(), 0.0001);
+               assertEquals(mb.max(), mbr.max(), 0.0001);
+               assertEquals(mb.getNumRows(), mbr.getNumRows());
+               assertEquals(mb.getNumColumns(), mbr.getNumColumns());
+               assertTrue(mb.getInMemorySize() > mbr.getInMemorySize());
+               assertTrue(mb.getExactSizeOnDisk() > mbr.getExactSizeOnDisk());
+
+       }
+
+       @Test(expected = DMLCompressionException.class)
+       public void testWriteNotCompressable() throws Exception {
+               MatrixBlock mb = 
TestUtils.ceil(TestUtils.generateTestMatrixBlock(3, 3, 1, 3, 1.0, 2514));
+               WriterCompressed.writeCompressedMatrixToHDFS(mb, getName());
+       }
+
+       private static MatrixBlock read(String path) {
+               try {
+                       return 
ReaderCompressed.readCompressedMatrixFromHDFS(path);
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail("Failed to read file");
+                       return null;
+               }
+       }
+
+       private static void write(MatrixBlock src, String path) {
+               try {
+                       WriterCompressed.writeCompressedMatrixToHDFS(src, path);
+               }
+               catch(Exception e) {
+                       e.printStackTrace();
+                       fail("Failed to write file");
+               }
+       }
+}

Reply via email to