This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new ce6f6cd68a [SYSTEMDS-3722] Fix file-based parfor result merge and 
related tests
ce6f6cd68a is described below

commit ce6f6cd68a235db64419dc4b93fcff17b46a575d
Author: Matthias Boehm <mboe...@gmail.com>
AuthorDate: Tue Aug 20 20:06:50 2024 +0200

    [SYSTEMDS-3722] Fix file-based parfor result merge and related tests
    
    This patch fixes a recently discovered bug of the file-based parfor
    result merge (missing deep copy of compare block) as well as some
    setup of the ResultMergeTest (overwrite of files and compare setup).
    
    Furthermore, we now remove the support for textcell format during
    this file-based result merge, as meanwhile the sparse block binary
    representations are good enough.
---
 .../controlprogram/caching/CacheableData.java      |   6 +-
 .../parfor/ResultMergeLocalFile.java               | 454 ++-------------------
 .../controlprogram/parfor/ResultMergeMatrix.java   |   1 -
 .../test/component/parfor/ResultMergeTest.java     |  23 +-
 4 files changed, 45 insertions(+), 439 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
index 8e37dc29a2..f40d9c2cde 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/caching/CacheableData.java
@@ -846,7 +846,8 @@ public abstract class CacheableData<T extends 
CacheBlock<?>> extends Data
                if ( !isAvailableToRead() )
                        throw new DMLRuntimeException("MatrixObject not 
available to read.");
 
-               LOG.trace("Exporting " + this.getDebugName() + " to " + fName + 
" in format " + outputFormat);
+               if( LOG.isTraceEnabled() )
+                       LOG.trace("Exporting " + this.getDebugName() + " to " + 
fName + " in format " + outputFormat);
                
                if( DMLScript.USE_ACCELERATOR && _gpuObjects != null ) {
                        boolean copiedFromGPU = false;
@@ -962,7 +963,8 @@ public abstract class CacheableData<T extends 
CacheBlock<?>> extends Data
                else 
                {
                        //CASE 4: data already in hdfs (do nothing, no need for 
export)
-                       LOG.trace(this.getDebugName() + ": Skip export to hdfs 
since data already exists.");
+                       if( LOG.isTraceEnabled() )
+                               LOG.trace(this.getDebugName() + ": Skip export 
to hdfs since data already exists.");
                }
                
                _hdfsFileExists = true;
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java
index b2340426e6..ce683e455b 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeLocalFile.java
@@ -19,55 +19,31 @@
 
 package org.apache.sysds.runtime.controlprogram.parfor;
 
-import java.io.BufferedWriter;
 import java.io.File;
 import java.io.IOException;
-import java.io.OutputStreamWriter;
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map.Entry;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
-import org.apache.sysds.runtime.controlprogram.parfor.util.Cell;
 import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
-import org.apache.sysds.runtime.controlprogram.parfor.util.StagingFileUtils;
 import org.apache.sysds.runtime.data.DenseBlock;
 import org.apache.sysds.runtime.io.IOUtilFunctions;
-import org.apache.sysds.runtime.matrix.data.IJV;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysds.runtime.meta.DataCharacteristics;
 import org.apache.sysds.runtime.meta.MatrixCharacteristics;
 import org.apache.sysds.runtime.meta.MetaDataFormat;
 import org.apache.sysds.runtime.util.DataConverter;
-import org.apache.sysds.runtime.util.FastStringTokenizer;
 import org.apache.sysds.runtime.util.HDFSTool;
 import org.apache.sysds.runtime.util.LocalFileUtils;
 
-/**
- * 
- * TODO potential extension: parallel merge (create individual staging files 
concurrently)
- *     
- *      NOTE: file merge typically used due to memory constraints - parallel 
merge would increase the memory
- *      consumption again.
- */
 public class ResultMergeLocalFile extends ResultMergeMatrix
 {
        private static final long serialVersionUID = -6905893742840020489L;
@@ -78,14 +54,11 @@ public class ResultMergeLocalFile extends ResultMergeMatrix
        //internal comparison matrix
        private IDSequence _seq = null;
        
-       public ResultMergeLocalFile( MatrixObject out, MatrixObject[] in, 
String outputFilename, boolean accum )
-       {
+       public ResultMergeLocalFile( MatrixObject out, MatrixObject[] in, 
String outputFilename, boolean accum ) {
                super( out, in, outputFilename, accum );
-               
                _seq = new IDSequence();
        }
 
-
        @Override
        public MatrixObject executeSerialMerge() {
                MatrixObject moNew = null; //always create new matrix object 
(required for nested parallelism)
@@ -98,11 +71,9 @@ public class ResultMergeLocalFile extends ResultMergeMatrix
                {
                        //collect all relevant inputs
                        ArrayList<MatrixObject> inMO = new ArrayList<>();
-                       for( MatrixObject in : _inputs )
-                       {
+                       for( MatrixObject in : _inputs ) {
                                //check for empty inputs (no iterations 
executed)
-                               if( in !=null && in != _output ) 
-                               {
+                               if( in !=null && in != _output ) {
                                        //ensure that input file resides on disk
                                        in.exportData();
                                        
@@ -111,8 +82,7 @@ public class ResultMergeLocalFile extends ResultMergeMatrix
                                }
                        }
 
-                       if( !inMO.isEmpty() )
-                       {
+                       if( !inMO.isEmpty() ) {
                                //ensure that outputfile (for comparison) 
resides on disk
                                _output.exportData();
                                
@@ -122,13 +92,11 @@ public class ResultMergeLocalFile extends ResultMergeMatrix
                                //create new output matrix (e.g., to prevent 
potential export<->read file access conflict
                                moNew = createNewMatrixObject( _output, inMO );
                        }
-                       else
-                       {
+                       else {
                                moNew = _output; //return old matrix, to 
prevent copy
                        }
                }
-               catch(Exception ex)
-               {
+               catch(Exception ex) {
                        throw new DMLRuntimeException(ex);
                }
 
@@ -161,13 +129,7 @@ public class ResultMergeLocalFile extends ResultMergeMatrix
                FileFormat fmt = 
((MetaDataFormat)outMo.getMetaData()).getFileFormat();
                boolean withCompare = ( outMo.getNnz() != 0 ); //if nnz exist 
or unknown (-1)
                
-               if( fmt == FileFormat.TEXT ) {
-                       if(withCompare)
-                               mergeTextCellWithComp(fnameNew, outMo, inMO);
-                       else
-                               mergeTextCellWithoutComp( fnameNew, outMo, inMO 
);
-               }
-               else if( fmt == FileFormat.BINARY ) {
+               if( fmt == FileFormat.BINARY ) {
                        if(withCompare)
                                mergeBinaryBlockWithComp( fnameNew, outMo, inMO 
);
                        else
@@ -175,122 +137,16 @@ public class ResultMergeLocalFile extends 
ResultMergeMatrix
                }
        }
 
-       private static void mergeTextCellWithoutComp( String fnameNew, 
MatrixObject outMo, ArrayList<MatrixObject> inMO ) 
-       {
-               try
-               {
-                       //delete target file if already exists
-                       HDFSTool.deleteFileIfExistOnHDFS(fnameNew);
-                       
-                       if( ALLOW_COPY_CELLFILES )
-                       {
-                               copyAllFiles(fnameNew, inMO);
-                               return; //we're done
-                       }
-                       
-                       //actual merge
-                       JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-                       Path path = new Path( fnameNew );
-                       FileSystem fs = IOUtilFunctions.getFileSystem(path, 
job);
-                       BufferedWriter out = new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true)));          
-                       
-                       String valueStr = null;
-                       
-                       try
-                       {
-                               for( MatrixObject in : inMO ) //read/write all 
inputs
-                               {
-                                       if( LOG.isTraceEnabled() )
-                                               LOG.trace("ResultMerge (local, 
file): Merge input "+in.hashCode()+" (fname="
-                                                       +in.getFileName()+") 
via stream merge");
-                                       
-                                       JobConf tmpJob = new 
JobConf(ConfigurationManager.getCachedJobConf());
-                                       Path tmpPath = new 
Path(in.getFileName());
-                                       FileInputFormat.addInputPath(tmpJob, 
tmpPath);
-                                       TextInputFormat informat = new 
TextInputFormat();
-                                       informat.configure(tmpJob);
-                                       InputSplit[] splits = 
informat.getSplits(tmpJob, 1);
-                                       
-                                       LongWritable key = new LongWritable();
-                                       Text value = new Text();
-               
-                                       for(InputSplit split: splits)
-                                       {
-                                               RecordReader<LongWritable,Text> 
reader = informat.getRecordReader(split, tmpJob, Reporter.NULL);
-                                               try
-                                               {
-                                                       while(reader.next(key, 
value))
-                                                       {
-                                                               valueStr = 
value.toString().trim();     
-                                                               out.write( 
valueStr+"\n" );
-                                                       }
-                                               }
-                                               finally {
-                                                       
IOUtilFunctions.closeSilently(reader);
-                                               }
-                                       }
-                               }
-                       }
-                       finally {
-                               IOUtilFunctions.closeSilently(out);
-                       }
-               }
-               catch(Exception ex)
-               {
-                       throw new DMLRuntimeException("Unable to merge text 
cell results.", ex);
-               }
-       }
-
-       private void mergeTextCellWithComp( String fnameNew, MatrixObject 
outMo, ArrayList<MatrixObject> inMO ) 
-       {
-               String fnameStaging = 
LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
-               String fnameStagingCompare = 
LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
-               
-               try
-               {
-                       //delete target file if already exists
-                       HDFSTool.deleteFileIfExistOnHDFS(fnameNew);
-                       
-                       //Step 0) write compare blocks to staging area (if 
necessary)
-                       if( LOG.isTraceEnabled() )
-                               LOG.trace("ResultMerge (local, file): Create 
merge compare matrix for output "
-                                       +outMo.hashCode()+" 
(fname="+outMo.getFileName()+")");
-                       createTextCellStagingFile(fnameStagingCompare, outMo, 
0);
-                       
-                       //Step 1) read and write blocks to staging area
-                       for( MatrixObject in : inMO )
-                       {
-                               if( LOG.isTraceEnabled() )
-                                       LOG.trace("ResultMerge (local, file): 
Merge input "+in.hashCode()+" (fname="+in.getFileName()+")");
-                               
-                               long ID = _seq.getNextID();
-                               createTextCellStagingFile( fnameStaging, in, ID 
);
-                       }
-       
-                       //Step 2) read blocks, consolidate, and write to HDFS
-                       createTextCellResultFile(fnameStaging, 
fnameStagingCompare, fnameNew, (MetaDataFormat)outMo.getMetaData(), true);
-               }       
-               catch(Exception ex)
-               {
-                       throw new DMLRuntimeException("Unable to merge text 
cell results.", ex);
-               }
-               
-               LocalFileUtils.cleanupWorkingDirectory(fnameStaging);
-               LocalFileUtils.cleanupWorkingDirectory(fnameStagingCompare);
-       }
-
        private void mergeBinaryBlockWithoutComp( String fnameNew, MatrixObject 
outMo, ArrayList<MatrixObject> inMO ) 
        {
                String fnameStaging = 
LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
                
-               try
-               {
+               try {
                        //delete target file if already exists
                        HDFSTool.deleteFileIfExistOnHDFS(fnameNew);
                        
                        //Step 1) read and write blocks to staging area
-                       for( MatrixObject in : inMO )
-                       {
+                       for( MatrixObject in : inMO ) {
                                if( LOG.isTraceEnabled() )
                                        LOG.trace("ResultMerge (local, file): 
Merge input "+in.hashCode()+" (fname="+in.getFileName()+")");
                                
@@ -300,10 +156,9 @@ public class ResultMergeLocalFile extends ResultMergeMatrix
                        //Step 2) read blocks, consolidate, and write to HDFS
                        createBinaryBlockResultFile(fnameStaging, null, 
fnameNew, (MetaDataFormat)outMo.getMetaData(), false);
                }       
-               catch(Exception ex)
-               {
+               catch(Exception ex) {
                        throw new DMLRuntimeException("Unable to merge binary 
block results.", ex);
-               }       
+               }
                
                LocalFileUtils.cleanupWorkingDirectory(fnameStaging);
        }
@@ -326,20 +181,19 @@ public class ResultMergeLocalFile extends 
ResultMergeMatrix
                        createBinaryBlockStagingFile(fnameStagingCompare, 
outMo);
                        
                        //Step 1) read and write blocks to staging area
-                       for( MatrixObject in : inMO )
-                       {
+                       for( MatrixObject in : inMO ) {
                                if( LOG.isTraceEnabled() )
                                        LOG.trace("ResultMerge (local, file): 
Merge input "+in.hashCode()+" (fname="+in.getFileName()+")");
+                               
                                createBinaryBlockStagingFile( fnameStaging, in 
);
                        }
-       
+                       
                        //Step 2) read blocks, consolidate, and write to HDFS
                        createBinaryBlockResultFile(fnameStaging, 
fnameStagingCompare, fnameNew, (MetaDataFormat)outMo.getMetaData(), true);
                }       
-               catch(Exception ex)
-               {
+               catch(Exception ex) {
                        throw new DMLRuntimeException("Unable to merge binary 
block results.", ex);
-               }       
+               }
                
                LocalFileUtils.cleanupWorkingDirectory(fnameStaging);
                LocalFileUtils.cleanupWorkingDirectory(fnameStagingCompare);
@@ -348,7 +202,7 @@ public class ResultMergeLocalFile extends ResultMergeMatrix
        @SuppressWarnings("deprecation")
        private void createBinaryBlockStagingFile( String fnameStaging, 
MatrixObject mo ) 
                throws IOException
-       {               
+       {
                MatrixIndexes key = new MatrixIndexes(); 
                MatrixBlock value = new MatrixBlock();
                
@@ -356,131 +210,26 @@ public class ResultMergeLocalFile extends 
ResultMergeMatrix
                Path tmpPath = new Path(mo.getFileName());
                FileSystem fs = IOUtilFunctions.getFileSystem(tmpPath, tmpJob);
                
-               for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, 
tmpPath))
-               {
-                       SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,lpath,tmpJob);
-                       try
-                       {
-                               while(reader.next(key, value)) //for each block
-                               {                                               
        
+               for(Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, 
tmpPath)) {
+                       try( SequenceFile.Reader reader = new 
SequenceFile.Reader(fs,lpath,tmpJob) ) {
+                               while(reader.next(key, value)) { //for each 
block
                                        String lname = 
key.getRowIndex()+"_"+key.getColumnIndex();
                                        String dir = fnameStaging+"/"+lname;
-                                       if( value.getNonZeros()>0 ) //write 
only non-empty blocks
-                                       {
+                                       
+                                       if( value.getNonZeros()>0 ) { //write 
only non-empty blocks
                                                
LocalFileUtils.checkAndCreateStagingDir( dir );
                                                
LocalFileUtils.writeMatrixBlockToLocal(dir+"/"+_seq.getNextID(), value);
                                        }
                                }
                        }
-                       finally {
-                               IOUtilFunctions.closeSilently(reader);
-                       }
-               }
-       }
-
-       private static void createTextCellStagingFile( String fnameStaging, 
MatrixObject mo, long ID ) 
-               throws IOException, DMLRuntimeException
-       {               
-               JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               Path path = new Path(mo.getFileName());
-               FileInputFormat.addInputPath(job, path);
-               TextInputFormat informat = new TextInputFormat();
-               informat.configure(job);
-               InputSplit[] splits = informat.getSplits(job, 1);
-               
-               LinkedList<Cell> buffer = new LinkedList<>();
-               LongWritable key = new LongWritable();
-               Text value = new Text();
-
-               DataCharacteristics mc = mo.getDataCharacteristics();
-               int blen = mc.getBlocksize(); 
-               //long row = -1, col = -1; //FIXME needs reconsideration 
whenever textcell is used actively
-               //NOTE MB: Originally, we used long row, col but this led 
reproducibly to JIT compilation
-               // errors during runtime; experienced under WINDOWS, Intel 
x86-64, IBM JDK 64bit/32bit.
-               // It works fine with int row, col but we require long for 
larger matrices.
-               // Since, textcell is never used for result merge 
(hybrid/hadoop: binaryblock, singlenode:binarycell)
-               // we just propose the to exclude it with 
-Xjit:exclude={package.method*}(count=0,optLevel=0)
-               
-               FastStringTokenizer st = new FastStringTokenizer(' ');
-               
-               for(InputSplit split : splits)
-               {
-                       RecordReader<LongWritable,Text> reader = 
informat.getRecordReader(split, job, Reporter.NULL);
-                       try
-                       {
-                               while(reader.next(key, value))
-                               {
-                                       st.reset( value.toString() ); //reset 
tokenizer
-                                       long row = st.nextLong();
-                                   long col = st.nextLong();
-                                       double lvalue = Double.parseDouble( 
st.nextToken() );
-                                       
-                                       Cell tmp = new Cell( row, col, lvalue 
); 
-                                       
-                                       buffer.addLast( tmp );
-                                       if( buffer.size() > 
StagingFileUtils.CELL_BUFFER_SIZE ) //periodic flush
-                                       {
-                                               
appendCellBufferToStagingArea(fnameStaging, ID, buffer, blen);
-                                               buffer.clear();
-                                       }
-                               }
-                               
-                               //final flush
-                               if( !buffer.isEmpty() )
-                               {
-                                       
appendCellBufferToStagingArea(fnameStaging, ID, buffer, blen);
-                                       buffer.clear();
-                               }
-                       }
-                       finally {
-                               IOUtilFunctions.closeSilently(reader);
-                       }
                }
        }
 
-       private static void appendCellBufferToStagingArea( String fnameStaging, 
long ID, LinkedList<Cell> buffer, int blen ) 
-               throws IOException
-       {
-               HashMap<Long,HashMap<Long,LinkedList<Cell>>> sortedBuffer = new 
HashMap<>();
-               long brow, bcol, row_offset, col_offset;
-               
-               for( Cell c : buffer )
-               {
-                       brow = (c.getRow()-1)/blen + 1;
-                       bcol = (c.getCol()-1)/blen + 1;
-                       row_offset = (brow-1)*blen + 1;
-                       col_offset = (bcol-1)*blen + 1;
-                       
-                       c.setRow( c.getRow() - row_offset);
-                       c.setCol(c.getCol() - col_offset);
-                       
-                       if( !sortedBuffer.containsKey(brow) )
-                               sortedBuffer.put(brow, new 
HashMap<Long,LinkedList<Cell>>());
-                       if( !sortedBuffer.get(brow).containsKey(bcol) )
-                               sortedBuffer.get(brow).put(bcol, new 
LinkedList<Cell>());
-                       sortedBuffer.get(brow).get(bcol).addLast(c);
-               }       
-               
-               //write lists of cells to local files
-               for( Entry<Long,HashMap<Long,LinkedList<Cell>>> e : 
sortedBuffer.entrySet() )
-               {
-                       brow = e.getKey();
-                       for( Entry<Long,LinkedList<Cell>> e2 : 
e.getValue().entrySet() )
-                       {
-                               bcol = e2.getKey();
-                               String lname = brow+"_"+bcol;
-                               String dir = fnameStaging+"/"+lname;
-                               LocalFileUtils.checkAndCreateStagingDir( dir );
-                               
StagingFileUtils.writeCellListToLocal(dir+"/"+ID, e2.getValue());
-                       }
-               }
-       }       
-
        private void createBinaryBlockResultFile( String fnameStaging, String 
fnameStagingCompare, String fnameNew, MetaDataFormat metadata, boolean 
withCompare ) 
                throws IOException, DMLRuntimeException
        {
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               Path path = new Path( fnameNew );       
+               Path path = new Path( fnameNew );
                
                DataCharacteristics mc = metadata.getDataCharacteristics();
                long rlen = mc.getRows();
@@ -491,16 +240,13 @@ public class ResultMergeLocalFile extends 
ResultMergeMatrix
 
                try {
                        MatrixIndexes indexes = new MatrixIndexes();
-                       for(long brow = 1; brow <= 
(long)Math.ceil(rlen/(double)blen); brow++){
-
-                               for(long bcol = 1; bcol <= 
(long)Math.ceil(clen/(double)blen); bcol++)
-                               {
+                       for(long brow = 1; brow <= 
(long)Math.ceil(rlen/(double)blen); brow++) {
+                               for(long bcol = 1; bcol <= 
(long)Math.ceil(clen/(double)blen); bcol++) {
                                        File dir = new 
File(fnameStaging+"/"+brow+"_"+bcol);
                                        File dir2 = new 
File(fnameStagingCompare+"/"+brow+"_"+bcol);
                                        MatrixBlock mb = null;
                                        
-                                       if( dir.exists() )
-                                       {
+                                       if( dir.exists() ) {
                                                if( withCompare && 
dir2.exists() ) //WITH COMPARE BLOCK
                                                {
                                                        //copy only values that 
are different from the original
@@ -509,7 +255,7 @@ public class ResultMergeLocalFile extends ResultMergeMatrix
                                                                throw new 
DMLRuntimeException("Unable to merge results because multiple compare blocks 
found.");
                                                        mb = 
LocalFileUtils.readMatrixBlockFromLocal( dir2+"/"+lnames2[0] );
                                                        boolean appendOnly = 
mb.isInSparseFormat();
-                                                       DenseBlock compare = 
DataConverter.convertToDenseBlock(mb, false);
+                                                       DenseBlock compare = 
DataConverter.convertToDenseBlock(mb, true);
                                                        for( String lname : 
dir.list() ) {
                                                                MatrixBlock tmp 
= LocalFileUtils.readMatrixBlockFromLocal( dir+"/"+lname );
                                                                
mergeWithComp(mb, tmp, compare);
@@ -562,150 +308,4 @@ public class ResultMergeLocalFile extends 
ResultMergeMatrix
                        IOUtilFunctions.closeSilently(writer);
                }
        }
-
-       private void createTextCellResultFile( String fnameStaging, String 
fnameStagingCompare, String fnameNew, MetaDataFormat metadata, boolean 
withCompare ) 
-               throws IOException, DMLRuntimeException
-       {
-               JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               Path path = new Path( fnameNew );       
-               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
-               
-               DataCharacteristics mc = metadata.getDataCharacteristics();
-               long rlen = mc.getRows();
-               long clen = mc.getCols();
-               int blen = mc.getBlocksize();
-               
-               try( BufferedWriter out = new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true))) ) {
-                       //for obj reuse and preventing repeated buffer 
re-allocations
-                       StringBuilder sb = new StringBuilder();
-                       
-                       boolean written=false;
-                       for(long brow = 1; brow <= 
(long)Math.ceil(rlen/(double)blen); brow++)
-                               for(long bcol = 1; bcol <= 
(long)Math.ceil(clen/(double)blen); bcol++)
-                               {
-                                       File dir = new 
File(fnameStaging+"/"+brow+"_"+bcol);
-                                       File dir2 = new 
File(fnameStagingCompare+"/"+brow+"_"+bcol);
-                                       MatrixBlock mb = null;
-                                       
-                                       long row_offset = (brow-1)*blen + 1;
-                                       long col_offset = (bcol-1)*blen + 1;
-                                       
-                                       
-                                       if( dir.exists() )
-                                       {
-                                               if( withCompare && 
dir2.exists() ) //WITH COMPARE BLOCK
-                                               {
-                                                       //copy only values that 
are different from the original
-                                                       String[] lnames2 = 
dir2.list();
-                                                       if( lnames2.length != 1 
) //there should be exactly 1 compare block
-                                                               throw new 
DMLRuntimeException("Unable to merge results because multiple compare blocks 
found.");
-                                                       mb = 
StagingFileUtils.readCellList2BlockFromLocal( dir2+"/"+lnames2[0], blen );
-                                                       boolean appendOnly = 
mb.isInSparseFormat();
-                                                       DenseBlock compare = 
DataConverter.convertToDenseBlock(mb, false);
-                                                       for( String lname : 
dir.list() ) {
-                                                               MatrixBlock tmp 
= StagingFileUtils.readCellList2BlockFromLocal(  dir+"/"+lname, blen );
-                                                               
mergeWithComp(mb, tmp, compare);
-                                                       }
-                                                       
-                                                       //sort sparse and exam 
sparsity due to append-only
-                                                       if( appendOnly && 
!_isAccum )
-                                                               
mb.sortSparseRows();
-                                                       
-                                                       //change sparsity if 
required after 
-                                                       mb.examSparsity(); 
-                                               }
-                                               else //WITHOUT COMPARE BLOCK
-                                               {
-                                                       //copy all non-zeros 
from all workers
-                                                       boolean appendOnly = 
false;
-                                                       for( String lname : 
dir.list() ) {
-                                                               if( mb == null 
) {
-                                                                       mb = 
StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, blen );
-                                                                       
appendOnly = mb.isInSparseFormat();
-                                                               }
-                                                               else {
-                                                                       
MatrixBlock tmp = StagingFileUtils.readCellList2BlockFromLocal(  dir+"/"+lname, 
blen );
-                                                                       
mergeWithoutComp(mb, tmp, appendOnly);
-                                                               }
-                                                       }       
-                                                       
-                                                       //sort sparse due to 
append-only
-                                                       if( appendOnly && 
!_isAccum )
-                                                               
mb.sortSparseRows();
-                                                       
-                                                       //change sparsity if 
required after 
-                                                       mb.examSparsity(); 
-                                               }
-                                       }
-
-                                       //write the block to text cell
-                                       if( mb!=null )
-                                       {
-                                               if( mb.isInSparseFormat() ) {
-                                                       Iterator<IJV> iter = 
mb.getSparseBlockIterator();
-                                                       while( iter.hasNext() ) 
{
-                                                               IJV lcell = 
iter.next();
-                                                               
sb.append(row_offset+lcell.getI());
-                                                               sb.append(' ');
-                                                               
sb.append(col_offset+lcell.getJ());
-                                                               sb.append(' ');
-                                                               
sb.append(lcell.getV());
-                                                               sb.append('\n');
-                                                               out.write( 
sb.toString() );
-                                                               sb.setLength(0);
-                                                               written = true;
-                                                       }
-                                               }
-                                               else {
-                                                       DenseBlock d = 
mb.getDenseBlock();
-                                                       for( int i=0; i<blen; 
i++ )
-                                                               for( int j=0; 
j<blen; j++ )
-                                                               {
-                                                                       double 
lvalue = d.get(i, j);
-                                                                       if( 
lvalue != 0 ) //for nnz
-                                                                       {
-                                                                               
sb.append(row_offset+i);
-                                                                               
sb.append(' ');
-                                                                               
sb.append(col_offset+j);
-                                                                               
sb.append(' ');
-                                                                               
sb.append(lvalue);
-                                                                               
sb.append('\n');
-                                                                               
out.write( sb.toString() ); 
-                                                                               
sb.setLength(0);
-                                                                               
written = true;
-                                                                       }
-                                                               }
-                                               }
-                                       }
-                               }
-                       
-                       if( !written )
-                               out.write(IOUtilFunctions.EMPTY_TEXT_LINE);
-               }
-       }
-
-       private static void copyAllFiles( String fnameNew, 
ArrayList<MatrixObject> inMO ) 
-               throws IOException
-       {
-               JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
-               Path path = new Path( fnameNew );
-               FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
-               
-               //create output dir
-               fs.mkdirs(path);
-               
-               //merge in all input matrix objects
-               IDSequence seq = new IDSequence();
-               for( MatrixObject in : inMO )
-               {
-                       if( LOG.isTraceEnabled() )
-                               LOG.trace("ResultMerge (local, file): Merge 
input "+in.hashCode()
-                                       +" (fname="+in.getFileName()+") via 
file rename.");
-                       
-                       //copy over files (just rename file or entire dir)
-                       Path tmpPath = new Path(in.getFileName());
-                       String lname = tmpPath.getName();
-                       fs.rename(tmpPath, new 
Path(fnameNew+"/"+lname+seq.getNextID()));
-               }
-       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeMatrix.java
 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeMatrix.java
index 9efe624231..181899d954 100644
--- 
a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeMatrix.java
+++ 
b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/ResultMergeMatrix.java
@@ -90,7 +90,6 @@ public abstract class ResultMergeMatrix extends 
ResultMerge<MatrixObject> {
                // NaNs, since NaN != NaN, otherwise we would potentially 
overwrite results
                // * For the case of accumulation, we add out += (new-old) to 
ensure correct results
                // because all inputs have the old values replicated
-
                final int rows = in.getNumRows();
                final int cols = in.getNumColumns();
                if(in.isEmptyBlock(false)) {
diff --git 
a/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java 
b/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java
index fabfce69ce..8bf2981a7e 100644
--- a/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java
+++ b/src/test/java/org/apache/sysds/test/component/parfor/ResultMergeTest.java
@@ -33,7 +33,6 @@ import org.apache.sysds.runtime.meta.MetaDataFormat;
 import org.apache.sysds.test.AutomatedTestBase;
 import org.apache.sysds.test.TestConfiguration;
 import org.apache.sysds.test.TestUtils;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class ResultMergeTest extends AutomatedTestBase{
@@ -52,7 +51,6 @@ public class ResultMergeTest extends AutomatedTestBase{
        }
        
        @Test
-       @Ignore //FIXME
        public void testLocalFile() {
                testResultMergeAll(PResultMerge.LOCAL_FILE);
        }
@@ -66,6 +64,11 @@ public class ResultMergeTest extends AutomatedTestBase{
                testResultMerge(false, false, false, mtype);
                testResultMerge(false, true, false, mtype);
                testResultMerge(true, false, false, mtype);
+               testResultMerge(false, false, true, mtype);
+               if( mtype != PResultMerge.LOCAL_FILE ) //FIXME
+                       testResultMerge(false, true, true, mtype);
+               testResultMerge(true, false, true, mtype);
+               
                //testResultMerge(true, true, false, mtype); invalid
        }
        
@@ -75,19 +78,21 @@ public class ResultMergeTest extends AutomatedTestBase{
                //create input and output objects
                MatrixBlock A = MatrixBlock.randOperations(1200, 1100, 0.1);
                CacheableData<?> Cobj = compare ?
-                       toMatrixObject(A, output("C")) :
+                       toMatrixObject(new MatrixBlock(1200,1100,1d), 
output("C")) :
                        toMatrixObject(new MatrixBlock(1200,1100,true), 
output("C"));
-               MatrixBlock empty = new MatrixBlock(400,1100,true);
+               MatrixBlock rest = compare ? 
+                       new MatrixBlock(400,1100,1d) :  //constant (also dense)
+                       new MatrixBlock(400,1100,true); //empty (also sparse)
                MatrixObject[] Bobj = new MatrixObject[3];
-               Bobj[0] = 
toMatrixObject(A.slice(0,399).rbind(empty).rbind(empty), output("B0"));
-               Bobj[1] = 
toMatrixObject(empty.rbind(A.slice(400,799)).rbind(empty), output("B1"));
-               Bobj[2] = 
toMatrixObject(empty.rbind(empty).rbind(A.slice(800,1199)), output("B1"));
-               
+               Bobj[0] = 
toMatrixObject(A.slice(0,399).rbind(rest).rbind(rest), output("B0"));
+               Bobj[1] = 
toMatrixObject(rest.rbind(A.slice(400,799)).rbind(rest), output("B1"));
+               Bobj[2] = 
toMatrixObject(rest.rbind(rest).rbind(A.slice(800,1199)), output("B2"));
+       
                //create result merge
                ExecutionContext ec = ExecutionContextFactory.createContext();
                int numThreads = 3;
                ResultMerge<?> rm = ParForProgramBlock.createResultMerge(
-                       mtype, Cobj, Bobj, output("C"), accum, numThreads, ec);
+                       mtype, Cobj, Bobj, output("R"), accum, numThreads, ec);
                        
                //execute results merge
                if( par )

Reply via email to