This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit d8efb74571b8523d48e4fa9aa90b6c14264226b4
Author: Sebastian Baunsgaard <[email protected]>
AuthorDate: Wed Sep 6 16:09:46 2023 +0200

    [SYSTEMDS-3601] Write Parallel Local FileSystem
    
    This commit change the parallel writing on local file systems to
    make multiple write files for smaller block sizes than HDFS is defaulted
    to. This make the writing to local files parallel in much smaller cases
    for instance it improve writing of a 10k by 100 matrix from
    17 ms to 6.5 ms.
    
    Closes #1886
---
 .../runtime/compress/io/WriterCompressed.java      |  58 +++--
 .../controlprogram/caching/FrameObject.java        |   2 +
 .../parfor/stat/InfrastructureAnalyzer.java        |  12 +
 .../apache/sysds/runtime/io/IOUtilFunctions.java   |   6 +
 .../org/apache/sysds/runtime/io/MatrixWriter.java  |   1 -
 .../apache/sysds/runtime/io/WriterBinaryBlock.java |  75 +++---
 .../runtime/io/WriterBinaryBlockParallel.java      |  58 ++---
 .../sysds/runtime/meta/MatrixCharacteristics.java  |   4 +-
 .../apache/sysds/runtime/util/DataConverter.java   |  32 ---
 .../apache/sysds/runtime/util/UtilFunctions.java   |   2 +-
 .../colgroup/scheme/CLAConstSchemeTest.java        | 207 ----------------
 .../compress/colgroup/scheme/CLADDCSchemeTest.java | 146 ------------
 .../colgroup/scheme/CLAEmptySchemeTest.java        | 263 ---------------------
 .../component/federated/FederatedTestUtils.java    |   4 +
 .../test/component/misc/IOUtilFunctionsTest.java   |  21 +-
 .../test/component/misc/NumPartsFilesTest.java     |  88 +++++++
 .../federated/primitives/FederatedRCBindTest.java  |   4 +-
 17 files changed, 227 insertions(+), 756 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java 
