This is an automated email from the ASF dual-hosted git repository. baunsgaard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
commit 4cebe8ed343801e565c769fc8208001251242970 Author: baunsgaard <[email protected]> AuthorDate: Wed Oct 12 23:51:33 2022 +0200 [SYSTEMDS-3444] Spark Write compressed format This commit fixes/adds support for spark writing, the previous versions did not properly support the edge cases, while this commit fixes inconsistencies, and allow local reading of folders containing sub blocks. --- .../org/apache/sysds/parser/DMLTranslator.java | 6 +- .../org/apache/sysds/parser/DataExpression.java | 2 +- .../sysds/runtime/compress/io/CompressWrap.java | 36 ++++ .../runtime/compress/io/ReaderCompressed.java | 20 +- .../runtime/compress/io/WriterCompressed.java | 18 ++ .../runtime/compress/lib/CLALibDecompress.java | 20 +- .../instructions/spark/WriteSPInstruction.java | 24 ++- .../functions/ExtractBlockForBinaryReblock.java | 19 +- .../spark/utils/RDDAggregateUtils.java | 23 +- .../sysds/runtime/matrix/data/MatrixBlock.java | 2 + ...OTestUtils.java => IOCompressionTestUtils.java} | 6 +- .../sysds/test/component/compress/io/IOEmpty.java | 14 +- .../sysds/test/component/compress/io/IOSpark.java | 240 ++++++++++++++++++--- .../sysds/test/component/compress/io/IOTest.java | 14 +- .../io/compressed/WriteCompressedTest.java | 95 ++++++++ .../io/compressed/WriteCompressedTest.dml | 26 +++ 16 files changed, 485 insertions(+), 80 deletions(-) diff --git a/src/main/java/org/apache/sysds/parser/DMLTranslator.java b/src/main/java/org/apache/sysds/parser/DMLTranslator.java index c196f780c3..315f54ff72 100644 --- a/src/main/java/org/apache/sysds/parser/DMLTranslator.java +++ b/src/main/java/org/apache/sysds/parser/DMLTranslator.java @@ -1053,20 +1053,18 @@ public class DMLTranslator case MM: case CSV: case LIBSVM: + case HDF5: // write output in textcell format ae.setOutputParams(ae.getDim1(), ae.getDim2(), ae.getNnz(), ae.getUpdateType(), -1); break; case BINARY: + case COMPRESSED: // write output in binary block format ae.setOutputParams(ae.getDim1(), ae.getDim2(), ae.getNnz(), ae.getUpdateType(), ae.getBlocksize()); break; case FEDERATED: ae.setOutputParams(ae.getDim1(), ae.getDim2(), -1, ae.getUpdateType(), -1); break; - case HDF5: - // write output in HDF5 format - ae.setOutputParams(ae.getDim1(), ae.getDim2(), ae.getNnz(), ae.getUpdateType(), -1); - break; default: throw new LanguageException("Unrecognized file format: " + ae.getFileFormat()); } diff --git a/src/main/java/org/apache/sysds/parser/DataExpression.java b/src/main/java/org/apache/sysds/parser/DataExpression.java index c3d6edd5e3..a76aa3dd4c 100644 --- a/src/main/java/org/apache/sysds/parser/DataExpression.java +++ b/src/main/java/org/apache/sysds/parser/DataExpression.java @@ -1327,7 +1327,7 @@ public class DataExpression extends DataIdentifier //validate read filename if (getVarParam(FORMAT_TYPE) == null || FileFormat.isTextFormat(getVarParam(FORMAT_TYPE).toString())) getOutput().setBlocksize(-1); - else if (getVarParam(FORMAT_TYPE).toString().equalsIgnoreCase(FileFormat.BINARY.toString())) { + else if (getVarParam(FORMAT_TYPE).toString().equalsIgnoreCase(FileFormat.BINARY.toString()) || getVarParam(FORMAT_TYPE).toString().equalsIgnoreCase(FileFormat.COMPRESSED.toString())) { if( getVarParam(ROWBLOCKCOUNTPARAM)!=null ) getOutput().setBlocksize(Integer.parseInt(getVarParam(ROWBLOCKCOUNTPARAM).toString())); else diff --git a/src/main/java/org/apache/sysds/runtime/compress/io/CompressWrap.java b/src/main/java/org/apache/sysds/runtime/compress/io/CompressWrap.java new file mode 100644 index 0000000000..e3e370b694 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/io/CompressWrap.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.compress.io; + +import org.apache.spark.api.java.function.Function; +import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; + +public class CompressWrap implements Function<MatrixBlock, CompressedWriteBlock> { + private static final long serialVersionUID = 966405324406154236L; + + public CompressWrap() { + } + + @Override + public CompressedWriteBlock call(MatrixBlock arg0) throws Exception { + return new CompressedWriteBlock(arg0); + } +} diff --git a/src/main/java/org/apache/sysds/runtime/compress/io/ReaderCompressed.java b/src/main/java/org/apache/sysds/runtime/compress/io/ReaderCompressed.java index 0b57a8ec02..eb5fa08d60 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/io/ReaderCompressed.java +++ b/src/main/java/org/apache/sysds/runtime/compress/io/ReaderCompressed.java @@ -69,11 +69,23 @@ public final class ReaderCompressed extends MatrixReader { private static MatrixBlock readCompressedMatrix(Path path, JobConf job, FileSystem fs, int rlen, int clen, int blen) throws IOException { - final Reader reader = new SequenceFile.Reader(job, SequenceFile.Reader.file(path)); + final Map<MatrixIndexes, MatrixBlock> data = new HashMap<>(); + + for(Path subPath : IOUtilFunctions.getSequenceFilePaths(fs, path)) + read(subPath, job, data); + + if(data.size() == 1) + return data.entrySet().iterator().next().getValue(); + else + return CLALibCombine.combine(data); + } + + private static void read(Path path, JobConf job, Map<MatrixIndexes, MatrixBlock> data) throws IOException { + + final Reader reader = new SequenceFile.Reader(job, SequenceFile.Reader.file(path)); try { // Materialize all sub blocks. - Map<MatrixIndexes, MatrixBlock> data = new HashMap<>(); // Use write and read interface to read and write this object. MatrixIndexes key = new MatrixIndexes(); @@ -84,10 +96,6 @@ public final class ReaderCompressed extends MatrixReader { key = new MatrixIndexes(); value = new CompressedWriteBlock(); } - if(data.size() == 1) - return data.entrySet().iterator().next().getValue(); - else - return CLALibCombine.combine(data); } finally { IOUtilFunctions.closeSilently(reader); diff --git a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java index 7a3e07cf3c..207c6961fa 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java +++ b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java @@ -28,16 +28,22 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.hops.OptimizerUtils; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.compress.CompressedMatrixBlock; import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory; +import org.apache.sysds.runtime.instructions.spark.CompressionSPInstruction.CompressionFunction; +import org.apache.sysds.runtime.instructions.spark.utils.RDDConverterUtils; import org.apache.sysds.runtime.io.FileFormatProperties; import org.apache.sysds.runtime.io.IOUtilFunctions; import org.apache.sysds.runtime.io.MatrixWriter; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixIndexes; +import org.apache.sysds.runtime.meta.DataCharacteristics; +import org.apache.sysds.runtime.meta.MatrixCharacteristics; import org.apache.sysds.runtime.util.HDFSTool; public final class WriterCompressed extends MatrixWriter { @@ -62,6 +68,18 @@ public final class WriterCompressed extends MatrixWriter { create(null).writeMatrixToHDFS(src, fname, rlen, clen, blen, nnz, diag); } + public static void writeRDDToHDFS(JavaPairRDD<MatrixIndexes, MatrixBlock> src, String path, int blen, + DataCharacteristics mc) { + final DataCharacteristics outC = new MatrixCharacteristics(mc).setBlocksize(blen); + writeRDDToHDFS(RDDConverterUtils.binaryBlockToBinaryBlock(src, mc, outC), path); + } + + public static void writeRDDToHDFS(JavaPairRDD<MatrixIndexes, MatrixBlock> src, String path) { + src.mapValues(new CompressionFunction()) // Try to compress each block. + .mapValues(new CompressWrap()) // Wrap in writable + .saveAsHadoopFile(path, MatrixIndexes.class, CompressedWriteBlock.class, SequenceFileOutputFormat.class); + } + @Override public void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int blen, long nnz, boolean diag) throws IOException { diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibDecompress.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibDecompress.java index 30ea5de820..b4a38b1ce7 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibDecompress.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibDecompress.java @@ -61,6 +61,21 @@ public class CLALibDecompress { public static void decompressTo(CompressedMatrixBlock cmb, MatrixBlock ret, int rowOffset, int colOffset, int k) { Timing time = new Timing(true); + if(cmb.getNumColumns() + colOffset > ret.getNumColumns() || cmb.getNumRows() + rowOffset > ret.getNumRows()) { + LOG.warn( + "Slow slicing off excess parts for decompressTo because decompression into is implemented for fitting blocks"); + MatrixBlock mbSliced = cmb.slice( // + Math.min(Math.abs(rowOffset), 0), Math.min(cmb.getNumRows(), ret.getNumRows() - rowOffset) - 1, // Rows + Math.min(Math.abs(colOffset), 0), Math.min(cmb.getNumColumns(), ret.getNumColumns() - colOffset) - 1); // Cols + if(mbSliced instanceof MatrixBlock) { + mbSliced.putInto(ret, rowOffset, colOffset, false); + return; + } + + cmb = (CompressedMatrixBlock) mbSliced; + decompress(cmb, 1); + } + final boolean outSparse = ret.isInSparseFormat(); if(!cmb.isEmpty()) { if(outSparse && cmb.isOverlapping()) @@ -78,8 +93,7 @@ public class CLALibDecompress { LOG.trace("decompressed block w/ k=" + k + " in " + t + "ms."); } - if(ret.getNonZeros() <= 0) - ret.setNonZeros(cmb.getNonZeros()); + ret.recomputeNonZeros(); } private static void decompressToSparseBlock(CompressedMatrixBlock cmb, MatrixBlock ret, int rowOffset, @@ -96,6 +110,8 @@ public class CLALibDecompress { else for(AColGroup g : groups) g.decompressToSparseBlock(sb, 0, nRows, rowOffset, colOffset); + sb.sort(); + ret.checkSparseRows(); } private static void decompressToDenseBlock(CompressedMatrixBlock cmb, DenseBlock ret, int rowOffset, int colOffset) { diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java index bad0d2fde5..97371a477f 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java @@ -19,6 +19,10 @@ package org.apache.sysds.runtime.instructions.spark; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Random; + import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.io.LongWritable; @@ -31,6 +35,7 @@ import org.apache.sysds.common.Types.FileFormat; import org.apache.sysds.common.Types.ValueType; import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.compress.io.WriterCompressed; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext; import org.apache.sysds.runtime.instructions.InstructionUtils; @@ -52,10 +57,6 @@ import org.apache.sysds.runtime.meta.DataCharacteristics; import org.apache.sysds.runtime.meta.MatrixCharacteristics; import org.apache.sysds.runtime.util.HDFSTool; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Random; - public class WriteSPInstruction extends SPInstruction implements LineageTraceable { public CPOperand input1 = null; private CPOperand input2 = null; @@ -252,6 +253,21 @@ public class WriteSPInstruction extends SPInstruction implements LineageTraceabl if( nonDefaultBlen ) mcOut = new MatrixCharacteristics(mc).setBlocksize(blen); } + else if(fmt == FileFormat.COMPRESSED) { + // reblock output if needed + final int blen = Integer.parseInt(input4.getName()); + final boolean nonDefaultBlen = ConfigurationManager.getBlocksize() != blen; + mc.setNonZeros(-1); // default to unknown non zeros for compressed matrix block + + if(nonDefaultBlen) + WriterCompressed.writeRDDToHDFS(in1, fname, blen, mc); + else + WriterCompressed.writeRDDToHDFS(in1, fname); + + if(nonDefaultBlen) + mcOut = new MatrixCharacteristics(mc).setBlocksize(blen); + + } else if(fmt == FileFormat.LIBSVM) { if(mc.getRows() == 0 || mc.getCols() == 0) { throw new IOException( diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java index f9b735e39c..4f20216760 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/functions/ExtractBlockForBinaryReblock.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; -import org.apache.commons.lang.NotImplementedException; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.compress.CompressedMatrixBlock; @@ -92,15 +91,19 @@ public class ExtractBlockForBinaryReblock implements PairFlatMapFunction<Tuple2< final int cixi = UtilFunctions.computeCellInBlock(rowLower, out_blen); final int cixj = UtilFunctions.computeCellInBlock(colLower, out_blen); - if(in instanceof CompressedMatrixBlock){ - blk.allocateSparseRowsBlock(false); - CLALibDecompress.decompressTo((CompressedMatrixBlock) in, blk, cixi, cixj, 1); - } - else if( aligned ) { - blk.appendToSparse(in, cixi, cixj); - blk.setNonZeros(in.getNonZeros()); + if( aligned ) { + if(in instanceof CompressedMatrixBlock){ + blk.allocateSparseRowsBlock(false); + CLALibDecompress.decompressTo((CompressedMatrixBlock) in, blk, cixi- aixi, cixj-aixj, 1); + }else{ + blk.appendToSparse(in, cixi, cixj); + blk.setNonZeros(in.getNonZeros()); + } } else { //general case + if(in instanceof CompressedMatrixBlock){ + in = CompressedMatrixBlock.getUncompressed(in); + } for(int i2 = 0; i2 <= (int)(rowUpper-rowLower); i2++) for(int j2 = 0; j2 <= (int)(colUpper-colLower); j2++) blk.appendValue(cixi+i2, cixj+j2, in.quickGetValue(aixi+i2, aixj+j2)); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/RDDAggregateUtils.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/RDDAggregateUtils.java index 3a3e9602cb..22b313f8e6 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/RDDAggregateUtils.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/RDDAggregateUtils.java @@ -704,32 +704,27 @@ public class RDDAggregateUtils } @Override - public MatrixBlock call(MatrixBlock b1, MatrixBlock b2) - throws Exception - { + public MatrixBlock call(MatrixBlock b1, MatrixBlock b2) throws Exception { long b1nnz = b1.getNonZeros(); long b2nnz = b2.getNonZeros(); - + // sanity check input dimensions - if (b1.getNumRows() != b2.getNumRows() || b1.getNumColumns() != b2.getNumColumns()) { - throw new DMLRuntimeException("Mismatched block sizes for: " - + b1.getNumRows() + " " + b1.getNumColumns() + " " - + b2.getNumRows() + " " + b2.getNumColumns()); + if(b1.getNumRows() != b2.getNumRows() || b1.getNumColumns() != b2.getNumColumns()) { + throw new DMLRuntimeException("Mismatched block sizes for: " + b1.getNumRows() + " " + b1.getNumColumns() + + " " + b2.getNumRows() + " " + b2.getNumColumns()); } // execute merge (never pass by reference) MatrixBlock ret = _deep ? new MatrixBlock(b1) : b1; ret.merge(b2, false, false, _deep); ret.examSparsity(); - + // sanity check output number of non-zeros - if (ret.getNonZeros() != b1nnz + b2nnz) { - throw new DMLRuntimeException("Number of non-zeros does not match: " - + ret.getNonZeros() + " != " + b1nnz + " + " + b2nnz); + if(ret.getNonZeros() != b1nnz + b2nnz) { + throw new DMLRuntimeException( + "Number of non-zeros does not match: " + ret.getNonZeros() + " != " + b1nnz + " + " + b2nnz); } - return ret; } - } } diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java index ad0ca1c776..b69fb99873 100644 --- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java +++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java @@ -1435,6 +1435,8 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab for(int k = apos; k < apos + alen; k++) if(avals[k] == 0) throw new RuntimeException("Wrong sparse row: zero at " + k); + if(aix[apos + alen-1] > clen) + throw new RuntimeException("Invalid offset outside of matrix"); } } diff --git a/src/test/java/org/apache/sysds/test/component/compress/io/IOTestUtils.java b/src/test/java/org/apache/sysds/test/component/compress/io/IOCompressionTestUtils.java similarity index 94% rename from src/test/java/org/apache/sysds/test/component/compress/io/IOTestUtils.java rename to src/test/java/org/apache/sysds/test/component/compress/io/IOCompressionTestUtils.java index fe5605ec31..785fc4444d 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/io/IOTestUtils.java +++ b/src/test/java/org/apache/sysds/test/component/compress/io/IOCompressionTestUtils.java @@ -28,14 +28,14 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.sysds.runtime.compress.io.ReaderCompressed; import org.apache.sysds.runtime.matrix.data.MatrixBlock; -public class IOTestUtils { +public class IOCompressionTestUtils { final static Object lock = new Object(); static final AtomicInteger id = new AtomicInteger(0); protected static void deleteDirectory(File file) { - synchronized(IOTestUtils.lock) { + synchronized(IOCompressionTestUtils.lock) { for(File subfile : file.listFiles()) { if(subfile.isDirectory()) deleteDirectory(subfile); @@ -59,7 +59,7 @@ public class IOTestUtils { // assertTrue("Disk size is not equivalent", a.getExactSizeOnDisk() > b.getExactSizeOnDisk()); } - protected static MatrixBlock read(String path) { + public static MatrixBlock read(String path) { try { return ReaderCompressed.readCompressedMatrixFromHDFS(path); } diff --git a/src/test/java/org/apache/sysds/test/component/compress/io/IOEmpty.java b/src/test/java/org/apache/sysds/test/component/compress/io/IOEmpty.java index 5f913c70d3..41e4f2e904 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/io/IOEmpty.java +++ b/src/test/java/org/apache/sysds/test/component/compress/io/IOEmpty.java @@ -39,18 +39,18 @@ public class IOEmpty { + IOEmpty.class.getSimpleName() + "/"; public IOEmpty() { - synchronized(IOTestUtils.lock) { + synchronized(IOCompressionTestUtils.lock) { new File(nameBeginning).mkdirs(); } } @AfterClass public static void cleanup() { - IOTestUtils.deleteDirectory(new File(nameBeginning)); + IOCompressionTestUtils.deleteDirectory(new File(nameBeginning)); } public static String getName() { - return IOTestUtils.getName(nameBeginning); + return IOCompressionTestUtils.getName(nameBeginning); } @Test @@ -65,8 +65,8 @@ public class IOEmpty { public void writeEmptyAndRead() { String n = getName(); write(n, 10, 10, 1000); - MatrixBlock mb = IOTestUtils.read(n); - IOTestUtils.verifyEquivalence(mb, new MatrixBlock(10, 10, 0.0)); + MatrixBlock mb = IOCompressionTestUtils.read(n); + IOCompressionTestUtils.verifyEquivalence(mb, new MatrixBlock(10, 10, 0.0)); } @Test @@ -83,8 +83,8 @@ public class IOEmpty { write(n, 1000, 10, 100); File f = new File(n); assertTrue(f.isDirectory() || f.isFile()); - MatrixBlock mb = IOTestUtils.read(n); - IOTestUtils.verifyEquivalence(mb, new MatrixBlock(1000, 10, 0.0)); + MatrixBlock mb = IOCompressionTestUtils.read(n); + IOCompressionTestUtils.verifyEquivalence(mb, new MatrixBlock(1000, 10, 0.0)); } protected static void write(String path, int nRows, int nCols, int blen) { diff --git a/src/test/java/org/apache/sysds/test/component/compress/io/IOSpark.java b/src/test/java/org/apache/sysds/test/component/compress/io/IOSpark.java index 61b699c254..847938bff3 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/io/IOSpark.java +++ b/src/test/java/org/apache/sysds/test/component/compress/io/IOSpark.java @@ -20,6 +20,7 @@ package org.apache.sysds.test.component.compress.io; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.File; @@ -31,14 +32,18 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.sysds.common.Types.FileFormat; import org.apache.sysds.common.Types.ValueType; +import org.apache.sysds.runtime.compress.io.CompressUnwrap; import org.apache.sysds.runtime.compress.io.CompressedWriteBlock; import org.apache.sysds.runtime.compress.io.WriterCompressed; import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; import org.apache.sysds.runtime.controlprogram.context.ExecutionContextFactory; import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysds.runtime.instructions.spark.utils.RDDConverterUtils; import org.apache.sysds.runtime.io.InputOutputInfo; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixIndexes; +import org.apache.sysds.runtime.meta.DataCharacteristics; +import org.apache.sysds.runtime.meta.MatrixCharacteristics; import org.apache.sysds.test.TestUtils; import org.junit.AfterClass; import org.junit.Test; @@ -54,11 +59,11 @@ public class IOSpark { @AfterClass public static void cleanup() { - IOTestUtils.deleteDirectory(new File(nameBeginning)); + IOCompressionTestUtils.deleteDirectory(new File(nameBeginning)); } private static String getName() { - return IOTestUtils.getName(nameBeginning); + return IOCompressionTestUtils.getName(nameBeginning); } @Test @@ -109,38 +114,202 @@ public class IOSpark { readWrite(mb); } - private void readWrite(MatrixBlock mb) { - double sum = mb.sum(); - String n = getName(); - try { - WriterCompressed.writeCompressedMatrixToHDFS(mb, n, 50); - } - catch(Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + @Test + public void writeSparkReadCPMultiColBlock() throws Exception { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(50, 124, 1, 3, 1.0, 2514)); + testWriteSparkReadCP(mb, 100, 100); + } + + @Test + public void writeSparkReadCPMultiRowBlock() throws Exception { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(1322, 33, 1, 3, 1.0, 2514)); + testWriteSparkReadCP(mb, 100, 100); + } + + @Test + public void writeSparkReadCPSingleBlock() throws Exception { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(50, 99, 1, 3, 1.0, 33)); + testWriteSparkReadCP(mb, 100, 100); + } + + @Test + public void writeSparkReadCPMultiBlock() throws Exception { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(580, 244, 1, 3, 1.0, 33)); + testWriteSparkReadCP(mb, 100, 100); + } + + @Test + public void writeSparkReadCPMultiColBlockReblockUp() throws Exception { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(50, 124, 1, 3, 1.0, 2514)); + testWriteSparkReadCP(mb, 100, 150); + } + + @Test + public void writeSparkReadCPMultiRowBlockReblockUp() throws Exception { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(1322, 33, 1, 3, 1.0, 2514)); + testWriteSparkReadCP(mb, 100, 150); + } + + @Test + public void writeSparkReadCPSingleBlockReblockUp() throws Exception { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(50, 99, 1, 3, 1.0, 33)); + testWriteSparkReadCP(mb, 100, 150); + } + + @Test + public void writeSparkReadCPMultiBlockReblockUp() throws Exception { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(580, 244, 1, 3, 1.0, 33)); + testWriteSparkReadCP(mb, 100, 150); + } + + @Test + public void writeSparkReadCPMultiColBlockReblockDown() throws Exception { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(50, 124, 1, 3, 1.0, 2514)); + testWriteSparkReadCP(mb, 100, 80); + } + + @Test + public void writeSparkReadCPMultiRowBlockReblockDown() throws Exception { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(1322, 33, 1, 3, 1.0, 2514)); + testWriteSparkReadCP(mb, 100, 80); + } + + @Test + public void writeSparkReadCPSingleBlockReblockDown() throws Exception { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(50, 99, 1, 3, 1.0, 33)); + testWriteSparkReadCP(mb, 100, 80); + } + + @Test + public void writeSparkReadCPMultiBlockReblockDown() throws Exception { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(580, 244, 1, 3, 1.0, 33)); + testWriteSparkReadCP(mb, 100, 80); + } + + private void testWriteSparkReadCP(MatrixBlock mb, int blen1, int blen2) throws Exception { + + String f1 = getName(); + WriterCompressed.writeCompressedMatrixToHDFS(mb, f1, blen1); + + // Make sure the first file is written + File f = new File(f1); + assertTrue(f.isFile() || f.isDirectory()); + // Read in again as RDD + JavaPairRDD<MatrixIndexes, MatrixBlock> m = getRDD(f1); + String f2 = getName(); // get new name for writing RDD. + + // Write RDD to disk + if(blen1 != blen2) { + DataCharacteristics mc = new MatrixCharacteristics(mb.getNumRows(), mb.getNumColumns(), blen1); + WriterCompressed.writeRDDToHDFS(m, f2, blen2, mc); } - verifySum(read(n), sum, 0.0001); + else + WriterCompressed.writeRDDToHDFS(m, f2); + + // Read locally the spark block written. + MatrixBlock mbr = IOCompressionTestUtils.read(f2); + IOCompressionTestUtils.verifyEquivalence(mb, mbr); } - private void verifySum(List<Tuple2<MatrixIndexes, MatrixBlock>> c, double val, double tol) { - double sum = 0.0; - for(Tuple2<MatrixIndexes, MatrixBlock> b : c) - sum += b._2().sum(); - assertEquals(val, sum, tol); + @Test + public void testReblock_up() throws Exception { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(50, 50, 1, 3, 1.0, 2514)); + testReblock(mb, 25, 50); } - private List<Tuple2<MatrixIndexes, MatrixBlock>> read(String n) { - JavaPairRDD<MatrixIndexes, MatrixBlock> m = getRDD(n).mapValues(x -> x.get()); - return m.collect(); + @Test + public void testReblock_up_2() throws Exception { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(50, 50, 1, 3, 1.0, 2514)); + testReblock(mb, 25, 55); + } + + @Test + public void testReblock_up_3() throws Exception { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(165, 110, 1, 3, 1.0, 2514)); + testReblock(mb, 25, 55); + } + + @Test + public void testReblock_up_4() throws Exception { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(165, 110, 1, 3, 1.0, 2514)); + testReblock(mb, 25, 100); + } + + @Test + public void testReblock_up_5() throws Exception { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(230, 401, 1, 3, 1.0, 2514)); + testReblock(mb, 25, 100); + } + + @Test + public void testReblock_down() throws Exception { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(50, 50, 1, 3, 1.0, 2514)); + testReblock(mb, 50, 25); + } + + @Test + public void testReblock_down_2() throws Exception { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(50, 50, 1, 3, 1.0, 2514)); + testReblock(mb, 55, 25); + } + + @Test + public void testReblock_down_3() throws Exception { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(165, 110, 1, 3, 1.0, 2514)); + testReblock(mb, 55, 25); + } + + @Test + public void testReblock_down_4() throws Exception { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(165, 110, 1, 3, 1.0, 2514)); + testReblock(mb, 100, 25); + } + + @Test + public void testReblock_down_5() throws Exception { + MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(230, 401, 1, 3, 1.0, 2514)); + testReblock(mb, 100, 25); + } + + private void testReblock(MatrixBlock mb, int blen1, int blen2) throws Exception { + String f1 = getName(); + WriterCompressed.writeCompressedMatrixToHDFS(mb, f1, blen1); + + // Read in again as RDD + JavaPairRDD<MatrixIndexes, MatrixBlock> m = getRDD(f1); // Our starting point + int nBlocksExpected = (1 + (mb.getNumColumns() - 1) / blen1) * (1 + (mb.getNumRows() - 1) / blen1); + int nBlocksActual = m.collect().size(); + assertEquals("Expected same number of blocks ", nBlocksExpected, nBlocksActual); + + DataCharacteristics mc = new MatrixCharacteristics(mb.getNumRows(), mb.getNumColumns(), blen1); + JavaPairRDD<MatrixIndexes, MatrixBlock> m2 = reblock(m, mc, blen2); + int nBlocksExpected2 = (1 + (mb.getNumColumns() - 1) / blen2) * (1 + (mb.getNumRows() - 1) / blen2); + int nBlocksActual2 = m2.collect().size(); + assertEquals("Expected same number of blocks on re-blocked", nBlocksExpected2, nBlocksActual2); + + double val = mb.sum(); + verifySum(m, val, 0.0000001); + verifySum(m2, val, 0.0000001); + + } + + private static JavaPairRDD<MatrixIndexes, MatrixBlock> reblock(JavaPairRDD<MatrixIndexes, MatrixBlock> in, + DataCharacteristics mc, int blen) { + final DataCharacteristics outC = new MatrixCharacteristics(mc).setBlocksize(blen); + return RDDConverterUtils.binaryBlockToBinaryBlock(in, mc, outC); + } + + private List<Tuple2<MatrixIndexes, MatrixBlock>> read(String n) { + return getRDD(n).collect(); } @SuppressWarnings({"unchecked"}) - private JavaPairRDD<MatrixIndexes, CompressedWriteBlock> getRDD(String path) { + private JavaPairRDD<MatrixIndexes, MatrixBlock> getRDD(String path) { InputOutputInfo inf = InputOutputInfo.CompressedInputOutputInfo; JavaSparkContext sc = SparkExecutionContext.getSparkContextStatic(); - return (JavaPairRDD<MatrixIndexes, CompressedWriteBlock>) sc.hadoopFile(path, inf.inputFormatClass, inf.keyClass, - inf.valueClass); + return ((JavaPairRDD<MatrixIndexes, CompressedWriteBlock>) sc.hadoopFile(path, inf.inputFormatClass, inf.keyClass, + inf.valueClass)).mapValues(new CompressUnwrap()); } @SuppressWarnings({"unchecked"}) @@ -158,13 +327,36 @@ public class IOSpark { JavaPairRDD<MatrixIndexes, MatrixBlock> m = (JavaPairRDD<MatrixIndexes, MatrixBlock>) ec .getRDDHandleForMatrixObject(obj, fmt); + List<Tuple2<MatrixIndexes, MatrixBlock>> c = m.collect(); + verifySum(c, mb.sum(), 0.0001); + } + catch(Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } - verifySum(m.collect(), mb.sum(), 0.0001); + private void readWrite(MatrixBlock mb) { + double sum = mb.sum(); + String n = getName(); + try { + WriterCompressed.writeCompressedMatrixToHDFS(mb, n, 50); } catch(Exception e) { e.printStackTrace(); fail(e.getMessage()); } + verifySum(read(n), sum, 0.0001); + } + + private void verifySum(JavaPairRDD<MatrixIndexes, MatrixBlock> m, double val, double tol) { + verifySum(m.collect(), val, tol); } + private void verifySum(List<Tuple2<MatrixIndexes, MatrixBlock>> c, double val, double tol) { + double sum = 0.0; + for(Tuple2<MatrixIndexes, MatrixBlock> b : c) + sum += b._2().sum(); + assertEquals(val, sum, tol); + } } diff --git a/src/test/java/org/apache/sysds/test/component/compress/io/IOTest.java b/src/test/java/org/apache/sysds/test/component/compress/io/IOTest.java index 90b23e49ab..11f28b6fd5 100644 --- a/src/test/java/org/apache/sysds/test/component/compress/io/IOTest.java +++ b/src/test/java/org/apache/sysds/test/component/compress/io/IOTest.java @@ -41,18 +41,18 @@ public class IOTest { + IOTest.class.getSimpleName() + "/"; public IOTest() { - synchronized(IOTestUtils.lock) { + synchronized(IOCompressionTestUtils.lock) { new File(nameBeginning).mkdirs(); } } @AfterClass public static void cleanup() { - IOTestUtils.deleteDirectory(new File(nameBeginning)); + IOCompressionTestUtils.deleteDirectory(new File(nameBeginning)); } public static String getName() { - return IOTestUtils.getName(nameBeginning); + return IOCompressionTestUtils.getName(nameBeginning); } @Test @@ -101,8 +101,8 @@ public class IOTest { WriterCompressed.writeCompressedMatrixToHDFS(mb, filename); File f = new File(filename); assertTrue(f.isFile() || f.isDirectory()); - MatrixBlock mbr = IOTestUtils.read(filename); - IOTestUtils.verifyEquivalence(mb, mbr); + MatrixBlock mbr = IOCompressionTestUtils.read(filename); + IOCompressionTestUtils.verifyEquivalence(mb, mbr); } protected static void write(MatrixBlock src, String path) { @@ -145,7 +145,7 @@ public class IOTest { WriterCompressed.writeCompressedMatrixToHDFS(mb, filename, blen); File f = new File(filename); assertTrue(f.isFile() || f.isDirectory()); - MatrixBlock mbr = IOTestUtils.read(filename); - IOTestUtils.verifyEquivalence(mb, mbr); + MatrixBlock mbr = IOCompressionTestUtils.read(filename); + IOCompressionTestUtils.verifyEquivalence(mb, mbr); } } diff --git a/src/test/java/org/apache/sysds/test/functions/io/compressed/WriteCompressedTest.java b/src/test/java/org/apache/sysds/test/functions/io/compressed/WriteCompressedTest.java new file mode 100644 index 0000000000..ba9e6f430f --- /dev/null +++ b/src/test/java/org/apache/sysds/test/functions/io/compressed/WriteCompressedTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.functions.io.compressed; + +import java.io.IOException; + +import org.apache.sysds.common.Types.ExecMode; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.test.AutomatedTestBase; +import org.apache.sysds.test.TestConfiguration; +import org.apache.sysds.test.TestUtils; +import org.apache.sysds.test.component.compress.io.IOCompressionTestUtils; +import org.junit.Test; + +/** + * JUnit Test cases to evaluate the functionality of reading CSV files. + * + * Test 1: write() w/ all properties. Test 2: read(format="csv") w/o mtd file. Test 3: read() w/ complete mtd file. + * + */ + +public class WriteCompressedTest extends AutomatedTestBase { + + private final static String TEST_NAME = "WriteCompressedTest"; + private final static String TEST_DIR = "functions/io/compressed/"; + private final static String TEST_CLASS_DIR = TEST_DIR + WriteCompressedTest.class.getSimpleName() + "/"; + + private final static double eps = 1e-9; + + @Override + public void setUp() { + addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"Rout"})); + } + + @Test + public void testCP() throws IOException { + runWriteTest(ExecMode.SINGLE_NODE); + } + + @Test + public void testHP() throws IOException { + runWriteTest(ExecMode.HYBRID); + } + + @Test + public void testSP() throws IOException { + runWriteTest(ExecMode.SPARK); + } + + private void runWriteTest(ExecMode platform) throws IOException { + runWriteTest(platform, 100, 100, 0, 0, 0.0); + } + + private void runWriteTest(ExecMode platform, int rows, int cols, int min, int max, double sparsity) + throws IOException { + + ExecMode oldPlatform = rtplatform; + rtplatform = platform; + + TestConfiguration config = getTestConfiguration(TEST_NAME); + loadTestConfiguration(config); + + String HOME = SCRIPT_DIR + TEST_DIR; + + fullDMLScriptName = HOME + TEST_NAME + ".dml"; + programArgs = new String[] {"-explain", "-args", "" + rows, "" + cols, "" + min, "" + max, "" + sparsity, + output("out.cla"), output("sum.scalar")}; + + runTest(null); + + double sumDML = TestUtils.readDMLScalar(output("sum.scalar")); + MatrixBlock mbr = IOCompressionTestUtils.read(output("out.cla")); + + TestUtils.compareScalars(sumDML, mbr.sum(), eps); + + rtplatform = oldPlatform; + } +} diff --git a/src/test/scripts/functions/io/compressed/WriteCompressedTest.dml b/src/test/scripts/functions/io/compressed/WriteCompressedTest.dml new file mode 100644 index 0000000000..6a51041016 --- /dev/null +++ b/src/test/scripts/functions/io/compressed/WriteCompressedTest.dml @@ -0,0 +1,26 @@ +#------------------------------------------------------------- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +#------------------------------------------------------------- + +m = rand(rows= $1, cols=$2, min=$3, max=$4, sparsity=$5) + +s = sum(m) +write(m, $6, format="compressed") +write(s, $7, format="csv")
