This is an automated email from the ASF dual-hosted git repository. mboehm7 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push: new f3e09266f0 [SYSTEMDS-3711] Fix slow csv reading performance (split serialization) f3e09266f0 is described below commit f3e09266f02b923f37c66f55752b7ea336dbe20e Author: Matthias Boehm <mboe...@gmail.com> AuthorDate: Sat Jun 22 20:26:18 2024 +0200 [SYSTEMDS-3711] Fix slow csv reading performance (split serialization) This patch fixes a performance bug of unnecessarily slow CSV read performance. In detail, for error checking, a string-serialized version of the input split was passed for every value. By moving this code to the exception case only (and some minor specializations for vectors) the performance of reading a 100Mx1 matrix from csv (~2GB) reduced from 17.937s to 9.483s. --- .../apache/sysds/runtime/io/IOUtilFunctions.java | 14 ++++++++++--- .../sysds/runtime/io/ReaderTextCSVParallel.java | 24 ++++++++++++---------- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java index 9ce18d11b8..491fbcc050 100644 --- a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java +++ b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java @@ -176,14 +176,22 @@ public class IOUtilFunctions { } } - public static void checkAndRaiseErrorCSVNumColumns(String fname, String line, String[] parts, long ncol) + public static void checkAndRaiseErrorCSVNumColumns(InputSplit split, String line, String[] parts, long ncol) + throws IOException + { + int realncol = parts==null ? 1 : parts.length; + if( realncol != ncol ) { + checkAndRaiseErrorCSVNumColumns(split.toString(), line, parts, realncol); + } + } + + public static void checkAndRaiseErrorCSVNumColumns(String src, String line, String[] parts, long ncol) throws IOException { int realncol = parts.length; - if( realncol != ncol ) { throw new IOException("Invalid number of columns (" + realncol + ", expected=" + ncol + ") " - + "found in delimited file (" + fname + ") for line: " + line + "\n" + Arrays.toString(parts)); + + "found in delimited file (" + src + ") for line: " + line + "\n" + Arrays.toString(parts)); } } diff --git a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java index b3e1680748..5b297a5d53 100644 --- a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java +++ b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java @@ -88,22 +88,22 @@ public class ReaderTextCSVParallel extends MatrixReader { InputSplit[] splits = informat.getSplits(_job, _numThreads); splits = IOUtilFunctions.sortInputSplits(splits); - + // check existence and non-empty file checkValidInputFile(fs, path); // allocate output matrix block // First Read Pass (count rows/cols, determine offsets, allocate matrix block) MatrixBlock ret = computeCSVSizeAndCreateOutputMatrixBlock(splits, path, rlen, clen, blen, estnnz); - + // Second Read Pass (read, parse strings, append to matrix block) readCSVMatrixFromHDFS(splits, path, ret); - + // post-processing (representation-specific, change of sparse/dense block representation) // - no sorting required for CSV because it is read in sorted order per row // - nnz explicitly maintained in parallel for the individual splits ret.examSparsity(); - + // sanity check for parallel row count (since determined internally) if(rlen >= 0 && rlen != ret.getNumRows()) throw new DMLRuntimeException("Read matrix inconsistent with given meta data: " + "expected nrow=" + rlen @@ -351,11 +351,13 @@ public class ReaderTextCSVParallel extends MatrixReader { while(reader.next(key, value)) { // foreach line final String cellStr = value.toString().trim(); - final String[] parts = IOUtilFunctions.split(cellStr, _props.getDelim()); double[] avals = a.values(_row); int apos = a.pos(_row); + + final String[] parts = _cLen == 1 ? null : + IOUtilFunctions.split(cellStr, _props.getDelim()); for(int j = 0; j < _cLen; j++) { // foreach cell - String part = parts[j].trim(); + String part = _cLen == 1 ? cellStr : parts[j].trim(); if(part.isEmpty()) { noFillEmpty |= !_props.isFill(); cellValue = _props.getFillValue(); @@ -370,7 +372,7 @@ public class ReaderTextCSVParallel extends MatrixReader { } // sanity checks (number of columns, fill values) IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), noFillEmpty); - IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, parts, _cLen); + IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen); _row++; } @@ -411,7 +413,7 @@ public class ReaderTextCSVParallel extends MatrixReader { } // sanity checks (number of columns, fill values) IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), noFillEmpty); - IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, parts, _cLen); + IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen); _row++; } return nnz; @@ -456,7 +458,7 @@ public class ReaderTextCSVParallel extends MatrixReader { // sanity checks (number of columns, fill values) IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), noFillEmpty); - IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, parts, _cLen); + IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen); _row++; } @@ -499,7 +501,7 @@ public class ReaderTextCSVParallel extends MatrixReader { // sanity checks (number of columns, fill values) IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), noFillEmpty); - IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, parts, _cLen); + IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen); _row++; } @@ -534,7 +536,7 @@ public class ReaderTextCSVParallel extends MatrixReader { _col++; } - IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, parts, _cLen); + IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen); _row++; }