This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new f3e09266f0 [SYSTEMDS-3711] Fix slow csv reading performance (split 
serialization)
f3e09266f0 is described below

commit f3e09266f02b923f37c66f55752b7ea336dbe20e
Author: Matthias Boehm <mboe...@gmail.com>
AuthorDate: Sat Jun 22 20:26:18 2024 +0200

    [SYSTEMDS-3711] Fix slow csv reading performance (split serialization)
    
    This patch fixes a performance bug of unnecessarily slow CSV read
    performance. In detail, for error checking, a string-serialized version
    of the input split was passed for every value. By moving this code to
    the exception case only (and some minor specializations for vectors)
    the performance of reading a 100Mx1 matrix from csv (~2GB) reduced
    from 17.937s to 9.483s.
---
 .../apache/sysds/runtime/io/IOUtilFunctions.java   | 14 ++++++++++---
 .../sysds/runtime/io/ReaderTextCSVParallel.java    | 24 ++++++++++++----------
 2 files changed, 24 insertions(+), 14 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java 
b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
index 9ce18d11b8..491fbcc050 100644
--- a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
@@ -176,14 +176,22 @@ public class IOUtilFunctions {
                }
        }
 
-       public static void checkAndRaiseErrorCSVNumColumns(String fname, String 
line, String[] parts, long ncol) 
+       public static void checkAndRaiseErrorCSVNumColumns(InputSplit split, 
String line, String[] parts, long ncol) 
+               throws IOException
+       {
+               int realncol = parts==null ? 1 : parts.length;
+               if( realncol != ncol ) {
+                       checkAndRaiseErrorCSVNumColumns(split.toString(), line, 
parts, realncol);
+               }
+       }
+       
+       public static void checkAndRaiseErrorCSVNumColumns(String src, String 
line, String[] parts, long ncol) 
                throws IOException
        {
                int realncol = parts.length;
-               
                if( realncol != ncol ) {
                        throw new IOException("Invalid number of columns (" + 
realncol + ", expected=" + ncol + ") "
-                                       + "found in delimited file (" + fname + 
") for line: " + line + "\n" + Arrays.toString(parts));
+                                       + "found in delimited file (" + src + 
") for line: " + line + "\n" + Arrays.toString(parts));
                }
        }
        
diff --git 
a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java 
b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java
index b3e1680748..5b297a5d53 100644
--- a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java
@@ -88,22 +88,22 @@ public class ReaderTextCSVParallel extends MatrixReader {
 
                InputSplit[] splits = informat.getSplits(_job, _numThreads);
                splits = IOUtilFunctions.sortInputSplits(splits);
-               
+
                // check existence and non-empty file
                checkValidInputFile(fs, path);
 
                // allocate output matrix block
                // First Read Pass (count rows/cols, determine offsets, 
allocate matrix block)
                MatrixBlock ret = 
computeCSVSizeAndCreateOutputMatrixBlock(splits, path, rlen, clen, blen, 
estnnz);
-
+               
                // Second Read Pass (read, parse strings, append to matrix 
block)
                readCSVMatrixFromHDFS(splits, path, ret);
-
+               
                // post-processing (representation-specific, change of 
sparse/dense block representation)
                // - no sorting required for CSV because it is read in sorted 
order per row
                // - nnz explicitly maintained in parallel for the individual 
splits
                ret.examSparsity();
-
+               
                // sanity check for parallel row count (since determined 
internally)
                if(rlen >= 0 && rlen != ret.getNumRows())
                        throw new DMLRuntimeException("Read matrix inconsistent 
with given meta data: " + "expected nrow=" + rlen
@@ -351,11 +351,13 @@ public class ReaderTextCSVParallel extends MatrixReader {
 
                        while(reader.next(key, value)) { // foreach line
                                final String cellStr = value.toString().trim();
-                               final String[] parts = 
IOUtilFunctions.split(cellStr, _props.getDelim());
                                double[] avals = a.values(_row);
                                int apos = a.pos(_row);
+                               
+                               final String[] parts = _cLen == 1 ? null :
+                                       IOUtilFunctions.split(cellStr, 
_props.getDelim());
                                for(int j = 0; j < _cLen; j++) { // foreach cell
-                                       String part = parts[j].trim();
+                                       String part = _cLen == 1 ? cellStr : 
parts[j].trim();
                                        if(part.isEmpty()) {
                                                noFillEmpty |= !_props.isFill();
                                                cellValue = 
_props.getFillValue();
@@ -370,7 +372,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
                                }
                                // sanity checks (number of columns, fill 
values)
                                
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), 
noFillEmpty);
-                               
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, 
parts, _cLen);
+                               
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen);
                                _row++;
                        }
 
@@ -411,7 +413,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
                                }
                                // sanity checks (number of columns, fill 
values)
                                
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), 
noFillEmpty);
-                               
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, 
parts, _cLen);
+                               
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen);
                                _row++;
                        }
                        return nnz;
@@ -456,7 +458,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
 
                                // sanity checks (number of columns, fill 
values)
                                
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), 
noFillEmpty);
-                               
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, 
parts, _cLen);
+                               
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen);
 
                                _row++;
                        }
@@ -499,7 +501,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
 
                                // sanity checks (number of columns, fill 
values)
                                
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(), 
noFillEmpty);
-                               
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, 
parts, _cLen);
+                               
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen);
 
                                _row++;
                        }
@@ -534,7 +536,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
                                        _col++;
                                }
 
-                               
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr, 
parts, _cLen);
+                               
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen);
 
                                _row++;
                        }

Reply via email to