This is an automated email from the ASF dual-hosted git repository. baunsgaard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
commit 678b28a2210dcdf11841d9486d1dac4fdacb036d Author: baunsgaard <[email protected]> AuthorDate: Wed Oct 19 16:38:16 2022 +0200 [MINOR] Disable Hadoop Write compression --- .../sysds/runtime/compress/io/CompressWrap.java | 1 - .../runtime/compress/io/WriterCompressed.java | 3 +- .../sysds/runtime/compress/lib/CLALibCombine.java | 93 +++++++++++----------- 3 files changed, 48 insertions(+), 49 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/compress/io/CompressWrap.java b/src/main/java/org/apache/sysds/runtime/compress/io/CompressWrap.java index e3e370b694..c17edc2b6d 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/io/CompressWrap.java +++ b/src/main/java/org/apache/sysds/runtime/compress/io/CompressWrap.java @@ -20,7 +20,6 @@ package org.apache.sysds.runtime.compress.io; import org.apache.spark.api.java.function.Function; -import org.apache.sysds.runtime.compress.CompressedMatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixBlock; public class CompressWrap implements Function<MatrixBlock, CompressedWriteBlock> { diff --git a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java index d2194b98f2..82f2be59e4 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java +++ b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java @@ -124,7 +124,8 @@ public final class WriterCompressed extends MatrixWriter { // Make Writer (New interface) final Writer w = SequenceFile.createWriter(job, Writer.file(path), Writer.bufferSize(4096), Writer.blockSize(4096), Writer.keyClass(MatrixIndexes.class), Writer.valueClass(CompressedWriteBlock.class), - Writer.compression(SequenceFile.CompressionType.RECORD), Writer.replication((short) 1)); + Writer.compression(SequenceFile.CompressionType.NONE), // No Compression type on disk + Writer.replication((short) 1)); final int rlen = src.getNumRows(); final int clen = src.getNumColumns(); diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCombine.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCombine.java index 7600167250..ca54f7d181 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCombine.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCombine.java @@ -32,13 +32,14 @@ import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixIndexes; +import edu.emory.mathcs.backport.java.util.Arrays; + public class CLALibCombine { protected static final Log LOG = LogFactory.getLog(CLALibCombine.class.getName()); public static MatrixBlock combine(Map<MatrixIndexes, MatrixBlock> m) { // Dynamically find rlen, clen and blen; - // assume that the blen is the same in all blocks. // assume that all blocks are there ... final MatrixIndexes lookup = new MatrixIndexes(1, 1); @@ -57,15 +58,21 @@ public class CLALibCombine { lookup.setIndexes(1, lookup.getColumnIndex() + 1); } - return combine(m, (int) rows, (int) cols, blen); + return combine(m, lookup, (int) rows, (int) cols, blen); } public static MatrixBlock combine(Map<MatrixIndexes, MatrixBlock> m, int rlen, int clen, int blen) { + final MatrixIndexes lookup = new MatrixIndexes(); + return combine(m, lookup, rlen, clen, blen); + } + + private static MatrixBlock combine(final Map<MatrixIndexes, MatrixBlock> m, final MatrixIndexes lookup, + final int rlen, final int clen, final int blen) { if(rlen < blen) // Shortcut, in case file only contains one block in r length. - return CombiningColumnGroups(m, rlen, clen, blen); + return CombiningColumnGroups(m, lookup, rlen, clen, blen); + final CompressionType[] colTypes = new CompressionType[clen]; - final MatrixIndexes lookup = new MatrixIndexes(); // Look through the first blocks in to the top. for(int bc = 0; bc * blen < clen; bc++) { lookup.setIndexes(1, bc + 1); // get first blocks @@ -119,10 +126,11 @@ public class CLALibCombine { } } - return CombiningColumnGroups(m, rlen, clen, blen); + return CombiningColumnGroups(m, lookup, rlen, clen, blen); } - private static MatrixBlock combineViaDecompression(Map<MatrixIndexes, MatrixBlock> m, int rlen, int clen, int blen) { + private static MatrixBlock combineViaDecompression(final Map<MatrixIndexes, MatrixBlock> m, final int rlen, + final int clen, final int blen) { final MatrixBlock out = new MatrixBlock(rlen, clen, false); out.allocateDenseBlock(); for(Entry<MatrixIndexes, MatrixBlock> e : m.entrySet()) { @@ -140,57 +148,48 @@ public class CLALibCombine { } // It is known all of the matrices are Compressed and they are non overlapping. - private static MatrixBlock CombiningColumnGroups(Map<MatrixIndexes, MatrixBlock> m, int rlen, int clen, int blen) { + private static MatrixBlock CombiningColumnGroups(final Map<MatrixIndexes, MatrixBlock> m, final MatrixIndexes lookup, + final int rlen, final int clen, final int blen) { - final AColGroup[] finalCols = new AColGroup[clen]; - final MatrixIndexes lookup = new MatrixIndexes(); - for(int bc = 0; bc * blen < clen; bc++) { - lookup.setIndexes(1, bc + 1); // get first blocks - final MatrixBlock b = m.get(lookup); - final CompressedMatrixBlock cmb = (CompressedMatrixBlock) b; - - final List<AColGroup> gs = cmb.getColGroups(); - for(AColGroup g : gs) { - AColGroup gc = g; - if(bc > 0) - gc = g.shiftColIndices(bc * blen); - final int[] cols = gc.getColIndices(); - finalCols[cols[0]] = gc; // only assign first column of each group. - } - } + final AColGroup[][] finalCols = new AColGroup[clen][]; // temp array for combining + final int blocksInColumn = (rlen - 1) / blen + 1; + final int nGroups = m.size() / blocksInColumn; - for(int br = 1; br * blen < rlen; br++) { + // Add all the blocks into linear structure. + for(int br = 0; br * blen < rlen; br++) { for(int bc = 0; bc * blen < clen; bc++) { - lookup.setIndexes(br + 1, bc + 1); // get first blocks - final MatrixBlock b = m.get(lookup); - - final CompressedMatrixBlock cmb = (CompressedMatrixBlock) b; - - final List<AColGroup> gs = cmb.getColGroups(); - for(AColGroup g : gs) { - AColGroup gc = g; - if(bc > 0) - gc = g.shiftColIndices(bc * blen); + lookup.setIndexes(br + 1, bc + 1); + final CompressedMatrixBlock cmb = (CompressedMatrixBlock) m.get(lookup); + for(AColGroup g : cmb.getColGroups()) { + final AColGroup gc = bc > 0 ? g.shiftColIndices(bc * blen) : g; final int[] cols = gc.getColIndices(); - AColGroup prev = finalCols[cols[0]]; - AColGroup comb = prev.append(gc); - if(comb == null) { - LOG.warn("Combining of columns from group: " + prev.getClass().getSimpleName() + " and " - + gc.getClass().getSimpleName() + " was non trivial, therefore falling back to decompression"); - return combineViaDecompression(m, rlen, clen, blen); - } - finalCols[cols[0]] = comb; + if(br == 0) + finalCols[cols[0]] = new AColGroup[blocksInColumn]; + finalCols[cols[0]][br] = gc; } } } - List<AColGroup> finalGroups = new ArrayList<>(); - - for(AColGroup g : finalCols) - if(g != null) - finalGroups.add(g); + final List<AColGroup> finalGroups = new ArrayList<>(nGroups); + for(AColGroup[] colOfGroups : finalCols) { + if(colOfGroups != null) { // skip null entries + final AColGroup combined = combineN(colOfGroups); + if(combined == null) { + LOG.warn("Combining of columns from group failed: " + Arrays.toString(colOfGroups)); + return combineViaDecompression(m, rlen, clen, blen); + } + finalGroups.add(combined); + } + } return new CompressedMatrixBlock(rlen, clen, -1, false, finalGroups); } + + private static AColGroup combineN(AColGroup[] groups) { + AColGroup r = groups[0]; + for(int i = 1; i < groups.length && r != null; i++) + r = r.append(groups[i]); + return r; + } }
