Re: DataFrameReader bottleneck in DataSource#checkAndGlobPathIfNecessary when reading S3 files

2019-12-09 Thread Arwin Tio
Hello,

I have a ticket/PR out for this issue:

https://issues.apache.org/jira/browse/SPARK-29089
https://github.com/apache/spark/pull/25899

Can somebody please take a look/anything else I can do to get this through the 
door?

Thanks,

Arwin


From: Steve Loughran 
Sent: September 7, 2019 9:22 AM
To: Arwin Tio 
Cc: Sean Owen ; dev@spark.apache.org 
Subject: Re: DataFrameReader bottleneck in 
DataSource#checkAndGlobPathIfNecessary when reading S3 files



On Fri, Sep 6, 2019 at 10:56 PM Arwin Tio 
mailto:arwin@hotmail.com>> wrote:
I think the problem is calling globStatus to expand all 300K files.
In my particular case I did not use any glob patterns, so my bottleneck came 
from the FileSystem#exists specifically. I do concur that the globStatus 
expansion could also be problematic.

But you might
consider, if possible, running a lot of .csv jobs in parallel to query
subsets of all the files, and union the results. At least there you
parallelize the reading from the object store.
That is a great solution! I think that's what I will do as a workaround for the 
moment. Right now I'm thinking that a potential improvement here is to 
parallelize the SparkHadoopUtil#globPathIfNecessary and FileSystem#exists calls 
whenever possible (i.e. when multiple paths are specified), so that the client 
doesn't have to.


The other tactic though it'd go through a lot more of the code would be to 
postpone the exists check until the work is scheduled, which is implicitly in 
open() on the workers, or explicit when the RDD does the split calculation and 
calls getFileBlockLocations(). If you are confident that that always happens 
(and you will have to trace back from those calls in things like 
org.apache.spark.streaming.util.HdfsUtils and ParallelizedWithLocalityRDD) then 
you get those scans in the driver ... but I fear regression handling there gets 
harder.

* have SparkHadoopUtils differentiate between files returned by globStatus(), 
and which therefore exist, and those which it didn't glob for -it will only 
need to check those.
* then worry about parallel execution of the scan, again
Okay sounds good, I will take a crack at this and open a ticket. Any thoughts 
on the parallelism; should it be configurable?

For file input formats (parquet, orc, ...) there is an option, default == 8. 
Though its also off by default...maybe i should change that.


Another possible QoL improvement here is to show progress log messages - 
something that indicates to the user that the cluster is stuck while the driver 
is listing S3 files, maybe even including the FS getStorageStatistics?

yeah. If you want some examples of this, take a look at 
https://github.com/steveloughran/cloudstore . the locatedfilestatus command 
replicates what happens during FileInputFormat scans, so is how I'm going to 
tune IOPs there. It might also be good to have those bits of the hadoop MR 
classes which spark uses to log internally @ debug, so everything gets this 
logging if they ask for it.

Happy to take contribs there as Hadoop JIRAs & PRs

Thanks,

Arwin

From: Steve Loughran mailto:ste...@cloudera.com>>
Sent: September 6, 2019 4:15 PM
To: Sean Owen mailto:sro...@gmail.com>>
Cc: Arwin Tio mailto:arwin@hotmail.com>>; 
dev@spark.apache.org<mailto:dev@spark.apache.org> 
mailto:dev@spark.apache.org>>
Subject: Re: DataFrameReader bottleneck in 
DataSource#checkAndGlobPathIfNecessary when reading S3 files



On Fri, Sep 6, 2019 at 2:50 PM Sean Owen 
mailto:sro...@gmail.com>> wrote:
I think the problem is calling globStatus to expand all 300K files.
This is a general problem for object stores and huge numbers of files.
Steve L. may have better thoughts on real solutions. But you might
consider, if possible, running a lot of .csv jobs in parallel to query
subsets of all the files, and union the results. At least there you
parallelize the reading from the object store.

yeah, avoid globs and small files, especially small files in deep trees.

I think it's hard to optimize this case from the Spark side as it's
not clear how big a glob like s3://foo/* is going to be. I think it
would take reimplementing some logic to expand the glob incrementally
or something. Or maybe I am overlooking optimizations that have gone
into Spark 3.

A long time ago I actually tried to move Filesystem.globFiles off its own 
recursive treewalk into supporting the option of flat-list-chlldren + filter. 
But while you can get some great speedups in some layouts, you can get 
pathological collapses in perf elsewhere, which makes the people running those 
queries very sad. So I gave up.