b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
index 9f9a531a6d..744a5e945a 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
@@ -61,7 +61,12 @@ public final class WriterCompressed extends MatrixWriter {
 
        protected static final Log LOG = 
LogFactory.getLog(WriterCompressed.class.getName());
 
+       protected static int jobUse = 0;
+       protected static  JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
+
        private String fname;
+
+       private FileSystem fs;
        private Future<Writer>[] writers;
        private Lock[] writerLocks;
 
@@ -128,10 +133,20 @@ public final class WriterCompressed extends MatrixWriter {
        }
 
        private void write(MatrixBlock src, final String fname, final int blen) 
throws IOException {
+               jobUse ++;
+               if(jobUse > 30){
+                       job = new 
JobConf(ConfigurationManager.getCachedJobConf());
+                       jobUse = 0;
+               }
+               
+               
                if(this.fname != fname) {
                        this.fname = fname;
                        this.writers = null;
                }
+
+               fs = IOUtilFunctions.getFileSystem(new Path(fname), job);
+
                final int k = 
OptimizerUtils.getParallelBinaryWriteParallelism();
                final int rlen = src.getNumRows();
                final int clen = src.getNumColumns();
@@ -149,18 +164,20 @@ public final class WriterCompressed extends MatrixWriter {
        }
 
        private void writeSingleBlock(MatrixBlock b, int k) throws IOException {
-               Writer w = getWriter(fname);
+               final Path path = new Path(fname);
+               Writer w = generateWriter(job, path, fs);
                MatrixIndexes idx = new MatrixIndexes(1, 1);
                if(!(b instanceof CompressedMatrixBlock))
                        b = CompressedMatrixBlockFactory.compress(b, 
k).getLeft();
                w.append(idx, new CompressedWriteBlock(b));
                IOUtilFunctions.closeSilently(w);
-               cleanup();
+               cleanup(path);
        }
 
        private void writeMultiBlockUncompressed(MatrixBlock b, final int rlen, 
final int clen, final int blen, int k)
                throws IOException {
-               Writer w = getWriter(fname);
+               final Path path = new Path(fname);
+               Writer w = generateWriter(job, path, fs);
                final MatrixIndexes indexes = new MatrixIndexes();
                LOG.warn("Writing compressed format with non identical 
compression scheme");
 
@@ -178,7 +195,7 @@ public final class WriterCompressed extends MatrixWriter {
                        }
                }
                IOUtilFunctions.closeSilently(w);
-               cleanup();
+               cleanup(path);
        }
 
        private void writeMultiBlockCompressed(MatrixBlock b, final int rlen, 
final int clen, final int blen, int k)
@@ -196,7 +213,8 @@ public final class WriterCompressed extends MatrixWriter {
                try {
 
                        setupWrite();
-                       Writer w = getWriter(fname);
+                       final Path path = new Path(fname);
+                       Writer w = generateWriter(job, path, fs);
                        for(int bc = 0; bc * blen < clen; bc++) {// column 
blocks
                                final int sC = bc * blen;
                                final int mC = Math.min(sC + blen, clen) - 1;
@@ -212,6 +230,7 @@ public final class WriterCompressed extends MatrixWriter {
 
                        }
                        IOUtilFunctions.closeSilently(w);
+                       cleanup(path);
                }
                catch(Exception e) {
                        throw new IOException(e);
@@ -235,7 +254,7 @@ public final class WriterCompressed extends MatrixWriter {
                                final int j = i;
                                if(writers[i] == null) {
                                        writers[i] = pool.submit(() -> {
-                                               return 
generateWriter(getPath(j));
+                                               return generateWriter(job, 
getPath(j), fs);
                                        });
                                }
                                writerLocks[i] = new ReentrantLock();
@@ -273,7 +292,7 @@ public final class WriterCompressed extends MatrixWriter {
                                pool.submit(() -> {
                                        try {
                                                
IOUtilFunctions.closeSilently(writers[l].get());
-                                               cleanup(getPath(l));
+                                               cleanup(job, getPath(l), fs);
                                        }
                                        catch(Exception e) {
                                                throw new RuntimeException(e);
@@ -301,28 +320,24 @@ public final class WriterCompressed extends MatrixWriter {
                return new Path(fname, IOUtilFunctions.getPartFileName(id));
        }
 
-       private Writer getWriter(String fname) throws IOException {
-               final Path path = new Path(fname);
-               return generateWriter(path);
-       }
+       // private Writer getWriter(String fname) throws IOException {
+       //      final Path path = new Path(fname);
+       //      return generateWriter(job, path);
+       // }
 
-       private static Writer generateWriter(Path path) throws IOException {
-               final JobConf job = ConfigurationManager.getCachedJobConf();
-               // HDFSTool.deleteFileIfExistOnHDFS(path, job);
+       private static Writer generateWriter(JobConf job, Path path, FileSystem 
fs) throws IOException {
+               
                return SequenceFile.createWriter(job, Writer.file(path), 
Writer.bufferSize(4096),
                        Writer.keyClass(MatrixIndexes.class), 
Writer.valueClass(CompressedWriteBlock.class),
                        Writer.compression(SequenceFile.CompressionType.NONE), 
// No Compression type on disk
                        Writer.replication((short) 1));
        }
 
-       private void cleanup() throws IOException {
-               final Path path = new Path(fname);
-               cleanup(path);
+       private void cleanup(Path p) throws IOException {
+               cleanup(job, p, fs);
        }
 
-       private static void cleanup(Path path) throws IOException {
-               final JobConf job = ConfigurationManager.getCachedJobConf();
-               final FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
+       private static void cleanup(JobConf job, Path path, FileSystem fs) 
throws IOException {
                IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
        }
 
@@ -384,7 +399,6 @@ public final class WriterCompressed extends MatrixWriter {
                @Override
                public Object call() throws Exception {
 
-                       final JobConf job = 
ConfigurationManager.getCachedJobConf();
                        Path p = new Path(fname + ".dict", 
IOUtilFunctions.getPartFileName(id));
                        try(Writer w = SequenceFile.createWriter(job, 
Writer.file(p), //
                                Writer.bufferSize(4096), //
@@ -394,7 +408,7 @@ public final class WriterCompressed extends MatrixWriter {
                                Writer.replication((short) 1))) {
                                w.append(new DictWritable.K(id), new 
DictWritable(dicts));
                        }
-                       cleanup(p);
+                       cleanup(job, p, fs);
                        return null;
 
                }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
index d3e18f9e4e..945021626e 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/FrameObject.java
@@ -138,6 +138,8 @@ public class FrameObject extends CacheableData<FrameBlock>
        }
 
        public static ValueType[] parseSchema(String schema) {
+               if(schema == null)
+                       return new ValueType[]{ValueType.STRING};
                // parse given schema
                String[] parts = 
schema.split(DataExpression.DEFAULT_DELIM_DELIMITER);
                ValueType[] ret = new ValueType[parts.length];
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
index d39c470a7a..3dd47d5faa 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/stat/InfrastructureAnalyzer.java
@@ -22,12 +22,16 @@ package org.apache.sysds.runtime.controlprogram.parfor.stat;
 import java.io.IOException;
 import java.util.StringTokenizer;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysds.runtime.io.IOUtilFunctions;
 import org.apache.sysds.runtime.util.HDFSTool;
 import org.apache.sysds.runtime.util.UtilFunctions;
 
@@ -194,6 +198,14 @@ public class InfrastructureAnalyzer
                return getLocalMaxMemory();
        }
 
+       public static long getBlockSize(FileSystem fs){
+               if(fs instanceof LocalFileSystem)
+                       // 4 blocks per file at least
+                       return 4096 * 4; 
+               else 
+                       return getHDFSBlockSize();
+       }
+
        /**
         * Gets the HDFS blocksize of the used cluster in bytes.
         * 
diff --git a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java 
b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
index b66f6a00b5..3735f9d7db 100644
--- a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
@@ -610,6 +610,12 @@ public class IOUtilFunctions
                return ret;
        }
        
+       public static void deleteCrcFilesFromLocalFileSystem( JobConf job, Path 
path) throws IOException {
+               final FileSystem fs = getFileSystem(path,job );
+               deleteCrcFilesFromLocalFileSystem(fs, path);
+       }
+       
+
        /**
         * Delete the CRC files from the local file system associated with a
         * particular file and its metadata file.
diff --git a/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java 
b/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java
index ae47e99e70..faddd66466 100644
--- a/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java
+++ b/src/main/java/org/apache/sysds/runtime/io/MatrixWriter.java
@@ -30,7 +30,6 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
  * write functionality but might provide additional custom functionality. Any 
non-default parameters
  * (e.g., CSV read properties) should be passed into custom constructors. 
There is also a factory
  * for creating format-specific writers. 
- * 
  */
 public abstract class MatrixWriter {
        protected static final Log LOG = 
LogFactory.getLog(MatrixWriter.class.getName());
diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java 
b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java
index 7991c5701f..e3dd3935c6 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java
@@ -37,27 +37,35 @@ import org.apache.sysds.runtime.util.HDFSTool;
 public class WriterBinaryBlock extends MatrixWriter {
        protected int _replication = -1;
 
+       protected static int jobUse = 0;
+       protected static JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
+
        public WriterBinaryBlock(int replication) {
                _replication = replication;
+
+               jobUse ++;
+               if(jobUse > 15){
+                       // job =  new JobConf();
+                       job = new 
JobConf(ConfigurationManager.getCachedJobConf());
+                       jobUse = 0;
+               }
        }
 
        @Override
        public final void writeMatrixToHDFS(MatrixBlock src, String fname, long 
rlen, long clen, int blen, long nnz,
                boolean diag) throws IOException, DMLRuntimeException {
                // prepare file access
-               JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
                Path path = new Path(fname);
-               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
 
                // if the file already exists on HDFS, remove it.
-               HDFSTool.deleteFileIfExistOnHDFS(fname);
+               HDFSTool.deleteFileIfExistOnHDFS(path, job);
 
                // set up preferred custom serialization framework for binary 
block format
                if(HDFSTool.USE_BINARYBLOCK_SERIALIZATION)
                        HDFSTool.addBinaryBlockSerializationFramework(job);
 
                if(src instanceof CompressedMatrixBlock) {
-                       
if(ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE_BINARYFORMATS)){
+                       
if(ConfigurationManager.getCompilerConfigFlag(ConfigType.PARALLEL_CP_WRITE_BINARYFORMATS))
 {
                                LOG.debug("Multi threaded decompression");
                                // parallel
                                src = 
CompressedMatrixBlock.getUncompressed(src, "binary write",
@@ -71,17 +79,15 @@ public class WriterBinaryBlock extends MatrixWriter {
 
                // core write sequential/parallel
                if(diag)
-                       writeDiagBinaryBlockMatrixToHDFS(path, job, fs, src, 
rlen, clen, blen);
+                       writeDiagBinaryBlockMatrixToHDFS(path, job, src, rlen, 
clen, blen);
                else
-                       writeBinaryBlockMatrixToHDFS(path, job, fs, src, rlen, 
clen, blen);
+                       writeBinaryBlockMatrixToHDFS(path, job, src, rlen, 
clen, blen);
 
-               IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
        }
 
        @Override
        public final void writeEmptyMatrixToHDFS(String fname, long rlen, long 
clen, int blen)
                throws IOException, DMLRuntimeException {
-               JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
                Path path = new Path(fname);
                FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                final Writer writer = IOUtilFunctions.getSeqWriter(path, job, 
_replication);
@@ -98,26 +104,22 @@ public class WriterBinaryBlock extends MatrixWriter {
                IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
        }
 
-       protected void writeBinaryBlockMatrixToHDFS(Path path, JobConf job, 
FileSystem fs, MatrixBlock src, long rlen,
+       protected void writeBinaryBlockMatrixToHDFS(Path path, JobConf job,  
MatrixBlock src, long rlen,
                long clen, int blen) throws IOException, DMLRuntimeException {
                // sequential write
-               writeBinaryBlockMatrixToSequenceFile(path, job, fs, src, blen, 
0, (int) rlen);
+               writeBinaryBlockMatrixToSequenceFile(path, job, src, blen, 0, 
(int) rlen);
+               IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(job, path);
        }
 
-       protected final void writeBinaryBlockMatrixToSequenceFile(Path path, 
JobConf job, FileSystem fs, MatrixBlock src,
+       protected final void writeBinaryBlockMatrixToSequenceFile(Path path, 
JobConf job,  MatrixBlock src,
                int blen, int rl, int ru) throws IOException {
                boolean sparse = src.isInSparseFormat();
-               int rlen = src.getNumRows();
-               int clen = src.getNumColumns();
-
+               final int rlen = src.getNumRows();
+               final int clen = src.getNumColumns();
                final Writer writer = IOUtilFunctions.getSeqWriter(path, job, 
_replication);
 
-               try { // 2) bound check for src block
-                       if(src.getNumRows() > rlen || src.getNumColumns() > 
clen) {
-                               throw new IOException("Matrix block [1:" + 
src.getNumRows() + ",1:" + src.getNumColumns() + "] "
-                                       + "out of overall matrix range [1:" + 
rlen + ",1:" + clen + "].");
-                       }
-
+               try { 
+       
                        // 3) reblock and write
                        MatrixIndexes indexes = new MatrixIndexes();
 
@@ -133,10 +135,9 @@ public class WriterBinaryBlock extends MatrixWriter {
 
                                // create and write sub-blocks of matrix
                                for(int blockRow = rl / blen; blockRow < (int) 
Math.ceil(ru / (double) blen); blockRow++) {
-                                       for(int blockCol = 0; blockCol < (int) 
Math.ceil(src.getNumColumns() / (double) blen); blockCol++) {
-                                               int maxRow = (blockRow * blen + 
blen < src.getNumRows()) ? blen : src.getNumRows() - blockRow * blen;
-                                               int maxCol = (blockCol * blen + 
blen < src.getNumColumns()) ? blen : src.getNumColumns() -
-                                                       blockCol * blen;
+                                       for(int blockCol = 0; blockCol < (int) 
Math.ceil(clen / (double) blen); blockCol++) {
+                                               int maxRow = (blockRow * blen + 
blen < rlen) ? blen : rlen - blockRow * blen;
+                                               int maxCol = (blockCol * blen + 
blen < clen) ? blen : clen - blockCol * blen;
 
                                                int row_offset = blockRow * 
blen;
                                                int col_offset = blockCol * 
blen;
@@ -144,7 +145,7 @@ public class WriterBinaryBlock extends MatrixWriter {
                                                // get reuse matrix block
                                                MatrixBlock block = 
getMatrixBlockForReuse(blocks, maxRow, maxCol, blen);
 
-                                               // copy submatrix to block
+                                               // copy sub matrix to block
                                                src.slice(row_offset, 
row_offset + maxRow - 1, col_offset, col_offset + maxCol - 1, block);
 
                                                // append block to sequence file
@@ -162,16 +163,17 @@ public class WriterBinaryBlock extends MatrixWriter {
                }
        }
 
-       protected final void writeDiagBinaryBlockMatrixToHDFS(Path path, 
JobConf job, FileSystem fs, MatrixBlock src,
-               long rlen, long clen, int blen) throws IOException, 
DMLRuntimeException {
+       protected final void writeDiagBinaryBlockMatrixToHDFS(Path path, 
JobConf job,  MatrixBlock src,
+               long rlen, long clen, int blen) throws IOException {
                boolean sparse = src.isInSparseFormat();
-
+               final int nRow = src.getNumRows();
+               final int nCol = src.getNumColumns();
                final Writer writer = IOUtilFunctions.getSeqWriter(path, job, 
_replication);
 
                try {
                        // 2) bound check for src block
-                       if(src.getNumRows() > rlen || src.getNumColumns() > 
clen) {
-                               throw new IOException("Matrix block [1:" + 
src.getNumRows() + ",1:" + src.getNumColumns() + "] "
+                       if(nRow > rlen || nCol > clen) {
+                               throw new IOException("Matrix block [1:" + nRow 
+ ",1:" + nCol + "] "
                                        + "out of overall matrix range [1:" + 
rlen + ",1:" + clen + "].");
                        }
 
@@ -188,13 +190,12 @@ public class WriterBinaryBlock extends MatrixWriter {
                                MatrixBlock[] blocks = 
createMatrixBlocksForReuse(rlen, clen, blen, sparse, src.getNonZeros());
                                MatrixBlock emptyBlock = new MatrixBlock();
 
-                               // create and write subblocks of matrix
-                               for(int blockRow = 0; blockRow < (int) 
Math.ceil(src.getNumRows() / (double) blen); blockRow++) {
+                               // create and write sub blocks of the matrix
+                               for(int blockRow = 0; blockRow < (int) 
Math.ceil(nRow / (double) blen); blockRow++) {
 
-                                       for(int blockCol = 0; blockCol < (int) 
Math.ceil(src.getNumColumns() / (double) blen); blockCol++) {
-                                               int maxRow = (blockRow * blen + 
blen < src.getNumRows()) ? blen : src.getNumRows() - blockRow * blen;
-                                               int maxCol = (blockCol * blen + 
blen < src.getNumColumns()) ? blen : src.getNumColumns() -
-                                                       blockCol * blen;
+                                       for(int blockCol = 0; blockCol < (int) 
Math.ceil(nCol / (double) blen); blockCol++) {
+                                               int maxRow = (blockRow * blen + 
blen < nRow) ? blen : nRow - blockRow * blen;
+                                               int maxCol = (blockCol * blen + 
blen < nCol) ? blen : nCol - blockCol * blen;
                                                MatrixBlock block = null;
 
                                                if(blockRow == blockCol) { // 
block on diagonal
@@ -204,7 +205,7 @@ public class WriterBinaryBlock extends MatrixWriter {
                                                        // get reuse matrix 
block
                                                        block = 
getMatrixBlockForReuse(blocks, maxRow, maxCol, blen);
 
-                                                       // copy submatrix to 
block
+                                                       // copy sub matrix to 
block
                                                        src.slice(row_offset, 
row_offset + maxRow - 1, col_offset, col_offset + maxCol - 1, block);
                                                }
                                                else { // empty block (not on 
diagonal)
diff --git 
a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlockParallel.java 
b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlockParallel.java
index e483bbd6ff..ca79c8daf3 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlockParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlockParallel.java
@@ -21,22 +21,18 @@ package org.apache.sysds.runtime.io;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.sysds.conf.DMLConfig;
 import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import 
org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.CommonThreadPool;
-import org.apache.sysds.runtime.util.HDFSTool;
 
 public class WriterBinaryBlockParallel extends WriterBinaryBlock
 {
@@ -45,71 +41,68 @@ public class WriterBinaryBlockParallel extends 
WriterBinaryBlock
        }
        
        @Override
-       protected void writeBinaryBlockMatrixToHDFS( Path path, JobConf job, 
FileSystem fs, MatrixBlock src, long rlen, long clen, int blen )
+       protected void writeBinaryBlockMatrixToHDFS( Path path, JobConf job, 
MatrixBlock src, long rlen, long clen, int blen )
                throws IOException, DMLRuntimeException
        {
                //estimate output size and number of output blocks (min 1)
-               int numPartFiles = 
(int)(OptimizerUtils.estimatePartitionedSizeExactSparsity(rlen, clen, 
-                               blen, src.getNonZeros()) / 
InfrastructureAnalyzer.getHDFSBlockSize());
-               numPartFiles = Math.max(numPartFiles, 1);
-               
+               int numPartFiles = numPartsFiles(path.getFileSystem(job), rlen, 
clen, blen , src.getNonZeros());
+
                //determine degree of parallelism
                int numThreads = 
OptimizerUtils.getParallelBinaryWriteParallelism();
                numThreads = Math.min(numThreads, numPartFiles);
                
                //fall back to sequential write if dop is 1 (e.g., <128MB) in 
order to create single file
                if( numThreads <= 1 ) {
-                       super.writeBinaryBlockMatrixToHDFS(path, job, fs, src, 
rlen, clen, blen);
+                       super.writeBinaryBlockMatrixToHDFS(path, job,  src, 
rlen, clen, blen);
                        return;
                }
 
                //create directory for concurrent tasks
-               HDFSTool.createDirIfNotExistOnHDFS(path, 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
+               // HDFSTool.createDirIfNotExistOnHDFS(path, 
DMLConfig.DEFAULT_SHARED_DIR_PERMISSION);
                
                //create and execute write tasks
-               try 
-               {
-                       ExecutorService pool = CommonThreadPool.get(numThreads);
+               final ExecutorService pool = CommonThreadPool.get(numThreads);
+               try {
                        ArrayList<WriteFileTask> tasks = new ArrayList<>();
                        int blklen = (int)Math.ceil((double)rlen / blen / 
numThreads) * blen;
                        for(int i=0; i<numThreads & i*blklen<rlen; i++) {
                                Path newPath = new Path(path, 
IOUtilFunctions.getPartFileName(i));
-                               tasks.add(new WriteFileTask(newPath, job, fs, 
src, i*blklen, Math.min((i+1)*blklen, rlen), blen));
+                               tasks.add(new WriteFileTask(newPath, job,  src, 
i*blklen, Math.min((i+1)*blklen, rlen), blen));
                        }
 
-                       //wait until all tasks have been executed
-                       List<Future<Object>> rt = pool.invokeAll(tasks);        
-                       pool.shutdown();
-                       
                        //check for exceptions 
-                       for( Future<Object> task : rt )
+                       for( Future<Object> task : pool.invokeAll(tasks) )
                                task.get();
                        
-                       // delete crc files if written to local file system
-                       if (fs instanceof LocalFileSystem) {
-                               for(int i=0; i<numThreads & i*blklen<rlen; i++) 
-                                       
IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs,
-                                               new Path(path, 
IOUtilFunctions.getPartFileName(i)));
-                       }
                } 
                catch (Exception e) {
                        throw new IOException("Failed parallel write of binary 
block input.", e);
                }
+               finally{
+                       pool.shutdown();
+               }
+       }
+
+       public static int numPartsFiles(FileSystem fs, long rlen, long clen, 
long blen, long nZeros) {
+               int numPartFiles = (int) 
(OptimizerUtils.estimatePartitionedSizeExactSparsity(rlen, clen, blen, nZeros) /
+                       InfrastructureAnalyzer.getBlockSize(fs));
+               numPartFiles = Math.max(numPartFiles, 1);
+               numPartFiles = Math.min(numPartFiles,
+                       (int) (Math.ceil((double) rlen / blen) * 
Math.ceil((double) clen / blen)));
+               return numPartFiles;
        }
 
        private class WriteFileTask implements Callable<Object> 
        {
                private Path _path = null;
                private JobConf _job = null;
-               private FileSystem _fs = null;
                private MatrixBlock _src = null;
                private long _rl = -1;
                private long _ru = -1;
                private int _blen = -1;
                
-               public WriteFileTask(Path path, JobConf job, FileSystem fs, 
MatrixBlock src, long rl, long ru, int blen) {
+               public WriteFileTask(Path path, JobConf job, MatrixBlock src, 
long rl, long ru, int blen) {
                        _path = path;
-                       _fs = fs;
                        _job = job;
                        _src = src;
                        _rl = rl;
@@ -118,10 +111,9 @@ public class WriterBinaryBlockParallel extends 
WriterBinaryBlock
                }
        
                @Override
-               public Object call() 
-                       throws Exception 
-               {
-                       writeBinaryBlockMatrixToSequenceFile(_path, _job, _fs, 
_src, _blen, (int)_rl, (int)_ru);
+               public Object call() throws Exception {
+                       writeBinaryBlockMatrixToSequenceFile(_path, _job,  
_src, _blen, (int) _rl, (int) _ru);
+                       IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(_job, 
_path);
                        return null;
                }
        }
diff --git 
a/src/main/java/org/apache/sysds/runtime/meta/MatrixCharacteristics.java 
b/src/main/java/org/apache/sysds/runtime/meta/MatrixCharacteristics.java
index aeacc1c064..82c3aba08d 100644
--- a/src/main/java/org/apache/sysds/runtime/meta/MatrixCharacteristics.java
+++ b/src/main/java/org/apache/sysds/runtime/meta/MatrixCharacteristics.java
@@ -43,11 +43,11 @@ public class MatrixCharacteristics extends 
DataCharacteristics
        public MatrixCharacteristics() {}
        
        public MatrixCharacteristics(long nr, long nc) {
-               set(nr, nc, -1, -1);
+               set(nr, nc, ConfigurationManager.getBlocksize(), -1);
        }
        
        public MatrixCharacteristics(long nr, long nc, long nnz) {
-               set(nr, nc, -1, nnz);
+               set(nr, nc, ConfigurationManager.getBlocksize(), nnz);
        }
        
        public MatrixCharacteristics(long nr, long nc, int blen) {
diff --git a/src/main/java/org/apache/sysds/runtime/util/DataConverter.java 
b/src/main/java/org/apache/sysds/runtime/util/DataConverter.java
index 240c5ba976..72428163f4 100644
--- a/src/main/java/org/apache/sysds/runtime/util/DataConverter.java
+++ b/src/main/java/org/apache/sysds/runtime/util/DataConverter.java
@@ -79,7 +79,6 @@ import org.apache.sysds.runtime.meta.DataCharacteristics;
  * This class provides methods to read and write matrix blocks from to HDFS 
using different data formats.
  * Those functionalities are used especially for CP read/write and exporting 
in-memory matrices to HDFS
  * (before executing MR jobs).
- *
  */
 public class DataConverter {
        private static final String DELIM = " ";
@@ -112,20 +111,6 @@ public class DataConverter {
                writer.writeTensorToHDFS(tensor, dir, blen);
        }
 
-       public static MatrixBlock readMatrixFromHDFS(String dir, FileFormat 
fmt, long rlen, long clen, int blen, boolean localFS)
-               throws IOException
-       {
-               ReadProperties prop = new ReadProperties();
-
-               prop.path = dir;
-               prop.fmt = fmt;
-               prop.rlen = rlen;
-               prop.clen = clen;
-               prop.blen = blen;
-               prop.localFS = localFS;
-
-               return readMatrixFromHDFS(prop);
-       }
 
        public static MatrixBlock readMatrixFromHDFS(String dir, FileFormat 
fmt, long rlen, long clen, int blen)
                throws IOException
@@ -156,23 +141,6 @@ public class DataConverter {
                return readMatrixFromHDFS(prop);
        }
 
-       public static MatrixBlock readMatrixFromHDFS(String dir, FileFormat 
fmt, long rlen, long clen,
-               int blen, long expectedNnz, boolean localFS)
-               throws IOException
-       {
-               ReadProperties prop = new ReadProperties();
-
-               prop.path = dir;
-               prop.fmt = fmt;
-               prop.rlen = rlen;
-               prop.clen = clen;
-               prop.blen = blen;
-               prop.expectedNnz = expectedNnz;
-               prop.localFS = localFS;
-
-               return readMatrixFromHDFS(prop);
-       }
-
        public static MatrixBlock readMatrixFromHDFS(String dir, FileFormat 
fmt, long rlen, long clen,
                int blen, long expectedNnz, FileFormatProperties 
formatProperties)
                throws IOException
diff --git a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java 
b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
index 38d8458a0e..74cf1d45ef 100644
--- a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
@@ -1296,7 +1296,7 @@ public class UtilFunctions {
        }
 
        public static int getEndIndex(int arrayLength, int startIndex, int 
blockSize){
-               return (blockSize <= 0)? arrayLength: Math.min(arrayLength, 
startIndex + blockSize);
+               return blockSize <= 0 ? arrayLength : Math.min(arrayLength, 
startIndex + blockSize);
        }
 
        public static int[] getBlockSizes(int num, int numBlocks){
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLAConstSchemeTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLAConstSchemeTest.java
deleted file mode 100644
index 31b78c959d..0000000000
--- 
a/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLAConstSchemeTest.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.test.component.compress.colgroup.scheme;
-
-import static org.junit.Assert.assertTrue;
-
-import org.apache.sysds.runtime.compress.colgroup.AColGroup;
-import org.apache.sysds.runtime.compress.colgroup.ColGroupConst;
-import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
-import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme;
-import org.apache.sysds.runtime.data.DenseBlockFP64;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.junit.Test;
-
-public class CLAConstSchemeTest {
-
-       private final AColGroup g;
-       private final ICLAScheme sh;
-
-       public CLAConstSchemeTest() {
-               g = ColGroupConst.create(//
-                       ColIndexFactory.create(new int[] {1, 3, 5}), // Columns
-                       new double[] {1.1, 1.2, 1.3} // Values
-               );
-               sh = g.getCompressionScheme();
-       }
-
-       @Test
-       public void testConstValid() {
-               assertTrue(sh != null);
-       }
-
-
-
-       @Test
-       public void testValidEncodeSingleRow() {
-               assertTrue(sh.encode(new MatrixBlock(1, 5, new double[] {//
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3})) != null);
-       }
-
-       @Test
-       public void testValidEncodeMultiRow() {
-               assertTrue(sh.encode(new MatrixBlock(2, 6, new double[] {//
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-               })) != null);
-       }
-
-       @Test
-       public void testValidEncodeMultiRowsLarger() {
-               assertTrue(sh.encode(new MatrixBlock(2, 10, new double[] {//
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, 1, 1, 1, 1, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, 1, 1, 1, 1, //
-               })) != null);
-       }
-
-       @Test
-       public void testInvalidEncodeMultiRowsValue() {
-               assertTrue(sh.encode(new MatrixBlock(4, 6, new double[] {//
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-               })) != null);
-       }
-
-       @Test
-       public void testValidEncodeMultiRowDifferentValuesOtherColumns() {
-               assertTrue(sh.encode(new MatrixBlock(4, 6, new double[] {//
-                       0.2, 1.1, 0.4, 1.2, 0.3, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.1, 1.3, //
-                       0.2, 1.1, 0.4, 1.2, 0.1, 1.3, //
-               })) != null);
-       }
-
-
-
-
-       @Test
-       public void testEncodeOtherColumns() {
-               assertTrue(sh.encode(new MatrixBlock(4, 5, new double[] {//
-                       1.1, 0.2, 1.2, 0.2, 1.3, //
-                       1.1, 0.2, 1.2, 0.2, 1.3, //
-                       1.1, 0.2, 1.2, 0.2, 1.3, //
-                       1.1, 0.2, 1.2, 0.2, 1.3, //
-               }), ColIndexFactory.create(new int[] {0, 2, 4})) != null);
-       }
-
-
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testInvalidArgument_1() {
-               sh.encode(null, ColIndexFactory.create(new int[] {0, 2, 4, 5}));
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testInvalidArgument_2() {
-               sh.encode(null, ColIndexFactory.create(new int[] {0, 2}));
-       }
-
-       @Test
-       public void testSparse() {
-               MatrixBlock mb = new MatrixBlock(4, 6, new double[] {//
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-               });
-
-               MatrixBlock empty = new MatrixBlock(4, 1000, 0.0);
-               mb = mb.append(empty);
-
-               assertTrue(sh.encode(mb) != null);
-       }
-
-
-       @Test
-       public void testSparseValidCustom() {
-               MatrixBlock mb = new MatrixBlock(4, 6, new double[] {//
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-               });
-
-               MatrixBlock empty = new MatrixBlock(4, 1000, 0.0);
-               mb = empty.append(mb);
-
-               assertTrue(sh.encode(mb, ColIndexFactory.create(new int[] 
{1001, 1003, 1005})) != null);
-       }
-
-       @Test
-       public void testSparseValidCustom2() {
-               MatrixBlock mb = new MatrixBlock(4, 6, new double[] {//
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-               });
-
-               MatrixBlock empty = new MatrixBlock(4, 1000, 0.0);
-               MatrixBlock comb = empty.append(mb).append(mb);
-
-               assertTrue(sh.encode(comb, ColIndexFactory.create(new int[] 
{1001, 1003, 1005})) != null);
-       }
-
-
-
-
-
-       @Test
-       public void testGenericNonContinuosBlockValid() {
-               MatrixBlock mb = new MatrixBlock(4, 6, //
-                       new DenseBlockFP64Mock(new int[] {4, 6}, new double[] 
{//
-                               0.2, 1.1, 0.4, 1.2, 0.3, 1.3, //
-                               0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                               0.0, 1.1, 0.2, 1.2, 0.1, 1.3, //
-                               0.2, 1.1, 0.4, 1.2, 0.1, 1.3, //
-                       }));
-               mb.recomputeNonZeros();
-               assertTrue(sh.encode(mb) != null);
-       }
-
-
-
-       @Test(expected = NullPointerException.class)
-       public void testNull() {
-               sh.encode(null, null);
-       }
-
-       private class DenseBlockFP64Mock extends DenseBlockFP64 {
-               private static final long serialVersionUID = 
-3601232958390554672L;
-
-               public DenseBlockFP64Mock(int[] dims, double[] data) {
-                       super(dims, data);
-               }
-
-               @Override
-               public boolean isContiguous() {
-                       return false;
-               }
-
-               @Override
-               public int numBlocks() {
-                       return 2;
-               }
-       }
-
-}
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLADDCSchemeTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLADDCSchemeTest.java
deleted file mode 100644
index 0c40209f9c..0000000000
--- 
a/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLADDCSchemeTest.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.test.component.compress.colgroup.scheme;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.EnumSet;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.sysds.runtime.compress.CompressionSettings;
-import org.apache.sysds.runtime.compress.CompressionSettingsBuilder;
-import org.apache.sysds.runtime.compress.colgroup.AColGroup;
-import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
-import org.apache.sysds.runtime.compress.colgroup.ColGroupDDC;
-import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory;
-import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
-import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
-import org.apache.sysds.runtime.compress.colgroup.indexes.RangeIndex;
-import org.apache.sysds.runtime.compress.colgroup.scheme.DDCScheme;
-import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme;
-import org.apache.sysds.runtime.compress.estim.ComEstExact;
-import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
-import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.apache.sysds.test.TestUtils;
-import org.junit.Test;
-
-public class CLADDCSchemeTest {
-       protected final Log LOG = 
LogFactory.getLog(CLADDCSchemeTest.class.getName());
-
-       final MatrixBlock src;
-       final AColGroup g;
-       final ICLAScheme sh;
-
-       public CLADDCSchemeTest() {
-               src = TestUtils.round(TestUtils.generateTestMatrixBlock(1023, 
3, 0, 3, 0.9, 7));
-
-               IColIndex colIndexes = ColIndexFactory.create(3);
-               CompressionSettings cs = new 
CompressionSettingsBuilder().setSamplingRatio(1.0)
-                       
.setValidCompressions(EnumSet.of(CompressionType.DDC)).create();
-               final CompressedSizeInfoColGroup cgi = new ComEstExact(src, 
cs).getColGroupInfo(colIndexes);
-
-               final CompressedSizeInfo csi = new CompressedSizeInfo(cgi);
-               final List<AColGroup> groups = 
ColGroupFactory.compressColGroups(src, csi, cs, 1);
-               g = groups.get(0);
-               sh = g.getCompressionScheme();
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testInvalidColumnApply() {
-               sh.encode(null, ColIndexFactory.create(new int[] {1, 2}));
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testInvalidColumnApply_2() {
-               sh.encode(null, ColIndexFactory.create(new int[] {1, 2, 5, 5}));
-       }
-
-       @Test(expected = NullPointerException.class)
-       public void testNull() {
-               sh.encode(null, null);
-       }
-
-       @Test
-       public void testEncode() {
-               
assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(2, 3, 0, 
3, 0.9, 7))) != null);
-       }
-
-       @Test
-       public void testEncode_sparse() {
-               
assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(2, 100, 
0, 3, 0.01, 7))) != null);
-       }
-
-       @Test
-       public void testEncodeSparseDifferentColumns() {
-               
assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(2, 100, 
0, 3, 0.01, 7)),
-                       ColIndexFactory.create(new int[] {13, 16, 30})) != 
null);
-       }
-
-       @Test
-       public void testEncodeSparseDifferentColumns2() {
-               
assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(2, 100, 
0, 3, 0.01, 7)),
-                       ColIndexFactory.create(new int[] {15, 16, 99})) != 
null);
-       }
-
-       @Test
-       public void testEncodeSparseDifferentColumns3() {
-               
assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(2, 100, 
0, 3, 0.01, 7)),
-                       ColIndexFactory.create(new int[] {15, 86, 99})) != 
null);
-       }
-
-       @Test
-       public void testEncodeDenseDifferentColumns() {
-               
assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(2, 100, 
0, 3, 0.86, 7)),
-                       ColIndexFactory.create(new int[] {13, 16, 30})) != 
null);
-       }
-
-       @Test
-       public void testEncodeDenseDifferentColumns2() {
-               
assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(2, 100, 
0, 3, 0.86, 7)),
-                       ColIndexFactory.create(new int[] {15, 16, 99})) != 
null);
-       }
-
-       @Test
-       public void testEncodeDenseDifferentColumns3() {
-               
assertTrue(sh.encode(TestUtils.round(TestUtils.generateTestMatrixBlock(2, 100, 
0, 3, 0.86, 7)),
-                       ColIndexFactory.create(new int[] {15, 86, 99})) != 
null);
-       }
-
-       @Test
-       public void testEncodeFromColumns() {
-               try {
-
-                       DDCScheme s = DDCScheme.create(new RangeIndex(2));
-                       MatrixBlock m = 
TestUtils.round(TestUtils.generateTestMatrixBlock(50, 3, 0, 2, 0.9, 7));
-                       s.update(m);
-                       AColGroup g = s.encode(m);
-                       assertTrue(g instanceof ColGroupDDC);
-               }
-
-               catch(Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-}
diff --git 
a/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLAEmptySchemeTest.java
 
b/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLAEmptySchemeTest.java
deleted file mode 100644
index b27dd42c99..0000000000
--- 
a/src/test/java/org/apache/sysds/test/component/compress/colgroup/scheme/CLAEmptySchemeTest.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.test.component.compress.colgroup.scheme;
-
-import static org.junit.Assert.assertTrue;
-
-import org.apache.sysds.runtime.compress.colgroup.AColGroup;
-import org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty;
-import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
-import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme;
-import org.apache.sysds.runtime.data.DenseBlockFP64;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.junit.Test;
-
-public class CLAEmptySchemeTest {
-
-       private final AColGroup g;
-       private final ICLAScheme sh;
-
-       public CLAEmptySchemeTest() {
-               g = new ColGroupEmpty(//
-                       ColIndexFactory.create(new int[] {1, 3, 5}) // Columns
-               );
-               sh = g.getCompressionScheme();
-       }
-
-       @Test
-       public void testConstValid() {
-               assertTrue(sh != null);
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testToSmallMatrix() {
-               sh.encode(new MatrixBlock(1, 3, new double[] {//
-                       1.1, 1.2, 1.3}));
-       }
-
-       @Test
-       public void testValidEncodeSingleRow() {
-               assertTrue(sh.encode(new MatrixBlock(1, 6, new double[] {//
-                       0.1, 0.0, 0.04, 0.0, 0.03, 0.0})) != null);
-       }
-
-       @Test
-       public void testValidEncodeMultiRow() {
-               assertTrue(sh.encode(new MatrixBlock(2, 6, new double[] {//
-                       132, 0.0, 241, 0.0, 142, 0.0, //
-                       132, 0.0, 241, 0.0, 142, 0.0, //
-               })) != null);
-       }
-
-       @Test
-       public void testValidEncodeMultiRowsLarger() {
-               assertTrue(sh.encode(new MatrixBlock(2, 10, new double[] {//
-                       0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1, 1, 1, 1, //
-                       0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1, 1, 1, 1, //
-               })) != null);
-       }
-
-       @Test
-       public void testValidEncodeMultiRowDifferentValuesOtherColumns() {
-               assertTrue(sh.encode(new MatrixBlock(4, 12, new double[] {//
-                       0.2, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.1, 0.4, 1.2, 0.3, 
1.3, //
-                       0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.1, 0.2, 1.2, 0.2, 
1.3, //
-                       0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.1, 0.2, 1.2, 0.1, 
1.3, //
-                       0.2, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.1, 0.4, 1.2, 0.1, 
1.3, //
-               })) != null);
-       }
-
-       @Test
-       public void testEncodeOtherColumnsValid() {
-               assertTrue(sh.encode(new MatrixBlock(4, 8, new double[] {//
-                       0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, //
-               }), ColIndexFactory.create(new int[] {0, 2, 4})// other columns
-               ) != null);
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testInvalidArgument_1() {
-               sh.encode(null, ColIndexFactory.create(new int[] {0, 2, 4, 5}));
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testInvalidArgument_2() {
-               sh.encode(null, ColIndexFactory.create(new int[] {0, 2}));
-       }
-
-       @Test
-       public void testSparse() {
-               MatrixBlock mb = new MatrixBlock(4, 6, new double[] {//
-                       0.01, 0.0, 0.2, 0.0, 0.2, 0.0, //
-                       0.01, 0.0, 0.2, 0.0, 0.2, 0.0, //
-                       0.01, 0.0, 0.2, 0.0, 0.2, 0.0, //
-                       0.01, 0.0, 0.2, 0.0, 0.2, 0.0, //
-               });
-
-               MatrixBlock empty = new MatrixBlock(4, 1000, 0.0);
-               mb = mb.append(empty);
-
-               assertTrue(sh.encode(mb) != null);
-       }
-
-       @Test
-       public void testSpars_AllCosOver() {
-               MatrixBlock mb = new MatrixBlock(4, 6, new double[] {//
-                       0.01, 0.0, 0.2, 0.0, 0.2, 0.0, //
-                       0.01, 0.0, 0.2, 0.0, 0.2, 0.0, //
-                       0.01, 0.0, 0.2, 0.0, 0.2, 0.0, //
-                       0.01, 0.0, 0.2, 0.0, 0.2, 0.0, //
-               });
-
-               MatrixBlock empty = new MatrixBlock(4, 1000, 0.0);
-               mb = mb.append(empty);
-
-               assertTrue(sh.encode(mb, ColIndexFactory.create(new int[] {100, 
102, 999})) != null);
-       }
-
-       @Test
-       public void testSparse_Append() {
-               MatrixBlock mb = new MatrixBlock(4, 6, new double[] {//
-                       0.0, 0.0, 0.2, 0.0, 0.2, 1.3, //
-                       0.0, 0.0, 0.2, 0.0, 0.2, 1.3, //
-                       0.0, 0.0, 0.2, 0.0, 0.2, 1.3, //
-                       0.0, 0.0, 0.2, 0.0, 0.2, 1.3, //
-               });
-
-               MatrixBlock empty = new MatrixBlock(4, 1000, 0.0);
-               mb = empty.append(mb);
-
-               assertTrue(sh.encode(mb) != null);
-       }
-
-       @Test
-       public void testSparseValidCustom() {
-               MatrixBlock mb = new MatrixBlock(4, 9, new double[] {//
-                       0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, //
-                       0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, //
-                       0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, //
-                       0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, //
-               });
-
-               MatrixBlock empty = new MatrixBlock(4, 1000, 0.0);
-               mb = empty.append(mb);
-
-               assertTrue(sh.encode(mb, ColIndexFactory.create(new int[] 
{1001, 1003, 1005})) != null);
-       }
-
-       @Test
-       public void testSparseValidCustom2() {
-               MatrixBlock mb = new MatrixBlock(4, 9, new double[] {//
-                       0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, //
-                       0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, //
-                       0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, //
-                       0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, //
-               });
-
-               MatrixBlock empty = new MatrixBlock(4, 1000, 0.0);
-               MatrixBlock comb = empty.append(mb).append(mb);
-
-               assertTrue(sh.encode(comb, ColIndexFactory.create(new int[] 
{1001, 1003, 1005})) != null);
-       }
-
-       @Test
-       public void testSparseValidCustom3Valid() {
-               MatrixBlock mb = new MatrixBlock(4, 9, new double[] {//
-                       0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, //
-                       0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.33, 0.2, 1.3, //
-                       0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, //
-                       0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, //
-               });
-
-               MatrixBlock empty = new MatrixBlock(4, 1000, 0.0);
-               MatrixBlock comb = empty.append(mb).append(mb);
-
-               assertTrue(sh.encode(comb, ColIndexFactory.create(new int[] 
{1001, 1003, 1005})) != null);
-       }
-
-       @Test
-       public void testSparseEmptyRow() {
-               MatrixBlock mb = new MatrixBlock(4, 6, new double[] {//
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-                       0.0, 1.1, 0.2, 1.2, 0.2, 1.3, //
-               });
-
-               MatrixBlock empty = new MatrixBlock(4, 1000, 0.0);
-               mb = empty.append(mb);
-               MatrixBlock emptyRow = new MatrixBlock(1, 1006, 0.0);
-               mb = mb.append(emptyRow, false);
-
-               assertTrue(sh.encode(mb, ColIndexFactory.create(new int[] {44, 
45, 999})) != null);
-       }
-
-       @Test
-       public void testEmpty() {
-               MatrixBlock empty = new MatrixBlock(4, 1000, 0.0);
-               assertTrue(sh.encode(empty) != null);
-       }
-
-       @Test
-       public void testEmptyOtherColumns() {
-               MatrixBlock empty = new MatrixBlock(4, 1000, 0.0);
-               assertTrue(sh.encode(empty, ColIndexFactory.create(new int[] 
{33, 34, 99})) != null);
-       }
-
-       @Test
-       public void testGenericNonContinuosBlockValid() {
-               MatrixBlock mb = new MatrixBlock(4, 6, //
-                       new DenseBlockFP64Mock(new int[] {4, 9}, new double[] 
{//
-                               0.2, 0.0, 1.1, 0.0, 0.4, 0.0, 1.2, 0.3, 1.3, //
-                               0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.2, 1.3, //
-                               0.0, 0.0, 1.1, 0.0, 0.2, 0.0, 1.2, 0.1, 1.3, //
-                               0.2, 0.0, 1.1, 0.0, 0.4, 0.0, 1.2, 0.1, 1.3, //
-                       }));
-               mb.recomputeNonZeros();
-               assertTrue(sh.encode(mb) != null);
-       }
-
-       @Test(expected = NullPointerException.class)
-       public void testNull() {
-               sh.encode(null, null);
-       }
-
-       private class DenseBlockFP64Mock extends DenseBlockFP64 {
-               private static final long serialVersionUID = 
-3601232958390554672L;
-
-               public DenseBlockFP64Mock(int[] dims, double[] data) {
-                       super(dims, data);
-               }
-
-               @Override
-               public boolean isContiguous() {
-                       return false;
-               }
-
-               @Override
-               public int numBlocks() {
-                       return 2;
-               }
-       }
-
-}
diff --git 
a/src/test/java/org/apache/sysds/test/component/federated/FederatedTestUtils.java
 
b/src/test/java/org/apache/sysds/test/component/federated/FederatedTestUtils.java
index 8bacebdef8..595fa6799f 100644
--- 
a/src/test/java/org/apache/sysds/test/component/federated/FederatedTestUtils.java
+++ 
b/src/test/java/org/apache/sysds/test/component/federated/FederatedTestUtils.java
@@ -26,6 +26,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
 import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest;
@@ -38,6 +40,7 @@ import org.apache.sysds.runtime.instructions.cp.ScalarObject;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
 public class FederatedTestUtils {
+       protected static final Log LOG = 
LogFactory.getLog(FederatedTestUtils.class.getName());
 
        public static long putDouble(double v, InetSocketAddress addr) {
                return putDouble(v, addr, 5000);
@@ -96,6 +99,7 @@ public class FederatedTestUtils {
                        final FederatedRequest frq = new 
FederatedRequest(RequestType.PUT_VAR, null, id, mb);
                        final Future<FederatedResponse> fr = 
FederatedData.executeFederatedOperation(addr, frq);
                        final FederatedResponse r = fr.get(timeout, 
TimeUnit.MILLISECONDS);
+                       LOG.error(r);
                        if(r.isSuccessful())
                                return id;
                        else
diff --git 
a/src/test/java/org/apache/sysds/test/component/misc/IOUtilFunctionsTest.java 
b/src/test/java/org/apache/sysds/test/component/misc/IOUtilFunctionsTest.java
index 92d18af25b..4aa16db209 100644
--- 
a/src/test/java/org/apache/sysds/test/component/misc/IOUtilFunctionsTest.java
+++ 
b/src/test/java/org/apache/sysds/test/component/misc/IOUtilFunctionsTest.java
@@ -80,15 +80,15 @@ public class IOUtilFunctionsTest {
        @Test(expected = StringIndexOutOfBoundsException.class)
        public void splitCustom_3() {
                // try{
-                       String in = "aaaaaa \"\"\" abb";
-                       String[] ret = IOUtilFunctions.splitCSV(in, " ");
-                       assertArrayEquals(new String[] {"aaaaaa", "\"\"\"", 
"abb"}, ret);
+               String in = "aaaaaa \"\"\" abb";
+               String[] ret = IOUtilFunctions.splitCSV(in, " ");
+               assertArrayEquals(new String[] {"aaaaaa", "\"\"\"", "abb"}, 
ret);
 
                // }
                // catch(Exception e){
-                       // e.printStackTrace();
-                       // throw e;
-                       // fail(e.getMessage());
+               // e.printStackTrace();
+               // throw e;
+               // fail(e.getMessage());
                // }
        }
 
@@ -108,9 +108,9 @@ public class IOUtilFunctionsTest {
 
        // @Test
        // public void splitCustom_6() {
-       //      String in = "aaaaaa \"\"\"";
-       //      String[] ret = IOUtilFunctions.splitCSV(in, " ");
-       //      assertArrayEquals(new String[] {"aaaaaa", "\""}, ret);
+       // String in = "aaaaaa \"\"\"";
+       // String[] ret = IOUtilFunctions.splitCSV(in, " ");
+       // assertArrayEquals(new String[] {"aaaaaa", "\""}, ret);
        // }
 
        @Test
@@ -212,10 +212,11 @@ public class IOUtilFunctionsTest {
        }
 
        @Test
-       public void splitCustom_fromRddTest(){
+       public void splitCustom_fromRddTest() {
                String in = "aaa,\"\"\",,,b,,\",\"c,c,c\"";
 
                String[] ret = IOUtilFunctions.splitCSV(in, ",", null);
                assertArrayEquals(new String[] {"aaa", "\"\"\",,,b,,\"", 
"\"c,c,c\""}, ret);
        }
+
 }
diff --git 
a/src/test/java/org/apache/sysds/test/component/misc/NumPartsFilesTest.java 
b/src/test/java/org/apache/sysds/test/component/misc/NumPartsFilesTest.java
new file mode 100644
index 0000000000..dbc9a321c8
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/component/misc/NumPartsFilesTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.misc;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.runtime.io.WriterBinaryBlockParallel;
+import org.junit.Test;
+
+public class NumPartsFilesTest {
+
+    private final Path path;
+    private final FileSystem fs;
+
+    public NumPartsFilesTest() throws Exception {
+        path = new Path("/tmp/test.someEnding");
+        fs = path.getFileSystem(ConfigurationManager.getCachedJobConf());
+    }
+
+    @Test
+    public void numPartsTest1() {
+
+        int p = WriterBinaryBlockParallel.numPartsFiles(fs, 1000, 1000, 1000, 
1000 * 1000);
+        assertEquals(1, p);
+    }
+
+    @Test
+    public void numPartsTest2() {
+        int p = WriterBinaryBlockParallel.numPartsFiles(fs, 1200, 1000, 1000, 
1000 * 1000);
+        assertEquals(2, p);
+    }
+
+    @Test
+    public void numPartsTest3() {
+        int p = WriterBinaryBlockParallel.numPartsFiles(fs, 10000, 1000, 1000, 
10000L * 1000);
+        assertEquals(10, p);
+    }
+
+    @Test
+    public void numPartsTest4() {
+        int p = WriterBinaryBlockParallel.numPartsFiles(fs, 100000, 1000, 
1000, 100000L * 1000);
+        assertEquals(100, p);
+    }
+
+    @Test
+    public void numPartsTest5() {
+        int p = WriterBinaryBlockParallel.numPartsFiles(fs, 1000000, 1000, 
1000, 1000000L * 1000);
+        assertEquals(1000, p);
+    }
+
+    @Test
+    public void numPartsTest6() {
+        int p = WriterBinaryBlockParallel.numPartsFiles(fs, 10000000L, 1000, 
1000, 10000000L * 1000);
+        assertEquals(10000, p);
+    }
+
+    @Test
+    public void numPartsTest7() {
+        int p = WriterBinaryBlockParallel.numPartsFiles(fs, 100000000L, 1000, 
1000, 100000000L * 1000);
+        assertEquals(100000, p);
+    }
+
+    @Test
+    public void numPartsTest8() {
+        int p = WriterBinaryBlockParallel.numPartsFiles(fs, 1000000000L, 1000, 
1000, 1000000000L * 1000);
+        assertEquals(1000000, p);
+    }
+}
diff --git 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
index fb07a58d8a..48c5b69a7a 100644
--- 
a/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
+++ 
b/src/test/java/org/apache/sysds/test/functions/federated/primitives/FederatedRCBindTest.java
@@ -113,8 +113,8 @@ public class FederatedRCBindTest extends AutomatedTestBase {
                int port3 = getRandomAvailablePort();
                int port4 = getRandomAvailablePort();
                Thread t1 = startLocalFedWorkerThread(port1, FED_WORKER_WAIT_S);
-               Thread t2 = startLocalFedWorkerThread(port2);
-               Thread t3 = startLocalFedWorkerThread(port3);
+               Thread t2 = startLocalFedWorkerThread(port2, FED_WORKER_WAIT_S);
+               Thread t3 = startLocalFedWorkerThread(port3, FED_WORKER_WAIT_S);
                Thread t4 = startLocalFedWorkerThread(port4);
 
                // we need the reference file to not be written to hdfs, so we 
get the correct format

Reply via email to