[ 
https://issues.apache.org/jira/browse/SPARK-27664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prashant Sharma updated SPARK-27664:
------------------------------------
    Description: 
In short,

This issue(i.e. degraded performance ) surfaces when the number of files are 
large > 100K, and is stored on an object store, or any remote storage. The 
actual issue is due to,

Everything is inserted as a single entry in the FileStatusCache i.e. guava 
cache, which does not fit unless the cache is configured to be very very large 
or 4X. Reason: [https://github.com/google/guava/issues/3462].

 

Full story, with possible solutions,

When we read a directory in spark by,
{code:java}
spark.read.parquet("/dir/data/test").limit(1).show()
{code}
behind the scenes, it fetches the FileStatus objects and caches them, inside a 
FileStatusCache, so that it does not need to refetch these objects. Internally, 
it scans using listLeafFiles function at driver. 
 Inside the cache, the entire content of the listing as array of FileStatus 
objects is inserted as a single entry, with key as "/dir/data/test", in the 
FileStatusCache. The default size of this cache is 250MB and it is 
configurable. This underlying cache uses guava cache.

The guava cache has one interesting property, i.e. a single entry can only be 
as large as
{code:java}
maximumSize/concurrencyLevel{code}
see [https://github.com/google/guava/issues/3462], for details. So for a cache 
size of 250MB, a single entry can be as large as only 250MB/4, since the 
default concurrency level is 4 in guava. This size is around 62MB, which is 
good enough for most datasets, but for directories with larger listing, it does 
not work that well. And the effect of this is especially evident when such 
listings are for object stores like Amazon s3 or IBM Cloud object store etc..

So, currently one can work around this problem by setting the value of size of 
the cache (i.e. `spark.sql.hive.filesourcePartitionFileCacheSize`) as very 
high, as it needs to be much more than 4x of what is required.

In order to fix this issue, we can take 3 different approaches,

1) one stop gap fix can be, reduce the concurrency level of the guava cache to 
be just 1, because if everything has to be just one single entry per job, then 
concurrency is not helpful anyway.

2) The alternative would be, to divide the input array into multiple entries in 
the cache, instead of inserting everything against a single key. This can be 
done using directories as keys, if there are multiple nested directories under 
a directory, but if a user has everything listed under a single dir, then this 
solution does not help either and we cannot depend on them creating partitions 
in their hive/sql table.

3) One more alternative fix would be, to make concurrency level configurable, 
for those who want to change it. And while inserting the entry in the cache 
divide it into the `concurrencyLevel`(or even 2X or 3X of it) number of parts, 
before inserting. This way cache will perform optimally, and even if there is 
an eviction, it will evict only a part of the entries, as against all the 
entries in the current implementation. How many entries are evicted due to 
size, depends on concurrencyLevel configured. This approach can be taken, even 
without making `concurrencyLevel` configurable.

The problem with this approach is, the partitions in cache are of no use as 
such, because even if one partition is evicted, then all the partitions of the 
key should also be evicted, otherwise the results would be wrong. 

  was:
In short,

This issue(i.e. degraded performance ) surfaces when the number of files are 
large > 100K, and is stored on an object store, or any remote storage. The 
actual issue is due to,

Everything is inserted as a single entry in the FileStatusCache i.e. guava 
cache, which does not fit unless the cache is configured to be very very large 
or 4X. Reason: [https://github.com/google/guava/issues/3462].

 

Full story, with possible solutions,

When we read a directory in spark by,
{code:java}
spark.read.parquet("/dir/data/test").limit(1).show()
{code}
behind the scenes, it fetches the FileStatus objects and caches them, inside a 
FileStatusCache, so that it does not need to refetch these objects. Internally, 
it scans using listLeafFiles function at driver. 
 Inside the cache, the entire content of the listing as array of FileStatus 
objects is inserted as a single entry, with key as "/dir/data/test", in the 
FileStatusCache. The default size of this cache is 250MB and it is 
configurable. This underlying cache uses guava cache.

The guava cache has one interesting property, i.e. a single entry can only be 
as large as
{code:java}
maximumSize/concurrencyLevel{code}
see [https://github.com/google/guava/issues/3462], for details. So for a cache 
size of 250MB, a single entry can be as large as only 250MB/4, since the 
default concurrency level is 4 in guava. This size is around 62MB, which is 
good enough for most datasets, but for directories with larger listing, it does 
not work that well. And the effect of this is especially evident when such 
listings are for object stores like Amazon s3 or IBM Cloud object store etc..

So, currently one can work around this problem by setting the value of size of 
the cache (i.e. `spark.sql.hive.filesourcePartitionFileCacheSize`) as very 
high, as it needs to be much more than 4x of what is required.

In order to fix this issue, we can take 3 different approaches,

1) one stop gap fix can be, reduce the concurrency level of the guava cache to 
be just 1, because if everything has to be just one single entry per job, then 
concurrency is not helpful anyway.

2) The ideal fix would be, to divide the input array into multiple entries in 
the cache, instead of inserting everything against a single key. This can be 
done using directories as keys, if there are multiple nested directories under 
a directory, but if a user has everything listed under a single dir, then this 
solution does not help either. 