Parallelized scans can do speedup; look at the code in 
org.apache.hadoop.mapred.LocatedFileStatusFetcher to see what it does there. 
I've only just started exploring what we can do to tune that, with
HADOOP-16458, HADOOP-16465<https://issues.apache.org/jira/browse/HADOOP

Re: DataFrameReader bottleneck in DataSource#checkAndGlobPathIfNecessary when reading S3 files

2019-09-23 Thread Arwin Tio
Hi Steve,

I filed a JIRA and opened a PR for this issue:

https://issues.apache.org/jira/browse/SPARK-29089
https://github.com/apache/spark/pull/25899

Please lmk what you think

Cheers,

Arwin

From: Steve Loughran 
Sent: September 7, 2019 9:22 AM
To: Arwin Tio 
Cc: Sean Owen ; dev@spark.apache.org 
Subject: Re: DataFrameReader bottleneck in 
DataSource#checkAndGlobPathIfNecessary when reading S3 files



On Fri, Sep 6, 2019 at 10:56 PM Arwin Tio 
mailto:arwin@hotmail.com>> wrote:
I think the problem is calling globStatus to expand all 300K files.
In my particular case I did not use any glob patterns, so my bottleneck came 
from the FileSystem#exists specifically. I do concur that the globStatus 
expansion could also be problematic.

But you might
consider, if possible, running a lot of .csv jobs in parallel to query
subsets of all the files, and union the results. At least there you
parallelize the reading from the object store.
That is a great solution! I think that's what I will do as a workaround for the 
moment. Right now I'm thinking that a potential improvement here is to 
parallelize the SparkHadoopUtil#globPathIfNecessary and FileSystem#exists calls 
whenever possible (i.e. when multiple paths are specified), so that the client 
doesn't have to.


The other tactic though it'd go through a lot more of the code would be to 
postpone the exists check until the work is scheduled, which is implicitly in 
open() on the workers, or explicit when the RDD does the split calculation and 
calls getFileBlockLocations(). If you are confident that that always happens 
(and you will have to trace back from those calls in things like 
org.apache.spark.streaming.util.HdfsUtils and ParallelizedWithLocalityRDD) then 
you get those scans in the driver ... but I fear regression handling there gets 
harder.

* have SparkHadoopUtils differentiate between files returned by globStatus(), 
and which therefore exist, and those which it didn't glob for -it will only 
need to check those.
* then worry about parallel execution of the scan, again
Okay sounds good, I will take a crack at this and open a ticket. Any thoughts 
on the parallelism; should it be configurable?

For file input formats (parquet, orc, ...) there is an option, default == 8. 
Though its also off by default...maybe i should change that.


Another possible QoL improvement here is to show progress log messages - 
something that indicates to the user that the cluster is stuck while the driver 
is listing S3 files, maybe even including the FS getStorageStatistics?

yeah. If you want some examples of this, take a look at 
https://github.com/steveloughran/cloudstore . the locatedfilestatus command 
replicates what happens during FileInputFormat scans, so is how I'm going to 
tune IOPs there. It might also be good to have those bits of the hadoop MR 
classes which spark uses to log internally @ debug, so everything gets this 
logging if they ask for it.

Happy to take contribs there as Hadoop JIRAs & PRs

Thanks,

Arwin

From: Steve Loughran mailto:ste...@cloudera.com>>
Sent: September 6, 2019 4:15 PM
To: Sean Owen mailto:sro...@gmail.com>>
Cc: Arwin Tio mailto:arwin@hotmail.com>>; 
dev@spark.apache.org<mailto:dev@spark.apache.org> 
mailto:dev@spark.apache.org>>
Subject: Re: DataFrameReader bottleneck in 
DataSource#checkAndGlobPathIfNecessary when reading S3 files



On Fri, Sep 6, 2019 at 2:50 PM Sean Owen 
mailto:sro...@gmail.com>> wrote:
I think the problem is calling globStatus to expand all 300K files.
This is a general problem for object stores and huge numbers of files.
Steve L. may have better thoughts on real solutions. But you might
consider, if possible, running a lot of .csv jobs in parallel to query
subsets of all the files, and union the results. At least there you
parallelize the reading from the object store.

yeah, avoid globs and small files, especially small files in deep trees.

I think it's hard to optimize this case from the Spark side as it's
not clear how big a glob like s3://foo/* is going to be. I think it
would take reimplementing some logic to expand the glob incrementally
or something. Or maybe I am overlooking optimizations that have gone
into Spark 3.

