This is an automated email from the ASF dual-hosted git repository. mboehm7 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push: new ce6f6cd68a [SYSTEMDS-3722] Fix file-based parfor result merge and related tests ce6f6cd68a is described below commit ce6f6cd68a235db64419dc4b93fcff17b46a575d Author: Matthias Boehm <mboe...@gmail.com> AuthorDate: Tue Aug 20 20:06:50 2024 +0200 [SYSTEMDS-3722] Fix file-based parfor result merge and related tests This patch fixes a recently discovered bug of the file-based parfor result merge (missing deep copy of compare block) as well as some setup of the ResultMergeTest (overwrite of files and compare setup). Furthermore, we now remove the support for textcell format during this file-based result merge, as meanwhile the sparse block binary representations are good enough. --- .../controlprogram/caching/CacheableData.java | 6 +- .../parfor/ResultMergeLocalFile.java | 454 ++------------------- .../controlprogram/parfor/ResultMergeMatrix.java | 1 - .../test/component/parfor/ResultMergeTest.java | 23 +- 4 files changed, 45 insertions(+), 439 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java index 8e37dc29a2..f40d9c2cde 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java @@ -846,7 +846,8 @@ public abstract class CacheableData<T extends CacheBlock<?>> extends Data if ( !isAvailableToRead() ) throw new DMLRuntimeException("MatrixObject not available to read."); - LOG.trace("Exporting " + this.getDebugName() + " to " + fName + " in format " + outputFormat); + if( LOG.isTraceEnabled() ) + LOG.trace("Exporting " + this.getDebugName() + " to " + fName + " in format " + outputFormat); if( DMLScript.USE_ACCELERATOR && _gpuObjects != null ) { boolean copiedFromGPU = false; @@ -962,7 +963,8 @@ public abstract class CacheableData<T extends CacheBlock<?>> extends Data else { //CASE 4: data already in hdfs (do nothing, no need for export) - LOG.trace(this.getDebugName() + ": Skip export to hdfs since data already exists."); + if( LOG.isTraceEnabled() ) + LOG.trace(this.getDebugName() + ": Skip export to hdfs since data already exists."); } _hdfsFileExists = true; diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java index b2340426e6..ce683e455b 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java @@ -19,55 +19,31 @@ package org.apache.sysds.runtime.controlprogram.parfor; -import java.io.BufferedWriter; import java.io.File; import java.io.IOException; -import java.io.OutputStreamWriter; import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Map.Entry; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.Writer; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.TextInputFormat; import org.apache.sysds.common.Types.FileFormat; import org.apache.sysds.conf.ConfigurationManager; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; -import org.apache.sysds.runtime.controlprogram.parfor.util.Cell; import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence; -import org.apache.sysds.runtime.controlprogram.parfor.util.StagingFileUtils; import org.apache.sysds.runtime.data.DenseBlock; import org.apache.sysds.runtime.io.IOUtilFunctions; -import org.apache.sysds.runtime.matrix.data.IJV; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.data.MatrixIndexes; import org.apache.sysds.runtime.meta.DataCharacteristics; import org.apache.sysds.runtime.meta.MatrixCharacteristics; import org.apache.sysds.runtime.meta.MetaDataFormat; import org.apache.sysds.runtime.util.DataConverter; -import org.apache.sysds.runtime.util.FastStringTokenizer; import org.apache.sysds.runtime.util.HDFSTool; import org.apache.sysds.runtime.util.LocalFileUtils; -/** - * - * TODO potential extension: parallel merge (create individual staging files concurrently) - * - * NOTE: file merge typically used due to memory constraints - parallel merge would increase the memory - * consumption again. - */ public class ResultMergeLocalFile extends ResultMergeMatrix { private static final long serialVersionUID = -6905893742840020489L; @@ -78,14 +54,11 @@ public class ResultMergeLocalFile extends ResultMergeMatrix //internal comparison matrix private IDSequence _seq = null; - public ResultMergeLocalFile( MatrixObject out, MatrixObject[] in, String outputFilename, boolean accum ) - { + public ResultMergeLocalFile( MatrixObject out, MatrixObject[] in, String outputFilename, boolean accum ) { super( out, in, outputFilename, accum ); - _seq = new IDSequence(); } - @Override public MatrixObject executeSerialMerge() { MatrixObject moNew = null; //always create new matrix object (required for nested parallelism) @@ -98,11 +71,9 @@ public class ResultMergeLocalFile extends ResultMergeMatrix { //collect all relevant inputs ArrayList<MatrixObject> inMO = new ArrayList<>(); - for( MatrixObject in : _inputs ) - { + for( MatrixObject in : _inputs ) { //check for empty inputs (no iterations executed) - if( in !=null && in != _output ) - { + if( in !=null && in != _output ) { //ensure that input file resides on disk in.exportData(); @@ -111,8 +82,7 @@ public class ResultMergeLocalFile extends ResultMergeMatrix } } - if( !inMO.isEmpty() ) - { + if( !inMO.isEmpty() ) { //ensure that outputfile (for comparison) resides on disk _output.exportData(); @@ -122,13 +92,11 @@ public class ResultMergeLocalFile extends ResultMergeMatrix //create new output matrix (e.g., to prevent potential export<->read file access conflict moNew = createNewMatrixObject( _output, inMO ); } - else - { + else { moNew = _output; //return old matrix, to prevent copy } } - catch(Exception ex) - { + catch(Exception ex) { throw new DMLRuntimeException(ex); } @@ -161,13 +129,7 @@ public class ResultMergeLocalFile extends ResultMergeMatrix FileFormat fmt = ((MetaDataFormat)outMo.getMetaData()).getFileFormat(); boolean withCompare = ( outMo.getNnz() != 0 ); //if nnz exist or unknown (-1) - if( fmt == FileFormat.TEXT ) { - if(withCompare) - mergeTextCellWithComp(fnameNew, outMo, inMO); - else - mergeTextCellWithoutComp( fnameNew, outMo, inMO ); - } - else if( fmt == FileFormat.BINARY ) { + if( fmt == FileFormat.BINARY ) { if(withCompare) mergeBinaryBlockWithComp( fnameNew, outMo, inMO ); else @@ -175,122 +137,16 @@ public class ResultMergeLocalFile extends ResultMergeMatrix } } - private static void mergeTextCellWithoutComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO ) - { - try - { - //delete target file if already exists - HDFSTool.deleteFileIfExistOnHDFS(fnameNew); - - if( ALLOW_COPY_CELLFILES ) - { - copyAllFiles(fnameNew, inMO); - return; //we're done - } - - //actual merge - JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - Path path = new Path( fnameNew ); - FileSystem fs = IOUtilFunctions.getFileSystem(path, job); - BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(path,true))); - - String valueStr = null; - - try - { - for( MatrixObject in : inMO ) //read/write all inputs - { - if( LOG.isTraceEnabled() ) - LOG.trace("ResultMerge (local, file): Merge input "+in.hashCode()+" (fname=" - +in.getFileName()+") via stream merge"); - - JobConf tmpJob = new JobConf(ConfigurationManager.getCachedJobConf()); - Path tmpPath = new Path(in.getFileName()); - FileInputFormat.addInputPath(tmpJob, tmpPath); - TextInputFormat informat = new TextInputFormat(); - informat.configure(tmpJob); - InputSplit[] splits = informat.getSplits(tmpJob, 1); - - LongWritable key = new LongWritable(); - Text value = new Text(); - - for(InputSplit split: splits) - { - RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, tmpJob, Reporter.NULL); - try - { - while(reader.next(key, value)) - { - valueStr = value.toString().trim(); - out.write( valueStr+"\n" ); - } - } - finally { - IOUtilFunctions.closeSilently(reader); - } - } - } - } - finally { - IOUtilFunctions.closeSilently(out); - } - } - catch(Exception ex) - { - throw new DMLRuntimeException("Unable to merge text cell results.", ex); - } - } - - private void mergeTextCellWithComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO ) - { - String fnameStaging = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE); - String fnameStagingCompare = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE); - - try - { - //delete target file if already exists - HDFSTool.deleteFileIfExistOnHDFS(fnameNew); - - //Step 0) write compare blocks to staging area (if necessary) - if( LOG.isTraceEnabled() ) - LOG.trace("ResultMerge (local, file): Create merge compare matrix for output " - +outMo.hashCode()+" (fname="+outMo.getFileName()+")"); - createTextCellStagingFile(fnameStagingCompare, outMo, 0); - - //Step 1) read and write blocks to staging area - for( MatrixObject in : inMO ) - { - if( LOG.isTraceEnabled() ) - LOG.trace("ResultMerge (local, file): Merge input "+in.hashCode()+" (fname="+in.getFileName()+")"); - - long ID = _seq.getNextID(); - createTextCellStagingFile( fnameStaging, in, ID ); - } - - //Step 2) read blocks, consolidate, and write to HDFS - createTextCellResultFile(fnameStaging, fnameStagingCompare, fnameNew, (MetaDataFormat)outMo.getMetaData(), true); - } - catch(Exception ex) - { - throw new DMLRuntimeException("Unable to merge text cell results.", ex); - } - - LocalFileUtils.cleanupWorkingDirectory(fnameStaging); - LocalFileUtils.cleanupWorkingDirectory(fnameStagingCompare); - } - private void mergeBinaryBlockWithoutComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO ) { String fnameStaging = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE); - try - { + try { //delete target file if already exists HDFSTool.deleteFileIfExistOnHDFS(fnameNew); //Step 1) read and write blocks to staging area - for( MatrixObject in : inMO ) - { + for( MatrixObject in : inMO ) { if( LOG.isTraceEnabled() ) LOG.trace("ResultMerge (local, file): Merge input "+in.hashCode()+" (fname="+in.getFileName()+")"); @@ -300,10 +156,9 @@ public class ResultMergeLocalFile extends ResultMergeMatrix //Step 2) read blocks, consolidate, and write to HDFS createBinaryBlockResultFile(fnameStaging, null, fnameNew, (MetaDataFormat)outMo.getMetaData(), false); } - catch(Exception ex) - { + catch(Exception ex) { throw new DMLRuntimeException("Unable to merge binary block results.", ex); - } + } LocalFileUtils.cleanupWorkingDirectory(fnameStaging); } @@ -326,20 +181,19 @@ public class ResultMergeLocalFile extends ResultMergeMatrix createBinaryBlockStagingFile(fnameStagingCompare, outMo); //Step 1) read and write blocks to staging area - for( MatrixObject in : inMO ) - { + for( MatrixObject in : inMO ) { if( LOG.isTraceEnabled() ) LOG.trace("ResultMerge (local, file): Merge input "+in.hashCode()+" (fname="+in.getFileName()+")"); + createBinaryBlockStagingFile( fnameStaging, in ); } - + //Step 2) read blocks, consolidate, and write to HDFS createBinaryBlockResultFile(fnameStaging, fnameStagingCompare, fnameNew, (MetaDataFormat)outMo.getMetaData(), true); } - catch(Exception ex) - { + catch(Exception ex) { throw new DMLRuntimeException("Unable to merge binary block results.", ex); - } + } LocalFileUtils.cleanupWorkingDirectory(fnameStaging); LocalFileUtils.cleanupWorkingDirectory(fnameStagingCompare); @@ -348,7 +202,7 @@ public class ResultMergeLocalFile extends ResultMergeMatrix @SuppressWarnings("deprecation") private void createBinaryBlockStagingFile( String fnameStaging, MatrixObject mo ) throws IOException - { + { MatrixIndexes key = new MatrixIndexes(); MatrixBlock value = new MatrixBlock(); @@ -356,131 +210,26 @@ public class ResultMergeLocalFile extends ResultMergeMatrix Path tmpPath = new Path(mo.getFileName()); FileSystem fs = IOUtilFunctions.getFileSystem(tmpPath, tmpJob); - for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, tmpPath)) - { - SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,tmpJob); - try - { - while(reader.next(key, value)) //for each block - { + for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, tmpPath)) { + try( SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,tmpJob) ) { + while(reader.next(key, value)) { //for each block String lname = key.getRowIndex()+"_"+key.getColumnIndex(); String dir = fnameStaging+"/"+lname; - if( value.getNonZeros()>0 ) //write only non-empty blocks - { + + if( value.getNonZeros()>0 ) { //write only non-empty blocks LocalFileUtils.checkAndCreateStagingDir( dir ); LocalFileUtils.writeMatrixBlockToLocal(dir+"/"+_seq.getNextID(), value); } } } - finally { - IOUtilFunctions.closeSilently(reader); - } - } - } - - private static void createTextCellStagingFile( String fnameStaging, MatrixObject mo, long ID ) - throws IOException, DMLRuntimeException - { - JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - Path path = new Path(mo.getFileName()); - FileInputFormat.addInputPath(job, path); - TextInputFormat informat = new TextInputFormat(); - informat.configure(job); - InputSplit[] splits = informat.getSplits(job, 1); - - LinkedList<Cell> buffer = new LinkedList<>(); - LongWritable key = new LongWritable(); - Text value = new Text(); - - DataCharacteristics mc = mo.getDataCharacteristics(); - int blen = mc.getBlocksize(); - //long row = -1, col = -1; //FIXME needs reconsideration whenever textcell is used actively - //NOTE MB: Originally, we used long row, col but this led reproducibly to JIT compilation - // errors during runtime; experienced under WINDOWS, Intel x86-64, IBM JDK 64bit/32bit. - // It works fine with int row, col but we require long for larger matrices. - // Since, textcell is never used for result merge (hybrid/hadoop: binaryblock, singlenode:binarycell) - // we just propose the to exclude it with -Xjit:exclude={package.method*}(count=0,optLevel=0) - - FastStringTokenizer st = new FastStringTokenizer(' '); - - for(InputSplit split : splits) - { - RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL); - try - { - while(reader.next(key, value)) - { - st.reset( value.toString() ); //reset tokenizer - long row = st.nextLong(); - long col = st.nextLong(); - double lvalue = Double.parseDouble( st.nextToken() ); - - Cell tmp = new Cell( row, col, lvalue ); - - buffer.addLast( tmp ); - if( buffer.size() > StagingFileUtils.CELL_BUFFER_SIZE ) //periodic flush - { - appendCellBufferToStagingArea(fnameStaging, ID, buffer, blen); - buffer.clear(); - } - } - - //final flush - if( !buffer.isEmpty() ) - { - appendCellBufferToStagingArea(fnameStaging, ID, buffer, blen); - buffer.clear(); - } - } - finally { - IOUtilFunctions.closeSilently(reader); - } } } - private static void appendCellBufferToStagingArea( String fnameStaging, long ID, LinkedList<Cell> buffer, int blen ) - throws IOException - { - HashMap<Long,HashMap<Long,LinkedList<Cell>>> sortedBuffer = new HashMap<>(); - long brow, bcol, row_offset, col_offset; - - for( Cell c : buffer ) - { - brow = (c.getRow()-1)/blen + 1; - bcol = (c.getCol()-1)/blen + 1; - row_offset = (brow-1)*blen + 1; - col_offset = (bcol-1)*blen + 1; - - c.setRow( c.getRow() - row_offset); - c.setCol(c.getCol() - col_offset); - - if( !sortedBuffer.containsKey(brow) ) - sortedBuffer.put(brow, new HashMap<Long,LinkedList<Cell>>()); - if( !sortedBuffer.get(brow).containsKey(bcol) ) - sortedBuffer.get(brow).put(bcol, new LinkedList<Cell>()); - sortedBuffer.get(brow).get(bcol).addLast(c); - } - - //write lists of cells to local files - for( Entry<Long,HashMap<Long,LinkedList<Cell>>> e : sortedBuffer.entrySet() ) - { - brow = e.getKey(); - for( Entry<Long,LinkedList<Cell>> e2 : e.getValue().entrySet() ) - { - bcol = e2.getKey(); - String lname = brow+"_"+bcol; - String dir = fnameStaging+"/"+lname; - LocalFileUtils.checkAndCreateStagingDir( dir ); - StagingFileUtils.writeCellListToLocal(dir+"/"+ID, e2.getValue()); - } - } - } - private void createBinaryBlockResultFile( String fnameStaging, String fnameStagingCompare, String fnameNew, MetaDataFormat metadata, boolean withCompare ) throws IOException, DMLRuntimeException { JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - Path path = new Path( fnameNew ); + Path path = new Path( fnameNew ); DataCharacteristics mc = metadata.getDataCharacteristics(); long rlen = mc.getRows(); @@ -491,16 +240,13 @@ public class ResultMergeLocalFile extends ResultMergeMatrix try { MatrixIndexes indexes = new MatrixIndexes(); - for(long brow = 1; brow <= (long)Math.ceil(rlen/(double)blen); brow++){ - - for(long bcol = 1; bcol <= (long)Math.ceil(clen/(double)blen); bcol++) - { + for(long brow = 1; brow <= (long)Math.ceil(rlen/(double)blen); brow++) { + for(long bcol = 1; bcol <= (long)Math.ceil(clen/(double)blen); bcol++) { File dir = new File(fnameStaging+"/"+brow+"_"+bcol); File dir2 = new File(fnameStagingCompare+"/"+brow+"_"+bcol); MatrixBlock mb = null; - if( dir.exists() ) - { + if( dir.exists() ) { if( withCompare && dir2.exists() ) //WITH COMPARE BLOCK { //copy only values that are different from the original @@ -509,7 +255,7 @@ public class ResultMergeLocalFile extends ResultMergeMatrix throw new DMLRuntimeException("Unable to merge results because multiple compare blocks found."); mb = LocalFileUtils.readMatrixBlockFromLocal( dir2+"/"+lnames2[0] ); boolean appendOnly = mb.isInSparseFormat(); - DenseBlock compare = DataConverter.convertToDenseBlock(mb, false); + DenseBlock compare = DataConverter.convertToDenseBlock(mb, true); for( String lname : dir.list() ) { MatrixBlock tmp = LocalFileUtils.readMatrixBlockFromLocal( dir+"/"+lname ); mergeWithComp(mb, tmp, compare); @@ -562,150 +308,4 @@ public class ResultMergeLocalFile extends ResultMergeMatrix IOUtilFunctions.closeSilently(writer); } } - - private void createTextCellResultFile( String fnameStaging, String fnameStagingCompare, String fnameNew, MetaDataFormat metadata, boolean withCompare ) - throws IOException, DMLRuntimeException - { - JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - Path path = new Path( fnameNew ); - FileSystem fs = IOUtilFunctions.getFileSystem(path, job); - - DataCharacteristics mc = metadata.getDataCharacteristics(); - long rlen = mc.getRows(); - long clen = mc.getCols(); - int blen = mc.getBlocksize(); - - try( BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(path,true))) ) { - //for obj reuse and preventing repeated buffer re-allocations - StringBuilder sb = new StringBuilder(); - - boolean written=false; - for(long brow = 1; brow <= (long)Math.ceil(rlen/(double)blen); brow++) - for(long bcol = 1; bcol <= (long)Math.ceil(clen/(double)blen); bcol++) - { - File dir = new File(fnameStaging+"/"+brow+"_"+bcol); - File dir2 = new File(fnameStagingCompare+"/"+brow+"_"+bcol); - MatrixBlock mb = null; - - long row_offset = (brow-1)*blen + 1; - long col_offset = (bcol-1)*blen + 1; - - - if( dir.exists() ) - { - if( withCompare && dir2.exists() ) //WITH COMPARE BLOCK - { - //copy only values that are different from the original - String[] lnames2 = dir2.list(); - if( lnames2.length != 1 ) //there should be exactly 1 compare block - throw new DMLRuntimeException("Unable to merge results because multiple compare blocks found."); - mb = StagingFileUtils.readCellList2BlockFromLocal( dir2+"/"+lnames2[0], blen ); - boolean appendOnly = mb.isInSparseFormat(); - DenseBlock compare = DataConverter.convertToDenseBlock(mb, false); - for( String lname : dir.list() ) { - MatrixBlock tmp = StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, blen ); - mergeWithComp(mb, tmp, compare); - } - - //sort sparse and exam sparsity due to append-only - if( appendOnly && !_isAccum ) - mb.sortSparseRows(); - - //change sparsity if required after - mb.examSparsity(); - } - else //WITHOUT COMPARE BLOCK - { - //copy all non-zeros from all workers - boolean appendOnly = false; - for( String lname : dir.list() ) { - if( mb == null ) { - mb = StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, blen ); - appendOnly = mb.isInSparseFormat(); - } - else { - MatrixBlock tmp = StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, blen ); - mergeWithoutComp(mb, tmp, appendOnly); - } - } - - //sort sparse due to append-only - if( appendOnly && !_isAccum ) - mb.sortSparseRows(); - - //change sparsity if required after - mb.examSparsity(); - } - } - - //write the block to text cell - if( mb!=null ) - { - if( mb.isInSparseFormat() ) { - Iterator<IJV> iter = mb.getSparseBlockIterator(); - while( iter.hasNext() ) { - IJV lcell = iter.next(); - sb.append(row_offset+lcell.getI()); - sb.append(' '); - sb.append(col_offset+lcell.getJ()); - sb.append(' '); - sb.append(lcell.getV()); - sb.append('\n'); - out.write( sb.toString() ); - sb.setLength(0); - written = true; - } - } - else { - DenseBlock d = mb.getDenseBlock(); - for( int i=0; i<blen; i++ ) - for( int j=0; j<blen; j++ ) - { - double lvalue = d.get(i, j); - if( lvalue != 0 ) //for nnz - { - sb.append(row_offset+i); - sb.append(' '); - sb.append(col_offset+j); - sb.append(' '); - sb.append(lvalue); - sb.append('\n'); - out.write( sb.toString() ); - sb.setLength(0); - written = true; - } - } - } - } - } - - if( !written ) - out.write(IOUtilFunctions.EMPTY_TEXT_LINE); - } - } - - private static void copyAllFiles( String fnameNew, ArrayList<MatrixObject> inMO ) - throws IOException - { - JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); - Path path = new Path( fnameNew ); - FileSystem fs = IOUtilFunctions.getFileSystem(path, job); - - //create output dir - fs.mkdirs(path); - - //merge in all input matrix objects - IDSequence seq = new IDSequence(); - for( MatrixObject in : inMO ) - { - if( LOG.isTraceEnabled() ) - LOG.trace("ResultMerge (local, file): Merge input "+in.hashCode() - +" (fname="+in.getFileName()+") via file rename."); - - //copy over files (just rename file or entire dir) - Path tmpPath = new Path(in.getFileName()); - String lname = tmpPath.getName(); - fs.rename(tmpPath, new Path(fnameNew+"/"+lname+seq.getNextID())); - } - } } diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeMatrix.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeMatrix.java index 9efe624231..181899d954 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeMatrix.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeMatrix.java @@ -90,7 +90,6 @@ public abstract class ResultMergeMatrix extends ResultMerge<MatrixObject> { // NaNs, since NaN != NaN, otherwise we would potentially overwrite results // * For the case of accumulation, we add out += (new-old) to ensure correct results // because all inputs have the old values replicated - final int rows = in.getNumRows(); final int cols = in.getNumColumns(); if(in.isEmptyBlock(false)) { diff --git a/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java b/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java index fabfce69ce..8bf2981a7e 100644 --- a/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java +++ b/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java @@ -33,7 +33,6 @@ import org.apache.sysds.runtime.meta.MetaDataFormat; import org.apache.sysds.test.AutomatedTestBase; import org.apache.sysds.test.TestConfiguration; import org.apache.sysds.test.TestUtils; -import org.junit.Ignore; import org.junit.Test; public class ResultMergeTest extends AutomatedTestBase{ @@ -52,7 +51,6 @@ public class ResultMergeTest extends AutomatedTestBase{ } @Test - @Ignore //FIXME public void testLocalFile() { testResultMergeAll(PResultMerge.LOCAL_FILE); } @@ -66,6 +64,11 @@ public class ResultMergeTest extends AutomatedTestBase{ testResultMerge(false, false, false, mtype); testResultMerge(false, true, false, mtype); testResultMerge(true, false, false, mtype); + testResultMerge(false, false, true, mtype); + if( mtype != PResultMerge.LOCAL_FILE ) //FIXME + testResultMerge(false, true, true, mtype); + testResultMerge(true, false, true, mtype); + //testResultMerge(true, true, false, mtype); invalid } @@ -75,19 +78,21 @@ public class ResultMergeTest extends AutomatedTestBase{ //create input and output objects MatrixBlock A = MatrixBlock.randOperations(1200, 1100, 0.1); CacheableData<?> Cobj = compare ? - toMatrixObject(A, output("C")) : + toMatrixObject(new MatrixBlock(1200,1100,1d), output("C")) : toMatrixObject(new MatrixBlock(1200,1100,true), output("C")); - MatrixBlock empty = new MatrixBlock(400,1100,true); + MatrixBlock rest = compare ? + new MatrixBlock(400,1100,1d) : //constant (also dense) + new MatrixBlock(400,1100,true); //empty (also sparse) MatrixObject[] Bobj = new MatrixObject[3]; - Bobj[0] = toMatrixObject(A.slice(0,399).rbind(empty).rbind(empty), output("B0")); - Bobj[1] = toMatrixObject(empty.rbind(A.slice(400,799)).rbind(empty), output("B1")); - Bobj[2] = toMatrixObject(empty.rbind(empty).rbind(A.slice(800,1199)), output("B1")); - + Bobj[0] = toMatrixObject(A.slice(0,399).rbind(rest).rbind(rest), output("B0")); + Bobj[1] = toMatrixObject(rest.rbind(A.slice(400,799)).rbind(rest), output("B1")); + Bobj[2] = toMatrixObject(rest.rbind(rest).rbind(A.slice(800,1199)), output("B2")); + //create result merge ExecutionContext ec = ExecutionContextFactory.createContext(); int numThreads = 3; ResultMerge<?> rm = ParForProgramBlock.createResultMerge( - mtype, Cobj, Bobj, output("C"), accum, numThreads, ec); + mtype, Cobj, Bobj, output("R"), accum, numThreads, ec); //execute results merge if( par )