Repository: systemml Updated Branches: refs/heads/master f418c4460 -> 988366de0
[MINOR] Fix consistency matrix/frame writers (crc files, part names) Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/f0cb8cc8 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/f0cb8cc8 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/f0cb8cc8 Branch: refs/heads/master Commit: f0cb8cc86feae0d0b5825f01cf85b47337336fa7 Parents: f418c44 Author: Matthias Boehm <mboe...@gmail.com> Authored: Wed Jul 5 23:46:11 2017 -0700 Committer: Matthias Boehm <mboe...@gmail.com> Committed: Wed Jul 5 23:46:11 2017 -0700 ---------------------------------------------------------------------- .../sysml/runtime/io/FrameWriterBinaryBlock.java | 3 ++- .../io/FrameWriterBinaryBlockParallel.java | 14 +++++++++++--- .../sysml/runtime/io/FrameWriterTextCSV.java | 3 ++- .../runtime/io/FrameWriterTextCSVParallel.java | 10 +++++++++- .../sysml/runtime/io/FrameWriterTextCell.java | 5 +++-- .../runtime/io/FrameWriterTextCellParallel.java | 11 +++++++++-- .../apache/sysml/runtime/io/IOUtilFunctions.java | 6 +++++- .../runtime/io/WriterBinaryBlockParallel.java | 18 ++++++++---------- .../runtime/io/WriterMatrixMarketParallel.java | 18 ++++++++---------- .../sysml/runtime/io/WriterTextCSVParallel.java | 19 ++++++++----------- .../sysml/runtime/io/WriterTextCellParallel.java | 18 ++++++++---------- 11 files changed, 73 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/f0cb8cc8/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java index 819b7d0..e208bbe 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlock.java @@ -67,7 +67,8 @@ public class FrameWriterBinaryBlock extends FrameWriter int blen = ConfigurationManager.getBlocksize(); //sequential write to single file - writeBinaryBlockFrameToSequenceFile(path, job, fs, src, blen, 0, (int)rlen); + writeBinaryBlockFrameToSequenceFile(path, job, fs, src, blen, 0, (int)rlen); + IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path); } /** http://git-wip-us.apache.org/repos/asf/systemml/blob/f0cb8cc8/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java index a25fe75..52f1ed0 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterBinaryBlockParallel.java @@ -28,6 +28,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.sysml.conf.ConfigurationManager; @@ -45,7 +46,7 @@ import org.apache.sysml.runtime.util.MapReduceTool; */ public class FrameWriterBinaryBlockParallel extends FrameWriterBinaryBlock { - + @Override protected void writeBinaryBlockFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen ) throws IOException, DMLRuntimeException { @@ -75,7 +76,7 @@ public class FrameWriterBinaryBlockParallel extends FrameWriterBinaryBlock ArrayList<WriteFileTask> tasks = new ArrayList<WriteFileTask>(); int blklen = (int)Math.ceil((double)rlen / blen / numThreads) * blen; for(int i=0; i<numThreads & i*blklen<rlen; i++) { - Path newPath = new Path(path, String.format("0-m-%05d",i)); + Path newPath = new Path(path, IOUtilFunctions.getPartFileName(i)); tasks.add(new WriteFileTask(newPath, job, fs, src, i*blklen, Math.min((i+1)*blklen, (int)rlen), blen)); } @@ -86,10 +87,17 @@ public class FrameWriterBinaryBlockParallel extends FrameWriterBinaryBlock //check for exceptions for( Future<Object> task : rt ) task.get(); + + // delete crc files if written to local file system + if (fs instanceof LocalFileSystem) { + for(int i=0; i<numThreads & i*blklen<rlen; i++) + IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, + new Path(path, IOUtilFunctions.getPartFileName(i))); + } } catch (Exception e) { throw new IOException("Failed parallel write of binary block input.", e); - } + } } private class WriteFileTask implements Callable<Object> http://git-wip-us.apache.org/repos/asf/systemml/blob/f0cb8cc8/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java index 83f3861..7d15d9b 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSV.java @@ -77,7 +77,8 @@ public class FrameWriterTextCSV extends FrameWriter FileSystem fs = IOUtilFunctions.getFileSystem(path, job); //sequential write to single text file - writeCSVFrameToFile(path, job, fs, src, 0, (int)rlen, csvprops); + writeCSVFrameToFile(path, job, fs, src, 0, (int)rlen, csvprops); + IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path); } protected final void writeCSVFrameToFile( Path path, JobConf job, FileSystem fs, FrameBlock src, int rl, int ru, CSVFileFormatProperties props ) http://git-wip-us.apache.org/repos/asf/systemml/blob/f0cb8cc8/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java index 10f0827..fe4fd39 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCSVParallel.java @@ -28,6 +28,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.sysml.conf.DMLConfig; @@ -77,7 +78,7 @@ public class FrameWriterTextCSVParallel extends FrameWriterTextCSV ArrayList<WriteFileTask> tasks = new ArrayList<WriteFileTask>(); int blklen = (int)Math.ceil((double)rlen / numThreads); for(int i=0; i<numThreads & i*blklen<rlen; i++) { - Path newPath = new Path(path, String.format("0-m-%05d",i)); + Path newPath = new Path(path, IOUtilFunctions.getPartFileName(i)); tasks.add(new WriteFileTask(newPath, job, fs, src, i*blklen, (int)Math.min((i+1)*blklen, rlen), csvprops)); } @@ -88,6 +89,13 @@ public class FrameWriterTextCSVParallel extends FrameWriterTextCSV //check for exceptions for( Future<Object> task : rt ) task.get(); + + // delete crc files if written to local file system + if (fs instanceof LocalFileSystem) { + for(int i=0; i<numThreads & i*blklen<rlen; i++) + IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, + new Path(path, IOUtilFunctions.getPartFileName(i))); + } } catch (Exception e) { throw new IOException("Failed parallel write of csv output.", e); http://git-wip-us.apache.org/repos/asf/systemml/blob/f0cb8cc8/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java index 7263e7a..134d961 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCell.java @@ -66,8 +66,9 @@ public class FrameWriterTextCell extends FrameWriter FileSystem fs = IOUtilFunctions.getFileSystem(path, job); //sequential write to single text file - writeTextCellFrameToFile(path, job, fs, src, 0, (int)rlen); - } + writeTextCellFrameToFile(path, job, fs, src, 0, (int)rlen); + IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, path); + } /** * Internal primitive to write a row range of a frame to a single text file, http://git-wip-us.apache.org/repos/asf/systemml/blob/f0cb8cc8/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java index 8eed53c..f42ca41 100644 --- a/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/FrameWriterTextCellParallel.java @@ -28,6 +28,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.sysml.conf.DMLConfig; @@ -43,7 +44,6 @@ import org.apache.sysml.runtime.util.MapReduceTool; */ public class FrameWriterTextCellParallel extends FrameWriterTextCell { - @Override protected void writeTextCellFrameToHDFS( Path path, JobConf job, FrameBlock src, long rlen, long clen ) throws IOException @@ -73,7 +73,7 @@ public class FrameWriterTextCellParallel extends FrameWriterTextCell ArrayList<WriteFileTask> tasks = new ArrayList<WriteFileTask>(); int blklen = (int)Math.ceil((double)rlen / numThreads); for(int i=0; i<numThreads & i*blklen<rlen; i++) { - Path newPath = new Path(path, String.format("0-m-%05d",i)); + Path newPath = new Path(path, IOUtilFunctions.getPartFileName(i)); tasks.add(new WriteFileTask(newPath, job, fs, src, i*blklen, (int)Math.min((i+1)*blklen, rlen))); } @@ -84,6 +84,13 @@ public class FrameWriterTextCellParallel extends FrameWriterTextCell //check for exceptions for( Future<Object> task : rt ) task.get(); + + // delete crc files if written to local file system + if (fs instanceof LocalFileSystem) { + for(int i=0; i<numThreads & i*blklen<rlen; i++) + IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, + new Path(path, IOUtilFunctions.getPartFileName(i))); + } } catch (Exception e) { throw new IOException("Failed parallel write of text output.", e); http://git-wip-us.apache.org/repos/asf/systemml/blob/f0cb8cc8/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java index 040da01..ecbf7e4 100644 --- a/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java +++ b/src/main/java/org/apache/sysml/runtime/io/IOUtilFunctions.java @@ -56,7 +56,7 @@ public class IOUtilFunctions private static final Log LOG = LogFactory.getLog(UtilFunctions.class.getName()); private static final char CSV_QUOTE_CHAR = '"'; - + public static FileSystem getFileSystem(String fname) throws IOException { return getFileSystem(new Path(fname), ConfigurationManager.getCachedJobConf()); @@ -88,6 +88,10 @@ public class IOUtilFunctions return scheme.startsWith("s3") || scheme.startsWith("swift"); } + public static String getPartFileName(int pos) { + return String.format("0-m-%05d", pos); + } + public static void closeSilently( Closeable io ) { try { if( io != null ) http://git-wip-us.apache.org/repos/asf/systemml/blob/f0cb8cc8/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java index 6f33011..b5aec06 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterBinaryBlockParallel.java @@ -73,7 +73,7 @@ public class WriterBinaryBlockParallel extends WriterBinaryBlock ArrayList<WriteFileTask> tasks = new ArrayList<WriteFileTask>(); int blklen = (int)Math.ceil((double)rlen / brlen / numThreads) * brlen; for(int i=0; i<numThreads & i*blklen<rlen; i++) { - Path newPath = new Path(path, String.format("0-m-%05d",i)); + Path newPath = new Path(path, IOUtilFunctions.getPartFileName(i)); tasks.add(new WriteFileTask(newPath, job, fs, src, i*blklen, Math.min((i+1)*blklen, rlen), brlen, bclen)); } @@ -84,19 +84,17 @@ public class WriterBinaryBlockParallel extends WriterBinaryBlock //check for exceptions for( Future<Object> task : rt ) task.get(); + + // delete crc files if written to local file system + if (fs instanceof LocalFileSystem) { + for(int i=0; i<numThreads & i*blklen<rlen; i++) + IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, + new Path(path, IOUtilFunctions.getPartFileName(i))); + } } catch (Exception e) { throw new IOException("Failed parallel write of binary block input.", e); } - - // delete crc files if written to local file system - if (fs instanceof LocalFileSystem) { - int blklen = (int)Math.ceil((double)rlen / numThreads); - for(int i=0; i<numThreads & i*blklen<rlen; i++) { - Path newPath = new Path(path, String.format("0-m-%05d",i)); - IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, newPath); - } - } } private class WriteFileTask implements Callable<Object> http://git-wip-us.apache.org/repos/asf/systemml/blob/f0cb8cc8/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java index 1fed377..afc1bdb 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterMatrixMarketParallel.java @@ -71,7 +71,7 @@ public class WriterMatrixMarketParallel extends WriterMatrixMarket ArrayList<WriteMMTask> tasks = new ArrayList<WriteMMTask>(); int blklen = (int)Math.ceil((double)rlen / numThreads); for(int i=0; i<numThreads & i*blklen<rlen; i++) { - Path newPath = new Path(path, String.format("0-m-%05d",i)); + Path newPath = new Path(path, IOUtilFunctions.getPartFileName(i)); tasks.add(new WriteMMTask(newPath, job, fs, src, i*blklen, (int)Math.min((i+1)*blklen, rlen))); } @@ -82,19 +82,17 @@ public class WriterMatrixMarketParallel extends WriterMatrixMarket //check for exceptions for( Future<Object> task : rt ) task.get(); + + // delete crc files if written to local file system + if (fs instanceof LocalFileSystem) { + for(int i=0; i<numThreads & i*blklen<rlen; i++) + IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, + new Path(path, IOUtilFunctions.getPartFileName(i))); + } } catch (Exception e) { throw new IOException("Failed parallel write of text output.", e); } - - // delete crc files if written to local file system - if (fs instanceof LocalFileSystem) { - int blklen = (int)Math.ceil((double)rlen / numThreads); - for(int i=0; i<numThreads & i*blklen<rlen; i++) { - Path newPath = new Path(path, String.format("0-m-%05d",i)); - IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, newPath); - } - } } private class WriteMMTask implements Callable<Object> http://git-wip-us.apache.org/repos/asf/systemml/blob/f0cb8cc8/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java index 581225f..173f602 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCSVParallel.java @@ -74,7 +74,7 @@ public class WriterTextCSVParallel extends WriterTextCSV int rlen = src.getNumRows(); int blklen = (int)Math.ceil((double)rlen / numThreads); for(int i=0; i<numThreads & i*blklen<rlen; i++) { - Path newPath = new Path(path, String.format("0-m-%05d",i)); + Path newPath = new Path(path, IOUtilFunctions.getPartFileName(i)); tasks.add(new WriteCSVTask(newPath, job, fs, src, i*blklen, (int)Math.min((i+1)*blklen, rlen), csvprops)); } @@ -85,20 +85,17 @@ public class WriterTextCSVParallel extends WriterTextCSV //check for exceptions for( Future<Object> task : rt ) task.get(); + + // delete crc files if written to local file system + if (fs instanceof LocalFileSystem) { + for(int i=0; i<numThreads & i*blklen<rlen; i++) + IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, + new Path(path, IOUtilFunctions.getPartFileName(i))); + } } catch (Exception e) { throw new IOException("Failed parallel write of csv output.", e); } - - // delete crc files if written to local file system - if (fs instanceof LocalFileSystem) { - int rlen = src.getNumRows(); - int blklen = (int)Math.ceil((double)rlen / numThreads); - for(int i=0; i<numThreads & i*blklen<rlen; i++) { - Path newPath = new Path(path, String.format("0-m-%05d",i)); - IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, newPath); - } - } } private class WriteCSVTask implements Callable<Object> http://git-wip-us.apache.org/repos/asf/systemml/blob/f0cb8cc8/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java index 1c08459..ec4b042 100644 --- a/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java +++ b/src/main/java/org/apache/sysml/runtime/io/WriterTextCellParallel.java @@ -70,7 +70,7 @@ public class WriterTextCellParallel extends WriterTextCell ArrayList<WriteTextTask> tasks = new ArrayList<WriteTextTask>(); int blklen = (int)Math.ceil((double)rlen / numThreads); for(int i=0; i<numThreads & i*blklen<rlen; i++) { - Path newPath = new Path(path, String.format("0-m-%05d",i)); + Path newPath = new Path(path, IOUtilFunctions.getPartFileName(i)); tasks.add(new WriteTextTask(newPath, job, fs, src, i*blklen, (int)Math.min((i+1)*blklen, rlen))); } @@ -81,19 +81,17 @@ public class WriterTextCellParallel extends WriterTextCell //check for exceptions for( Future<Object> task : rt ) task.get(); + + // delete crc files if written to local file system + if (fs instanceof LocalFileSystem) { + for(int i=0; i<numThreads & i*blklen<rlen; i++) + IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, + new Path(path, IOUtilFunctions.getPartFileName(i))); + } } catch (Exception e) { throw new IOException("Failed parallel write of text output.", e); } - - // delete crc files if written to local file system - if (fs instanceof LocalFileSystem) { - int blklen = (int)Math.ceil((double)rlen / numThreads); - for(int i=0; i<numThreads & i*blklen<rlen; i++) { - Path newPath = new Path(path, String.format("0-m-%05d",i)); - IOUtilFunctions.deleteCrcFilesFromLocalFileSystem(fs, newPath); - } - } } private class WriteTextTask implements Callable<Object>