This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 37c241d83a [SYSTEMDS-3724] Fix csv frame reader performance issues
37c241d83a is described below

commit 37c241d83ac0000af0e8e1c1951fa4e35deecdc6
Author: Matthias Boehm <mboe...@gmail.com>
AuthorDate: Wed Aug 21 18:43:32 2024 +0200

    [SYSTEMDS-3724] Fix csv frame reader performance issues
    
    This patch addresses an issue of very slow csv reader performance. This
    reader uses two passes to (1) compute the number of rows and split
    offsets, and (2) actually read the data. On a scenario of 10m x 10
    frame with double values, step one took >40s, while step (main part)
    was in ~1s. The underlying reason are JIT compilation issues, where
    hadoop internals are apparently deoptimized to really poor version.
    Example splits task execution times in milliseconds are
    
    count-rows task: 246.127235
    count-rows task: 322.513615
    count-rows task: 98.376842
    ...
    count-rows task: 96.397212
    count-rows task: 99.228203
    count-rows task: 103.718725
    count-rows task: 3338.762098
    count-rows task: 23294.995156
    count-rows task: 25480.077678
    count-rows task: 26676.858852
    
    We now simplified the code, changed the signature of the count tasks,
    and reduces unnecessary indirections. Together these changes
    improved the total read time from 42.3s to 2.4s. However, there is
    still some variance (where every 5th repetition falls into the 40s
    JIT compilation issue, but it seems we can't do something about it).
---
 .../sysds/runtime/io/FrameReaderTextCSV.java       | 55 ++++++++------------
 .../runtime/io/FrameReaderTextCSVParallel.java     | 58 +++++++++-------------
 .../apache/sysds/runtime/io/IOUtilFunctions.java   |  1 +
 .../org/apache/sysds/runtime/util/HDFSTool.java    |  7 ++-
 4 files changed, 51 insertions(+), 70 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSV.java 
b/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSV.java
index cfe4a5e45b..6a94bcfd50 100644
--- a/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSV.java
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSV.java
@@ -40,6 +40,7 @@ import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.frame.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.Pair;
 import org.apache.sysds.runtime.transform.TfUtils;
+import org.apache.sysds.runtime.util.HDFSTool;
 import org.apache.sysds.runtime.util.InputStreamInputFormat;
 
 /**
@@ -57,7 +58,6 @@ public class FrameReaderTextCSV extends FrameReader {
        @Override
        public final FrameBlock readFrameFromHDFS(String fname, ValueType[] 
schema, String[] names, long rlen, long clen)
                throws IOException, DMLRuntimeException {
-               LOG.debug("readFrameFromHDFS csv");
                // prepare file access
                JobConf job = new 
JobConf(ConfigurationManager.getCachedJobConf());
                Path path = new Path(fname);
@@ -87,7 +87,8 @@ public class FrameReaderTextCSV extends FrameReader {
 
        @Override
        public FrameBlock readFrameFromInputStream(InputStream is, ValueType[] 
schema, String[] names, long rlen, long clen)
-               throws IOException, DMLRuntimeException {
+               throws IOException, DMLRuntimeException
+       {
                // allocate output frame block
                ValueType[] lschema = createOutputSchema(schema, clen);
                String[] lnames = createOutputNames(names, clen);
@@ -102,12 +103,13 @@ public class FrameReaderTextCSV extends FrameReader {
        }
 
        protected void readCSVFrameFromHDFS(Path path, JobConf job, FileSystem 
fs, FrameBlock dest, ValueType[] schema,
-               String[] names, long rlen, long clen) throws IOException {
-               LOG.debug("readCSVFrameFromHDFS csv");
+               String[] names, long rlen, long clen) throws IOException
+       {
                TextInputFormat informat = new TextInputFormat();
                informat.configure(job);
                InputSplit[] splits = informat.getSplits(job, 1);
-               splits = IOUtilFunctions.sortInputSplits(splits);
+               if(HDFSTool.isDirectory(fs, path))
+                       splits = IOUtilFunctions.sortInputSplits(splits);
                for(int i = 0, rpos = 0; i < splits.length; i++)
                        rpos = readCSVFrameFromInputSplit(splits[i], informat, 
job, dest, schema, names, rlen, clen, rpos, i == 0);
        }
@@ -223,7 +225,6 @@ public class FrameReaderTextCSV extends FrameReader {
                return emptyValuesFound;
        }
 
-
        protected Pair<Integer, Integer> computeCSVSize(Path path, JobConf job, 
FileSystem fs) throws IOException {
                TextInputFormat informat = new TextInputFormat();
                informat.configure(job);
@@ -237,50 +238,36 @@ public class FrameReaderTextCSV extends FrameReader {
                int nrow = 0;
                for(int i = 0; i < splits.length; i++) {
                        boolean header = i == 0 && _props.hasHeader();
-                       nrow += countLinesInReader(splits[i], informat, job, 
ncol, header);
+                       nrow += countLinesInSplit(splits[i], informat, job, 
header);
                }
 
                return new Pair<>(nrow, ncol);
        }
 
-
-       protected static int countLinesInReader(InputSplit split, 
TextInputFormat inFormat, JobConf job, long ncol,
-               boolean header) throws IOException {
+       protected static long countLinesInSplit(InputSplit split, 
TextInputFormat inFormat, JobConf job, boolean header)
+               throws IOException
+       {
                RecordReader<LongWritable, Text> reader = 
inFormat.getRecordReader(split, job, Reporter.NULL);
-               try {
-                       return countLinesInReader(reader, ncol, header);
-               }
-               finally {
-                       IOUtilFunctions.closeSilently(reader);
-               }
-       }
-
-       protected static int countLinesInReader(RecordReader<LongWritable, 
Text> reader, long ncol, boolean header)
-               throws IOException {
-               final LongWritable key = new LongWritable();
-               final Text value = new Text();
-
                int nrow = 0;
                try {
+                       LongWritable key = new LongWritable();
+                       Text value = new Text();
                        // ignore header of first split
                        if(header)
                                reader.next(key, value);
                        while(reader.next(key, value)) {
-                               // note the metadata can be located at any row 
when spark.
-                               nrow += containsMetaTag(value) ? 0 : 1;
+                               // note the metadata can be located at any row 
when spark
+                               // (but only at beginning of individual part 
files)
+                               String sval = 
IOUtilFunctions.trim(value.toString());
+                               boolean containsMTD = nrow<3 &&
+                                       (sval.startsWith(TfUtils.TXMTD_MVPREFIX)
+                                       || 
sval.startsWith(TfUtils.TXMTD_NDPREFIX));
+                               nrow += containsMTD ? 0 : 1;
                        }
-                       return nrow;
                }
                finally {
                        IOUtilFunctions.closeSilently(reader);
                }
-       }
-
-       private final static boolean containsMetaTag(Text val) {
-               if(val.charAt(0) == '#')
-                       return val.find(TfUtils.TXMTD_MVPREFIX) > -1//
-                               || val.find(TfUtils.TXMTD_NDPREFIX) > -1;
-               else 
-                       return false;
+               return nrow;
        }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSVParallel.java 
b/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSVParallel.java
index 13464ca075..05a259bf6a 100644
--- a/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysds/runtime/io/FrameReaderTextCSVParallel.java
@@ -37,6 +37,7 @@ import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.frame.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.Pair;
 import org.apache.sysds.runtime.util.CommonThreadPool;
+import org.apache.sysds.runtime.util.HDFSTool;
 
 /**
  * Multi-threaded frame text csv reader.
@@ -57,8 +58,9 @@ public class FrameReaderTextCSVParallel extends 
FrameReaderTextCSV
                
                TextInputFormat informat = new TextInputFormat();
                informat.configure(job);
-               InputSplit[] splits = informat.getSplits(job, numThreads); 
-               splits = IOUtilFunctions.sortInputSplits(splits);
+               InputSplit[] splits = informat.getSplits(job, numThreads);
+               if(HDFSTool.isDirectory(fs, path))
+                       splits = IOUtilFunctions.sortInputSplits(splits);
 
                ExecutorService pool = CommonThreadPool.get(numThreads);
                try {
@@ -67,13 +69,13 @@ public class FrameReaderTextCSVParallel extends 
FrameReaderTextCSV
                        //compute num rows per split
                        ArrayList<CountRowsTask> tasks = new ArrayList<>();
                        for( int i=0; i<splits.length; i++ )
-                               tasks.add(new CountRowsTask(splits[i], 
informat, job, _props.hasHeader() && i==0, clen));
-                       List<Future<Integer>> cret = pool.invokeAll(tasks);
+                               tasks.add(new CountRowsTask(splits[i], 
informat, job, _props.hasHeader() && i==0));
+                       List<Future<Long>> cret = pool.invokeAll(tasks);
 
                        //compute row offset per split via cumsum on row counts
                        long offset = 0;
                        List<Long> offsets = new ArrayList<>();
-                       for( Future<Integer> count : cret ) {
+                       for( Future<Long> count : cret ) {
                                offsets.add(offset);
                                offset += count.get();
                        }
@@ -111,10 +113,10 @@ public class FrameReaderTextCSVParallel extends 
FrameReaderTextCSV
                try {
                        ArrayList<CountRowsTask> tasks = new ArrayList<>();
                        for( int i=0; i<splits.length; i++ )
-                               tasks.add(new CountRowsTask(splits[i], 
informat, job, _props.hasHeader()&& i==0, ncol));
-                       List<Future<Integer>> cret = pool.invokeAll(tasks);
-                       for( Future<Integer> count : cret ) 
-                               nrow += count.get().intValue();
+                               tasks.add(new CountRowsTask(splits[i], 
informat, job, _props.hasHeader()&& i==0));
+                       List<Future<Long>> cret = pool.invokeAll(tasks);
+                       for( Future<Long> count : cret ) 
+                               nrow += count.get().longValue();
                        
                        if(nrow > Integer.MAX_VALUE)
                                throw new DMLRuntimeException("invalid read 
with over Integer number of rows");
@@ -129,25 +131,22 @@ public class FrameReaderTextCSVParallel extends 
FrameReaderTextCSV
                }
        }
 
-       private static class CountRowsTask implements Callable<Integer> {
-               private final InputSplit _split;
-               private final TextInputFormat _informat;
-               private final JobConf _job;
-               private final boolean _hasHeader;
-               private final long _nCol;
+       private static class CountRowsTask implements Callable<Long> {
+               private InputSplit _split;
+               private TextInputFormat _informat;
+               private JobConf _job;
+               private boolean _hasHeader;
 
-               public CountRowsTask(InputSplit split, TextInputFormat 
informat, JobConf job, boolean hasHeader, long nCol) {
+               public CountRowsTask(InputSplit split, TextInputFormat 
informat, JobConf job, boolean hasHeader) {
                        _split = split;
                        _informat = informat;
                        _job = job;
                        _hasHeader = hasHeader;
-                       _nCol = nCol;
                }
 
                @Override
-               public Integer call() throws Exception {
-                       return countLinesInReader(_split, _informat, _job, 
_nCol, _hasHeader);
-
+               public Long call() throws Exception {
+                       return countLinesInSplit(_split, _informat, _job, 
_hasHeader);
                }
        }
 
@@ -162,7 +161,7 @@ public class FrameReaderTextCSVParallel extends 
FrameReaderTextCSV
                
                
                public ReadRowsTask(InputSplit split, TextInputFormat informat, 
JobConf job, 
-                               FrameBlock dest, int offset, boolean first) 
+                       FrameBlock dest, int offset, boolean first) 
                {
                        _split = split;
                        _informat = informat;
@@ -173,19 +172,10 @@ public class FrameReaderTextCSVParallel extends 
FrameReaderTextCSV
                }
 
                @Override
-               public Object call() 
-                       throws Exception 
-               {
-                       try{
-
-                               readCSVFrameFromInputSplit(_split, _informat, 
_job, _dest, _dest.getSchema(), 
-                                               _dest.getColumnNames(), 
_dest.getNumRows(), _dest.getNumColumns(), _offset, _isFirstSplit);
-                               return null;
-                       }
-                       catch(Exception e){
-                               e.printStackTrace();
-                               throw e;
-                       }
+               public Object call() throws Exception {
+                       readCSVFrameFromInputSplit(_split, _informat, _job, 
_dest, _dest.getSchema(), 
+                               _dest.getColumnNames(), _dest.getNumRows(), 
_dest.getNumColumns(), _offset, _isFirstSplit);
+                       return null;
                }
        }
 }
diff --git a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java 
b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
index 491fbcc050..9aa948acf4 100644
--- a/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
+++ b/src/main/java/org/apache/sysds/runtime/io/IOUtilFunctions.java
@@ -593,6 +593,7 @@ public class IOUtilFunctions {
                return size;
        }
 
+       @SuppressWarnings("deprecation")
        public static InputStream toInputStream(String input) {
                if( input == null ) 
                        return null;
diff --git a/src/main/java/org/apache/sysds/runtime/util/HDFSTool.java 
b/src/main/java/org/apache/sysds/runtime/util/HDFSTool.java
index 5e9de1e39d..51342fca02 100644
--- a/src/main/java/org/apache/sysds/runtime/util/HDFSTool.java
+++ b/src/main/java/org/apache/sysds/runtime/util/HDFSTool.java
@@ -119,14 +119,17 @@ public class HDFSTool
                
                try {
                        Path path = new Path(fname);
-                       return IOUtilFunctions
-                               
.getFileSystem(path).getFileStatus(path).isDirectory();
+                       return isDirectory(IOUtilFunctions.getFileSystem(path), 
path);
                }
                catch(Exception ex) {
                        throw new DMLRuntimeException("Failed to check if file 
is directory", ex);
                }
        }
        
+       public static boolean isDirectory(FileSystem fs, Path path) throws 
IOException {
+               return fs.getFileStatus(path).isDirectory();
+       }
+       
        public static FileStatus[] getDirectoryListing(String fname) {
                try {
                        Path path = new Path(fname);

Reply via email to