A long time ago I actually tried to move Filesystem.globFiles off its own 
recursive treewalk into supporting the option of flat-list-chlldren + filter. 
But while you can get some great speedups in some layouts, you can get 
pathological collapses in perf elsewhere, which makes the people running those 
queries very sad. So I gave up.

Parallelized scans can do speedup; look at the code in 
org.apache.hadoop.mapred.LocatedFileStatusFetcher to see what it does there. 
I've only just started exploring what we can do to tune that, with
HADOOP-16458, HADOOP-16465<https://issues.apache.org/jira/browse/HADOOP-16465>, 
which should speed up ORC/Parquet scans) . 

Re: DataFrameReader bottleneck in DataSource#checkAndGlobPathIfNecessary when reading S3 files

2019-09-07 Thread Steve Loughran
On Fri, Sep 6, 2019 at 10:56 PM Arwin Tio  wrote:

> I think the problem is calling globStatus to expand all 300K files.
>
> In my particular case I did not use any glob patterns, so my bottleneck
> came from the FileSystem#exists specifically. I do concur that the
> globStatus expansion could also be problematic.
>
> But you might
> consider, if possible, running a lot of .csv jobs in parallel to query
> subsets of all the files, and union the results. At least there you
> parallelize the reading from the object store.
>
> That is a great solution! I think that's what I will do as a workaround
> for the moment. Right now I'm thinking that a potential improvement here is
> to parallelize the SparkHadoopUtil#globPathIfNecessary and
> FileSystem#exists calls whenever possible (i.e. when multiple paths are
> specified), so that the client doesn't have to.
>
>
The other tactic though it'd go through a lot more of the code would be to
postpone the exists check until the work is scheduled, which is implicitly
in open() on the workers, or explicit when the RDD does the split
calculation and calls getFileBlockLocations(). If you are confident that
that always happens (and you will have to trace back from those calls in
things like org.apache.spark.streaming.util.HdfsUtils and
ParallelizedWithLocalityRDD)
then you get those scans in the driver ... but I fear regression handling
there gets harder.

* have SparkHadoopUtils differentiate between files returned
> by globStatus(), and which therefore exist, and those which it didn't glob
> for -it will only need to check those.
> * then worry about parallel execution of the scan, again
>
> Okay sounds good, I will take a crack at this and open a ticket. Any
> thoughts on the parallelism; should it be configurable?
>

For file input formats (parquet, orc, ...) there is an option, default ==
8. Though its also off by default...maybe i should change that.


> Another possible QoL improvement here is to show progress log messages -
> something that indicates to the user that the cluster is stuck while the
> driver is listing S3 files, maybe even including the FS
> getStorageStatistics?
>

yeah. If you want some examples of this, take a look at
https://github.com/steveloughran/cloudstore . the locatedfilestatus command
replicates what happens during FileInputFormat scans, so is how I'm going
to tune IOPs there. It might also be good to have those bits of the hadoop
MR classes which spark uses to log internally @ debug, so everything gets
this logging if they ask for it.

Happy to take contribs there as Hadoop JIRAs & PRs

>
> Thanks,
>
> Arwin
> --
> *From:* Steve Loughran 
> *Sent:* September 6, 2019 4:15 PM
> *To:* Sean Owen 
> *Cc:* Arwin Tio ; dev@spark.apache.org <
> dev@spark.apache.org>
> *Subject:* Re: DataFrameReader bottleneck in
> DataSource#checkAndGlobPathIfNecessary when reading S3 files
>
>
>
> On Fri, Sep 6, 2019 at 2:50 PM Sean Owen  wrote:
>
> I think the problem is calling globStatus to expand all 300K files.
> This is a general problem for object stores and huge numbers of files.
> Steve L. may have better thoughts on real solutions. But you might
> consider, if possible, running a lot of .csv jobs in parallel to query
> subsets of all the files, and union the results. At least there you
> parallelize the reading from the object store.
>
>
> yeah, avoid globs and small files, especially small files in deep trees.
>
>
> I think it's hard to optimize this case from the Spark side as it's
> not clear how big a glob like s3://foo/* is going to be. I think it
> would take reimplementing some logic to expand the glob incrementally
> or something. Or maybe I am overlooking optimizations that have gone
> into Spark 3.
>
>
> A long time ago I actually tried to move Filesystem.globFiles off its own
> recursive treewalk into supporting the option of flat-list-chlldren +
> filter. But while you can get some great speedups in some layouts, you can
> get pathological collapses in perf elsewhere, which makes the people
> running those queries very sad. So I gave up.
>
> Parallelized scans can do speedup; look at the code in
> org.apache.hadoop.mapred.LocatedFileStatusFetcher to see what it does
> there. I've only just started exploring what we can do to tune that, with
> HADOOP-16458, HADOOP-16465
> <https://issues.apache.org/jira/browse/HADOOP-16465>, which should speed
> up ORC/Parquet scans) . These are designed to cut 1-2 HEAD requests off per
> directory list, which may seem small but from my early measurements, can be
> significant.
>
> That's why cutting things like an exists check makes a big difference,
> especially if you are going to call some list() or open() operation

