This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 2567d2ff2e [SYSTEMDS-3507] Update all HDFS writers
2567d2ff2e is described below
commit 2567d2ff2e2079cc61573bfa161b93fc97f80d8c
Author: baunsgaard <[email protected]>
AuthorDate: Wed Mar 15 15:38:55 2023 +0100
[SYSTEMDS-3507] Update all HDFS writers
This commit update all the writers contained in the system to
use the not deprecated API. The update seems to have some positive
influence, but the difference is not consistent. Where sometimes i
observe large differences of 1-2 seconds sometimes not on writing 8GB
files to disk.
Closes #1788
---
.../runtime/compress/io/WriterCompressed.java | 9 +-
.../parfor/DataPartitionerLocal.java | 17 +-
.../parfor/DataPartitionerRemoteSparkReducer.java | 14 +-
.../parfor/ResultMergeLocalFile.java | 35 +--
.../sysds/runtime/io/FrameWriterBinaryBlock.java | 6 +-
.../apache/sysds/runtime/io/IOUtilFunctions.java | 36 +++
.../sysds/runtime/io/TensorWriterBinaryBlock.java | 20 +-
.../apache/sysds/runtime/io/WriterBinaryBlock.java | 263 +++++++++------------
src/test/java/org/apache/sysds/test/TestUtils.java | 101 ++++----
9 files changed, 235 insertions(+), 266 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
index 82f2be59e4..0c6deecbd2 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/io/WriterCompressed.java
@@ -123,9 +123,9 @@ public final class WriterCompressed extends MatrixWriter {
// Make Writer (New interface)
final Writer w = SequenceFile.createWriter(job,
Writer.file(path), Writer.bufferSize(4096),
- Writer.blockSize(4096),
Writer.keyClass(MatrixIndexes.class),
Writer.valueClass(CompressedWriteBlock.class),
+ Writer.keyClass(MatrixIndexes.class),
Writer.valueClass(CompressedWriteBlock.class),
Writer.compression(SequenceFile.CompressionType.NONE),
// No Compression type on disk
- Writer.replication((short) 1));
+ Writer.replication((short) 1));
final int rlen = src.getNumRows();
final int clen = src.getNumColumns();
@@ -162,9 +162,8 @@ public final class WriterCompressed extends MatrixWriter {
final int sC = bc * blen;
final int mC = Math.min(sC + blen, clen) - 1;
if(b instanceof CompressedMatrixBlock) {
- final CompressedMatrixBlock mc = //mC == clen -
1 ? (CompressedMatrixBlock) b :
- CLALibSlice
- .sliceColumns((CompressedMatrixBlock)
b, sC, mC); // slice columns!
+ final CompressedMatrixBlock mc = // mC == clen
- 1 ? (CompressedMatrixBlock) b :
+
CLALibSlice.sliceColumns((CompressedMatrixBlock) b, sC, mC); // slice columns!
final List<MatrixBlock> blocks =
CLALibSlice.sliceBlocks(mc, blen); // Slice compressed blocks
for(int br = 0; br * blen < rlen; br++) {
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/DataPartitionerLocal.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/DataPartitionerLocal.java
index 3ec4ecb1dc..0644b4d9d3 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/DataPartitionerLocal.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/DataPartitionerLocal.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
@@ -374,17 +375,17 @@ public class DataPartitionerLocal extends DataPartitioner
// read/write in different formats //
/////////////////////////////////////
- @SuppressWarnings({ "deprecation"})
+ // @SuppressWarnings({ "deprecation"})
public void writeBinaryBlockSequenceFileToHDFS( JobConf job, String
dir, String lpdir, boolean threadsafe )
throws IOException
{
long key = getKeyFromFilePath(lpdir);
Path path = new Path(dir+"/"+key);
- SequenceFile.Writer writer = null;
- try {
- FileSystem fs = IOUtilFunctions.getFileSystem(path,
job);
- writer = new SequenceFile.Writer(fs, job, path,
MatrixIndexes.class, MatrixBlock.class); //beware ca 50ms
+ //beware ca 50ms
+ final Writer writer = IOUtilFunctions.getSeqWriter(path, job,
1);
+
+ try {
String[] fnameBlocks = new File( lpdir ).list();
for( String fnameBlock : fnameBlocks )
{
@@ -410,18 +411,14 @@ public class DataPartitionerLocal extends DataPartitioner
}
}
- @SuppressWarnings({ "deprecation" })
public void writeBinaryCellSequenceFileToHDFS( JobConf job, String dir,
String lpdir )
throws IOException
{
long key = getKeyFromFilePath(lpdir);
Path path = new Path(dir+"/"+key);
- SequenceFile.Writer writer = null;
+ final Writer writer = IOUtilFunctions.getSeqWriterCell(path,
job, 1);
try {
- FileSystem fs = IOUtilFunctions.getFileSystem(path,
job);
- writer = new SequenceFile.Writer(fs, job, path,
MatrixIndexes.class, MatrixCell.class); //beware ca 50ms
-
MatrixIndexes indexes = new MatrixIndexes();
MatrixCell cell = new MatrixCell();
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
index a41f9b68c0..2e925aaf6c 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
@@ -23,18 +23,14 @@ import java.io.File;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Writable;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.controlprogram.parfor.util.PairWritableBlock;
import org.apache.sysds.runtime.io.IOUtilFunctions;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysds.runtime.util.HDFSTool;
import scala.Tuple2;
@@ -51,7 +47,6 @@ public class DataPartitionerRemoteSparkReducer implements
VoidFunction<Tuple2<Lo
}
@Override
- @SuppressWarnings({ "deprecation" })
public void call(Tuple2<Long, Iterable<Writable>> arg0)
throws Exception
{
@@ -62,14 +57,9 @@ public class DataPartitionerRemoteSparkReducer implements
VoidFunction<Tuple2<Lo
//write entire partition to binary block sequence file
//create sequence file writer
Configuration job = new
Configuration(ConfigurationManager.getCachedJobConf());
- job.setInt(HDFSTool.DFS_REPLICATION, _replication);
Path path = new Path(_fnameNew + File.separator + key);
-
- SequenceFile.Writer writer = null;
+ final Writer writer = IOUtilFunctions.getSeqWriter(path, job,
_replication);
try {
- FileSystem fs = IOUtilFunctions.getFileSystem(path,
job);
- writer = new SequenceFile.Writer(fs, job, path,
MatrixIndexes.class, MatrixBlock.class);
-
//write individual blocks unordered to output
while( valueList.hasNext() ) {
PairWritableBlock pair = (PairWritableBlock)
valueList.next();
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java
index 84847bfc4f..1799258e48 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java
@@ -19,10 +19,21 @@
package org.apache.sysds.runtime.controlprogram.parfor;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
@@ -50,16 +61,6 @@ import org.apache.sysds.runtime.util.FastStringTokenizer;
import org.apache.sysds.runtime.util.HDFSTool;
import org.apache.sysds.runtime.util.LocalFileUtils;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map.Entry;
-
/**
*
* TODO potential extension: parallel merge (create individual staging files
concurrently)
@@ -475,23 +476,23 @@ public class ResultMergeLocalFile extends
ResultMergeMatrix
}
}
- @SuppressWarnings("deprecation")
private void createBinaryBlockResultFile( String fnameStaging, String
fnameStagingCompare, String fnameNew, MetaDataFormat metadata, boolean
withCompare )
throws IOException, DMLRuntimeException
{
JobConf job = new
JobConf(ConfigurationManager.getCachedJobConf());
Path path = new Path( fnameNew );
- FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
DataCharacteristics mc = metadata.getDataCharacteristics();
long rlen = mc.getRows();
long clen = mc.getCols();
int blen = mc.getBlocksize();
- try(SequenceFile.Writer writer = new SequenceFile.Writer(fs,
job, path, MatrixIndexes.class, MatrixBlock.class))
- {
+ Writer writer = IOUtilFunctions.getSeqWriter(path, job, 1);
+
+ try {
MatrixIndexes indexes = new MatrixIndexes();
- for(long brow = 1; brow <=
(long)Math.ceil(rlen/(double)blen); brow++)
+ for(long brow = 1; brow <=
(long)Math.ceil(rlen/(double)blen); brow++){
+
for(long bcol = 1; bcol <=
(long)Math.ceil(clen/(double)blen); bcol++)
{
File dir = new
File(fnameStaging+"/"+brow+"_"+bcol);
@@ -555,6 +556,10 @@ public class ResultMergeLocalFile extends ResultMergeMatrix
indexes.setIndexes(brow, bcol);
writer.append(indexes, mb);
}
+ }
+ }
+ finally {
+ IOUtilFunctions.closeSilently(writer);
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/io/FrameWriterBinaryBlock.java
b/src/main/java/org/apache/sysds/runtime/io/FrameWriterBinaryBlock.java
index 43bb85a8db..859cbe028c 100644
--- a/src/main/java/org/apache/sysds/runtime/io/FrameWriterBinaryBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameWriterBinaryBlock.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.conf.ConfigurationManager;
@@ -84,10 +83,7 @@ public class FrameWriterBinaryBlock extends FrameWriter {
protected static void writeBinaryBlockFrameToSequenceFile(Path path,
JobConf job, FileSystem fs, FrameBlock src,
int blen, int rl, int ru) throws IOException {
// 1) create sequence file writer
- SequenceFile.Writer writer = SequenceFile.createWriter(job,
Writer.file(path), Writer.bufferSize(4096),
- Writer.blockSize(4096),
Writer.keyClass(LongWritable.class), Writer.valueClass(FrameBlock.class),
- Writer.compression(SequenceFile.CompressionType.NONE),
Writer.replication((short) 1));
-
+ final Writer writer = IOUtilFunctions.getSeqWriterFrame(path,
job, 1);
final int rlen = src.getNumRows();
final int clen = src.getNumColumns();
try {
diff --git a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
index b36f4497d1..bf1e545098 100644
--- a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
@@ -56,6 +58,12 @@ import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.data.TensorBlock;
+import org.apache.sysds.runtime.data.TensorIndexes;
+import org.apache.sysds.runtime.frame.data.FrameBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixCell;
+import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.transform.TfUtils;
import org.apache.sysds.runtime.util.LocalFileUtils;
import org.apache.sysds.runtime.util.UtilFunctions;
@@ -729,4 +737,32 @@ public class IOUtilFunctions
return nrows;
}
}
+
+ public static Writer getSeqWriter(Path path, Configuration job, int
replication) throws IOException {
+ return SequenceFile.createWriter(job, Writer.file(path),
Writer.bufferSize(4096),
+ Writer.replication((short) (replication > 0 ?
replication : 1)),
+ Writer.compression(SequenceFile.CompressionType.NONE),
Writer.keyClass(MatrixIndexes.class),
+ Writer.valueClass(MatrixBlock.class));
+ }
+
+ public static Writer getSeqWriterFrame(Path path, Configuration job,
int replication) throws IOException {
+ return SequenceFile.createWriter(job, Writer.file(path),
Writer.bufferSize(4096),
+ Writer.keyClass(LongWritable.class),
Writer.valueClass(FrameBlock.class),
+ Writer.compression(SequenceFile.CompressionType.NONE),
+ Writer.replication((short) (replication > 0 ?
replication : 1)));
+ }
+
+ public static Writer getSeqWriterTensor(Path path, Configuration job,
int replication) throws IOException {
+ return SequenceFile.createWriter(job, Writer.file(path),
Writer.bufferSize(4096),
+ Writer.replication((short) (replication > 0 ? replication : 1)),
+ Writer.compression(SequenceFile.CompressionType.NONE),
Writer.keyClass(TensorIndexes.class),
+ Writer.valueClass(TensorBlock.class));
+ }
+
+ public static Writer getSeqWriterCell(Path path, Configuration job, int
replication) throws IOException {
+ return SequenceFile.createWriter(job, Writer.file(path),
Writer.bufferSize(4096),
+ Writer.replication((short) (replication > 0 ?
replication : 1)),
+ Writer.compression(SequenceFile.CompressionType.NONE),
Writer.keyClass(MatrixIndexes.class),
+ Writer.valueClass(MatrixCell.class));
+ }
}
diff --git
a/src/main/java/org/apache/sysds/runtime/io/TensorWriterBinaryBlock.java
b/src/main/java/org/apache/sysds/runtime/io/TensorWriterBinaryBlock.java
index 7c74dfc146..fbb316e816 100644
--- a/src/main/java/org/apache/sysds/runtime/io/TensorWriterBinaryBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/io/TensorWriterBinaryBlock.java
@@ -19,9 +19,12 @@
package org.apache.sysds.runtime.io;
+import java.io.IOException;
+import java.util.Arrays;
+
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.conf.ConfigurationManager;
@@ -29,9 +32,6 @@ import org.apache.sysds.runtime.data.TensorBlock;
import org.apache.sysds.runtime.data.TensorIndexes;
import org.apache.sysds.runtime.util.HDFSTool;
-import java.io.IOException;
-import java.util.Arrays;
-
public class TensorWriterBinaryBlock extends TensorWriter {
//TODO replication
@@ -60,12 +60,12 @@ public class TensorWriterBinaryBlock extends TensorWriter {
writeBinaryBlockTensorToSequenceFile(path, job, fs, src, blen,
0, src.getNumRows());
}
- @SuppressWarnings("deprecation")
protected static void writeBinaryBlockTensorToSequenceFile(Path path,
JobConf job, FileSystem fs, TensorBlock src,
- int blen, int rl, int ru)
- throws IOException
- {
- try(SequenceFile.Writer writer = new SequenceFile.Writer(fs,
job, path, TensorIndexes.class, TensorBlock.class)) {
+ int blen, int rl, int ru) throws IOException {
+
+ final Writer writer = IOUtilFunctions.getSeqWriterTensor(path,
job,1);
+
+ try{
int[] dims = src.getDims();
// bound check
for (int i = 0; i < dims.length; i++) {
@@ -107,6 +107,8 @@ public class TensorWriterBinaryBlock extends TensorWriter {
writer.append(indx, block);
}
+ }finally{
+ IOUtilFunctions.closeSilently(writer);
}
}
}
diff --git a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java
b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java
index 7cd8b9b599..718584acfe 100644
--- a/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/io/WriterBinaryBlock.java
@@ -23,7 +23,7 @@ import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.mapred.JobConf;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.DMLRuntimeException;
@@ -32,35 +32,33 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.util.HDFSTool;
-public class WriterBinaryBlock extends MatrixWriter
-{
+public class WriterBinaryBlock extends MatrixWriter {
protected int _replication = -1;
-
- public WriterBinaryBlock( int replication ) {
- _replication = replication;
+
+ public WriterBinaryBlock(int replication) {
+ _replication = replication;
}
@Override
- public final void writeMatrixToHDFS(MatrixBlock src, String fname, long
rlen, long clen, int blen, long nnz, boolean diag)
- throws IOException, DMLRuntimeException
- {
- //prepare file access
+ public final void writeMatrixToHDFS(MatrixBlock src, String fname, long
rlen, long clen, int blen, long nnz,
+ boolean diag) throws IOException, DMLRuntimeException {
+ // prepare file access
JobConf job = new
JobConf(ConfigurationManager.getCachedJobConf());
- Path path = new Path( fname );
+ Path path = new Path(fname);
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
-
- //if the file already exists on HDFS, remove it.
- HDFSTool.deleteFileIfExistOnHDFS( fname );
-
- //set up preferred custom serialization framework for binary
block format
- if( HDFSTool.USE_BINARYBLOCK_SERIALIZATION )
- HDFSTool.addBinaryBlockSerializationFramework( job );
-
+
+ // if the file already exists on HDFS, remove it.
+ HDFSTool.deleteFileIfExistOnHDFS(fname);
+
+ // set up preferred custom serialization framework for binary
block format
+ if(HDFSTool.USE_BINARYBLOCK_SERIALIZATION)
+ HDFSTool.addBinaryBlockSerializationFramework(job);
+
if(src instanceof CompressedMatrixBlock)
src = CompressedMatrixBlock.getUncompressed(src,
"Decompressing for binary write");
- //core write sequential/parallel
- if( diag )
+ // core write sequential/parallel
+ if(diag)
writeDiagBinaryBlockMatrixToHDFS(path, job, fs, src,
rlen, clen, blen);
else
writeBinaryBlockMatrixToHDFS(path, job, fs, src, rlen,
clen, blen);
@@ -69,107 +67,82 @@ public class WriterBinaryBlock extends MatrixWriter
}
@Override
- @SuppressWarnings("deprecation")
- public final void writeEmptyMatrixToHDFS(String fname, long rlen, long
clen, int blen)
- throws IOException, DMLRuntimeException
- {
+ public final void writeEmptyMatrixToHDFS(String fname, long rlen, long
clen, int blen)
+ throws IOException, DMLRuntimeException {
JobConf job = new
JobConf(ConfigurationManager.getCachedJobConf());
- Path path = new Path( fname );
+ Path path = new Path(fname);
FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
- SequenceFile.Writer writer = null;
+ final Writer writer = IOUtilFunctions.getSeqWriter(path, job,
_replication);
try {
- writer = new SequenceFile.Writer(fs, job, path,
- MatrixIndexes.class, MatrixBlock.class);
MatrixIndexes index = new MatrixIndexes(1, 1);
- MatrixBlock block = new MatrixBlock(
- (int)Math.max(Math.min(rlen, blen),1),
- (int)Math.max(Math.min(clen, blen),1), true);
+ MatrixBlock block = new MatrixBlock((int)
Math.max(Math.min(rlen, blen), 1),
+ (int) Math.max(Math.min(clen, blen), 1), true);
writer.append(index, block);
}
finally {
IOUtilFunctions.closeSilently(writer);
}
-
+
IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path);
}
- protected void writeBinaryBlockMatrixToHDFS( Path path, JobConf job,
FileSystem fs, MatrixBlock src, long rlen, long clen, int blen )
- throws IOException, DMLRuntimeException
- {
- //sequential write
- writeBinaryBlockMatrixToSequenceFile(path, job, fs, src, blen,
0, (int)rlen);
+ protected void writeBinaryBlockMatrixToHDFS(Path path, JobConf job,
FileSystem fs, MatrixBlock src, long rlen,
+ long clen, int blen) throws IOException, DMLRuntimeException {
+ // sequential write
+ writeBinaryBlockMatrixToSequenceFile(path, job, fs, src, blen,
0, (int) rlen);
}
- @SuppressWarnings("deprecation")
- protected final void writeBinaryBlockMatrixToSequenceFile( Path path,
JobConf job, FileSystem fs, MatrixBlock src, int blen, int rl, int ru )
- throws IOException
- {
+ protected final void writeBinaryBlockMatrixToSequenceFile(Path path,
JobConf job, FileSystem fs, MatrixBlock src,
+ int blen, int rl, int ru) throws IOException {
boolean sparse = src.isInSparseFormat();
int rlen = src.getNumRows();
int clen = src.getNumColumns();
-
- // 1) create sequence file writer, with right replication
factor
- // (config via MRConfigurationNames.DFS_REPLICATION not
possible since sequence file internally calls fs.getDefaultReplication())
- SequenceFile.Writer writer = null;
- if( _replication > 0 ) //if replication specified (otherwise
default)
- {
- //copy of SequenceFile.Writer(fs, job, path,
MatrixIndexes.class, MatrixBlock.class), except for replication
- writer = new SequenceFile.Writer(fs, job, path,
MatrixIndexes.class, MatrixBlock.class,
- job.getInt(HDFSTool.IO_FILE_BUFFER_SIZE, 4096),
- (short)_replication, fs.getDefaultBlockSize(),
null, new SequenceFile.Metadata());
- }
- else
- {
- writer = new SequenceFile.Writer(fs, job, path,
MatrixIndexes.class, MatrixBlock.class);
- }
-
- try
- {
- // 2) bound check for src block
- if( src.getNumRows() > rlen || src.getNumColumns() >
clen )
- {
- throw new IOException("Matrix block
[1:"+src.getNumRows()+",1:"+src.getNumColumns()+"] " +
- "out of overall
matrix range [1:"+rlen+",1:"+clen+"].");
+
+ final Writer writer = IOUtilFunctions.getSeqWriter(path, job,
_replication);
+
+ try { // 2) bound check for src block
+ if(src.getNumRows() > rlen || src.getNumColumns() >
clen) {
+ throw new IOException("Matrix block [1:" +
src.getNumRows() + ",1:" + src.getNumColumns() + "] "
+ + "out of overall matrix range [1:" +
rlen + ",1:" + clen + "].");
}
-
- //3) reblock and write
+
+ // 3) reblock and write
MatrixIndexes indexes = new MatrixIndexes();
- if( rlen <= blen && clen <= blen && rl == 0 ) //opt for
single block
- {
- //directly write single block
+ if(rlen <= blen && clen <= blen && rl == 0) { // opt
for single block
+ // directly write single block
indexes.setIndexes(1, 1);
writer.append(indexes, src);
}
- else //general case
- {
- //initialize blocks for reuse (at most 4
different blocks required)
+ else {
+ // general case
+ // initialize blocks for reuse (at most 4
different blocks required)
MatrixBlock[] blocks =
createMatrixBlocksForReuse(rlen, clen, blen, sparse, src.getNonZeros());
-
- //create and write subblocks of matrix
- for(int blockRow = rl/blen; blockRow <
(int)Math.ceil(ru/(double)blen); blockRow++)
- for(int blockCol = 0; blockCol <
(int)Math.ceil(src.getNumColumns()/(double)blen); blockCol++)
- {
- int maxRow = (blockRow*blen +
blen < src.getNumRows()) ? blen : src.getNumRows() - blockRow*blen;
- int maxCol = (blockCol*blen +
blen < src.getNumColumns()) ? blen : src.getNumColumns() - blockCol*blen;
-
- int row_offset = blockRow*blen;
- int col_offset = blockCol*blen;
-
- //get reuse matrix block
+
+ // create and write sub-blocks of matrix
+ for(int blockRow = rl / blen; blockRow < (int)
Math.ceil(ru / (double) blen); blockRow++) {
+ for(int blockCol = 0; blockCol < (int)
Math.ceil(src.getNumColumns() / (double) blen); blockCol++) {
+ int maxRow = (blockRow * blen +
blen < src.getNumRows()) ? blen : src.getNumRows() - blockRow * blen;
+ int maxCol = (blockCol * blen +
blen < src.getNumColumns()) ? blen : src.getNumColumns() -
+ blockCol * blen;
+
+ int row_offset = blockRow *
blen;
+ int col_offset = blockCol *
blen;
+
+ // get reuse matrix block
MatrixBlock block =
getMatrixBlockForReuse(blocks, maxRow, maxCol, blen);
-
- //copy submatrix to block
- src.slice( row_offset,
row_offset+maxRow-1,
-
col_offset, col_offset+maxCol-1, block );
-
- //append block to sequence file
- indexes.setIndexes(blockRow+1,
blockCol+1);
+
+ // copy submatrix to block
+ src.slice(row_offset,
row_offset + maxRow - 1, col_offset, col_offset + maxCol - 1, block);
+
+ // append block to sequence file
+ indexes.setIndexes(blockRow +
1, blockCol + 1);
writer.append(indexes, block);
-
- //reset block for later reuse
+
+ // reset block for later reuse
block.reset();
}
+ }
}
}
finally {
@@ -177,86 +150,66 @@ public class WriterBinaryBlock extends MatrixWriter
}
}
- @SuppressWarnings("deprecation")
- protected final void writeDiagBinaryBlockMatrixToHDFS( Path path,
JobConf job, FileSystem fs, MatrixBlock src, long rlen, long clen, int blen )
- throws IOException, DMLRuntimeException
- {
+ protected final void writeDiagBinaryBlockMatrixToHDFS(Path path,
JobConf job, FileSystem fs, MatrixBlock src,
+ long rlen, long clen, int blen) throws IOException,
DMLRuntimeException {
boolean sparse = src.isInSparseFormat();
-
- // 1) create sequence file writer, with right replication
factor
- // (config via MRConfigurationNames.DFS_REPLICATION not
possible since sequence file internally calls fs.getDefaultReplication())
- SequenceFile.Writer writer = null;
- if( _replication > 0 ) //if replication specified (otherwise
default)
- {
- //copy of SequenceFile.Writer(fs, job, path,
MatrixIndexes.class, MatrixBlock.class), except for replication
- writer = new SequenceFile.Writer(fs, job, path,
MatrixIndexes.class, MatrixBlock.class,
- job.getInt(HDFSTool.IO_FILE_BUFFER_SIZE, 4096),
- (short)_replication, fs.getDefaultBlockSize(),
null, new SequenceFile.Metadata());
- }
- else
- {
- writer = new SequenceFile.Writer(fs, job, path,
MatrixIndexes.class, MatrixBlock.class);
- }
-
- try
- {
+
+ final Writer writer = IOUtilFunctions.getSeqWriter(path, job,
_replication);
+
+ try {
// 2) bound check for src block
- if( src.getNumRows() > rlen || src.getNumColumns() >
clen )
- {
- throw new IOException("Matrix block
[1:"+src.getNumRows()+",1:"+src.getNumColumns()+"] " +
- "out of overall
matrix range [1:"+rlen+",1:"+clen+"].");
+ if(src.getNumRows() > rlen || src.getNumColumns() >
clen) {
+ throw new IOException("Matrix block [1:" +
src.getNumRows() + ",1:" + src.getNumColumns() + "] "
+ + "out of overall matrix range [1:" +
rlen + ",1:" + clen + "].");
}
-
- //3) reblock and write
+
+ // 3) reblock and write
MatrixIndexes indexes = new MatrixIndexes();
- if( rlen <= blen && clen <= blen ) //opt for single
block
- {
- //directly write single block
+ if(rlen <= blen && clen <= blen) { // opt for single
block
+ // directly write single block
indexes.setIndexes(1, 1);
writer.append(indexes, src);
}
- else //general case
- {
- //initialize blocks for reuse (at most 4
different blocks required)
+ else { // general case
+ // initialize blocks for reuse (at most 4
different blocks required)
MatrixBlock[] blocks =
createMatrixBlocksForReuse(rlen, clen, blen, sparse, src.getNonZeros());
MatrixBlock emptyBlock = new MatrixBlock();
-
- //create and write subblocks of matrix
- for(int blockRow = 0; blockRow <
(int)Math.ceil(src.getNumRows()/(double)blen); blockRow++)
- for(int blockCol = 0; blockCol <
(int)Math.ceil(src.getNumColumns()/(double)blen); blockCol++)
- {
- int maxRow = (blockRow*blen +
blen < src.getNumRows()) ? blen : src.getNumRows() - blockRow*blen;
- int maxCol = (blockCol*blen +
blen < src.getNumColumns()) ? blen : src.getNumColumns() - blockCol*blen;
+
+ // create and write subblocks of matrix
+ for(int blockRow = 0; blockRow < (int)
Math.ceil(src.getNumRows() / (double) blen); blockRow++) {
+
+ for(int blockCol = 0; blockCol < (int)
Math.ceil(src.getNumColumns() / (double) blen); blockCol++) {
+ int maxRow = (blockRow * blen +
blen < src.getNumRows()) ? blen : src.getNumRows() - blockRow * blen;
+ int maxCol = (blockCol * blen +
blen < src.getNumColumns()) ? blen : src.getNumColumns() -
+ blockCol * blen;
MatrixBlock block = null;
-
- if( blockRow==blockCol )
//block on diagonal
- {
- int row_offset =
blockRow*blen;
- int col_offset =
blockCol*blen;
-
- //get reuse matrix block
+
+ if(blockRow == blockCol) { //
block on diagonal
+ int row_offset =
blockRow * blen;
+ int col_offset =
blockCol * blen;
+
+ // get reuse matrix
block
block =
getMatrixBlockForReuse(blocks, maxRow, maxCol, blen);
-
- //copy submatrix to
block
- src.slice( row_offset,
row_offset+maxRow-1,
- col_offset,
col_offset+maxCol-1, block );
+
+ // copy submatrix to
block
+ src.slice(row_offset,
row_offset + maxRow - 1, col_offset, col_offset + maxCol - 1, block);
}
- else //empty block (not on
diagonal)
- {
+ else { // empty block (not on
diagonal)
block = emptyBlock;
block.reset(maxRow,
maxCol);
}
-
- //append block to sequence file
- indexes.setIndexes(blockRow+1,
blockCol+1);
+
+ // append block to sequence file
+ indexes.setIndexes(blockRow +
1, blockCol + 1);
writer.append(indexes, block);
-
- //reset block for later reuse
- if( blockRow!=blockCol )
+
+ // reset block for later reuse
+ if(blockRow != blockCol)
block.reset();
}
- }
+ }
+ }
}
finally {
IOUtilFunctions.closeSilently(writer);
diff --git a/src/test/java/org/apache/sysds/test/TestUtils.java
b/src/test/java/org/apache/sysds/test/TestUtils.java
index bade9ddbe8..554e1f5a05 100644
--- a/src/test/java/org/apache/sysds/test/TestUtils.java
+++ b/src/test/java/org/apache/sysds/test/TestUtils.java
@@ -59,7 +59,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.ValueType;
@@ -2647,49 +2646,52 @@ public class TestUtils
}
- /* Write a scalar value to a file */
+ /**
+ * Write scalar to file
+ *
+ * @param file File to write to
+ * @param value Value to write
+ */
public static void writeTestScalar(String file, double value) {
try {
DataOutputStream out = new DataOutputStream(new
FileOutputStream(file));
- try( PrintWriter pw = new PrintWriter(out) ) {
+ try(PrintWriter pw = new PrintWriter(out)) {
pw.println(value);
}
- } catch (IOException e) {
+ }
+ catch(IOException e) {
fail("unable to write test scalar (" + file + "): " +
e.getMessage());
}
}
+ /**
+ * Write scalar to file
+ *
+ * @param file File to write to
+ * @param value Value to write
+ */
public static void writeTestScalar(String file, long value) {
try {
DataOutputStream out = new DataOutputStream(new
FileOutputStream(file));
- try( PrintWriter pw = new PrintWriter(out) ) {
+ try(PrintWriter pw = new PrintWriter(out)) {
pw.println(value);
}
- } catch (IOException e) {
+ }
+ catch(IOException e) {
fail("unable to write test scalar (" + file + "): " +
e.getMessage());
}
}
/**
- * <p>
* Writes a matrix to a file using the binary cells format.
- * </p>
- *
- * @param file
- * file name
- * @param matrix
- * matrix
+ *
+ * @param file file name
+ * @param matrix matrix
*/
public static void writeBinaryTestMatrixCells(String file, double[][]
matrix) {
try {
- SequenceFile.Writer writer = null;
+ final Writer writer =
IOUtilFunctions.getSeqWriterCell(new Path(file), conf, 1);
try {
- Path path = new Path(file);
- Writer.Option filePath = Writer.file(path);
- Writer.Option keyClass =
Writer.keyClass(MatrixIndexes.class);
- Writer.Option valueClass =
Writer.valueClass(MatrixBlock.class);
- Writer.Option compression =
Writer.compression(SequenceFile.CompressionType.NONE);
- writer = SequenceFile.createWriter(conf,
filePath, keyClass, valueClass, compression);
MatrixIndexes index = new MatrixIndexes();
MatrixCell value = new MatrixCell();
for (int i = 0; i < matrix.length; i++) {
@@ -2712,56 +2714,45 @@ public class TestUtils
}
/**
- * <p>
- * Writes a matrix to a file using the binary blocks format.
- * </p>
+ * Writes a matrix to a file using the binary blocks format.
*
- * @param file
- * file name
- * @param matrix
- * matrix
- * @param rowsInBlock
- * rows in block
- * @param colsInBlock
- * columns in block
- * @param sparseFormat
- * sparse format
+ * @param file file name
+ * @param matrix matrix
+ * @param rowsInBlock rows in block
+ * @param colsInBlock columns in block
+ * @param sparseFormat sparse format
*/
public static void writeBinaryTestMatrixBlocks(String file, double[][]
matrix, int rowsInBlock, int colsInBlock,
boolean sparseFormat) {
- SequenceFile.Writer writer = null;
try {
- Path path = new Path(file);
- Writer.Option filePath = Writer.file(path);
- Writer.Option keyClass =
Writer.keyClass(MatrixIndexes.class);
- Writer.Option valueClass =
Writer.valueClass(MatrixBlock.class);
- Writer.Option compression =
Writer.compression(SequenceFile.CompressionType.NONE);
- writer = SequenceFile.createWriter(conf, filePath,
keyClass, valueClass, compression);
- MatrixIndexes index = new MatrixIndexes();
- MatrixBlock value = new MatrixBlock();
- for (int i = 0; i < matrix.length; i += rowsInBlock) {
- int rows = Math.min(rowsInBlock, (matrix.length
- i));
- for (int j = 0; j < matrix[i].length; j +=
colsInBlock) {
- int cols = Math.min(colsInBlock,
(matrix[i].length - j));
- index.setIndexes(((i / rowsInBlock) +
1), ((j / colsInBlock) + 1));
- value.reset(rows, cols, sparseFormat);
- for (int k = 0; k < rows; k++) {
- for (int l = 0; l < cols; l++) {
- value.setValue(k, l,
matrix[i + k][j + l]);
+ final Writer writer = IOUtilFunctions.getSeqWriter(new
Path(file), conf, 1);
+ try{
+ MatrixIndexes index = new MatrixIndexes();
+ MatrixBlock value = new MatrixBlock();
+ for (int i = 0; i < matrix.length; i +=
rowsInBlock) {
+ int rows = Math.min(rowsInBlock,
(matrix.length - i));
+ for (int j = 0; j < matrix[i].length; j
+= colsInBlock) {
+ int cols =
Math.min(colsInBlock, (matrix[i].length - j));
+ index.setIndexes(((i /
rowsInBlock) + 1), ((j / colsInBlock) + 1));
+ value.reset(rows, cols,
sparseFormat);
+ for (int k = 0; k < rows; k++) {
+ for (int l = 0; l <
cols; l++) {
+
value.setValue(k, l, matrix[i + k][j + l]);
+ }
}
+ writer.append(index, value);
}
- writer.append(index, value);
}
}
+ finally {
+ IOUtilFunctions.closeSilently(writer);
+ }
}
catch (IOException e) {
e.printStackTrace();
fail("unable to write test matrix: " + e.getMessage());
}
- finally {
- IOUtilFunctions.closeSilently(writer);
- }
}
/**