This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 2567d2ff2e [SYSTEMDS-3507] Update all HDFS writers
2567d2ff2e is described below

commit 2567d2ff2e2079cc61573bfa161b93fc97f80d8c
Author: baunsgaard <[email protected]>
AuthorDate: Wed Mar 15 15:38:55 2023 +0100

    [SYSTEMDS-3507] Update all HDFS writers
    
    This commit update all the writers contained in the system to
    use the not deprecated API. The update seems to have some positive
    influence, but the difference is not consistent. Where sometimes i
    observe large differences of 1-2 seconds sometimes not on writing 8GB
    files to disk.
    
    Closes #1788
---
 .../runtime/compress/io/WriterCompressed.java      |   9 +-
 .../parfor/DataPartitionerLocal.java               |  17 +-
 .../parfor/DataPartitionerRemoteSparkReducer.java  |  14 +-
 .../parfor/ResultMergeLocalFile.java               |  35 +--
 .../sysds/runtime/io/FrameWriterBinaryBlock.java   |   6 +-
 .../apache/sysds/runtime/io/IOUtilFunctions.java   |  36 +++
 .../sysds/runtime/io/TensorWriterBinaryBlock.java  |  20 +-
 .../apache/sysds/runtime/io/WriterBinaryBlock.java | 263 +++++++++------------
 src/test/java/org/apache/sysds/test/TestUtils.java | 101 ++++----
 9 files changed, 235 insertions(+), 266 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java 
b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
index 82f2be59e4..0c6deecbd2 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
@@ -123,9 +123,9 @@ public final class WriterCompressed extends MatrixWriter {
 
                // Make Writer (New interface)
                final Writer w = SequenceFile.createWriter(job, 
Writer.file(path), Writer.bufferSize(4096),
-                       Writer.blockSize(4096), 
Writer.keyClass(MatrixIndexes.class), 
Writer.valueClass(CompressedWriteBlock.class),
+                       Writer.keyClass(MatrixIndexes.class), 
Writer.valueClass(CompressedWriteBlock.class),
                        Writer.compression(SequenceFile.CompressionType.NONE), 
// No Compression type on disk
-                        Writer.replication((short) 1));
+                       Writer.replication((short) 1));
 
                final int rlen = src.getNumRows();
                final int clen = src.getNumColumns();