Re: DataFrameReader bottleneck in DataSource#checkAndGlobPathIfNecessary when reading S3 files

2019-09-06 Thread Arwin Tio
I think the problem is calling globStatus to expand all 300K files.
In my particular case I did not use any glob patterns, so my bottleneck came 
from the FileSystem#exists specifically. I do concur that the globStatus 
expansion could also be problematic.

But you might
consider, if possible, running a lot of .csv jobs in parallel to query
subsets of all the files, and union the results. At least there you
parallelize the reading from the object store.
That is a great solution! I think that's what I will do as a workaround for the 
moment. Right now I'm thinking that a potential improvement here is to 
parallelize the SparkHadoopUtil#globPathIfNecessary and FileSystem#exists calls 
whenever possible (i.e. when multiple paths are specified), so that the client 
doesn't have to.

For speedup then
* have SparkHadoopUtils differentiate between files returned by globStatus(), 
and which therefore exist, and those which it didn't glob for -it will only 
need to check those.
* then worry about parallel execution of the scan, again
Okay sounds good, I will take a crack at this and open a ticket. Any thoughts 
on the parallelism; should it be configurable?

Another possible QoL improvement here is to show progress log messages - 
something that indicates to the user that the cluster is stuck while the driver 
is listing S3 files, maybe even including the FS getStorageStatistics?

Thanks,

Arwin

From: Steve Loughran 
Sent: September 6, 2019 4:15 PM
To: Sean Owen 
Cc: Arwin Tio ; dev@spark.apache.org 

Subject: Re: DataFrameReader bottleneck in 
DataSource#checkAndGlobPathIfNecessary when reading S3 files



On Fri, Sep 6, 2019 at 2:50 PM Sean Owen 
mailto:sro...@gmail.com>> wrote:
I think the problem is calling globStatus to expand all 300K files.
This is a general problem for object stores and huge numbers of files.
Steve L. may have better thoughts on real solutions. But you might
consider, if possible, running a lot of .csv jobs in parallel to query
subsets of all the files, and union the results. At least there you
parallelize the reading from the object store.

yeah, avoid globs and small files, especially small files in deep trees.

I think it's hard to optimize this case from the Spark side as it's
not clear how big a glob like s3://foo/* is going to be. I think it
would take reimplementing some logic to expand the glob incrementally
or something. Or maybe I am overlooking optimizations that have gone
into Spark 3.

A long time ago I actually tried to move Filesystem.globFiles off its own 
recursive treewalk into supporting the option of flat-list-chlldren + filter. 
But while you can get some great speedups in some layouts, you can get 
pathological collapses in perf elsewhere, which makes the people running those 
queries very sad. So I gave up.

Parallelized scans can do speedup; look at the code in 
org.apache.hadoop.mapred.LocatedFileStatusFetcher to see what it does there. 
I've only just started exploring what we can do to tune that, with
HADOOP-16458, HADOOP-16465<https://issues.apache.org/jira/browse/HADOOP-16465>, 
which should speed up ORC/Parquet scans) . These are designed to cut 1-2 HEAD 
requests off per directory list, which may seem small but from my early 
measurements, can be significant.

That's why cutting things like an exists check makes a big difference, 
especially if you are going to call some list() or open() operation straight 
after -just call the operation and rely on the FileNotFoundException to tell 
you when it's not there.

Now, looking at the code, if the list has already come from a real call to 
globPath, then yes, the existsCall is wasteful, where waste = 500+ mills per 
file: 
http://steveloughran.blogspot.com/2016/12/how-long-does-filesystemexists-take.html

For speedup then
* have SparkHadoopUtils differentiate between files returned by globStatus(), 
and which therefore exist, and those which it didn't glob for -it will only 
need to check those.
* then worry about parallel execution of the scan, again

