This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new ce6f6cd68a [SYSTEMDS-3722] Fix file-based parfor result merge and
related tests
ce6f6cd68a is described below
commit ce6f6cd68a235db64419dc4b93fcff17b46a575d
Author: Matthias Boehm <[email protected]>
AuthorDate: Tue Aug 20 20:06:50 2024 +0200
[SYSTEMDS-3722] Fix file-based parfor result merge and related tests
This patch fixes a recently discovered bug of the file-based parfor
result merge (missing deep copy of compare block) as well as some
setup of the ResultMergeTest (overwrite of files and compare setup).
Furthermore, we now remove the support for textcell format during
this file-based result merge, as meanwhile the sparse block binary
representations are good enough.
---
.../controlprogram/caching/CacheableData.java | 6 +-
.../parfor/ResultMergeLocalFile.java | 454 ++-------------------
.../controlprogram/parfor/ResultMergeMatrix.java | 1 -
.../test/component/parfor/ResultMergeTest.java | 23 +-
4 files changed, 45 insertions(+), 439 deletions(-)
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index 8e37dc29a2..f40d9c2cde 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -846,7 +846,8 @@ public abstract class CacheableData<T extends
CacheBlock<?>> extends Data
if ( !isAvailableToRead() )
throw new DMLRuntimeException("MatrixObject not
available to read.");
- LOG.trace("Exporting " + this.getDebugName() + " to " + fName +
" in format " + outputFormat);
+ if( LOG.isTraceEnabled() )
+ LOG.trace("Exporting " + this.getDebugName() + " to " +
fName + " in format " + outputFormat);
if( DMLScript.USE_ACCELERATOR && _gpuObjects != null ) {
boolean copiedFromGPU = false;
@@ -962,7 +963,8 @@ public abstract class CacheableData<T extends
CacheBlock<?>> extends Data
else
{
//CASE 4: data already in hdfs (do nothing, no need for
export)
- LOG.trace(this.getDebugName() + ": Skip export to hdfs
since data already exists.");
+ if( LOG.isTraceEnabled() )
+ LOG.trace(this.getDebugName() + ": Skip export
to hdfs since data already exists.");
}
_hdfsFileExists = true;
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java
index b2340426e6..ce683e455b 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java
@@ -19,55 +19,31 @@
package org.apache.sysds.runtime.controlprogram.parfor;
-import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
-import java.io.OutputStreamWriter;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map.Entry;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysds.runtime.controlprogram.parfor.util.Cell;
import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
-import org.apache.sysds.runtime.controlprogram.parfor.util.StagingFileUtils;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.io.IOUtilFunctions;
-import org.apache.sysds.runtime.matrix.data.IJV;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.meta.MetaDataFormat;
import org.apache.sysds.runtime.util.DataConverter;
-import org.apache.sysds.runtime.util.FastStringTokenizer;
import org.apache.sysds.runtime.util.HDFSTool;
import org.apache.sysds.runtime.util.LocalFileUtils;
-/**
- *
- * TODO potential extension: parallel merge (create individual staging files
concurrently)
- *
- * NOTE: file merge typically used due to memory constraints - parallel
merge would increase the memory
- * consumption again.
- */
public class ResultMergeLocalFile extends ResultMergeMatrix
{
private static final long serialVersionUID = -6905893742840020489L;
@@ -78,14 +54,11 @@ public class ResultMergeLocalFile extends ResultMergeMatrix
//internal comparison matrix
private IDSequence _seq = null;
- public ResultMergeLocalFile( MatrixObject out, MatrixObject[] in,
String outputFilename, boolean accum )
- {
+ public ResultMergeLocalFile( MatrixObject out, MatrixObject[] in,
String outputFilename, boolean accum ) {
super( out, in, outputFilename, accum );
-
_seq = new IDSequence();
}
-
@Override
public MatrixObject executeSerialMerge() {
MatrixObject moNew = null; //always create new matrix object
(required for nested parallelism)
@@ -98,11 +71,9 @@ public class ResultMergeLocalFile extends ResultMergeMatrix
{
//collect all relevant inputs
ArrayList<MatrixObject> inMO = new ArrayList<>();
- for( MatrixObject in : _inputs )
- {
+ for( MatrixObject in : _inputs ) {
//check for empty inputs (no iterations
executed)
- if( in !=null && in != _output )
- {
+ if( in !=null && in != _output ) {
//ensure that input file resides on disk
in.exportData();
@@ -111,8 +82,7 @@ public class ResultMergeLocalFile extends ResultMergeMatrix
}
}
- if( !inMO.isEmpty() )
- {
+ if( !inMO.isEmpty() ) {
//ensure that outputfile (for comparison)
resides on disk
_output.exportData();
@@ -122,13 +92,11 @@ public class ResultMergeLocalFile extends ResultMergeMatrix
//create new output matrix (e.g., to prevent
potential export<->read file access conflict
moNew = createNewMatrixObject( _output, inMO );
}
- else
- {
+ else {
moNew = _output; //return old matrix, to
prevent copy
}
}
- catch(Exception ex)
- {
+ catch(Exception ex) {
throw new DMLRuntimeException(ex);
}
@@ -161,13 +129,7 @@ public class ResultMergeLocalFile extends ResultMergeMatrix
FileFormat fmt =
((MetaDataFormat)outMo.getMetaData()).getFileFormat();
boolean withCompare = ( outMo.getNnz() != 0 ); //if nnz exist
or unknown (-1)
- if( fmt == FileFormat.TEXT ) {
- if(withCompare)
- mergeTextCellWithComp(fnameNew, outMo, inMO);
- else
- mergeTextCellWithoutComp( fnameNew, outMo, inMO
);
- }
- else if( fmt == FileFormat.BINARY ) {
+ if( fmt == FileFormat.BINARY ) {
if(withCompare)
mergeBinaryBlockWithComp( fnameNew, outMo, inMO
);
else
@@ -175,122 +137,16 @@ public class ResultMergeLocalFile extends
ResultMergeMatrix
}
}
- private static void mergeTextCellWithoutComp( String fnameNew,
MatrixObject outMo, ArrayList<MatrixObject> inMO )
- {
- try
- {
- //delete target file if already exists
- HDFSTool.deleteFileIfExistOnHDFS(fnameNew);
-
- if( ALLOW_COPY_CELLFILES )
- {
- copyAllFiles(fnameNew, inMO);
- return; //we're done
- }
-
- //actual merge
- JobConf job = new
JobConf(ConfigurationManager.getCachedJobConf());
- Path path = new Path( fnameNew );
- FileSystem fs = IOUtilFunctions.getFileSystem(path,
job);
- BufferedWriter out = new BufferedWriter(new
OutputStreamWriter(fs.create(path,true)));
-
- String valueStr = null;
-
- try
- {
- for( MatrixObject in : inMO ) //read/write all
inputs
- {
- if( LOG.isTraceEnabled() )
- LOG.trace("ResultMerge (local,
file): Merge input "+in.hashCode()+" (fname="
- +in.getFileName()+")
via stream merge");
-
- JobConf tmpJob = new
JobConf(ConfigurationManager.getCachedJobConf());
- Path tmpPath = new
Path(in.getFileName());
- FileInputFormat.addInputPath(tmpJob,
tmpPath);
- TextInputFormat informat = new
TextInputFormat();
- informat.configure(tmpJob);
- InputSplit[] splits =
informat.getSplits(tmpJob, 1);
-
- LongWritable key = new LongWritable();
- Text value = new Text();
-
- for(InputSplit split: splits)
- {
- RecordReader<LongWritable,Text>
reader = informat.getRecordReader(split, tmpJob, Reporter.NULL);
- try
- {
- while(reader.next(key,
value))
- {
- valueStr =
value.toString().trim();
- out.write(
valueStr+"\n" );
- }
- }
- finally {
-
IOUtilFunctions.closeSilently(reader);
- }
- }
- }
- }
- finally {
- IOUtilFunctions.closeSilently(out);
- }
- }
- catch(Exception ex)
- {
- throw new DMLRuntimeException("Unable to merge text
cell results.", ex);
- }
- }
-
- private void mergeTextCellWithComp( String fnameNew, MatrixObject
outMo, ArrayList<MatrixObject> inMO )
- {
- String fnameStaging =
LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
- String fnameStagingCompare =
LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
-
- try
- {
- //delete target file if already exists
- HDFSTool.deleteFileIfExistOnHDFS(fnameNew);
-
- //Step 0) write compare blocks to staging area (if
necessary)
- if( LOG.isTraceEnabled() )
- LOG.trace("ResultMerge (local, file): Create
merge compare matrix for output "
- +outMo.hashCode()+"
(fname="+outMo.getFileName()+")");
- createTextCellStagingFile(fnameStagingCompare, outMo,
0);
-
- //Step 1) read and write blocks to staging area
- for( MatrixObject in : inMO )
- {
- if( LOG.isTraceEnabled() )
- LOG.trace("ResultMerge (local, file):
Merge input "+in.hashCode()+" (fname="+in.getFileName()+")");
-
- long ID = _seq.getNextID();
- createTextCellStagingFile( fnameStaging, in, ID
);
- }
-
- //Step 2) read blocks, consolidate, and write to HDFS
- createTextCellResultFile(fnameStaging,
fnameStagingCompare, fnameNew, (MetaDataFormat)outMo.getMetaData(), true);
- }
- catch(Exception ex)
- {
- throw new DMLRuntimeException("Unable to merge text
cell results.", ex);
- }
-
- LocalFileUtils.cleanupWorkingDirectory(fnameStaging);
- LocalFileUtils.cleanupWorkingDirectory(fnameStagingCompare);
- }
-
private void mergeBinaryBlockWithoutComp( String fnameNew, MatrixObject
outMo, ArrayList<MatrixObject> inMO )
{
String fnameStaging =
LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
- try
- {
+ try {
//delete target file if already exists
HDFSTool.deleteFileIfExistOnHDFS(fnameNew);
//Step 1) read and write blocks to staging area
- for( MatrixObject in : inMO )
- {
+ for( MatrixObject in : inMO ) {
if( LOG.isTraceEnabled() )
LOG.trace("ResultMerge (local, file):
Merge input "+in.hashCode()+" (fname="+in.getFileName()+")");
@@ -300,10 +156,9 @@ public class ResultMergeLocalFile extends ResultMergeMatrix
//Step 2) read blocks, consolidate, and write to HDFS
createBinaryBlockResultFile(fnameStaging, null,
fnameNew, (MetaDataFormat)outMo.getMetaData(), false);
}
- catch(Exception ex)
- {
+ catch(Exception ex) {
throw new DMLRuntimeException("Unable to merge binary
block results.", ex);
- }
+ }
LocalFileUtils.cleanupWorkingDirectory(fnameStaging);
}
@@ -326,20 +181,19 @@ public class ResultMergeLocalFile extends
ResultMergeMatrix
createBinaryBlockStagingFile(fnameStagingCompare,
outMo);
//Step 1) read and write blocks to staging area
- for( MatrixObject in : inMO )
- {
+ for( MatrixObject in : inMO ) {
if( LOG.isTraceEnabled() )
LOG.trace("ResultMerge (local, file):
Merge input "+in.hashCode()+" (fname="+in.getFileName()+")");
+
createBinaryBlockStagingFile( fnameStaging, in
);
}
-
+
//Step 2) read blocks, consolidate, and write to HDFS
createBinaryBlockResultFile(fnameStaging,
fnameStagingCompare, fnameNew, (MetaDataFormat)outMo.getMetaData(), true);
}
- catch(Exception ex)
- {
+ catch(Exception ex) {
throw new DMLRuntimeException("Unable to merge binary
block results.", ex);
- }
+ }
LocalFileUtils.cleanupWorkingDirectory(fnameStaging);
LocalFileUtils.cleanupWorkingDirectory(fnameStagingCompare);
@@ -348,7 +202,7 @@ public class ResultMergeLocalFile extends ResultMergeMatrix
@SuppressWarnings("deprecation")
private void createBinaryBlockStagingFile( String fnameStaging,
MatrixObject mo )
throws IOException
- {
+ {
MatrixIndexes key = new MatrixIndexes();
MatrixBlock value = new MatrixBlock();
@@ -356,131 +210,26 @@ public class ResultMergeLocalFile extends
ResultMergeMatrix
Path tmpPath = new Path(mo.getFileName());
FileSystem fs = IOUtilFunctions.getFileSystem(tmpPath, tmpJob);
- for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs,
tmpPath))
- {
- SequenceFile.Reader reader = new
SequenceFile.Reader(fs,lpath,tmpJob);
- try
- {
- while(reader.next(key, value)) //for each block
- {
+ for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs,
tmpPath)) {
+ try( SequenceFile.Reader reader = new
SequenceFile.Reader(fs,lpath,tmpJob) ) {
+ while(reader.next(key, value)) { //for each
block
String lname =
key.getRowIndex()+"_"+key.getColumnIndex();
String dir = fnameStaging+"/"+lname;
- if( value.getNonZeros()>0 ) //write
only non-empty blocks
- {
+
+ if( value.getNonZeros()>0 ) { //write
only non-empty blocks
LocalFileUtils.checkAndCreateStagingDir( dir );
LocalFileUtils.writeMatrixBlockToLocal(dir+"/"+_seq.getNextID(), value);
}
}
}
- finally {
- IOUtilFunctions.closeSilently(reader);
- }
- }
- }
-
- private static void createTextCellStagingFile( String fnameStaging,
MatrixObject mo, long ID )
- throws IOException, DMLRuntimeException
- {
- JobConf job = new
JobConf(ConfigurationManager.getCachedJobConf());
- Path path = new Path(mo.getFileName());
- FileInputFormat.addInputPath(job, path);
- TextInputFormat informat = new TextInputFormat();
- informat.configure(job);
- InputSplit[] splits = informat.getSplits(job, 1);
-
- LinkedList<Cell> buffer = new LinkedList<>();
- LongWritable key = new LongWritable();
- Text value = new Text();
-
- DataCharacteristics mc = mo.getDataCharacteristics();
- int blen = mc.getBlocksize();
- //long row = -1, col = -1; //FIXME needs reconsideration
whenever textcell is used actively
- //NOTE MB: Originally, we used long row, col but this led
reproducibly to JIT compilation
- // errors during runtime; experienced under WINDOWS, Intel
x86-64, IBM JDK 64bit/32bit.
- // It works fine with int row, col but we require long for
larger matrices.
- // Since, textcell is never used for result merge
(hybrid/hadoop: binaryblock, singlenode:binarycell)
- // we just propose the to exclude it with
-Xjit:exclude={package.method*}(count=0,optLevel=0)
-
- FastStringTokenizer st = new FastStringTokenizer(' ');
-
- for(InputSplit split : splits)
- {
- RecordReader<LongWritable,Text> reader =
informat.getRecordReader(split, job, Reporter.NULL);
- try
- {
- while(reader.next(key, value))
- {
- st.reset( value.toString() ); //reset
tokenizer
- long row = st.nextLong();
- long col = st.nextLong();
- double lvalue = Double.parseDouble(
st.nextToken() );
-
- Cell tmp = new Cell( row, col, lvalue
);
-
- buffer.addLast( tmp );
- if( buffer.size() >
StagingFileUtils.CELL_BUFFER_SIZE ) //periodic flush
- {
-
appendCellBufferToStagingArea(fnameStaging, ID, buffer, blen);
- buffer.clear();
- }
- }
-
- //final flush
- if( !buffer.isEmpty() )
- {
-
appendCellBufferToStagingArea(fnameStaging, ID, buffer, blen);
- buffer.clear();
- }
- }
- finally {
- IOUtilFunctions.closeSilently(reader);
- }
}
}
- private static void appendCellBufferToStagingArea( String fnameStaging,
long ID, LinkedList<Cell> buffer, int blen )
- throws IOException
- {
- HashMap<Long,HashMap<Long,LinkedList<Cell>>> sortedBuffer = new
HashMap<>();
- long brow, bcol, row_offset, col_offset;
-
- for( Cell c : buffer )
- {
- brow = (c.getRow()-1)/blen + 1;
- bcol = (c.getCol()-1)/blen + 1;
- row_offset = (brow-1)*blen + 1;
- col_offset = (bcol-1)*blen + 1;
-
- c.setRow( c.getRow() - row_offset);
- c.setCol(c.getCol() - col_offset);
-
- if( !sortedBuffer.containsKey(brow) )
- sortedBuffer.put(brow, new
HashMap<Long,LinkedList<Cell>>());
- if( !sortedBuffer.get(brow).containsKey(bcol) )
- sortedBuffer.get(brow).put(bcol, new
LinkedList<Cell>());
- sortedBuffer.get(brow).get(bcol).addLast(c);
- }
-
- //write lists of cells to local files
- for( Entry<Long,HashMap<Long,LinkedList<Cell>>> e :
sortedBuffer.entrySet() )
- {
- brow = e.getKey();
- for( Entry<Long,LinkedList<Cell>> e2 :
e.getValue().entrySet() )
- {
- bcol = e2.getKey();
- String lname = brow+"_"+bcol;
- String dir = fnameStaging+"/"+lname;
- LocalFileUtils.checkAndCreateStagingDir( dir );
-
StagingFileUtils.writeCellListToLocal(dir+"/"+ID, e2.getValue());
- }
- }
- }
-
private void createBinaryBlockResultFile( String fnameStaging, String
fnameStagingCompare, String fnameNew, MetaDataFormat metadata, boolean
withCompare )
throws IOException, DMLRuntimeException
{
JobConf job = new
JobConf(ConfigurationManager.getCachedJobConf());
- Path path = new Path( fnameNew );
+ Path path = new Path( fnameNew );
DataCharacteristics mc = metadata.getDataCharacteristics();
long rlen = mc.getRows();
@@ -491,16 +240,13 @@ public class ResultMergeLocalFile extends
ResultMergeMatrix
try {
MatrixIndexes indexes = new MatrixIndexes();
- for(long brow = 1; brow <=
(long)Math.ceil(rlen/(double)blen); brow++){
-
- for(long bcol = 1; bcol <=
(long)Math.ceil(clen/(double)blen); bcol++)
- {
+ for(long brow = 1; brow <=
(long)Math.ceil(rlen/(double)blen); brow++) {
+ for(long bcol = 1; bcol <=
(long)Math.ceil(clen/(double)blen); bcol++) {
File dir = new
File(fnameStaging+"/"+brow+"_"+bcol);
File dir2 = new
File(fnameStagingCompare+"/"+brow+"_"+bcol);
MatrixBlock mb = null;
- if( dir.exists() )
- {
+ if( dir.exists() ) {
if( withCompare &&
dir2.exists() ) //WITH COMPARE BLOCK
{
//copy only values that
are different from the original
@@ -509,7 +255,7 @@ public class ResultMergeLocalFile extends ResultMergeMatrix
throw new
DMLRuntimeException("Unable to merge results because multiple compare blocks
found.");
mb =
LocalFileUtils.readMatrixBlockFromLocal( dir2+"/"+lnames2[0] );
boolean appendOnly =
mb.isInSparseFormat();
- DenseBlock compare =
DataConverter.convertToDenseBlock(mb, false);
+ DenseBlock compare =
DataConverter.convertToDenseBlock(mb, true);
for( String lname :
dir.list() ) {
MatrixBlock tmp
= LocalFileUtils.readMatrixBlockFromLocal( dir+"/"+lname );
mergeWithComp(mb, tmp, compare);
@@ -562,150 +308,4 @@ public class ResultMergeLocalFile extends
ResultMergeMatrix
IOUtilFunctions.closeSilently(writer);
}
}
-
- private void createTextCellResultFile( String fnameStaging, String
fnameStagingCompare, String fnameNew, MetaDataFormat metadata, boolean
withCompare )
- throws IOException, DMLRuntimeException
- {
- JobConf job = new
JobConf(ConfigurationManager.getCachedJobConf());
- Path path = new Path( fnameNew );
- FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
-
- DataCharacteristics mc = metadata.getDataCharacteristics();
- long rlen = mc.getRows();
- long clen = mc.getCols();
- int blen = mc.getBlocksize();
-
- try( BufferedWriter out = new BufferedWriter(new
OutputStreamWriter(fs.create(path,true))) ) {
- //for obj reuse and preventing repeated buffer
re-allocations
- StringBuilder sb = new StringBuilder();
-
- boolean written=false;
- for(long brow = 1; brow <=
(long)Math.ceil(rlen/(double)blen); brow++)
- for(long bcol = 1; bcol <=
(long)Math.ceil(clen/(double)blen); bcol++)
- {
- File dir = new
File(fnameStaging+"/"+brow+"_"+bcol);
- File dir2 = new
File(fnameStagingCompare+"/"+brow+"_"+bcol);
- MatrixBlock mb = null;
-
- long row_offset = (brow-1)*blen + 1;
- long col_offset = (bcol-1)*blen + 1;
-
-
- if( dir.exists() )
- {
- if( withCompare &&
dir2.exists() ) //WITH COMPARE BLOCK
- {
- //copy only values that
are different from the original
- String[] lnames2 =
dir2.list();
- if( lnames2.length != 1
) //there should be exactly 1 compare block
- throw new
DMLRuntimeException("Unable to merge results because multiple compare blocks
found.");
- mb =
StagingFileUtils.readCellList2BlockFromLocal( dir2+"/"+lnames2[0], blen );
- boolean appendOnly =
mb.isInSparseFormat();
- DenseBlock compare =
DataConverter.convertToDenseBlock(mb, false);
- for( String lname :
dir.list() ) {
- MatrixBlock tmp
= StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, blen );
-
mergeWithComp(mb, tmp, compare);
- }
-
- //sort sparse and exam
sparsity due to append-only
- if( appendOnly &&
!_isAccum )
-
mb.sortSparseRows();
-
- //change sparsity if
required after
- mb.examSparsity();
- }
- else //WITHOUT COMPARE BLOCK
- {
- //copy all non-zeros
from all workers
- boolean appendOnly =
false;
- for( String lname :
dir.list() ) {
- if( mb == null
) {
- mb =
StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, blen );
-
appendOnly = mb.isInSparseFormat();
- }
- else {
-
MatrixBlock tmp = StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname,
blen );
-
mergeWithoutComp(mb, tmp, appendOnly);
- }
- }
-
- //sort sparse due to
append-only
- if( appendOnly &&
!_isAccum )
-
mb.sortSparseRows();
-
- //change sparsity if
required after
- mb.examSparsity();
- }
- }
-
- //write the block to text cell
- if( mb!=null )
- {
- if( mb.isInSparseFormat() ) {
- Iterator<IJV> iter =
mb.getSparseBlockIterator();
- while( iter.hasNext() )
{
- IJV lcell =
iter.next();
-
sb.append(row_offset+lcell.getI());
- sb.append(' ');
-
sb.append(col_offset+lcell.getJ());
- sb.append(' ');
-
sb.append(lcell.getV());
- sb.append('\n');
- out.write(
sb.toString() );
- sb.setLength(0);
- written = true;
- }
- }
- else {
- DenseBlock d =
mb.getDenseBlock();
- for( int i=0; i<blen;
i++ )
- for( int j=0;
j<blen; j++ )
- {
- double
lvalue = d.get(i, j);
- if(
lvalue != 0 ) //for nnz
- {
-
sb.append(row_offset+i);
-
sb.append(' ');
-
sb.append(col_offset+j);
-
sb.append(' ');
-
sb.append(lvalue);
-
sb.append('\n');
-
out.write( sb.toString() );
-
sb.setLength(0);
-
written = true;
- }
- }
- }
- }
- }
-
- if( !written )
- out.write(IOUtilFunctions.EMPTY_TEXT_LINE);
- }
- }
-
- private static void copyAllFiles( String fnameNew,
ArrayList<MatrixObject> inMO )
- throws IOException
- {
- JobConf job = new
JobConf(ConfigurationManager.getCachedJobConf());
- Path path = new Path( fnameNew );
- FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
-
- //create output dir
- fs.mkdirs(path);
-
- //merge in all input matrix objects
- IDSequence seq = new IDSequence();
- for( MatrixObject in : inMO )
- {
- if( LOG.isTraceEnabled() )
- LOG.trace("ResultMerge (local, file): Merge
input "+in.hashCode()
- +" (fname="+in.getFileName()+") via
file rename.");
-
- //copy over files (just rename file or entire dir)
- Path tmpPath = new Path(in.getFileName());
- String lname = tmpPath.getName();
- fs.rename(tmpPath, new
Path(fnameNew+"/"+lname+seq.getNextID()));
- }
- }
}
diff --git
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeMatrix.java
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeMatrix.java
index 9efe624231..181899d954 100644
---
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeMatrix.java
+++
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeMatrix.java
@@ -90,7 +90,6 @@ public abstract class ResultMergeMatrix extends
ResultMerge<MatrixObject> {
// NaNs, since NaN != NaN, otherwise we would potentially
overwrite results
// * For the case of accumulation, we add out += (new-old) to
ensure correct results
// because all inputs have the old values replicated
-
final int rows = in.getNumRows();
final int cols = in.getNumColumns();
if(in.isEmptyBlock(false)) {
diff --git
a/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java
b/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java
index fabfce69ce..8bf2981a7e 100644
--- a/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java
+++ b/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java
@@ -33,7 +33,6 @@ import org.apache.sysds.runtime.meta.MetaDataFormat;
import org.apache.sysds.test.AutomatedTestBase;
import org.apache.sysds.test.TestConfiguration;
import org.apache.sysds.test.TestUtils;
-import org.junit.Ignore;
import org.junit.Test;
public class ResultMergeTest extends AutomatedTestBase{
@@ -52,7 +51,6 @@ public class ResultMergeTest extends AutomatedTestBase{
}
@Test
- @Ignore //FIXME
public void testLocalFile() {
testResultMergeAll(PResultMerge.LOCAL_FILE);
}
@@ -66,6 +64,11 @@ public class ResultMergeTest extends AutomatedTestBase{
testResultMerge(false, false, false, mtype);
testResultMerge(false, true, false, mtype);
testResultMerge(true, false, false, mtype);
+ testResultMerge(false, false, true, mtype);
+ if( mtype != PResultMerge.LOCAL_FILE ) //FIXME
+ testResultMerge(false, true, true, mtype);
+ testResultMerge(true, false, true, mtype);
+
//testResultMerge(true, true, false, mtype); invalid
}
@@ -75,19 +78,21 @@ public class ResultMergeTest extends AutomatedTestBase{
//create input and output objects
MatrixBlock A = MatrixBlock.randOperations(1200, 1100, 0.1);
CacheableData<?> Cobj = compare ?
- toMatrixObject(A, output("C")) :
+ toMatrixObject(new MatrixBlock(1200,1100,1d),
output("C")) :
toMatrixObject(new MatrixBlock(1200,1100,true),
output("C"));
- MatrixBlock empty = new MatrixBlock(400,1100,true);
+ MatrixBlock rest = compare ?
+ new MatrixBlock(400,1100,1d) : //constant (also dense)
+ new MatrixBlock(400,1100,true); //empty (also sparse)
MatrixObject[] Bobj = new MatrixObject[3];
- Bobj[0] =
toMatrixObject(A.slice(0,399).rbind(empty).rbind(empty), output("B0"));
- Bobj[1] =
toMatrixObject(empty.rbind(A.slice(400,799)).rbind(empty), output("B1"));
- Bobj[2] =
toMatrixObject(empty.rbind(empty).rbind(A.slice(800,1199)), output("B1"));
-
+ Bobj[0] =
toMatrixObject(A.slice(0,399).rbind(rest).rbind(rest), output("B0"));
+ Bobj[1] =
toMatrixObject(rest.rbind(A.slice(400,799)).rbind(rest), output("B1"));
+ Bobj[2] =
toMatrixObject(rest.rbind(rest).rbind(A.slice(800,1199)), output("B2"));
+
//create result merge
ExecutionContext ec = ExecutionContextFactory.createContext();
int numThreads = 3;
ResultMerge<?> rm = ParForProgramBlock.createResultMerge(
- mtype, Cobj, Bobj, output("C"), accum, numThreads, ec);
+ mtype, Cobj, Bobj, output("R"), accum, numThreads, ec);
//execute results merge
if( par )