@@ -162,9 +162,8 @@ public final class WriterCompressed extends MatrixWriter {
                        final int sC = bc * blen;
                        final int mC = Math.min(sC + blen, clen) - 1;
                        if(b instanceof CompressedMatrixBlock) {
-                               final CompressedMatrixBlock mc = //mC == clen - 
1 ? (CompressedMatrixBlock) b :
-                                CLALibSlice
-                                       .sliceColumns((CompressedMatrixBlock) 
b, sC, mC); // slice columns!
+                               final CompressedMatrixBlock mc = // mC == clen 
- 1 ? (CompressedMatrixBlock) b :
+                                       
CLALibSlice.sliceColumns((CompressedMatrixBlock) b, sC, mC); // slice columns!
 
                                final List<MatrixBlock> blocks = 
CLALibSlice.sliceBlocks(mc, blen); // Slice compressed blocks
                                for(int br = 0; br * blen < rlen; br++) {
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/DataPartitionerLocal.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/DataPartitionerLocal.java
index 3ec4ecb1dc..0644b4d9d3 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/DataPartitionerLocal.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/DataPartitionerLocal.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputSplit;
@@ -374,17 +375,17 @@ public class DataPartitionerLocal extends DataPartitioner
        // read/write in different formats //
        /////////////////////////////////////
        
-       @SuppressWarnings({ "deprecation"})
+       // @SuppressWarnings({ "deprecation"})
        public void writeBinaryBlockSequenceFileToHDFS( JobConf job, String 
dir, String lpdir, boolean threadsafe ) 
                throws IOException
        {
                long key = getKeyFromFilePath(lpdir);
                Path path =  new Path(dir+"/"+key);
-               SequenceFile.Writer writer = null;
                
-               try {
-                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
-                       writer = new SequenceFile.Writer(fs, job, path, 
MatrixIndexes.class, MatrixBlock.class); //beware ca 50ms
+               //beware ca 50ms
+               final Writer writer = IOUtilFunctions.getSeqWriter(path, job, 
1);
+               
+               try {   
                        String[] fnameBlocks = new File( lpdir ).list();
                        for( String fnameBlock : fnameBlocks  )
                        {
@@ -410,18 +411,14 @@ public class DataPartitionerLocal extends DataPartitioner
                }
        }
        
-       @SuppressWarnings({ "deprecation" })
        public void writeBinaryCellSequenceFileToHDFS( JobConf job, String dir, 
String lpdir ) 
                throws IOException
        {
                long key = getKeyFromFilePath(lpdir);
                Path path =  new Path(dir+"/"+key);
-               SequenceFile.Writer writer = null;
+               final Writer writer = IOUtilFunctions.getSeqWriterCell(path, 
job, 1);
                
                try {
-                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
-                       writer = new SequenceFile.Writer(fs, job, path, 
MatrixIndexes.class, MatrixCell.class); //beware ca 50ms
-                       
                        MatrixIndexes indexes = new MatrixIndexes();
                        MatrixCell cell = new MatrixCell();
                        
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
index a41f9b68c0..2e925aaf6c 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
@@ -23,18 +23,14 @@ import java.io.File;
 import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.Writable;
 import org.apache.spark.api.java.function.VoidFunction;
 import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.runtime.controlprogram.parfor.util.PairWritableBlock;
 import org.apache.sysds.runtime.io.IOUtilFunctions;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysds.runtime.util.HDFSTool;
 
 import scala.Tuple2;
 
@@ -51,7 +47,6 @@ public class DataPartitionerRemoteSparkReducer implements 
VoidFunction<Tuple2<Lo
        }
 
        @Override
-       @SuppressWarnings({ "deprecation" })
        public void call(Tuple2<Long, Iterable<Writable>> arg0)
                throws Exception 
        {
@@ -62,14 +57,9 @@ public class DataPartitionerRemoteSparkReducer implements 
VoidFunction<Tuple2<Lo
                //write entire partition to binary block sequence file
                //create sequence file writer
                Configuration job = new 
Configuration(ConfigurationManager.getCachedJobConf());
-               job.setInt(HDFSTool.DFS_REPLICATION, _replication);
                Path path = new Path(_fnameNew + File.separator + key);
-               
-               SequenceFile.Writer writer = null;
+               final Writer writer = IOUtilFunctions.getSeqWriter(path, job, 
_replication);
                try {
-                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
-                       writer = new SequenceFile.Writer(fs, job, path, 
MatrixIndexes.class, MatrixBlock.class);
-                       
                        //write individual blocks unordered to output
                        while( valueList.hasNext() ) {
                                PairWritableBlock pair = (PairWritableBlock) 
valueList.next();
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java
index 84847bfc4f..1799258e48 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java
@@ -19,10 +19,21 @@
 
 package org.apache.sysds.runtime.controlprogram.parfor;
 
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputSplit;
@@ -50,16 +61,6 @@ import org.apache.sysds.runtime.util.FastStringTokenizer;
 import org.apache.sysds.runtime.util.HDFSTool;
 import org.apache.sysds.runtime.util.LocalFileUtils;
 
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map.Entry;
-
 /**
  * 
  * TODO potential extension: parallel merge (create individual staging files 
concurrently)
@@ -475,23 +476,23 @@ public class ResultMergeLocalFile extends 
ResultMergeMatrix
                }
        }       
 
-       @SuppressWarnings("deprecation")
        private void createBinaryBlockResultFile( String fnameStaging, String 
fnameStagingCompare, String fnameNew, MetaDataFormat metadata, boolean 
withCompare ) 
                throws IOException, DMLRuntimeException
        {
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
                Path path = new Path( fnameNew );       
-               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
                
                DataCharacteristics mc = metadata.getDataCharacteristics();
                long rlen = mc.getRows();
                long clen = mc.getCols();
                int blen = mc.getBlocksize();
                
-               try(SequenceFile.Writer writer = new SequenceFile.Writer(fs, 
job, path, MatrixIndexes.class, MatrixBlock.class))
-               {
+               Writer writer = IOUtilFunctions.getSeqWriter(path, job, 1);
+
+               try {
                        MatrixIndexes indexes = new MatrixIndexes();
-                       for(long brow = 1; brow <= 
(long)Math.ceil(rlen/(double)blen); brow++)
+                       for(long brow = 1; brow <= 
(long)Math.ceil(rlen/(double)blen); brow++){
+
                                for(long bcol = 1; bcol <= 
(long)Math.ceil(clen/(double)blen); bcol++)
                                {
                                        File dir = new 
File(fnameStaging+"/"+brow+"_"+bcol);
@@ -555,6 +556,10 @@ public class ResultMergeLocalFile extends ResultMergeMatrix
                                        indexes.setIndexes(brow, bcol);
                                        writer.append(indexes, mb);
                                }
+                       }
+               }
+               finally {
+                       IOUtilFunctions.closeSilently(writer);
                }
        }
 
diff --git 
a/src/main/java/org/apache/sysds/runtime/io/FrameWriterBinaryBlock.java 
b/src/main/java/org/apache/sysds/runtime/io/FrameWriterBinaryBlock.java
index 43bb85a8db..859cbe028c 100644
--- a/src/main/java/org/apache/sysds/runtime/io/FrameWriterBinaryBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameWriterBinaryBlock.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysds.conf.ConfigurationManager;
@@ -84,10 +83,7 @@ public class FrameWriterBinaryBlock extends FrameWriter {
        protected static void writeBinaryBlockFrameToSequenceFile(Path path, 
JobConf job, FileSystem fs, FrameBlock src,
                int blen, int rl, int ru) throws IOException {
                // 1) create sequence file writer
-               SequenceFile.Writer writer = SequenceFile.createWriter(job, 
Writer.file(path), Writer.bufferSize(4096),
-                       Writer.blockSize(4096), 
Writer.keyClass(LongWritable.class), Writer.valueClass(FrameBlock.class),
-                       Writer.compression(SequenceFile.CompressionType.NONE), 
Writer.replication((short) 1));
-
+               final Writer writer = IOUtilFunctions.getSeqWriterFrame(path, 
job, 1);
                final int rlen = src.getNumRows();
                final int clen = src.getNumColumns();
                try {
diff --git a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java 
b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
index b36f4497d1..bf1e545098 100644
--- a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
@@ -56,6 +58,12 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.data.TensorBlock;
+import org.apache.sysds.runtime.data.TensorIndexes;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixCell;
+import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysds.runtime.transform.TfUtils;
 import org.apache.sysds.runtime.util.LocalFileUtils;
 import org.apache.sysds.runtime.util.UtilFunctions;
@@ -729,4 +737,32 @@ public class IOUtilFunctions
                        return nrows;
                }
        }
+
+       public static Writer getSeqWriter(Path path, Configuration job, int 
replication) throws IOException {
+               return SequenceFile.createWriter(job, Writer.file(path), 
Writer.bufferSize(4096),
+                       Writer.replication((short) (replication > 0 ? 
replication : 1)),
+                       Writer.compression(SequenceFile.CompressionType.NONE), 
Writer.keyClass(MatrixIndexes.class),
+                       Writer.valueClass(MatrixBlock.class));
+       }
+
+       public static Writer getSeqWriterFrame(Path path, Configuration job, 
int replication) throws IOException {
+               return SequenceFile.createWriter(job, Writer.file(path), 
Writer.bufferSize(4096),
+                       Writer.keyClass(LongWritable.class), 
Writer.valueClass(FrameBlock.class),
+                       Writer.compression(SequenceFile.CompressionType.NONE),
+                       Writer.replication((short) (replication > 0 ? 
replication : 1)));
+       }
+
+       public static Writer getSeqWriterTensor(Path path, Configuration job, 
int replication) throws IOException {
+               return SequenceFile.createWriter(job, Writer.file(path), 
Writer.bufferSize(4096),
+               Writer.replication((short) (replication > 0 ? replication : 1)),
+               Writer.compression(SequenceFile.CompressionType.NONE), 
Writer.keyClass(TensorIndexes.class),
+               Writer.valueClass(TensorBlock.class));
+       }
+
+       public static Writer getSeqWriterCell(Path path, Configuration job, int 
replication) throws IOException {
+               return SequenceFile.createWriter(job, Writer.file(path), 
Writer.bufferSize(4096),
+                       Writer.replication((short) (replication > 0 ? 
replication : 1)),
+                       Writer.compression(SequenceFile.CompressionType.NONE), 
Writer.keyClass(MatrixIndexes.class),
+                       Writer.valueClass(MatrixCell.class));
+       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/io/TensorWriterBinaryBlock.java 
b/src/main/java/org/apache/sysds/runtime/io/TensorWriterBinaryBlock.java
index 7c74dfc146..fbb316e816 100644
--- a/src/main/java/org/apache/sysds/runtime/io/TensorWriterBinaryBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/io/TensorWriterBinaryBlock.java
@@ -19,9 +19,12 @@
 
 package org.apache.sysds.runtime.io;
 
+import java.io.IOException;
+import java.util.Arrays;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.conf.ConfigurationManager;
@@ -29,9 +32,6 @@ import org.apache.sysds.runtime.data.TensorBlock;
 import org.apache.sysds.runtime.data.TensorIndexes;
 import org.apache.sysds.runtime.util.HDFSTool;
 
-import java.io.IOException;
-import java.util.Arrays;
-
 public class TensorWriterBinaryBlock extends TensorWriter {
        //TODO replication
 
@@ -60,12 +60,12 @@ public class TensorWriterBinaryBlock extends TensorWriter {
                writeBinaryBlockTensorToSequenceFile(path, job, fs, src, blen, 
0, src.getNumRows());
        }
 
-       @SuppressWarnings("deprecation")
        protected static void writeBinaryBlockTensorToSequenceFile(Path path, 
JobConf job, FileSystem fs, TensorBlock src,
-                       int blen, int rl, int ru)
-                       throws IOException
-       {
-               try(SequenceFile.Writer writer = new SequenceFile.Writer(fs, 
job, path, TensorIndexes.class, TensorBlock.class)) {
+               int blen, int rl, int ru) throws IOException {
+               
+               final Writer writer = IOUtilFunctions.getSeqWriterTensor(path,  
job,1);
+
+               try{
                        int[] dims = src.getDims();
                        // bound check
                        for (int i = 0; i < dims.length; i++) {
@@ -107,6 +107,8 @@ public class TensorWriterBinaryBlock extends TensorWriter {
 
                                writer.append(indx, block);
                        }
+               }finally{
+                       IOUtilFunctions.closeSilently(writer);
                }
        }
 }
diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java 
b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java
index 7cd8b9b599..718584acfe 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.runtime.DMLRuntimeException;
@@ -32,35 +32,33 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysds.runtime.util.HDFSTool;
 
-public class WriterBinaryBlock extends MatrixWriter
-{
+public class WriterBinaryBlock extends MatrixWriter {
        protected int _replication = -1;
-       
-       public WriterBinaryBlock( int replication ) {
-               _replication  = replication;
+
+       public WriterBinaryBlock(int replication) {
+               _replication = replication;
        }
 
        @Override
-       public final void writeMatrixToHDFS(MatrixBlock src, String fname, long 
rlen, long clen, int blen, long nnz, boolean diag) 
-               throws IOException, DMLRuntimeException 
-       {
-               //prepare file access
+       public final void writeMatrixToHDFS(MatrixBlock src, String fname, long 
rlen, long clen, int blen, long nnz,
+               boolean diag) throws IOException, DMLRuntimeException {
+               // prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               Path path = new Path( fname );
+               Path path = new Path(fname);
                FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
-               
-               //if the file already exists on HDFS, remove it.
-               HDFSTool.deleteFileIfExistOnHDFS( fname );
-
-               //set up preferred custom serialization framework for binary 
block format
-               if( HDFSTool.USE_BINARYBLOCK_SERIALIZATION )
-                       HDFSTool.addBinaryBlockSerializationFramework( job );
-               
+
+               // if the file already exists on HDFS, remove it.
+               HDFSTool.deleteFileIfExistOnHDFS(fname);
+
+               // set up preferred custom serialization framework for binary 
block format
+               if(HDFSTool.USE_BINARYBLOCK_SERIALIZATION)
+                       HDFSTool.addBinaryBlockSerializationFramework(job);
+
                if(src instanceof CompressedMatrixBlock)
                        src = CompressedMatrixBlock.getUncompressed(src, 
"Decompressing for binary write");
 
-               //core write sequential/parallel
-               if( diag )
+               // core write sequential/parallel
+               if(diag)
                        writeDiagBinaryBlockMatrixToHDFS(path, job, fs, src, 
rlen, clen, blen);
                else
                        writeBinaryBlockMatrixToHDFS(path, job, fs, src, rlen, 
clen, blen);
@@ -69,107 +67,82 @@ public class WriterBinaryBlock extends MatrixWriter
        }
 
        @Override
-       @SuppressWarnings("deprecation")
-       public final void writeEmptyMatrixToHDFS(String fname, long rlen, long 
clen, int blen) 
-               throws IOException, DMLRuntimeException 
-       {
+       public final void writeEmptyMatrixToHDFS(String fname, long rlen, long 
clen, int blen)
+               throws IOException, DMLRuntimeException {
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               Path path = new Path( fname );
+               Path path = new Path(fname);
                FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
-               SequenceFile.Writer writer = null;
+               final Writer writer = IOUtilFunctions.getSeqWriter(path, job, 
_replication);
                try {
-                       writer = new SequenceFile.Writer(fs, job, path,
-                               MatrixIndexes.class, MatrixBlock.class);
                        MatrixIndexes index = new MatrixIndexes(1, 1);
-                       MatrixBlock block = new MatrixBlock(
-                               (int)Math.max(Math.min(rlen, blen),1),
-                               (int)Math.max(Math.min(clen, blen),1), true);
+                       MatrixBlock block = new MatrixBlock((int) 
Math.max(Math.min(rlen, blen), 1),
+                               (int) Math.max(Math.min(clen, blen), 1), true);
                        writer.append(index, block);
                }
                finally {
                        IOUtilFunctions.closeSilently(writer);
                }
-               
+
                IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
        }
 
-       protected void writeBinaryBlockMatrixToHDFS( Path path, JobConf job, 
FileSystem fs, MatrixBlock src, long rlen, long clen, int blen )
-               throws IOException, DMLRuntimeException
-       {
-               //sequential write 
-               writeBinaryBlockMatrixToSequenceFile(path, job, fs, src, blen, 
0, (int)rlen);
+       protected void writeBinaryBlockMatrixToHDFS(Path path, JobConf job, 
FileSystem fs, MatrixBlock src, long rlen,
+               long clen, int blen) throws IOException, DMLRuntimeException {
+               // sequential write
+               writeBinaryBlockMatrixToSequenceFile(path, job, fs, src, blen, 
0, (int) rlen);
        }
 
-       @SuppressWarnings("deprecation")
-       protected final void writeBinaryBlockMatrixToSequenceFile( Path path, 
JobConf job, FileSystem fs, MatrixBlock src, int blen, int rl, int ru ) 
-               throws IOException
-       {
+       protected final void writeBinaryBlockMatrixToSequenceFile(Path path, 
JobConf job, FileSystem fs, MatrixBlock src,
+               int blen, int rl, int ru) throws IOException {
                boolean sparse = src.isInSparseFormat();
                int rlen = src.getNumRows();
                int clen = src.getNumColumns();
-               
-               // 1) create sequence file writer, with right replication 
factor 
-               // (config via MRConfigurationNames.DFS_REPLICATION not 
possible since sequence file internally calls fs.getDefaultReplication())
-               SequenceFile.Writer writer = null;
-               if( _replication > 0 ) //if replication specified (otherwise 
default)
-               {
-                       //copy of SequenceFile.Writer(fs, job, path, 
MatrixIndexes.class, MatrixBlock.class), except for replication
-                       writer = new SequenceFile.Writer(fs, job, path, 
MatrixIndexes.class, MatrixBlock.class,
-                               job.getInt(HDFSTool.IO_FILE_BUFFER_SIZE, 4096),
-                               (short)_replication, fs.getDefaultBlockSize(), 
null, new SequenceFile.Metadata());      
-               }
-               else    
-               {
-                       writer = new SequenceFile.Writer(fs, job, path, 
MatrixIndexes.class, MatrixBlock.class);
-               }
-               
-               try
-               {
-                       // 2) bound check for src block
-                       if( src.getNumRows() > rlen || src.getNumColumns() > 
clen )
-                       {
-                               throw new IOException("Matrix block 
[1:"+src.getNumRows()+",1:"+src.getNumColumns()+"] " +
-                                                             "out of overall 
matrix range [1:"+rlen+",1:"+clen+"].");
+
+               final Writer writer = IOUtilFunctions.getSeqWriter(path, job, 
_replication);
+
+               try { // 2) bound check for src block
+                       if(src.getNumRows() > rlen || src.getNumColumns() > 
clen) {
+                               throw new IOException("Matrix block [1:" + 
src.getNumRows() + ",1:" + src.getNumColumns() + "] "
+                                       + "out of overall matrix range [1:" + 
rlen + ",1:" + clen + "].");
                        }
-               
-                       //3) reblock and write
+
+                       // 3) reblock and write
                        MatrixIndexes indexes = new MatrixIndexes();
 
-                       if( rlen <= blen && clen <= blen && rl == 0 ) //opt for 
single block
-                       {
-                               //directly write single block
+                       if(rlen <= blen && clen <= blen && rl == 0) { // opt 
for single block
+                               // directly write single block
                                indexes.setIndexes(1, 1);
                                writer.append(indexes, src);
                        }
-                       else //general case
-                       {
-                               //initialize blocks for reuse (at most 4 
different blocks required)
+                       else {
+                               // general case
+                               // initialize blocks for reuse (at most 4 
different blocks required)
                                MatrixBlock[] blocks = 
createMatrixBlocksForReuse(rlen, clen, blen, sparse, src.getNonZeros());
-                               
-                               //create and write subblocks of matrix
-                               for(int blockRow = rl/blen; blockRow < 
(int)Math.ceil(ru/(double)blen); blockRow++)
-                                       for(int blockCol = 0; blockCol < 
(int)Math.ceil(src.getNumColumns()/(double)blen); blockCol++)
-                                       {
-                                               int maxRow = (blockRow*blen + 
blen < src.getNumRows()) ? blen : src.getNumRows() - blockRow*blen;
-                                               int maxCol = (blockCol*blen + 
blen < src.getNumColumns()) ? blen : src.getNumColumns() - blockCol*blen;
-                               
-                                               int row_offset = blockRow*blen;
-                                               int col_offset = blockCol*blen;
-                                               
-                                               //get reuse matrix block
+
+                               // create and write sub-blocks of matrix
+                               for(int blockRow = rl / blen; blockRow < (int) 
Math.ceil(ru / (double) blen); blockRow++) {
+                                       for(int blockCol = 0; blockCol < (int) 
Math.ceil(src.getNumColumns() / (double) blen); blockCol++) {
+                                               int maxRow = (blockRow * blen + 
blen < src.getNumRows()) ? blen : src.getNumRows() - blockRow * blen;
+                                               int maxCol = (blockCol * blen + 
blen < src.getNumColumns()) ? blen : src.getNumColumns() -
+                                                       blockCol * blen;
+
+                                               int row_offset = blockRow * 
blen;
+                                               int col_offset = blockCol * 
blen;
+
+                                               // get reuse matrix block
                                                MatrixBlock block = 
getMatrixBlockForReuse(blocks, maxRow, maxCol, blen);
-       
-                                               //copy submatrix to block
-                                               src.slice( row_offset, 
row_offset+maxRow-1, 
-                                                                            
col_offset, col_offset+maxCol-1, block );
-                                               
-                                               //append block to sequence file
-                                               indexes.setIndexes(blockRow+1, 
blockCol+1);
+
+                                               // copy submatrix to block
+                                               src.slice(row_offset, 
row_offset + maxRow - 1, col_offset, col_offset + maxCol - 1, block);
+
+                                               // append block to sequence file
+                                               indexes.setIndexes(blockRow + 
1, blockCol + 1);
                                                writer.append(indexes, block);
-                                                       
-                                               //reset block for later reuse
+
+                                               // reset block for later reuse
                                                block.reset();
                                        }
+                               }
                        }
                }
                finally {
@@ -177,86 +150,66 @@ public class WriterBinaryBlock extends MatrixWriter
                }
        }
 
-       @SuppressWarnings("deprecation")
-       protected final void writeDiagBinaryBlockMatrixToHDFS( Path path, 
JobConf job, FileSystem fs, MatrixBlock src, long rlen, long clen, int blen ) 
-               throws IOException, DMLRuntimeException
-       {
+       protected final void writeDiagBinaryBlockMatrixToHDFS(Path path, 
JobConf job, FileSystem fs, MatrixBlock src,
+               long rlen, long clen, int blen) throws IOException, 
DMLRuntimeException {
                boolean sparse = src.isInSparseFormat();
-               
-               // 1) create sequence file writer, with right replication 
factor 
-               // (config via MRConfigurationNames.DFS_REPLICATION not 
possible since sequence file internally calls fs.getDefaultReplication())
-               SequenceFile.Writer writer = null;
-               if( _replication > 0 ) //if replication specified (otherwise 
default)
-               {
-                       //copy of SequenceFile.Writer(fs, job, path, 
MatrixIndexes.class, MatrixBlock.class), except for replication
-                       writer = new SequenceFile.Writer(fs, job, path, 
MatrixIndexes.class, MatrixBlock.class,
-                               job.getInt(HDFSTool.IO_FILE_BUFFER_SIZE, 4096),
-                               (short)_replication, fs.getDefaultBlockSize(), 
null, new SequenceFile.Metadata());
-               }
-               else    
-               {
-                       writer = new SequenceFile.Writer(fs, job, path, 
MatrixIndexes.class, MatrixBlock.class);
-               }
-               
-               try
-               {
+
+               final Writer writer = IOUtilFunctions.getSeqWriter(path, job, 
_replication);
+
+               try {
                        // 2) bound check for src block
-                       if( src.getNumRows() > rlen || src.getNumColumns() > 
clen )
-                       {
-                               throw new IOException("Matrix block 
[1:"+src.getNumRows()+",1:"+src.getNumColumns()+"] " +
-                                                             "out of overall 
matrix range [1:"+rlen+",1:"+clen+"].");
+                       if(src.getNumRows() > rlen || src.getNumColumns() > 
clen) {
+                               throw new IOException("Matrix block [1:" + 
src.getNumRows() + ",1:" + src.getNumColumns() + "] "
+                                       + "out of overall matrix range [1:" + 
rlen + ",1:" + clen + "].");
                        }
-               
-                       //3) reblock and write
+
+                       // 3) reblock and write
                        MatrixIndexes indexes = new MatrixIndexes();
 
-                       if( rlen <= blen && clen <= blen ) //opt for single 
block
-                       {
-                               //directly write single block
+                       if(rlen <= blen && clen <= blen) { // opt for single 
block
+                               // directly write single block
                                indexes.setIndexes(1, 1);
                                writer.append(indexes, src);
                        }
-                       else //general case
-                       {
-                               //initialize blocks for reuse (at most 4 
different blocks required)
+                       else { // general case
+                               // initialize blocks for reuse (at most 4 
different blocks required)
                                MatrixBlock[] blocks = 
createMatrixBlocksForReuse(rlen, clen, blen, sparse, src.getNonZeros());
                                MatrixBlock emptyBlock = new MatrixBlock();
-                                       
-                               //create and write subblocks of matrix
-                               for(int blockRow = 0; blockRow < 
(int)Math.ceil(src.getNumRows()/(double)blen); blockRow++)
-                                       for(int blockCol = 0; blockCol < 
(int)Math.ceil(src.getNumColumns()/(double)blen); blockCol++)
-                                       {
-                                               int maxRow = (blockRow*blen + 
blen < src.getNumRows()) ? blen : src.getNumRows() - blockRow*blen;
-                                               int maxCol = (blockCol*blen + 
blen < src.getNumColumns()) ? blen : src.getNumColumns() - blockCol*blen;
+
+                               // create and write subblocks of matrix
+                               for(int blockRow = 0; blockRow < (int) 
Math.ceil(src.getNumRows() / (double) blen); blockRow++) {
+
+                                       for(int blockCol = 0; blockCol < (int) 
Math.ceil(src.getNumColumns() / (double) blen); blockCol++) {
+                                               int maxRow = (blockRow * blen + 
blen < src.getNumRows()) ? blen : src.getNumRows() - blockRow * blen;
+                                               int maxCol = (blockCol * blen + 
blen < src.getNumColumns()) ? blen : src.getNumColumns() -
+                                                       blockCol * blen;
                                                MatrixBlock block = null;
-                                               
-                                               if( blockRow==blockCol ) 
//block on diagonal
-                                               {       
-                                                       int row_offset = 
blockRow*blen;
-                                                       int col_offset = 
blockCol*blen;
-                                                       
-                                                       //get reuse matrix block
+
+                                               if(blockRow == blockCol) { // 
block on diagonal
+                                                       int row_offset = 
blockRow * blen;
+                                                       int col_offset = 
blockCol * blen;
+
+                                                       // get reuse matrix 
block
                                                        block = 
getMatrixBlockForReuse(blocks, maxRow, maxCol, blen);
-               
-                                                       //copy submatrix to 
block
-                                                       src.slice( row_offset, 
row_offset+maxRow-1, 
-                                                               col_offset, 
col_offset+maxCol-1, block );
+
+                                                       // copy submatrix to 
block
+                                                       src.slice(row_offset, 
row_offset + maxRow - 1, col_offset, col_offset + maxCol - 1, block);
                                                }
-                                               else //empty block (not on 
diagonal)
-                                               {
+                                               else { // empty block (not on 
diagonal)
                                                        block = emptyBlock;
                                                        block.reset(maxRow, 
maxCol);
                                                }
-                                               
-                                               //append block to sequence file
-                                               indexes.setIndexes(blockRow+1, 
blockCol+1);
+
+                                               // append block to sequence file
+                                               indexes.setIndexes(blockRow + 
1, blockCol + 1);
                                                writer.append(indexes, block);
-                                               
-                                               //reset block for later reuse
-                                               if( blockRow!=blockCol )
+
+                                               // reset block for later reuse
+                                               if(blockRow != blockCol)
                                                        block.reset();
                                        }
-                       }                               
+                               }
+                       }
                }
                finally {
                        IOUtilFunctions.closeSilently(writer);
diff --git a/src/test/java/org/apache/sysds/test/TestUtils.java 
b/src/test/java/org/apache/sysds/test/TestUtils.java
index bade9ddbe8..554e1f5a05 100644
--- a/src/test/java/org/apache/sysds/test/TestUtils.java
+++ b/src/test/java/org/apache/sysds/test/TestUtils.java
@@ -59,7 +59,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.common.Types.ValueType;
@@ -2647,49 +2646,52 @@ public class TestUtils
        }
 
 
-       /* Write a scalar value to a file */
+       /**
+        * Write scalar to file
+        * 
+        * @param file  File to write to
+        * @param value Value to write
+        */
        public static void writeTestScalar(String file, double value) {
                try {
                        DataOutputStream out = new DataOutputStream(new 
FileOutputStream(file));
-                       try( PrintWriter pw = new PrintWriter(out) ) {
+                       try(PrintWriter pw = new PrintWriter(out)) {
                                pw.println(value);
                        }
-               } catch (IOException e) {
+               }
+               catch(IOException e) {
                        fail("unable to write test scalar (" + file + "): " + 
e.getMessage());
                }
        }
 
+       /**
+        * Write scalar to file
+        * 
+        * @param file  File to write to
+        * @param value Value to write
+        */
        public static void writeTestScalar(String file, long value) {
                try {
                        DataOutputStream out = new DataOutputStream(new 
FileOutputStream(file));
-                       try( PrintWriter pw = new PrintWriter(out) ) {
+                       try(PrintWriter pw = new PrintWriter(out)) {
                                pw.println(value);
                        }
-               } catch (IOException e) {
+               }
+               catch(IOException e) {
                        fail("unable to write test scalar (" + file + "): " + 
e.getMessage());
                }
        }
 
        /**
-        * <p>
         * Writes a matrix to a file using the binary cells format.
-        * </p>
-        *
-        * @param file
-        *            file name
-        * @param matrix
-        *            matrix
+        * 
+        * @param file   file name
+        * @param matrix matrix
         */
        public static void writeBinaryTestMatrixCells(String file, double[][] 
matrix) {
                try {
-                       SequenceFile.Writer writer = null;
+                       final Writer writer = 
IOUtilFunctions.getSeqWriterCell(new Path(file), conf, 1);
                        try {
-                               Path path = new Path(file);
-                               Writer.Option filePath = Writer.file(path);
-                               Writer.Option keyClass = 
Writer.keyClass(MatrixIndexes.class);
-                               Writer.Option valueClass = 
Writer.valueClass(MatrixBlock.class);
-                               Writer.Option compression = 
Writer.compression(SequenceFile.CompressionType.NONE);
-                               writer = SequenceFile.createWriter(conf, 
filePath, keyClass, valueClass, compression);
                                MatrixIndexes index = new MatrixIndexes();
                                MatrixCell value = new MatrixCell();
                                for (int i = 0; i < matrix.length; i++) {
@@ -2712,56 +2714,45 @@ public class TestUtils
        }
 
        /**
-        * <p>
-        * Writes a matrix to a file using the binary blocks format.
-        * </p>
+        *  Writes a matrix to a file using the binary blocks format. 
         *
-        * @param file
-        *            file name
-        * @param matrix
-        *            matrix
-        * @param rowsInBlock
-        *            rows in block
-        * @param colsInBlock
-        *            columns in block
-        * @param sparseFormat
-        *            sparse format
+        * @param file         file name
+        * @param matrix       matrix
+        * @param rowsInBlock  rows in block
+        * @param colsInBlock  columns in block
+        * @param sparseFormat sparse format
         */
        public static void writeBinaryTestMatrixBlocks(String file, double[][] 
matrix, int rowsInBlock, int colsInBlock,
                        boolean sparseFormat) {
-               SequenceFile.Writer writer = null;
 
                try {
-                       Path path = new Path(file);
-                       Writer.Option filePath = Writer.file(path);
-                       Writer.Option keyClass = 
Writer.keyClass(MatrixIndexes.class);
-                       Writer.Option valueClass = 
Writer.valueClass(MatrixBlock.class);
-                       Writer.Option compression = 
Writer.compression(SequenceFile.CompressionType.NONE);
-                       writer = SequenceFile.createWriter(conf, filePath, 
keyClass, valueClass, compression);
-                       MatrixIndexes index = new MatrixIndexes();
-                       MatrixBlock value = new MatrixBlock();
-                       for (int i = 0; i < matrix.length; i += rowsInBlock) {
-                               int rows = Math.min(rowsInBlock, (matrix.length 
- i));
-                               for (int j = 0; j < matrix[i].length; j += 
colsInBlock) {
-                                       int cols = Math.min(colsInBlock, 
(matrix[i].length - j));
-                                       index.setIndexes(((i / rowsInBlock) + 
1), ((j / colsInBlock) + 1));
-                                       value.reset(rows, cols, sparseFormat);
-                                       for (int k = 0; k < rows; k++) {
-                                               for (int l = 0; l < cols; l++) {
-                                                       value.setValue(k, l, 
matrix[i + k][j + l]);
+                       final Writer writer = IOUtilFunctions.getSeqWriter(new 
Path(file), conf, 1);
+                       try{
+                               MatrixIndexes index = new MatrixIndexes();
+                               MatrixBlock value = new MatrixBlock();
+                               for (int i = 0; i < matrix.length; i += 
rowsInBlock) {
+                                       int rows = Math.min(rowsInBlock, 
(matrix.length - i));
+                                       for (int j = 0; j < matrix[i].length; j 
+= colsInBlock) {
+                                               int cols = 
Math.min(colsInBlock, (matrix[i].length - j));
+                                               index.setIndexes(((i / 
rowsInBlock) + 1), ((j / colsInBlock) + 1));
+                                               value.reset(rows, cols, 
sparseFormat);
+                                               for (int k = 0; k < rows; k++) {
+                                                       for (int l = 0; l < 
cols; l++) {
+                                                               
value.setValue(k, l, matrix[i + k][j + l]);
+                                                       }
                                                }
+                                               writer.append(index, value);
                                        }
-                                       writer.append(index, value);
                                }
                        }
+                       finally {
+                               IOUtilFunctions.closeSilently(writer);
+                       }
                }
                catch (IOException e) {
                        e.printStackTrace();
                        fail("unable to write test matrix: " + e.getMessage());
                }
-               finally {
-                       IOUtilFunctions.closeSilently(writer);
-               }
        }
 
        /**


Reply via email to