Why not file a JIRA on the spark work; send me a ref so I can look at your 
patch.

One thing to know here is that not only does the S3A FS class have counters for 
all operations you can get from getStorageStatistics, if you call toString() on 
it it will print out the current stats. So you can just log the fs string value 
before and after an operation and see what's gone on. We track FS API calls 
(op_*) and actual http requests of the store (object_*); both are interesting. 
object_ to see what is expensive (and in the S3A FS code, what we should cut), 
the op_ values what API calls are used a lot and should somehow be eliminated 
or, if you have insights, optimised better. Removal is usually the best, as it 
speeds up everything.

Long term, relying on directory trees to list your source data, commit 
algorithms which move/instantiate changes isn't sustainable. Things like Apache 
Iceberg are where data should g

Re: DataFrameReader bottleneck in DataSource#checkAndGlobPathIfNecessary when reading S3 files

2019-09-06 Thread Steve Loughran
On Fri, Sep 6, 2019 at 2:50 PM Sean Owen  wrote:

> I think the problem is calling globStatus to expand all 300K files.
> This is a general problem for object stores and huge numbers of files.
> Steve L. may have better thoughts on real solutions. But you might
> consider, if possible, running a lot of .csv jobs in parallel to query
> subsets of all the files, and union the results. At least there you
> parallelize the reading from the object store.
>

yeah, avoid globs and small files, especially small files in deep trees.

>
> I think it's hard to optimize this case from the Spark side as it's
> not clear how big a glob like s3://foo/* is going to be. I think it
> would take reimplementing some logic to expand the glob incrementally
> or something. Or maybe I am overlooking optimizations that have gone
> into Spark 3.
>

A long time ago I actually tried to move Filesystem.globFiles off its own
recursive treewalk into supporting the option of flat-list-chlldren +
filter. But while you can get some great speedups in some layouts, you can
get pathological collapses in perf elsewhere, which makes the people
running those queries very sad. So I gave up.

Parallelized scans can do speedup; look at the code in
org.apache.hadoop.mapred.LocatedFileStatusFetcher to see what it does
there. I've only just started exploring what we can do to tune that, with
HADOOP-16458, HADOOP-16465
, which should speed up
ORC/Parquet scans) . These are designed to cut 1-2 HEAD requests off per
directory list, which may seem small but from my early measurements, can be
significant.

That's why cutting things like an exists check makes a big difference,
especially if you are going to call some list() or open() operation
straight after -just call the operation and rely on the
FileNotFoundException to tell you when it's not there.

Now, looking at the code, if the list has already come from a real call to
globPath, then yes, the existsCall is wasteful, where waste = 500+ mills
per file:
http://steveloughran.blogspot.com/2016/12/how-long-does-filesystemexists-take.html

For speedup then
* have SparkHadoopUtils differentiate between files returned
by globStatus(), and which therefore exist, and those which it didn't glob
for -it will only need to check those.
* then worry about parallel execution of the scan, again

Why not file a JIRA on the spark work; send me a ref so I can look at your
patch.

One thing to know here is that not only does the S3A FS class have counters
for all operations you can get from getStorageStatistics, if you call
toString() on it it will print out the current stats. So you can just log
the fs string value before and after an operation and see what's gone on.
We track FS API calls (op_*) and actual http requests of the store
(object_*); both are interesting. object_ to see what is expensive (and in
the S3A FS code, what we should cut), the op_ values what API calls are
used a lot and should somehow be eliminated or, if you have insights,
optimised better. Removal is usually the best, as it speeds up everything.

Long term, relying on directory trees to list your source data, commit
algorithms which move/instantiate changes isn't sustainable. Things like
Apache Iceberg are where data should go ... things for which S3 can be
viewed as a fault-injecting test infrastructure. It's the Chaos Monkey of
object storage.


