This is an automated email from the ASF dual-hosted git repository. baunsgaard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
commit 8e4f06a89a2c82fc2b32bb11106849fe2cd83522 Author: baunsgaard <[email protected]> AuthorDate: Sat Oct 15 16:26:08 2022 +0200 [SYSTEMDS-3448] Uncompressed Append This commit adds support for uncompressed matrices to append to themselves. --- .../runtime/compress/colgroup/ColGroupDDC.java | 20 ++++++-- .../runtime/compress/colgroup/ColGroupSDC.java | 8 ++++ .../compress/colgroup/ColGroupUncompressed.java | 5 ++ .../runtime/compress/colgroup/offset/AOffset.java | 32 ++++++++----- .../runtime/compress/io/CompressedWriteBlock.java | 26 ++++++++--- .../runtime/compress/io/WriterCompressed.java | 54 +++++++++++++++------- .../sysds/runtime/compress/lib/CLALibCombine.java | 17 ++++--- .../sysds/runtime/compress/lib/CLALibSlice.java | 19 +++----- .../sysds/runtime/matrix/data/MatrixBlock.java | 44 ++++++++++++++++++ .../sysds/test/component/compress/io/IOTest.java | 23 +++++---- 10 files changed, 181 insertions(+), 67 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java index 008d66ce1b..ce2c691cca 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java @@ -486,13 +486,23 @@ public class ColGroupDDC extends APreAgg { @Override public AColGroup append(AColGroup g) { - if(g instanceof ColGroupDDC && Arrays.equals(g.getColIndices(), _colIndexes)) { - ColGroupDDC gDDC = (ColGroupDDC) g; - if(gDDC._dict.eq(_dict)){ - AMapToData nd = _data.append(gDDC._data); - return create(_colIndexes, _dict, nd, null); + if(g instanceof ColGroupDDC) { + if(Arrays.equals(g.getColIndices(), _colIndexes)) { + + ColGroupDDC gDDC = (ColGroupDDC) g; + if(gDDC._dict.eq(_dict)) { + AMapToData nd = _data.append(gDDC._data); + return create(_colIndexes, _dict, nd, null); + } + else + LOG.warn("Not same Dictionaries therefore not appending DDC\n" + _dict + "\n\n" + gDDC._dict); } + else + LOG.warn("Not same columns therefore not appending DDC\n" + Arrays.toString(_colIndexes) + "\n\n" + + Arrays.toString(g.getColIndices())); } + else + LOG.warn("Not DDC but " + g.getClass().getSimpleName() + ", therefore not appending DDC"); return null; } diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDC.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDC.java index a40ad71644..f266f9a8fb 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDC.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDC.java @@ -571,6 +571,14 @@ public class ColGroupSDC extends ASDC { @Override public AColGroup append(AColGroup g) { + if(g instanceof ColGroupSDC && Arrays.equals(g.getColIndices(), _colIndexes)) { + final ColGroupSDC gSDC = (ColGroupSDC) g; + if(Arrays.equals(_defaultTuple, gSDC._defaultTuple) && gSDC._dict.eq(_dict)) { + final AMapToData nd = _data.append(gSDC._data); + final AOffset ofd = _indexes.append(gSDC._indexes); + return create(_colIndexes, _numRows + gSDC._numRows, _dict, _defaultTuple, ofd, nd, null); + } + } return null; } diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java index fa888261f1..7c173cb6d1 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java @@ -767,6 +767,11 @@ public class ColGroupUncompressed extends AColGroup { @Override public AColGroup append(AColGroup g) { + if(g instanceof ColGroupUncompressed && Arrays.equals(g.getColIndices(), _colIndexes)) { + final ColGroupUncompressed gDDC = (ColGroupUncompressed) g; + final MatrixBlock nd = _data.append(gDDC._data, false); + return create(nd, _colIndexes); + } return null; } diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/AOffset.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/AOffset.java index 43657c15cd..2298e0ec80 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/AOffset.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/AOffset.java @@ -439,18 +439,14 @@ public abstract class AOffset implements Serializable { return ((OffsetChar) this).slice(lowOff, highOff, lowValue, highValue, low, high); } - // protected abstract OffsetSliceInfo slice(int lowOff, int highOff, int lowValue, int highValue, int low, int high); - - public static final class OffsetSliceInfo { - public final int lIndex; - public final int uIndex; - public final AOffset offsetSlice; - - protected OffsetSliceInfo(int l, int u, AOffset off) { - this.lIndex = l; - this.uIndex = u; - this.offsetSlice = off; - } + /** + * Append the offsets from that other offset to the offsets in this. + * + * @param t that offsets + * @return this offsets followed by thats offsets. + */ + public AOffset append(AOffset t){ + throw new NotImplementedException(); } @Override @@ -477,6 +473,18 @@ public abstract class AOffset implements Serializable { return sb.toString(); } + public static final class OffsetSliceInfo { + public final int lIndex; + public final int uIndex; + public final AOffset offsetSlice; + + protected OffsetSliceInfo(int l, int u, AOffset off) { + this.lIndex = l; + this.uIndex = u; + this.offsetSlice = off; + } + } + protected static class OffsetCache { protected final AIterator it; protected final int row; diff --git a/src/main/java/org/apache/sysds/runtime/compress/io/CompressedWriteBlock.java b/src/main/java/org/apache/sysds/runtime/compress/io/CompressedWriteBlock.java index 8c53b3ed59..1ccbec7dcd 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/io/CompressedWriteBlock.java +++ b/src/main/java/org/apache/sysds/runtime/compress/io/CompressedWriteBlock.java @@ -31,10 +31,14 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock; * Write block for serializing either a instance of MatrixBlock or CompressedMatrixBlock, To allow spark to read in * either or. */ -public class CompressedWriteBlock implements WritableComparable<CompressedWriteBlock> { +public class CompressedWriteBlock implements WritableComparable<CompressedWriteBlock> { public MatrixBlock mb; + private enum CONTENT { + Comp, MB; + } + /** * Write block used to point to a underlying instance of CompressedMatrixBlock or MatrixBlock, Unfortunately spark * require a specific object type to serialize therefore we use this class. @@ -49,17 +53,25 @@ public class CompressedWriteBlock implements WritableComparable<CompressedWrite @Override public void write(DataOutput out) throws IOException { - out.writeBoolean(mb instanceof CompressedMatrixBlock); + + if(mb instanceof CompressedMatrixBlock) + out.writeByte(CONTENT.Comp.ordinal()); + else + out.writeByte(CONTENT.MB.ordinal()); mb.write(out); + } @Override public void readFields(DataInput in) throws IOException { - if(in.readBoolean()) - mb = CompressedMatrixBlock.read(in); - else { - mb = new MatrixBlock(); - mb.readFields(in); + switch(CONTENT.values()[in.readByte()]) { + case Comp: + mb = CompressedMatrixBlock.read(in); + break; + case MB: + mb = new MatrixBlock(); + mb.readFields(in); + break; } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java index 207c6961fa..d2194b98f2 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java +++ b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java @@ -20,6 +20,7 @@ package org.apache.sysds.runtime.compress.io; import java.io.IOException; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +36,7 @@ import org.apache.sysds.hops.OptimizerUtils; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.compress.CompressedMatrixBlock; import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory; +import org.apache.sysds.runtime.compress.lib.CLALibSlice; import org.apache.sysds.runtime.instructions.spark.CompressionSPInstruction.CompressionFunction; import org.apache.sysds.runtime.instructions.spark.utils.RDDConverterUtils; import org.apache.sysds.runtime.io.FileFormatProperties; @@ -112,7 +114,7 @@ public final class WriterCompressed extends MatrixWriter { write(m, fname, blen); } - private void write(MatrixBlock src, String fname, int blen) throws IOException { + private void write(MatrixBlock src, final String fname, final int blen) throws IOException { final int k = OptimizerUtils.getParallelTextWriteParallelism(); final Path path = new Path(fname); final JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); @@ -120,13 +122,17 @@ public final class WriterCompressed extends MatrixWriter { HDFSTool.deleteFileIfExistOnHDFS(path, job); // Make Writer (New interface) - Writer w = SequenceFile.createWriter(job, Writer.file(path), Writer.bufferSize(4096), Writer.blockSize(4096), - Writer.keyClass(MatrixIndexes.class), Writer.valueClass(CompressedWriteBlock.class), + final Writer w = SequenceFile.createWriter(job, Writer.file(path), Writer.bufferSize(4096), + Writer.blockSize(4096), Writer.keyClass(MatrixIndexes.class), Writer.valueClass(CompressedWriteBlock.class), Writer.compression(SequenceFile.CompressionType.RECORD), Writer.replication((short) 1)); final int rlen = src.getNumRows(); final int clen = src.getNumColumns(); + // Try to compress! + if(!(src instanceof CompressedMatrixBlock)) + src = CompressedMatrixBlockFactory.compress(src, k).getLeft(); + if(rlen <= blen && clen <= blen) writeSingleBlock(w, src, k); else @@ -145,20 +151,36 @@ public final class WriterCompressed extends MatrixWriter { w.append(idx, new CompressedWriteBlock(mc)); } - private void writeMultiBlock(Writer w, MatrixBlock b, int rlen, int clen, int blen, int k) throws IOException { + private void writeMultiBlock(Writer w, MatrixBlock b, final int rlen, final int clen, final int blen, int k) + throws IOException { final MatrixIndexes indexes = new MatrixIndexes(); - for(int br = 0; br * blen < rlen; br++) { - for(int bc = 0; bc * blen < clen; bc++) { - // Max Row and col in block - int sR = br * blen; - int sC = bc * blen; - int mR = Math.min(br * blen + blen, rlen) - 1; - int mC = Math.min(bc * blen + blen, clen) - 1; - - MatrixBlock mb = b.slice(sR, mR, sC, mC); - MatrixBlock mc = CompressedMatrixBlockFactory.compress(mb, k).getLeft(); - indexes.setIndexes(br + 1, bc + 1); - w.append(indexes, new CompressedWriteBlock(mc)); + if(!(b instanceof CompressedMatrixBlock)) + LOG.warn("Writing compressed format with non identical compression scheme"); + + for(int bc = 0; bc * blen < clen; bc++) { + final int sC = bc * blen; + final int mC = Math.min(sC + blen, clen) - 1; + if(b instanceof CompressedMatrixBlock) { + final CompressedMatrixBlock mc = //mC == clen - 1 ? (CompressedMatrixBlock) b : + CLALibSlice + .sliceColumns((CompressedMatrixBlock) b, sC, mC); // slice columns! + + final List<MatrixBlock> blocks = CLALibSlice.sliceBlocks(mc, blen); // Slice compressed blocks + for(int br = 0; br * blen < rlen; br++) { + indexes.setIndexes(br + 1, bc + 1); + w.append(indexes, new CompressedWriteBlock(blocks.get(br))); + } + } + else { + for(int br = 0; br * blen < rlen; br++) { + // Max Row and col in block + final int sR = br * blen; + final int mR = Math.min(sR + blen, rlen) - 1; + MatrixBlock mb = b.slice(sR, mR, sC, mC); + MatrixBlock mc = CompressedMatrixBlockFactory.compress(mb, k).getLeft(); + indexes.setIndexes(br + 1, bc + 1); + w.append(indexes, new CompressedWriteBlock(mc)); + } } } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCombine.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCombine.java index c63aaef193..7600167250 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCombine.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCombine.java @@ -151,8 +151,11 @@ public class CLALibCombine { final List<AColGroup> gs = cmb.getColGroups(); for(AColGroup g : gs) { - final int[] cols = g.getColIndices(); - finalCols[cols[0] + bc * blen] = g; // only assign first column of each group. + AColGroup gc = g; + if(bc > 0) + gc = g.shiftColIndices(bc * blen); + final int[] cols = gc.getColIndices(); + finalCols[cols[0]] = gc; // only assign first column of each group. } } @@ -169,12 +172,14 @@ public class CLALibCombine { if(bc > 0) gc = g.shiftColIndices(bc * blen); final int[] cols = gc.getColIndices(); - - finalCols[cols[0]] = finalCols[cols[0]].append(gc); - if(finalCols[cols[0]] == null) { - LOG.warn("Combining of columns was non trivial, therefore falling back to decompression"); + AColGroup prev = finalCols[cols[0]]; + AColGroup comb = prev.append(gc); + if(comb == null) { + LOG.warn("Combining of columns from group: " + prev.getClass().getSimpleName() + " and " + + gc.getClass().getSimpleName() + " was non trivial, therefore falling back to decompression"); return combineViaDecompression(m, rlen, clen, blen); } + finalCols[cols[0]] = comb; } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibSlice.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibSlice.java index 52276962b3..32fa6c1b59 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibSlice.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibSlice.java @@ -39,18 +39,12 @@ public class CLALibSlice { * * @param cmb The input block to slice. * @param blen The length of the blocks. - * @return A list containing CompressedMatrixBlocks where there is values, and null if there is no values in the sub - * block. + * @return A list containing CompressedMatrixBlocks or MatrixBlocks */ - public static List<CompressedMatrixBlock> sliceBlocks(CompressedMatrixBlock cmb, int blen) { - List<CompressedMatrixBlock> mbs = new ArrayList<>(); - for(int b = 0; b < cmb.getNumRows(); b += blen) { - MatrixBlock mb = sliceRowsCompressed(cmb, b, Math.min(b + blen, cmb.getNumRows())); - if(mb instanceof CompressedMatrixBlock) - mbs.add((CompressedMatrixBlock) mb); - else - mbs.add(null); - } + public static List<MatrixBlock> sliceBlocks(CompressedMatrixBlock cmb, int blen) { + final List<MatrixBlock> mbs = new ArrayList<>(); + for(int b = 0; b < cmb.getNumRows(); b += blen) + mbs.add(sliceRowsCompressed(cmb, b, Math.min(b + blen, cmb.getNumRows()) - 1)); return mbs; } @@ -79,7 +73,6 @@ public class CLALibSlice { return sliceRowsDecompress(cmb, rl, ru); else return sliceRowsCompressed(cmb, rl, ru); - } private static boolean shouldDecompressSliceRows(CompressedMatrixBlock cmb, int rl, int ru) { @@ -139,7 +132,7 @@ public class CLALibSlice { return tmp; } - private static CompressedMatrixBlock sliceColumns(CompressedMatrixBlock cmb, int cl, int cu) { + public static CompressedMatrixBlock sliceColumns(CompressedMatrixBlock cmb, int cl, int cu) { final int cue = cu + 1; final CompressedMatrixBlock ret = new CompressedMatrixBlock(cmb.getNumRows(), cue - cl); diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java index b69fb99873..ebd0ff88b0 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java @@ -3659,22 +3659,66 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab return result; } + /** + * Append that matrix to this matrix, while allocating a new matrix. + * Default is cbind making the matrix "wider" + * + * @param that the other matrix to append + * @return A new MatrixBlock object with the appended result + */ public final MatrixBlock append(MatrixBlock that) { return append(that, null, true); // default cbind } + /** + * Append that matrix to this matrix, while allocating a new matrix. + * cbind true makes the matrix "wider" while cbind false make it "taller" + * + * @param that the other matrix to append + * @param cbind if binding on columns or rows + * @return a new MatrixBlock object with the appended result + */ public final MatrixBlock append(MatrixBlock that, boolean cbind) { return append(that, null, cbind); } + /** + * Append that matrix to this matrix. + * + * Default is cbind making the matrix "wider" + * + * @param that the other matrix to append + * @param ret the output matrix to modify, (is also returned) + * @return the ret MatrixBlock object with the appended result + */ public final MatrixBlock append( MatrixBlock that, MatrixBlock ret ) { return append(that, ret, true); //default cbind } + /** + * Append that matrix to this matrix. + * + * cbind true makes the matrix "wider" while cbind false make it "taller" + * + * @param that the other matrix to append + * @param ret the output matrix to modify, (is also returned) + * @param cbind if binding on columns or rows + * @return the ret MatrixBlock object with the appended result + */ public final MatrixBlock append( MatrixBlock that, MatrixBlock ret, boolean cbind ) { return append(new MatrixBlock[]{that}, ret, cbind); } + /** + * Append that list of matrixes to this matrix. + * + * cbind true makes the matrix "wider" while cbind false make it "taller" + * + * @param that a list of matrices to append in order + * @param result the output matrix to modify, (is also returned) + * @param cbind if binding on columns or rows + * @return the ret MatrixBlock object with the appended result + */ public MatrixBlock append( MatrixBlock[] that, MatrixBlock result, boolean cbind) { checkDimensionsForAppend(that, cbind); diff --git a/src/test/java/org/apache/sysds/test/component/compress/io/IOTest.java b/src/test/java/org/apache/sysds/test/component/compress/io/IOTest.java index 11f28b6fd5..003ef85d08 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/io/IOTest.java +++ b/src/test/java/org/apache/sysds/test/component/compress/io/IOTest.java @@ -117,12 +117,12 @@ public class IOTest { @Test public void testWriteAndReadSmallBlen() throws Exception { - writeAndRead(TestUtils.ceil(TestUtils.generateTestMatrixBlock(1000, 3, 1, 3, 1.0, 2514)), 100); + writeAndRead(TestUtils.ceil(TestUtils.generateTestMatrixBlock(200, 3, 1, 3, 1.0, 2514)), 100); } @Test public void testWriteAndReadSmallBlenBiggerClen() throws Exception { - writeAndRead(TestUtils.ceil(TestUtils.generateTestMatrixBlock(1000, 51, 1, 3, 1.0, 2514)), 50); + writeAndRead(TestUtils.ceil(TestUtils.generateTestMatrixBlock(200, 51, 1, 3, 1.0, 2514)), 50); } @Test @@ -141,11 +141,18 @@ public class IOTest { } protected static void writeAndRead(MatrixBlock mb, int blen) throws Exception { - String filename = getName(); - WriterCompressed.writeCompressedMatrixToHDFS(mb, filename, blen); - File f = new File(filename); - assertTrue(f.isFile() || f.isDirectory()); - MatrixBlock mbr = IOCompressionTestUtils.read(filename); - IOCompressionTestUtils.verifyEquivalence(mb, mbr); + try{ + + String filename = getName(); + WriterCompressed.writeCompressedMatrixToHDFS(mb, filename, blen); + File f = new File(filename); + assertTrue(f.isFile() || f.isDirectory()); + MatrixBlock mbr = IOCompressionTestUtils.read(filename); + IOCompressionTestUtils.verifyEquivalence(mb, mbr); + } + catch(Exception e){ + e.printStackTrace(); + throw e; + } } }