3) Even more ideal fix would be, to make concurrency level configurable, for 
those who want to change it. And while inserting the entry in the cache divide 
it into the `concurrencyLevel`(or even 2X or 3X of it) number of parts, before 
inserting. This way cache will perform optimally, and even if there is an 
eviction, it will evict only a part of the entries, as against all the entries 
in the current implementation. How many entries are evicted due to size, 
depends on concurrencyLevel configured. This approach can be taken, even 
without making `concurrencyLevel` configurable.


> Performance issue with FileStatusCache, while reading from object stores.
> -------------------------------------------------------------------------
>
>                 Key: SPARK-27664
>                 URL: https://issues.apache.org/jira/browse/SPARK-27664
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.0.0, 2.4.3
>            Reporter: Prashant Sharma
>            Priority: Major
>
> In short,
> This issue(i.e. degraded performance ) surfaces when the number of files are 
> large > 100K, and is stored on an object store, or any remote storage. The 
> actual issue is due to,
> Everything is inserted as a single entry in the FileStatusCache i.e. guava 
> cache, which does not fit unless the cache is configured to be very very 
> large or 4X. Reason: [https://github.com/google/guava/issues/3462].
>  
> Full story, with possible solutions,
> When we read a directory in spark by,
> {code:java}
> spark.read.parquet("/dir/data/test").limit(1).show()
> {code}
> behind the scenes, it fetches the FileStatus objects and caches them, inside 
> a FileStatusCache, so that it does not need to refetch these objects. 
> Internally, it scans using listLeafFiles function at driver. 
>  Inside the cache, the entire content of the listing as array of FileStatus 
> objects is inserted as a single entry, with key as "/dir/data/test", in the 
> FileStatusCache. The default size of this cache is 250MB and it is 
> configurable. This underlying cache uses guava cache.
> The guava cache has one interesting property, i.e. a single entry can only be 
> as large as
> {code:java}
> maximumSize/concurrencyLevel{code}
> see [https://github.com/google/guava/issues/3462], for details. So for a 
> cache size of 250MB, a single entry can be as large as only 250MB/4, since 
> the default concurrency level is 4 in guava. This size is around 62MB, which 
> is good enough for most datasets, but for directories with larger listing, it 
> does not work that well. And the effect of this is especially evident when 
> such listings are for object stores like Amazon s3 or IBM Cloud object store 
> etc..
> So, currently one can work around this problem by setting the value of size 
> of the cache (i.e. `spark.sql.hive.filesourcePartitionFileCacheSize`) as very 
> high, as it needs to be much more than 4x of what is required.
> In order to fix this issue, we can take 3 different approaches,
> 1) one stop gap fix can be, reduce the concurrency level of the guava cache 
> to be just 1, because if everything has to be just one single entry per job, 
> then concurrency is not helpful anyway.
> 2) The alternative would be, to divide the input array into multiple entries 
> in the cache, instead of inserting everything against a single key. This can 
> be done using directories as keys, if there are multiple nested directories 
> under a directory, but if a user has everything listed under a single dir, 
> then this solution does not help either and we cannot depend on them creating 
> partitions in their hive/sql table.
> 3) One more alternative fix would be, to make concurrency level configurable, 
> for those who want to change it. And while inserting the entry in the cache 
> divide it into the `concurrencyLevel`(or even 2X or 3X of it) number of 
> parts, before inserting. This way cache will perform optimally, and even if 
> there is an eviction, it will evict only a part of the entries, as against 
> all the entries in the current implementation. How many entries are evicted 
> due to size, depends on concurrencyLevel configured. This approach can be 
> taken, even without making `concurrencyLevel` configurable.
> The problem with this approach is, the partitions in cache are of no use as 
> such, because even if one partition is evicted, then all the partitions of 
> the key should also be evicted, otherwise the results would be wrong. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to