>
> On Fri, Sep 6, 2019 at 7:09 AM Arwin Tio  wrote:
> >
> > Hello,
> >
> > On Spark 2.4.4, I am using DataFrameReader#csv to read about 30
> files on S3, and I've noticed that it takes about an hour for it to load
> the data on the Driver. You can see the timestamp difference when the log
> from InMemoryFileIndex occurs from 7:45 to 8:54:
> >
> > 19/09/06 07:44:42 INFO SparkContext: Running Spark version 2.4.4
> > 19/09/06 07:44:42 INFO SparkContext: Submitted application:
> LoglineParquetGenerator
> > ...
> > 19/09/06 07:45:40 INFO StateStoreCoordinatorRef: Registered
> StateStoreCoordinator endpoint
> > 19/09/06 08:54:57 INFO InMemoryFileIndex: Listing leaf files and
> directories in parallel under: [300K files...]
> >
> >
> > I believe that the issue comes from
> DataSource#checkAndGlobPathIfNecessary [0], specifically from when it is
> calling FileSystem#exists. Unlike bulkListLeafFiles, the existence check
> here happens in a single-threaded flatMap, which is a blocking network call
> if your files are stored on S3.
> >
> > I believe that there is a fairly straightforward opportunity for
> improvement here, which is to parallelize the existence check perhaps with
> a configurable number of threads. If that seems reasonable, I would like to
> create a JIRA ticket and submit a patch. Please let me know!
> >
> > Cheers,
> >
> > Arwin
> >
> > [0]
> https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L557
>
> 

Re: DataFrameReader bottleneck in DataSource#checkAndGlobPathIfNecessary when reading S3 files

2019-09-06 Thread Sean Owen
I think the problem is calling globStatus to expand all 300K files.
This is a general problem for object stores and huge numbers of files.
Steve L. may have better thoughts on real solutions. But you might
consider, if possible, running a lot of .csv jobs in parallel to query
subsets of all the files, and union the results. At least there you
parallelize the reading from the object store.

I think it's hard to optimize this case from the Spark side as it's
not clear how big a glob like s3://foo/* is going to be. I think it
would take reimplementing some logic to expand the glob incrementally
or something. Or maybe I am overlooking optimizations that have gone
into Spark 3.

On Fri, Sep 6, 2019 at 7:09 AM Arwin Tio  wrote:
>
> Hello,
>
> On Spark 2.4.4, I am using DataFrameReader#csv to read about 30 files on 
> S3, and I've noticed that it takes about an hour for it to load the data on 
> the Driver. You can see the timestamp difference when the log from 
> InMemoryFileIndex occurs from 7:45 to 8:54:
>
> 19/09/06 07:44:42 INFO SparkContext: Running Spark version 2.4.4
> 19/09/06 07:44:42 INFO SparkContext: Submitted application: 
> LoglineParquetGenerator
> ...
> 19/09/06 07:45:40 INFO StateStoreCoordinatorRef: Registered 
> StateStoreCoordinator endpoint
> 19/09/06 08:54:57 INFO InMemoryFileIndex: Listing leaf files and directories 
> in parallel under: [300K files...]
>
>
> I believe that the issue comes from DataSource#checkAndGlobPathIfNecessary 
> [0], specifically from when it is calling FileSystem#exists. Unlike 
> bulkListLeafFiles, the existence check here happens in a single-threaded 
> flatMap, which is a blocking network call if your files are stored on S3.
>
> I believe that there is a fairly straightforward opportunity for improvement 
> here, which is to parallelize the existence check perhaps with a configurable 
> number of threads. If that seems reasonable, I would like to create a JIRA 
> ticket and submit a patch. Please let me know!
>
> Cheers,
>
> Arwin
>
> [0] 
> https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L557

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



DataFrameReader bottleneck in DataSource#checkAndGlobPathIfNecessary when reading S3 files

2019-09-06 Thread Arwin Tio
Hello,

On Spark 2.4.4, I am using DataFrameReader#csv to read about 30 files on 
S3, and I've noticed that it takes about an hour for it to load the data on the 
Driver. You can see the timestamp difference when the log from 
InMemoryFileIndex occurs from 7:45 to 8:54:
19/09/06 07:44:42 INFO SparkContext: Running Spark version 2.4.4
19/09/06 07:44:42 INFO SparkContext: Submitted application: 
LoglineParquetGenerator
...
19/09/06 07:45:40 INFO StateStoreCoordinatorRef: Registered 
StateStoreCoordinator endpoint
19/09/06 08:54:57 INFO InMemoryFileIndex: Listing leaf files and directories in 
parallel under: [300K files...]

I believe that the issue comes from DataSource#checkAndGlobPathIfNecessary [0], 
specifically from when it is calling FileSystem#exists. Unlike 
bulkListLeafFiles, the existence check here happens in a single-threaded 
flatMap, which is a blocking network call if your files are stored on S3.

I believe that there is a fairly straightforward opportunity for improvement 
here, which is to parallelize the existence check perhaps with a configurable 
number of threads. If that seems reasonable, I would like to create a JIRA 
ticket and submit a patch. Please let me know!

Cheers,

Arwin

[0] 
https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L557