This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new f3e09266f0 [SYSTEMDS-3711] Fix slow csv reading performance (split
serialization)
f3e09266f0 is described below
commit f3e09266f02b923f37c66f55752b7ea336dbe20e
Author: Matthias Boehm <[email protected]>
AuthorDate: Sat Jun 22 20:26:18 2024 +0200
[SYSTEMDS-3711] Fix slow csv reading performance (split serialization)
This patch fixes a performance bug of unnecessarily slow CSV read
performance. In detail, for error checking, a string-serialized version
of the input split was passed for every value. By moving this code to
the exception case only (and some minor specializations for vectors)
the performance of reading a 100Mx1 matrix from csv (~2GB) reduced
from 17.937s to 9.483s.
---
.../apache/sysds/runtime/io/IOUtilFunctions.java | 14 ++++++++++---
.../sysds/runtime/io/ReaderTextCSVParallel.java | 24 ++++++++++++----------
2 files changed, 24 insertions(+), 14 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
index 9ce18d11b8..491fbcc050 100644
--- a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
@@ -176,14 +176,22 @@ public class IOUtilFunctions {
}
}
- public static void checkAndRaiseErrorCSVNumColumns(String fname, String
line, String[] parts, long ncol)
+ public static void checkAndRaiseErrorCSVNumColumns(InputSplit split,
String line, String[] parts, long ncol)
+ throws IOException
+ {
+ int realncol = parts==null ? 1 : parts.length;
+ if( realncol != ncol ) {
+ checkAndRaiseErrorCSVNumColumns(split.toString(), line,
parts, realncol);
+ }
+ }
+
+ public static void checkAndRaiseErrorCSVNumColumns(String src, String
line, String[] parts, long ncol)
throws IOException
{
int realncol = parts.length;
-
if( realncol != ncol ) {
throw new IOException("Invalid number of columns (" +
realncol + ", expected=" + ncol + ") "
- + "found in delimited file (" + fname +
") for line: " + line + "\n" + Arrays.toString(parts));
+ + "found in delimited file (" + src +
") for line: " + line + "\n" + Arrays.toString(parts));
}
}
diff --git
a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java
b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java
index b3e1680748..5b297a5d53 100644
--- a/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/ReaderTextCSVParallel.java
@@ -88,22 +88,22 @@ public class ReaderTextCSVParallel extends MatrixReader {
InputSplit[] splits = informat.getSplits(_job, _numThreads);
splits = IOUtilFunctions.sortInputSplits(splits);
-
+
// check existence and non-empty file
checkValidInputFile(fs, path);
// allocate output matrix block
// First Read Pass (count rows/cols, determine offsets,
allocate matrix block)
MatrixBlock ret =
computeCSVSizeAndCreateOutputMatrixBlock(splits, path, rlen, clen, blen,
estnnz);
-
+
// Second Read Pass (read, parse strings, append to matrix
block)
readCSVMatrixFromHDFS(splits, path, ret);
-
+
// post-processing (representation-specific, change of
sparse/dense block representation)
// - no sorting required for CSV because it is read in sorted
order per row
// - nnz explicitly maintained in parallel for the individual
splits
ret.examSparsity();
-
+
// sanity check for parallel row count (since determined
internally)
if(rlen >= 0 && rlen != ret.getNumRows())
throw new DMLRuntimeException("Read matrix inconsistent
with given meta data: " + "expected nrow=" + rlen
@@ -351,11 +351,13 @@ public class ReaderTextCSVParallel extends MatrixReader {
while(reader.next(key, value)) { // foreach line
final String cellStr = value.toString().trim();
- final String[] parts =
IOUtilFunctions.split(cellStr, _props.getDelim());
double[] avals = a.values(_row);
int apos = a.pos(_row);
+
+ final String[] parts = _cLen == 1 ? null :
+ IOUtilFunctions.split(cellStr,
_props.getDelim());
for(int j = 0; j < _cLen; j++) { // foreach cell
- String part = parts[j].trim();
+ String part = _cLen == 1 ? cellStr :
parts[j].trim();
if(part.isEmpty()) {
noFillEmpty |= !_props.isFill();
cellValue =
_props.getFillValue();
@@ -370,7 +372,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
}
// sanity checks (number of columns, fill
values)
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(),
noFillEmpty);
-
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr,
parts, _cLen);
+
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen);
_row++;
}
@@ -411,7 +413,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
}
// sanity checks (number of columns, fill
values)
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(),
noFillEmpty);
-
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr,
parts, _cLen);
+
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen);
_row++;
}
return nnz;
@@ -456,7 +458,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
// sanity checks (number of columns, fill
values)
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(),
noFillEmpty);
-
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr,
parts, _cLen);
+
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen);
_row++;
}
@@ -499,7 +501,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
// sanity checks (number of columns, fill
values)
IOUtilFunctions.checkAndRaiseErrorCSVEmptyField(cellStr, _props.isFill(),
noFillEmpty);
-
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr,
parts, _cLen);
+
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen);
_row++;
}
@@ -534,7 +536,7 @@ public class ReaderTextCSVParallel extends MatrixReader {
_col++;
}
-
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split.toString(), cellStr,
parts, _cLen);
+
IOUtilFunctions.checkAndRaiseErrorCSVNumColumns(_split, cellStr, parts, _cLen);
_row++;
}