This is an automated email from the ASF dual-hosted git repository. baunsgaard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
commit d8efb74571b8523d48e4fa9aa90b6c14264226b4 Author: Sebastian Baunsgaard <[email protected]> AuthorDate: Wed Sep 6 16:09:46 2023 +0200 [SYSTEMDS-3601] Write Parallel Local FileSystem This commit change the parallel writing on local file systems to make multiple write files for smaller block sizes than HDFS is defaulted to. This make the writing to local files parallel in much smaller cases for instance it improve writing of a 10k by 100 matrix from 17 ms to 6.5 ms. Closes #1886 --- .../runtime/compress/io/WriterCompressed.java | 58 +++-- .../controlprogram/caching/FrameObject.java | 2 + .../parfor/stat/InfrastructureAnalyzer.java | 12 + .../apache/sysds/runtime/io/IOUtilFunctions.java | 6 + .../org/apache/sysds/runtime/io/MatrixWriter.java | 1 - .../apache/sysds/runtime/io/WriterBinaryBlock.java | 75 +++--- .../runtime/io/WriterBinaryBlockParallel.java | 58 ++--- .../sysds/runtime/meta/MatrixCharacteristics.java | 4 +- .../apache/sysds/runtime/util/DataConverter.java | 32 --- .../apache/sysds/runtime/util/UtilFunctions.java | 2 +- .../colgroup/scheme/CLAConstSchemeTest.java | 207 ---------------- .../compress/colgroup/scheme/CLADDCSchemeTest.java | 146 ------------ .../colgroup/scheme/CLAEmptySchemeTest.java | 263 --------------------- .../component/federated/FederatedTestUtils.java | 4 + .../test/component/misc/IOUtilFunctionsTest.java | 21 +- .../test/component/misc/NumPartsFilesTest.java | 88 +++++++ .../federated/primitives/FederatedRCBindTest.java | 4 +- 17 files changed, 227 insertions(+), 756 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java index 9f9a531a6d..744a5e945a 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java +++ b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java @@ -61,7 +61,12 @@ public final class WriterCompressed extends MatrixWriter { protected static final Log LOG = LogFactory.getLog(WriterCompressed.class.getName()); + protected static int jobUse = 0; + protected static JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); + private String fname; + + private FileSystem fs; private Future<Writer>[] writers; private Lock[] writerLocks; @@ -128,10 +133,20 @@ public final class WriterCompressed extends MatrixWriter { } private void write(MatrixBlock src, final String fname, final int blen) throws IOException { + jobUse ++; + if(jobUse > 30){ + job = new JobConf(ConfigurationManager.getCachedJobConf()); + jobUse = 0; + } + + if(this.fname != fname) { this.fname = fname; this.writers = null; } + + fs = IOUtilFunctions.getFileSystem(new Path(fname), job); + final int k = OptimizerUtils.getParallelBinaryWriteParallelism(); final int rlen = src.getNumRows(); final int clen = src.getNumColumns(); @@ -149,18 +164,20 @@ public final class WriterCompressed extends MatrixWriter { } private void writeSingleBlock(MatrixBlock b, int k) throws IOException { - Writer w = getWriter(fname); + final Path path = new Path(fname); + Writer w = generateWriter(job, path, fs); MatrixIndexes idx = new MatrixIndexes(1, 1); if(!(b instanceof CompressedMatrixBlock)) b = CompressedMatrixBlockFactory.compress(b, k).getLeft(); w.append(idx, new CompressedWriteBlock(b)); IOUtilFunctions.closeSilently(w); - cleanup(); + cleanup(path); } private void writeMultiBlockUncompressed(MatrixBlock b, final int rlen, final int clen, final int blen, int k) throws IOException { - Writer w = getWriter(fname); + final Path path = new Path(fname); + Writer w = generateWriter(job, path, fs); final MatrixIndexes indexes = new MatrixIndexes(); LOG.warn("Writing compressed format with non identical compression scheme"); @@ -178,7 +195,7 @@ public final class WriterCompressed extends MatrixWriter { } } IOUtilFunctions.closeSilently(w); - cleanup(); + cleanup(path); } private void writeMultiBlockCompressed(MatrixBlock b, final int rlen, final int clen, final int blen, int k) @@ -196,7 +213,8 @@ public final class WriterCompressed extends MatrixWriter { try { setupWrite(); - Writer w = getWriter(fname); + final Path path = new Path(fname); + Writer w = generateWriter(job, path, fs); for(int bc = 0; bc * blen < clen; bc++) {// column blocks final int sC = bc * blen; final int mC = Math.min(sC + blen, clen) - 1; @@ -212,6 +230,7 @@ public final class WriterCompressed extends MatrixWriter { } IOUtilFunctions.closeSilently(w); + cleanup(path); } catch(Exception e) { throw new IOException(e); @@ -235,7 +254,7 @@ public final class WriterCompressed extends MatrixWriter { final int j = i; if(writers[i] == null) { writers[i] = pool.submit(() -> { - return generateWriter(getPath(j)); + return generateWriter(job, getPath(j), fs); }); } writerLocks[i] = new ReentrantLock(); @@ -273,7 +292,7 @@ public final class WriterCompressed extends MatrixWriter { pool.submit(() -> { try { IOUtilFunctions.closeSilently(writers[l].get()); - cleanup(getPath(l)); + cleanup(job, getPath(l), fs); } catch(Exception e) { throw new RuntimeException(e); @@ -301,28 +320,24 @@ public final class WriterCompressed extends MatrixWriter { return new Path(fname, IOUtilFunctions.getPartFileName(id)); } - private Writer getWriter(String fname) throws IOException { - final Path path = new Path(fname); - return generateWriter(path); - } + // private Writer getWriter(String fname) throws IOException { + // final Path path = new Path(fname); + // return generateWriter(job, path); + // } - private static Writer generateWriter(Path path) throws IOException { - final JobConf job = ConfigurationManager.getCachedJobConf(); - // HDFSTool.deleteFileIfExistOnHDFS(path, job); + private static Writer generateWriter(JobConf job, Path path, FileSystem fs) throws IOException { + return SequenceFile.createWriter(job, Writer.file(path), Writer.bufferSize(4096), Writer.keyClass(MatrixIndexes.class), Writer.valueClass(CompressedWriteBlock.class), Writer.compression(SequenceFile.CompressionType.NONE), // No Compression type on disk Writer.replication((short) 1)); } - private void cleanup() throws IOException { - final Path path = new Path(fname); - cleanup(path); + private void cleanup(Path p) throws IOException { + cleanup(job, p, fs); } - private static void cleanup(Path path) throws IOException { - final JobConf job = ConfigurationManager.getCachedJobConf(); - final FileSystem fs = IOUtilFunctions.getFileSystem(path, job); + private static void cleanup(JobConf job, Path path, FileSystem fs) throws IOException { IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path); } @@ -384,7 +399,6 @@ public final class WriterCompressed extends MatrixWriter { @Override public Object call() throws Exception { - final JobConf job = ConfigurationManager.getCachedJobConf(); Path p = new Path(fname + ".dict", IOUtilFunctions.getPartFileName(id)); try(Writer w = SequenceFile.createWriter(job, Writer.file(p), // Writer.bufferSize(4096), // @@ -394,7 +408,7 @@ public final class WriterCompressed extends MatrixWriter { Writer.replication((short) 1))) { w.append(new DictWritable.K(id), new DictWritable(dicts)); } - cleanup(p); + cleanup(job, p, fs); return null; } diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java index d3e18f9e4e..945021626e 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java @@ -138,6 +138,8 @@ public class FrameObject extends CacheableData<FrameBlock> } public static ValueType[] parseSchema(String schema) { + if(schema == null) + return new ValueType[]{ValueType.STRING}; // parse given schema String[] parts = schema.split(DataExpression.DEFAULT_DELIM_DELIMITER); ValueType[] ret = new ValueType[parts.length]; diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java index d39c470a7a..3dd47d5faa 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java @@ -22,12 +22,16 @@ package org.apache.sysds.runtime.controlprogram.parfor.stat; import java.io.IOException; import java.util.StringTokenizer; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.ClusterStatus; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.hops.OptimizerUtils; import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext; +import org.apache.sysds.runtime.io.IOUtilFunctions; import org.apache.sysds.runtime.util.HDFSTool; import org.apache.sysds.runtime.util.UtilFunctions; @@ -194,6 +198,14 @@ public class InfrastructureAnalyzer return getLocalMaxMemory(); } + public static long getBlockSize(FileSystem fs){ + if(fs instanceof LocalFileSystem) + // 4 blocks per file at least + return 4096 * 4; + else + return getHDFSBlockSize(); + } + /** * Gets the HDFS blocksize of the used cluster in bytes. * diff --git a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java index b66f6a00b5..3735f9d7db 100644 --- a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java +++ b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java @@ -610,6 +610,12 @@ public class IOUtilFunctions return ret; } + public static void deleteCrcFilesFromLocalFileSystem( JobConf job, Path path) throws IOException { + final FileSystem fs = getFileSystem(path,job ); + deleteCrcFilesFromLocalFileSystem(fs, path); + } + + /** * Delete the CRC files from the local file system associated with a * particular file and its metadata file. diff --git a/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java b/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java index ae47e99e70..faddd66466 100644 --- a/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java +++ b/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java @@ -30,7 +30,6 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock; * write functionality but might provide additional custom functionality. Any non-default parameters * (e.g., CSV read properties) should be passed into custom constructors. There is also a factory * for creating format-specific writers. - * */ public abstract class MatrixWriter { protected static final Log LOG = LogFactory.getLog(MatrixWriter.class.getName()); diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java index 7991c5701f..e3dd3935c6 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java @@ -37,27 +37,35 @@ import org.apache.sysds.runtime.util.HDFSTool; public class WriterBinaryBlock extends MatrixWriter { protected int _replication = -1; + protected static int jobUse = 0; + protected static JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); + public WriterBinaryBlock(int replication) { _replication = replication; + + jobUse ++; + if(jobUse > 15){ + // job = new JobConf(); + job = new JobConf(ConfigurationManager.getCachedJobConf()); + jobUse = 0; + } } @Override public final void writeMatrixToHDFS(MatrixBlock src, String fname, long rlen, long clen, int blen, long nnz, boolean diag) throws IOException, DMLRuntimeException { // prepare file access - JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); Path path = new Path(fname); - FileSystem fs = IOUtilFunctions.getFileSystem(path, job); // if the file already exists on HDFS, remove it. - HDFSTool.deleteFileIfExistOnHDFS(fname); + HDFSTool.deleteFileIfExistOnHDFS(path, job); // set up preferred custom serialization framework for binary block format if(HDFSTool.USE_BINARYBLOCK_SERIALIZATION) HDFSTool.addBinaryBlockSerializationFramework(job); if(src instanceof CompressedMatrixBlock) { - if(ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE_BINARYFORMATS)){ + if(ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE_BINARYFORMATS)) { LOG.debug("Multi threaded decompression"); // parallel src = CompressedMatrixBlock.getUncompressed(src, "binary write", @@ -71,17 +79,15 @@ public class WriterBinaryBlock extends MatrixWriter { // core write sequential/parallel if(diag) - writeDiagBinaryBlockMatrixToHDFS(path, job, fs, src, rlen, clen, blen); + writeDiagBinaryBlockMatrixToHDFS(path, job, src, rlen, clen, blen); else - writeBinaryBlockMatrixToHDFS(path, job, fs, src, rlen, clen, blen); + writeBinaryBlockMatrixToHDFS(path, job, src, rlen, clen, blen); - IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path); } @Override public final void writeEmptyMatrixToHDFS(String fname, long rlen, long clen, int blen) throws IOException, DMLRuntimeException { - JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); Path path = new Path(fname); FileSystem fs = IOUtilFunctions.getFileSystem(path, job); final Writer writer = IOUtilFunctions.getSeqWriter(path, job, _replication); @@ -98,26 +104,22 @@ public class WriterBinaryBlock extends MatrixWriter { IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path); } - protected void writeBinaryBlockMatrixToHDFS(Path path, JobConf job, FileSystem fs, MatrixBlock src, long rlen, + protected void writeBinaryBlockMatrixToHDFS(Path path, JobConf job, MatrixBlock src, long rlen, long clen, int blen) throws IOException, DMLRuntimeException { // sequential write - writeBinaryBlockMatrixToSequenceFile(path, job, fs, src, blen, 0, (int) rlen); + writeBinaryBlockMatrixToSequenceFile(path, job, src, blen, 0, (int) rlen); + IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(job, path); } - protected final void writeBinaryBlockMatrixToSequenceFile(Path path, JobConf job, FileSystem fs, MatrixBlock src, + protected final void writeBinaryBlockMatrixToSequenceFile(Path path, JobConf job, MatrixBlock src, int blen, int rl, int ru) throws IOException { boolean sparse = src.isInSparseFormat(); - int rlen = src.getNumRows(); - int clen = src.getNumColumns(); - + final int rlen = src.getNumRows(); + final int clen = src.getNumColumns(); final Writer writer = IOUtilFunctions.getSeqWriter(path, job, _replication); - try { // 2) bound check for src block - if(src.getNumRows() > rlen || src.getNumColumns() > clen) { - throw new IOException("Matrix block [1:" + src.getNumRows() + ",1:" + src.getNumColumns() + "] " - + "out of overall matrix range [1:" + rlen + ",1:" + clen + "]."); - } - + try { + // 3) reblock and write MatrixIndexes indexes = new MatrixIndexes(); @@ -133,10 +135,9 @@ public class WriterBinaryBlock extends MatrixWriter { // create and write sub-blocks of matrix for(int blockRow = rl / blen; blockRow < (int) Math.ceil(ru / (double) blen); blockRow++) { - for(int blockCol = 0; blockCol < (int) Math.ceil(src.getNumColumns() / (double) blen); blockCol++) { - int maxRow = (blockRow * blen + blen < src.getNumRows()) ? blen : src.getNumRows() - blockRow * blen; - int maxCol = (blockCol * blen + blen < src.getNumColumns()) ? blen : src.getNumColumns() - - blockCol * blen; + for(int blockCol = 0; blockCol < (int) Math.ceil(clen / (double) blen); blockCol++) { + int maxRow = (blockRow * blen + blen < rlen) ? blen : rlen - blockRow * blen; + int maxCol = (blockCol * blen + blen < clen) ? blen : clen - blockCol * blen; int row_offset = blockRow * blen; int col_offset = blockCol * blen; @@ -144,7 +145,7 @@ public class WriterBinaryBlock extends MatrixWriter { // get reuse matrix block MatrixBlock block = getMatrixBlockForReuse(blocks, maxRow, maxCol, blen); - // copy submatrix to block + // copy sub matrix to block src.slice(row_offset, row_offset + maxRow - 1, col_offset, col_offset + maxCol - 1, block); // append block to sequence file @@ -162,16 +163,17 @@ public class WriterBinaryBlock extends MatrixWriter { } } - protected final void writeDiagBinaryBlockMatrixToHDFS(Path path, JobConf job, FileSystem fs, MatrixBlock src, - long rlen, long clen, int blen) throws IOException, DMLRuntimeException { + protected final void writeDiagBinaryBlockMatrixToHDFS(Path path, JobConf job, MatrixBlock src, + long rlen, long clen, int blen) throws IOException { boolean sparse = src.isInSparseFormat(); - + final int nRow = src.getNumRows(); + final int nCol = src.getNumColumns(); final Writer writer = IOUtilFunctions.getSeqWriter(path, job, _replication); try { // 2) bound check for src block - if(src.getNumRows() > rlen || src.getNumColumns() > clen) { - throw new IOException("Matrix block [1:" + src.getNumRows() + ",1:" + src.getNumColumns() + "] " + if(nRow > rlen || nCol > clen) { + throw new IOException("Matrix block [1:" + nRow + ",1:" + nCol + "] " + "out of overall matrix range [1:" + rlen + ",1:" + clen + "]."); } @@ -188,13 +190,12 @@ public class WriterBinaryBlock extends MatrixWriter { MatrixBlock[] blocks = createMatrixBlocksForReuse(rlen, clen, blen, sparse, src.getNonZeros()); MatrixBlock emptyBlock = new MatrixBlock(); - // create and write subblocks of matrix - for(int blockRow = 0; blockRow < (int) Math.ceil(src.getNumRows() / (double) blen); blockRow++) { + // create and write sub blocks of the matrix + for(int blockRow = 0; blockRow < (int) Math.ceil(nRow / (double) blen); blockRow++) { - for(int blockCol = 0; blockCol < (int) Math.ceil(src.getNumColumns() / (double) blen); blockCol++) { - int maxRow = (blockRow * blen + blen < src.getNumRows()) ? blen : src.getNumRows() - blockRow * blen; - int maxCol = (blockCol * blen + blen < src.getNumColumns()) ? blen : src.getNumColumns() - - blockCol * blen; + for(int blockCol = 0; blockCol < (int) Math.ceil(nCol / (double) blen); blockCol++) { + int maxRow = (blockRow * blen + blen < nRow) ? blen : nRow - blockRow * blen; + int maxCol = (blockCol * blen + blen < nCol) ? blen : nCol - blockCol * blen; MatrixBlock block = null; if(blockRow == blockCol) { // block on diagonal @@ -204,7 +205,7 @@ public class WriterBinaryBlock extends MatrixWriter { // get reuse matrix block block = getMatrixBlockForReuse(blocks, maxRow, maxCol, blen); - // copy submatrix to block + // copy sub matrix to block src.slice(row_offset, row_offset + maxRow - 1, col_offset, col_offset + maxCol - 1, block); } else { // empty block (not on diagonal) diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlockParallel.java b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlockParallel.java index e483bbd6ff..ca79c8daf3 100644 --- a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlockParallel.java +++ b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlockParallel.java @@ -21,22 +21,18 @@ package org.apache.sysds.runtime.io; import java.io.IOException; import java.util.ArrayList; -import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; -import org.apache.sysds.conf.DMLConfig; import org.apache.sysds.hops.OptimizerUtils; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.util.CommonThreadPool; -import org.apache.sysds.runtime.util.HDFSTool; public class WriterBinaryBlockParallel extends WriterBinaryBlock { @@ -45,71 +41,68 @@ public class WriterBinaryBlockParallel extends WriterBinaryBlock } @Override - protected void writeBinaryBlockMatrixToHDFS( Path path, JobConf job, FileSystem fs, MatrixBlock src, long rlen, long clen, int blen ) + protected void writeBinaryBlockMatrixToHDFS( Path path, JobConf job, MatrixBlock src, long rlen, long clen, int blen ) throws IOException, DMLRuntimeException { //estimate output size and number of output blocks (min 1) - int numPartFiles = (int)(OptimizerUtils.estimatePartitionedSizeExactSparsity(rlen, clen, - blen, src.getNonZeros()) / InfrastructureAnalyzer.getHDFSBlockSize()); - numPartFiles = Math.max(numPartFiles, 1); - + int numPartFiles = numPartsFiles(path.getFileSystem(job), rlen, clen, blen , src.getNonZeros()); + //determine degree of parallelism int numThreads = OptimizerUtils.getParallelBinaryWriteParallelism(); numThreads = Math.min(numThreads, numPartFiles); //fall back to sequential write if dop is 1 (e.g., <128MB) in order to create single file if( numThreads <= 1 ) { - super.writeBinaryBlockMatrixToHDFS(path, job, fs, src, rlen, clen, blen); + super.writeBinaryBlockMatrixToHDFS(path, job, src, rlen, clen, blen); return; } //create directory for concurrent tasks - HDFSTool.createDirIfNotExistOnHDFS(path, DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); + // HDFSTool.createDirIfNotExistOnHDFS(path, DMLConfig.DEFAULT_SHARED_DIR_PERMISSION); //create and execute write tasks - try - { - ExecutorService pool = CommonThreadPool.get(numThreads); + final ExecutorService pool = CommonThreadPool.get(numThreads); + try { ArrayList<WriteFileTask> tasks = new ArrayList<>(); int blklen = (int)Math.ceil((double)rlen / blen / numThreads) * blen; for(int i=0; i<numThreads & i*blklen<rlen; i++) { Path newPath = new Path(path, IOUtilFunctions.getPartFileName(i)); - tasks.add(new WriteFileTask(newPath, job, fs, src, i*blklen, Math.min((i+1)*blklen, rlen), blen)); + tasks.add(new WriteFileTask(newPath, job, src, i*blklen, Math.min((i+1)*blklen, rlen), blen)); } - //wait until all tasks have been executed - List<Future<Object>> rt = pool.invokeAll(tasks); - pool.shutdown(); - //check for exceptions - for( Future<Object> task : rt ) + for( Future<Object> task : pool.invokeAll(tasks) ) task.get(); - // delete crc files if written to local file system - if (fs instanceof LocalFileSystem) { - for(int i=0; i<numThreads & i*blklen<rlen; i++) - IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, - new Path(path, IOUtilFunctions.getPartFileName(i))); - } } catch (Exception e) { throw new IOException("Failed parallel write of binary block input.", e); } + finally{ + pool.shutdown(); + } + } + + public static int numPartsFiles(FileSystem fs, long rlen, long clen, long blen, long nZeros) { + int numPartFiles = (int) (OptimizerUtils.estimatePartitionedSizeExactSparsity(rlen, clen, blen, nZeros) / + InfrastructureAnalyzer.getBlockSize(fs)); + numPartFiles = Math.max(numPartFiles, 1); + numPartFiles = Math.min(numPartFiles, + (int) (Math.ceil((double) rlen / blen) * Math.ceil((double) clen / blen))); + return numPartFiles; } private class WriteFileTask implements Callable<Object> { private Path _path = null; private JobConf _job = null; - private FileSystem _fs = null; private MatrixBlock _src = null; private long _rl = -1; private long _ru = -1; private int _blen = -1; - public WriteFileTask(Path path, JobConf job, FileSystem fs, MatrixBlock src, long rl, long ru, int blen) { + public WriteFileTask(Path path, JobConf job, MatrixBlock src, long rl, long ru, int blen) { _path = path; - _fs = fs; _job = job; _src = src; _rl = rl; @@ -118,10 +111,9 @@ public class WriterBinaryBlockParallel extends WriterBinaryBlock } @Override - public Object call() - throws Exception - { - writeBinaryBlockMatrixToSequenceFile(_path, _job, _fs, _src, _blen, (int)_rl, (int)_ru); + public Object call() throws Exception { + writeBinaryBlockMatrixToSequenceFile(_path, _job, _src, _blen, (int) _rl, (int) _ru); + IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(_job, _path); return null; } } diff --git a/src/main/java/org/apache/sysds/runtime/meta/MatrixCharacteristics.java b/src/main/java/org/apache/sysds/runtime/meta/MatrixCharacteristics.java index aeacc1c064..82c3aba08d 100644 --- a/src/main/java/org/apache/sysds/runtime/meta/MatrixCharacteristics.java +++ b/src/main/java/org/apache/sysds/runtime/meta/MatrixCharacteristics.java @@ -43,11 +43,11 @@ public class MatrixCharacteristics extends DataCharacteristics public MatrixCharacteristics() {} public MatrixCharacteristics(long nr, long nc) { - set(nr, nc, -1, -1); + set(nr, nc, ConfigurationManager.getBlocksize(), -1); } public MatrixCharacteristics(long nr, long nc, long nnz) { - set(nr, nc, -1, nnz); + set(nr, nc, ConfigurationManager.getBlocksize(), nnz); } public MatrixCharacteristics(long nr, long nc, int blen) { diff --git a/src/main/java/org/apache/sysds/runtime/util/DataConverter.java b/src/main/java/org/apache/sysds/runtime/util/DataConverter.java index 240c5ba976..72428163f4 100644 --- a/src/main/java/org/apache/sysds/runtime/util/DataConverter.java +++ b/src/main/java/org/apache/sysds/runtime/util/DataConverter.java @@ -79,7 +79,6 @@ import org.apache.sysds.runtime.meta.DataCharacteristics; * This class provides methods to read and write matrix blocks from to HDFS using different data formats. * Those functionalities are used especially for CP read/write and exporting in-memory matrices to HDFS * (before executing MR jobs). - * */ public class DataConverter { private static final String DELIM = " "; @@ -112,20 +111,6 @@ public class DataConverter { writer.writeTensorToHDFS(tensor, dir, blen); } - public static MatrixBlock readMatrixFromHDFS(String dir, FileFormat fmt, long rlen, long clen, int blen, boolean localFS) - throws IOException - { - ReadProperties prop = new ReadProperties(); - - prop.path = dir; - prop.fmt = fmt; - prop.rlen = rlen; - prop.clen = clen; - prop.blen = blen; - prop.localFS = localFS; - - return readMatrixFromHDFS(prop); - } public static MatrixBlock readMatrixFromHDFS(String dir, FileFormat fmt, long rlen, long clen, int blen) throws IOException @@ -156,23 +141,6 @@ public class DataConverter { return readMatrixFromHDFS(prop); } - public static MatrixBlock readMatrixFromHDFS(String dir, FileFormat fmt, long rlen, long clen, - int blen, long expectedNnz, boolean localFS) - throws IOException - { - ReadProperties prop = new ReadProperties(); - - prop.path = dir; - prop.fmt = fmt; - prop.rlen = rlen; - prop.clen = clen; - prop.blen = blen; - prop.expectedNnz = expectedNnz; - prop.localFS = localFS; - - return readMatrixFromHDFS(prop); - } - public static MatrixBlock readMatrixFromHDFS(String dir, FileFormat fmt, long rlen, long clen, int blen, long expectedNnz, FileFormatProperties formatProperties) throws IOException diff --git a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java index 38d8458a0e..74cf1d45ef 100644 --- a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java +++ b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java @@ -1296,7 +1296,7 @@ public class UtilFunctions { } public static int getEndIndex(int arrayLength, int startIndex, int blockSize){ - return (blockSize <= 0)? arrayLength: Math.min(arrayLength, startIndex + blockSize); + return blockSize <= 0 ? arrayLength : Math.min(arrayLength, startIndex + blockSize); } public static int[] getBlockSizes(int num, int numBlocks){ diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLAConstSchemeTest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLAConstSchemeTest.java deleted file mode 100644 index 31b78c959d..0000000000 --- a/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLAConstSchemeTest.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysds.test.component.compress.colgroup.scheme; - -import static org.junit.Assert.assertTrue; - -import org.apache.sysds.runtime.compress.colgroup.AColGroup; -import org.apache.sysds.runtime.compress.colgroup.ColGroupConst; -import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; -import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme; -import org.apache.sysds.runtime.data.DenseBlockFP64; -import org.apache.sysds.runtime.matrix.data.MatrixBlock; -import org.junit.Test; - -public class CLAConstSchemeTest { - - private final AColGroup g; - private final ICLAScheme sh; - - public CLAConstSchemeTest() { - g = ColGroupConst.create(// - ColIndexFactory.create(new int[] {1, 3, 5}), // Columns - new double[] {1.1, 1.2, 1.3} // Values - ); - sh = g.getCompressionScheme(); - } - - @Test - public void testConstValid() { - assertTrue(sh != null); - } - - - - @Test - public void testValidEncodeSingleRow() { - assertTrue(sh.encode(new MatrixBlock(1, 5, new double[] {// - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3})) != null); - } - - @Test - public void testValidEncodeMultiRow() { - assertTrue(sh.encode(new MatrixBlock(2, 6, new double[] {// - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - })) != null); - } - - @Test - public void testValidEncodeMultiRowsLarger() { - assertTrue(sh.encode(new MatrixBlock(2, 10, new double[] {// - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, 1, 1, 1, 1, // - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, 1, 1, 1, 1, // - })) != null); - } - - @Test - public void testInvalidEncodeMultiRowsValue() { - assertTrue(sh.encode(new MatrixBlock(4, 6, new double[] {// - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - })) != null); - } - - @Test - public void testValidEncodeMultiRowDifferentValuesOtherColumns() { - assertTrue(sh.encode(new MatrixBlock(4, 6, new double[] {// - 0.2, 1.1, 0.4, 1.2, 0.3, 1.3, // - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - 0.0, 1.1, 0.2, 1.2, 0.1, 1.3, // - 0.2, 1.1, 0.4, 1.2, 0.1, 1.3, // - })) != null); - } - - - - - @Test - public void testEncodeOtherColumns() { - assertTrue(sh.encode(new MatrixBlock(4, 5, new double[] {// - 1.1, 0.2, 1.2, 0.2, 1.3, // - 1.1, 0.2, 1.2, 0.2, 1.3, // - 1.1, 0.2, 1.2, 0.2, 1.3, // - 1.1, 0.2, 1.2, 0.2, 1.3, // - }), ColIndexFactory.create(new int[] {0, 2, 4})) != null); - } - - - - @Test(expected = IllegalArgumentException.class) - public void testInvalidArgument_1() { - sh.encode(null, ColIndexFactory.create(new int[] {0, 2, 4, 5})); - } - - @Test(expected = IllegalArgumentException.class) - public void testInvalidArgument_2() { - sh.encode(null, ColIndexFactory.create(new int[] {0, 2})); - } - - @Test - public void testSparse() { - MatrixBlock mb = new MatrixBlock(4, 6, new double[] {// - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - }); - - MatrixBlock empty = new MatrixBlock(4, 1000, 0.0); - mb = mb.append(empty); - - assertTrue(sh.encode(mb) != null); - } - - - @Test - public void testSparseValidCustom() { - MatrixBlock mb = new MatrixBlock(4, 6, new double[] {// - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - }); - - MatrixBlock empty = new MatrixBlock(4, 1000, 0.0); - mb = empty.append(mb); - - assertTrue(sh.encode(mb, ColIndexFactory.create(new int[] {1001, 1003, 1005})) != null); - } - - @Test - public void testSparseValidCustom2() { - MatrixBlock mb = new MatrixBlock(4, 6, new double[] {// - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - }); - - MatrixBlock empty = new MatrixBlock(4, 1000, 0.0); - MatrixBlock comb = empty.append(mb).append(mb); - - assertTrue(sh.encode(comb, ColIndexFactory.create(new int[] {1001, 1003, 1005})) != null); - } - - - - - - @Test - public void testGenericNonContinuosBlockValid() { - MatrixBlock mb = new MatrixBlock(4, 6, // - new DenseBlockFP64Mock(new int[] {4, 6}, new double[] {// - 0.2, 1.1, 0.4, 1.2, 0.3, 1.3, // - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - 0.0, 1.1, 0.2, 1.2, 0.1, 1.3, // - 0.2, 1.1, 0.4, 1.2, 0.1, 1.3, // - })); - mb.recomputeNonZeros(); - assertTrue(sh.encode(mb) != null); - } - - - - @Test(expected = NullPointerException.class) - public void testNull() { - sh.encode(null, null); - } - - private class DenseBlockFP64Mock extends DenseBlockFP64 { - private static final long serialVersionUID = -3601232958390554672L; - - public DenseBlockFP64Mock(int[] dims, double[] data) { - super(dims, data); - } - - @Override - public boolean isContiguous() { - return false; - } - - @Override - public int numBlocks() { - return 2; - } - } - -} diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLADDCSchemeTest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLADDCSchemeTest.java deleted file mode 100644 index 0c40209f9c..0000000000 --- a/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLADDCSchemeTest.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysds.test.component.compress.colgroup.scheme; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.util.EnumSet; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.sysds.runtime.compress.CompressionSettings; -import org.apache.sysds.runtime.compress.CompressionSettingsBuilder; -import org.apache.sysds.runtime.compress.colgroup.AColGroup; -import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType; -import org.apache.sysds.runtime.compress.colgroup.ColGroupDDC; -import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory; -import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; -import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; -import org.apache.sysds.runtime.compress.colgroup.indexes.RangeIndex; -import org.apache.sysds.runtime.compress.colgroup.scheme.DDCScheme; -import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme; -import org.apache.sysds.runtime.compress.estim.ComEstExact; -import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo; -import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; -import org.apache.sysds.runtime.matrix.data.MatrixBlock; -import org.apache.sysds.test.TestUtils; -import org.junit.Test; - -public class CLADDCSchemeTest { - protected final Log LOG = LogFactory.getLog(CLADDCSchemeTest.class.getName()); - - final MatrixBlock src; - final AColGroup g; - final ICLAScheme sh; - - public CLADDCSchemeTest() { - src = TestUtils.round(TestUtils.generateTestMatrixBlock(1023, 3, 0, 3, 0.9, 7)); - - IColIndex colIndexes = ColIndexFactory.create(3); - CompressionSettings cs = new CompressionSettingsBuilder().setSamplingRatio(1.0) - .setValidCompressions(EnumSet.of(CompressionType.DDC)).create(); - final CompressedSizeInfoColGroup cgi = new ComEstExact(src, cs).getColGroupInfo(colIndexes); - - final CompressedSizeInfo csi = new CompressedSizeInfo(cgi); - final List<AColGroup> groups = ColGroupFactory.compressColGroups(src, csi, cs, 1); - g = groups.get(0); - sh = g.getCompressionScheme(); - } - - @Test(expected = IllegalArgumentException.class) - public void testInvalidColumnApply() { - sh.encode(null, ColIndexFactory.create(new int[] {1, 2})); - } - - @Test(expected = IllegalArgumentException.class) - public void testInvalidColumnApply_2() { - sh.encode(null, ColIndexFactory.create(new int[] {1, 2, 5, 5})); - } - - @Test(expected = NullPointerException.class) - public void testNull() { - sh.encode(null, null); - } - - @Test - public void testEncode() { - assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(2, 3, 0, 3, 0.9, 7))) != null); - } - - @Test - public void testEncode_sparse() { - assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(2, 100, 0, 3, 0.01, 7))) != null); - } - - @Test - public void testEncodeSparseDifferentColumns() { - assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(2, 100, 0, 3, 0.01, 7)), - ColIndexFactory.create(new int[] {13, 16, 30})) != null); - } - - @Test - public void testEncodeSparseDifferentColumns2() { - assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(2, 100, 0, 3, 0.01, 7)), - ColIndexFactory.create(new int[] {15, 16, 99})) != null); - } - - @Test - public void testEncodeSparseDifferentColumns3() { - assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(2, 100, 0, 3, 0.01, 7)), - ColIndexFactory.create(new int[] {15, 86, 99})) != null); - } - - @Test - public void testEncodeDenseDifferentColumns() { - assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(2, 100, 0, 3, 0.86, 7)), - ColIndexFactory.create(new int[] {13, 16, 30})) != null); - } - - @Test - public void testEncodeDenseDifferentColumns2() { - assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(2, 100, 0, 3, 0.86, 7)), - ColIndexFactory.create(new int[] {15, 16, 99})) != null); - } - - @Test - public void testEncodeDenseDifferentColumns3() { - assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(2, 100, 0, 3, 0.86, 7)), - ColIndexFactory.create(new int[] {15, 86, 99})) != null); - } - - @Test - public void testEncodeFromColumns() { - try { - - DDCScheme s = DDCScheme.create(new RangeIndex(2)); - MatrixBlock m = TestUtils.round(TestUtils.generateTestMatrixBlock(50, 3, 0, 2, 0.9, 7)); - s.update(m); - AColGroup g = s.encode(m); - assertTrue(g instanceof ColGroupDDC); - } - - catch(Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } -} diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLAEmptySchemeTest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLAEmptySchemeTest.java deleted file mode 100644 index b27dd42c99..0000000000 --- a/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLAEmptySchemeTest.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysds.test.component.compress.colgroup.scheme; - -import static org.junit.Assert.assertTrue; - -import org.apache.sysds.runtime.compress.colgroup.AColGroup; -import org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty; -import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; -import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme; -import org.apache.sysds.runtime.data.DenseBlockFP64; -import org.apache.sysds.runtime.matrix.data.MatrixBlock; -import org.junit.Test; - -public class CLAEmptySchemeTest { - - private final AColGroup g; - private final ICLAScheme sh; - - public CLAEmptySchemeTest() { - g = new ColGroupEmpty(// - ColIndexFactory.create(new int[] {1, 3, 5}) // Columns - ); - sh = g.getCompressionScheme(); - } - - @Test - public void testConstValid() { - assertTrue(sh != null); - } - - @Test(expected = IllegalArgumentException.class) - public void testToSmallMatrix() { - sh.encode(new MatrixBlock(1, 3, new double[] {// - 1.1, 1.2, 1.3})); - } - - @Test - public void testValidEncodeSingleRow() { - assertTrue(sh.encode(new MatrixBlock(1, 6, new double[] {// - 0.1, 0.0, 0.04, 0.0, 0.03, 0.0})) != null); - } - - @Test - public void testValidEncodeMultiRow() { - assertTrue(sh.encode(new MatrixBlock(2, 6, new double[] {// - 132, 0.0, 241, 0.0, 142, 0.0, // - 132, 0.0, 241, 0.0, 142, 0.0, // - })) != null); - } - - @Test - public void testValidEncodeMultiRowsLarger() { - assertTrue(sh.encode(new MatrixBlock(2, 10, new double[] {// - 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1, 1, 1, 1, // - 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1, 1, 1, 1, // - })) != null); - } - - @Test - public void testValidEncodeMultiRowDifferentValuesOtherColumns() { - assertTrue(sh.encode(new MatrixBlock(4, 12, new double[] {// - 0.2, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.1, 0.4, 1.2, 0.3, 1.3, // - 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.1, 0.2, 1.2, 0.1, 1.3, // - 0.2, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.1, 0.4, 1.2, 0.1, 1.3, // - })) != null); - } - - @Test - public void testEncodeOtherColumnsValid() { - assertTrue(sh.encode(new MatrixBlock(4, 8, new double[] {// - 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, // - 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, // - 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, // - 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, // - }), ColIndexFactory.create(new int[] {0, 2, 4})// other columns - ) != null); - } - - @Test(expected = IllegalArgumentException.class) - public void testInvalidArgument_1() { - sh.encode(null, ColIndexFactory.create(new int[] {0, 2, 4, 5})); - } - - @Test(expected = IllegalArgumentException.class) - public void testInvalidArgument_2() { - sh.encode(null, ColIndexFactory.create(new int[] {0, 2})); - } - - @Test - public void testSparse() { - MatrixBlock mb = new MatrixBlock(4, 6, new double[] {// - 0.01, 0.0, 0.2, 0.0, 0.2, 0.0, // - 0.01, 0.0, 0.2, 0.0, 0.2, 0.0, // - 0.01, 0.0, 0.2, 0.0, 0.2, 0.0, // - 0.01, 0.0, 0.2, 0.0, 0.2, 0.0, // - }); - - MatrixBlock empty = new MatrixBlock(4, 1000, 0.0); - mb = mb.append(empty); - - assertTrue(sh.encode(mb) != null); - } - - @Test - public void testSpars_AllCosOver() { - MatrixBlock mb = new MatrixBlock(4, 6, new double[] {// - 0.01, 0.0, 0.2, 0.0, 0.2, 0.0, // - 0.01, 0.0, 0.2, 0.0, 0.2, 0.0, // - 0.01, 0.0, 0.2, 0.0, 0.2, 0.0, // - 0.01, 0.0, 0.2, 0.0, 0.2, 0.0, // - }); - - MatrixBlock empty = new MatrixBlock(4, 1000, 0.0); - mb = mb.append(empty); - - assertTrue(sh.encode(mb, ColIndexFactory.create(new int[] {100, 102, 999})) != null); - } - - @Test - public void testSparse_Append() { - MatrixBlock mb = new MatrixBlock(4, 6, new double[] {// - 0.0, 0.0, 0.2, 0.0, 0.2, 1.3, // - 0.0, 0.0, 0.2, 0.0, 0.2, 1.3, // - 0.0, 0.0, 0.2, 0.0, 0.2, 1.3, // - 0.0, 0.0, 0.2, 0.0, 0.2, 1.3, // - }); - - MatrixBlock empty = new MatrixBlock(4, 1000, 0.0); - mb = empty.append(mb); - - assertTrue(sh.encode(mb) != null); - } - - @Test - public void testSparseValidCustom() { - MatrixBlock mb = new MatrixBlock(4, 9, new double[] {// - 0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, // - 0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, // - 0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, // - 0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, // - }); - - MatrixBlock empty = new MatrixBlock(4, 1000, 0.0); - mb = empty.append(mb); - - assertTrue(sh.encode(mb, ColIndexFactory.create(new int[] {1001, 1003, 1005})) != null); - } - - @Test - public void testSparseValidCustom2() { - MatrixBlock mb = new MatrixBlock(4, 9, new double[] {// - 0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, // - 0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, // - 0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, // - 0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, // - }); - - MatrixBlock empty = new MatrixBlock(4, 1000, 0.0); - MatrixBlock comb = empty.append(mb).append(mb); - - assertTrue(sh.encode(comb, ColIndexFactory.create(new int[] {1001, 1003, 1005})) != null); - } - - @Test - public void testSparseValidCustom3Valid() { - MatrixBlock mb = new MatrixBlock(4, 9, new double[] {// - 0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, // - 0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.33, 0.2, 1.3, // - 0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, // - 0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, // - }); - - MatrixBlock empty = new MatrixBlock(4, 1000, 0.0); - MatrixBlock comb = empty.append(mb).append(mb); - - assertTrue(sh.encode(comb, ColIndexFactory.create(new int[] {1001, 1003, 1005})) != null); - } - - @Test - public void testSparseEmptyRow() { - MatrixBlock mb = new MatrixBlock(4, 6, new double[] {// - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - 0.0, 1.1, 0.2, 1.2, 0.2, 1.3, // - }); - - MatrixBlock empty = new MatrixBlock(4, 1000, 0.0); - mb = empty.append(mb); - MatrixBlock emptyRow = new MatrixBlock(1, 1006, 0.0); - mb = mb.append(emptyRow, false); - - assertTrue(sh.encode(mb, ColIndexFactory.create(new int[] {44, 45, 999})) != null); - } - - @Test - public void testEmpty() { - MatrixBlock empty = new MatrixBlock(4, 1000, 0.0); - assertTrue(sh.encode(empty) != null); - } - - @Test - public void testEmptyOtherColumns() { - MatrixBlock empty = new MatrixBlock(4, 1000, 0.0); - assertTrue(sh.encode(empty, ColIndexFactory.create(new int[] {33, 34, 99})) != null); - } - - @Test - public void testGenericNonContinuosBlockValid() { - MatrixBlock mb = new MatrixBlock(4, 6, // - new DenseBlockFP64Mock(new int[] {4, 9}, new double[] {// - 0.2, 0.0, 1.1, 0.0, 0.4, 0.0, 1.2, 0.3, 1.3, // - 0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, // - 0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.1, 1.3, // - 0.2, 0.0, 1.1, 0.0, 0.4, 0.0, 1.2, 0.1, 1.3, // - })); - mb.recomputeNonZeros(); - assertTrue(sh.encode(mb) != null); - } - - @Test(expected = NullPointerException.class) - public void testNull() { - sh.encode(null, null); - } - - private class DenseBlockFP64Mock extends DenseBlockFP64 { - private static final long serialVersionUID = -3601232958390554672L; - - public DenseBlockFP64Mock(int[] dims, double[] data) { - super(dims, data); - } - - @Override - public boolean isContiguous() { - return false; - } - - @Override - public int numBlocks() { - return 2; - } - } - -} diff --git a/src/test/java/org/apache/sysds/test/component/federated/FederatedTestUtils.java b/src/test/java/org/apache/sysds/test/component/federated/FederatedTestUtils.java index 8bacebdef8..595fa6799f 100644 --- a/src/test/java/org/apache/sysds/test/component/federated/FederatedTestUtils.java +++ b/src/test/java/org/apache/sysds/test/component/federated/FederatedTestUtils.java @@ -26,6 +26,8 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.sysds.common.Types.DataType; import org.apache.sysds.runtime.controlprogram.federated.FederatedData; import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest; @@ -38,6 +40,7 @@ import org.apache.sysds.runtime.instructions.cp.ScalarObject; import org.apache.sysds.runtime.matrix.data.MatrixBlock; public class FederatedTestUtils { + protected static final Log LOG = LogFactory.getLog(FederatedTestUtils.class.getName()); public static long putDouble(double v, InetSocketAddress addr) { return putDouble(v, addr, 5000); @@ -96,6 +99,7 @@ public class FederatedTestUtils { final FederatedRequest frq = new FederatedRequest(RequestType.PUT_VAR, null, id, mb); final Future<FederatedResponse> fr = FederatedData.executeFederatedOperation(addr, frq); final FederatedResponse r = fr.get(timeout, TimeUnit.MILLISECONDS); + LOG.error(r); if(r.isSuccessful()) return id; else diff --git a/src/test/java/org/apache/sysds/test/component/misc/IOUtilFunctionsTest.java b/src/test/java/org/apache/sysds/test/component/misc/IOUtilFunctionsTest.java index 92d18af25b..4aa16db209 100644 --- a/src/test/java/org/apache/sysds/test/component/misc/IOUtilFunctionsTest.java +++ b/src/test/java/org/apache/sysds/test/component/misc/IOUtilFunctionsTest.java @@ -80,15 +80,15 @@ public class IOUtilFunctionsTest { @Test(expected = StringIndexOutOfBoundsException.class) public void splitCustom_3() { // try{ - String in = "aaaaaa \"\"\" abb"; - String[] ret = IOUtilFunctions.splitCSV(in, " "); - assertArrayEquals(new String[] {"aaaaaa", "\"\"\"", "abb"}, ret); + String in = "aaaaaa \"\"\" abb"; + String[] ret = IOUtilFunctions.splitCSV(in, " "); + assertArrayEquals(new String[] {"aaaaaa", "\"\"\"", "abb"}, ret); // } // catch(Exception e){ - // e.printStackTrace(); - // throw e; - // fail(e.getMessage()); + // e.printStackTrace(); + // throw e; + // fail(e.getMessage()); // } } @@ -108,9 +108,9 @@ public class IOUtilFunctionsTest { // @Test // public void splitCustom_6() { - // String in = "aaaaaa \"\"\""; - // String[] ret = IOUtilFunctions.splitCSV(in, " "); - // assertArrayEquals(new String[] {"aaaaaa", "\""}, ret); + // String in = "aaaaaa \"\"\""; + // String[] ret = IOUtilFunctions.splitCSV(in, " "); + // assertArrayEquals(new String[] {"aaaaaa", "\""}, ret); // } @Test @@ -212,10 +212,11 @@ public class IOUtilFunctionsTest { } @Test - public void splitCustom_fromRddTest(){ + public void splitCustom_fromRddTest() { String in = "aaa,\"\"\",,,b,,\",\"c,c,c\""; String[] ret = IOUtilFunctions.splitCSV(in, ",", null); assertArrayEquals(new String[] {"aaa", "\"\"\",,,b,,\"", "\"c,c,c\""}, ret); } + } diff --git a/src/test/java/org/apache/sysds/test/component/misc/NumPartsFilesTest.java b/src/test/java/org/apache/sysds/test/component/misc/NumPartsFilesTest.java new file mode 100644 index 0000000000..dbc9a321c8 --- /dev/null +++ b/src/test/java/org/apache/sysds/test/component/misc/NumPartsFilesTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.test.component.misc; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.sysds.conf.ConfigurationManager; +import org.apache.sysds.runtime.io.WriterBinaryBlockParallel; +import org.junit.Test; + +public class NumPartsFilesTest { + + private final Path path; + private final FileSystem fs; + + public NumPartsFilesTest() throws Exception { + path = new Path("/tmp/test.someEnding"); + fs = path.getFileSystem(ConfigurationManager.getCachedJobConf()); + } + + @Test + public void numPartsTest1() { + + int p = WriterBinaryBlockParallel.numPartsFiles(fs, 1000, 1000, 1000, 1000 * 1000); + assertEquals(1, p); + } + + @Test + public void numPartsTest2() { + int p = WriterBinaryBlockParallel.numPartsFiles(fs, 1200, 1000, 1000, 1000 * 1000); + assertEquals(2, p); + } + + @Test + public void numPartsTest3() { + int p = WriterBinaryBlockParallel.numPartsFiles(fs, 10000, 1000, 1000, 10000L * 1000); + assertEquals(10, p); + } + + @Test + public void numPartsTest4() { + int p = WriterBinaryBlockParallel.numPartsFiles(fs, 100000, 1000, 1000, 100000L * 1000); + assertEquals(100, p); + } + + @Test + public void numPartsTest5() { + int p = WriterBinaryBlockParallel.numPartsFiles(fs, 1000000, 1000, 1000, 1000000L * 1000); + assertEquals(1000, p); + } + + @Test + public void numPartsTest6() { + int p = WriterBinaryBlockParallel.numPartsFiles(fs, 10000000L, 1000, 1000, 10000000L * 1000); + assertEquals(10000, p); + } + + @Test + public void numPartsTest7() { + int p = WriterBinaryBlockParallel.numPartsFiles(fs, 100000000L, 1000, 1000, 100000000L * 1000); + assertEquals(100000, p); + } + + @Test + public void numPartsTest8() { + int p = WriterBinaryBlockParallel.numPartsFiles(fs, 1000000000L, 1000, 1000, 1000000000L * 1000); + assertEquals(1000000, p); + } +} diff --git a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java index fb07a58d8a..48c5b69a7a 100644 --- a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java +++ b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java @@ -113,8 +113,8 @@ public class FederatedRCBindTest extends AutomatedTestBase { int port3 = getRandomAvailablePort(); int port4 = getRandomAvailablePort(); Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S); - Thread t2 = startLocalFedWorkerThread(port2); - Thread t3 = startLocalFedWorkerThread(port3); + Thread t2 = startLocalFedWorkerThread(port2, FED_WORKER_WAIT_S); + Thread t3 = startLocalFedWorkerThread(port3, FED_WORKER_WAIT_S); Thread t4 = startLocalFedWorkerThread(port4); // we need the reference file to not be written to hdfs, so we get the correct format
