This is an automated email from the ASF dual-hosted git repository. mboehm7 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push: new 37c241d83a [SYSTEMDS-3724] Fix csv frame reader performance issues 37c241d83a is described below commit 37c241d83ac0000af0e8e1c1951fa4e35deecdc6 Author: Matthias Boehm <mboe...@gmail.com> AuthorDate: Wed Aug 21 18:43:32 2024 +0200 [SYSTEMDS-3724] Fix csv frame reader performance issues This patch addresses an issue of very slow csv reader performance. This reader uses two passes to (1) compute the number of rows and split offsets, and (2) actually read the data. On a scenario of 10m x 10 frame with double values, step one took >40s, while step (main part) was in ~1s. The underlying reason are JIT compilation issues, where hadoop internals are apparently deoptimized to really poor version. Example splits task execution times in milliseconds are count-rows task: 246.127235 count-rows task: 322.513615 count-rows task: 98.376842 ... count-rows task: 96.397212 count-rows task: 99.228203 count-rows task: 103.718725 count-rows task: 3338.762098 count-rows task: 23294.995156 count-rows task: 25480.077678 count-rows task: 26676.858852 We now simplified the code, changed the signature of the count tasks, and reduces unnecessary indirections. Together these changes improved the total read time from 42.3s to 2.4s. However, there is still some variance (where every 5th repetition falls into the 40s JIT compilation issue, but it seems we can't do something about it). --- .../sysds/runtime/io/FrameReaderTextCSV.java | 55 ++++++++------------ .../runtime/io/FrameReaderTextCSVParallel.java | 58 +++++++++------------- .../apache/sysds/runtime/io/IOUtilFunctions.java | 1 + .../org/apache/sysds/runtime/util/HDFSTool.java | 7 ++- 4 files changed, 51 insertions(+), 70 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSV.java b/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSV.java index cfe4a5e45b..6a94bcfd50 100644 --- a/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSV.java +++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSV.java @@ -40,6 +40,7 @@ import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.frame.data.FrameBlock; import org.apache.sysds.runtime.matrix.data.Pair; import org.apache.sysds.runtime.transform.TfUtils; +import org.apache.sysds.runtime.util.HDFSTool; import org.apache.sysds.runtime.util.InputStreamInputFormat; /** @@ -57,7 +58,6 @@ public class FrameReaderTextCSV extends FrameReader { @Override public final FrameBlock readFrameFromHDFS(String fname, ValueType[] schema, String[] names, long rlen, long clen) throws IOException, DMLRuntimeException { - LOG.debug("readFrameFromHDFS csv"); // prepare file access JobConf job = new JobConf(ConfigurationManager.getCachedJobConf()); Path path = new Path(fname); @@ -87,7 +87,8 @@ public class FrameReaderTextCSV extends FrameReader { @Override public FrameBlock readFrameFromInputStream(InputStream is, ValueType[] schema, String[] names, long rlen, long clen) - throws IOException, DMLRuntimeException { + throws IOException, DMLRuntimeException + { // allocate output frame block ValueType[] lschema = createOutputSchema(schema, clen); String[] lnames = createOutputNames(names, clen); @@ -102,12 +103,13 @@ public class FrameReaderTextCSV extends FrameReader { } protected void readCSVFrameFromHDFS(Path path, JobConf job, FileSystem fs, FrameBlock dest, ValueType[] schema, - String[] names, long rlen, long clen) throws IOException { - LOG.debug("readCSVFrameFromHDFS csv"); + String[] names, long rlen, long clen) throws IOException + { TextInputFormat informat = new TextInputFormat(); informat.configure(job); InputSplit[] splits = informat.getSplits(job, 1); - splits = IOUtilFunctions.sortInputSplits(splits); + if(HDFSTool.isDirectory(fs, path)) + splits = IOUtilFunctions.sortInputSplits(splits); for(int i = 0, rpos = 0; i < splits.length; i++) rpos = readCSVFrameFromInputSplit(splits[i], informat, job, dest, schema, names, rlen, clen, rpos, i == 0); } @@ -223,7 +225,6 @@ public class FrameReaderTextCSV extends FrameReader { return emptyValuesFound; } - protected Pair<Integer, Integer> computeCSVSize(Path path, JobConf job, FileSystem fs) throws IOException { TextInputFormat informat = new TextInputFormat(); informat.configure(job); @@ -237,50 +238,36 @@ public class FrameReaderTextCSV extends FrameReader { int nrow = 0; for(int i = 0; i < splits.length; i++) { boolean header = i == 0 && _props.hasHeader(); - nrow += countLinesInReader(splits[i], informat, job, ncol, header); + nrow += countLinesInSplit(splits[i], informat, job, header); } return new Pair<>(nrow, ncol); } - - protected static int countLinesInReader(InputSplit split, TextInputFormat inFormat, JobConf job, long ncol, - boolean header) throws IOException { + protected static long countLinesInSplit(InputSplit split, TextInputFormat inFormat, JobConf job, boolean header) + throws IOException + { RecordReader<LongWritable, Text> reader = inFormat.getRecordReader(split, job, Reporter.NULL); - try { - return countLinesInReader(reader, ncol, header); - } - finally { - IOUtilFunctions.closeSilently(reader); - } - } - - protected static int countLinesInReader(RecordReader<LongWritable, Text> reader, long ncol, boolean header) - throws IOException { - final LongWritable key = new LongWritable(); - final Text value = new Text(); - int nrow = 0; try { + LongWritable key = new LongWritable(); + Text value = new Text(); // ignore header of first split if(header) reader.next(key, value); while(reader.next(key, value)) { - // note the metadata can be located at any row when spark. - nrow += containsMetaTag(value) ? 0 : 1; + // note the metadata can be located at any row when spark + // (but only at beginning of individual part files) + String sval = IOUtilFunctions.trim(value.toString()); + boolean containsMTD = nrow<3 && + (sval.startsWith(TfUtils.TXMTD_MVPREFIX) + || sval.startsWith(TfUtils.TXMTD_NDPREFIX)); + nrow += containsMTD ? 0 : 1; } - return nrow; } finally { IOUtilFunctions.closeSilently(reader); } - } - - private final static boolean containsMetaTag(Text val) { - if(val.charAt(0) == '#') - return val.find(TfUtils.TXMTD_MVPREFIX) > -1// - || val.find(TfUtils.TXMTD_NDPREFIX) > -1; - else - return false; + return nrow; } } diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSVParallel.java b/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSVParallel.java index 13464ca075..05a259bf6a 100644 --- a/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSVParallel.java +++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSVParallel.java @@ -37,6 +37,7 @@ import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.frame.data.FrameBlock; import org.apache.sysds.runtime.matrix.data.Pair; import org.apache.sysds.runtime.util.CommonThreadPool; +import org.apache.sysds.runtime.util.HDFSTool; /** * Multi-threaded frame text csv reader. @@ -57,8 +58,9 @@ public class FrameReaderTextCSVParallel extends FrameReaderTextCSV TextInputFormat informat = new TextInputFormat(); informat.configure(job); - InputSplit[] splits = informat.getSplits(job, numThreads); - splits = IOUtilFunctions.sortInputSplits(splits); + InputSplit[] splits = informat.getSplits(job, numThreads); + if(HDFSTool.isDirectory(fs, path)) + splits = IOUtilFunctions.sortInputSplits(splits); ExecutorService pool = CommonThreadPool.get(numThreads); try { @@ -67,13 +69,13 @@ public class FrameReaderTextCSVParallel extends FrameReaderTextCSV //compute num rows per split ArrayList<CountRowsTask> tasks = new ArrayList<>(); for( int i=0; i<splits.length; i++ ) - tasks.add(new CountRowsTask(splits[i], informat, job, _props.hasHeader() && i==0, clen)); - List<Future<Integer>> cret = pool.invokeAll(tasks); + tasks.add(new CountRowsTask(splits[i], informat, job, _props.hasHeader() && i==0)); + List<Future<Long>> cret = pool.invokeAll(tasks); //compute row offset per split via cumsum on row counts long offset = 0; List<Long> offsets = new ArrayList<>(); - for( Future<Integer> count : cret ) { + for( Future<Long> count : cret ) { offsets.add(offset); offset += count.get(); } @@ -111,10 +113,10 @@ public class FrameReaderTextCSVParallel extends FrameReaderTextCSV try { ArrayList<CountRowsTask> tasks = new ArrayList<>(); for( int i=0; i<splits.length; i++ ) - tasks.add(new CountRowsTask(splits[i], informat, job, _props.hasHeader()&& i==0, ncol)); - List<Future<Integer>> cret = pool.invokeAll(tasks); - for( Future<Integer> count : cret ) - nrow += count.get().intValue(); + tasks.add(new CountRowsTask(splits[i], informat, job, _props.hasHeader()&& i==0)); + List<Future<Long>> cret = pool.invokeAll(tasks); + for( Future<Long> count : cret ) + nrow += count.get().longValue(); if(nrow > Integer.MAX_VALUE) throw new DMLRuntimeException("invalid read with over Integer number of rows"); @@ -129,25 +131,22 @@ public class FrameReaderTextCSVParallel extends FrameReaderTextCSV } } - private static class CountRowsTask implements Callable<Integer> { - private final InputSplit _split; - private final TextInputFormat _informat; - private final JobConf _job; - private final boolean _hasHeader; - private final long _nCol; + private static class CountRowsTask implements Callable<Long> { + private InputSplit _split; + private TextInputFormat _informat; + private JobConf _job; + private boolean _hasHeader; - public CountRowsTask(InputSplit split, TextInputFormat informat, JobConf job, boolean hasHeader, long nCol) { + public CountRowsTask(InputSplit split, TextInputFormat informat, JobConf job, boolean hasHeader) { _split = split; _informat = informat; _job = job; _hasHeader = hasHeader; - _nCol = nCol; } @Override - public Integer call() throws Exception { - return countLinesInReader(_split, _informat, _job, _nCol, _hasHeader); - + public Long call() throws Exception { + return countLinesInSplit(_split, _informat, _job, _hasHeader); } } @@ -162,7 +161,7 @@ public class FrameReaderTextCSVParallel extends FrameReaderTextCSV public ReadRowsTask(InputSplit split, TextInputFormat informat, JobConf job, - FrameBlock dest, int offset, boolean first) + FrameBlock dest, int offset, boolean first) { _split = split; _informat = informat; @@ -173,19 +172,10 @@ public class FrameReaderTextCSVParallel extends FrameReaderTextCSV } @Override - public Object call() - throws Exception - { - try{ - - readCSVFrameFromInputSplit(_split, _informat, _job, _dest, _dest.getSchema(), - _dest.getColumnNames(), _dest.getNumRows(), _dest.getNumColumns(), _offset, _isFirstSplit); - return null; - } - catch(Exception e){ - e.printStackTrace(); - throw e; - } + public Object call() throws Exception { + readCSVFrameFromInputSplit(_split, _informat, _job, _dest, _dest.getSchema(), + _dest.getColumnNames(), _dest.getNumRows(), _dest.getNumColumns(), _offset, _isFirstSplit); + return null; } } } diff --git a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java index 491fbcc050..9aa948acf4 100644 --- a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java +++ b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java @@ -593,6 +593,7 @@ public class IOUtilFunctions { return size; } + @SuppressWarnings("deprecation") public static InputStream toInputStream(String input) { if( input == null ) return null; diff --git a/src/main/java/org/apache/sysds/runtime/util/HDFSTool.java b/src/main/java/org/apache/sysds/runtime/util/HDFSTool.java index 5e9de1e39d..51342fca02 100644 --- a/src/main/java/org/apache/sysds/runtime/util/HDFSTool.java +++ b/src/main/java/org/apache/sysds/runtime/util/HDFSTool.java @@ -119,14 +119,17 @@ public class HDFSTool try { Path path = new Path(fname); - return IOUtilFunctions - .getFileSystem(path).getFileStatus(path).isDirectory(); + return isDirectory(IOUtilFunctions.getFileSystem(path), path); } catch(Exception ex) { throw new DMLRuntimeException("Failed to check if file is directory", ex); } } + public static boolean isDirectory(FileSystem fs, Path path) throws IOException { + return fs.getFileStatus(path).isDirectory(); + } + public static FileStatus[] getDirectoryListing(String fname) { try { Path path = new Path(fname);