alamb commented on code in PR #18855:
URL: https://github.com/apache/datafusion/pull/18855#discussion_r2550693419
##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -42,8 +47,18 @@ pub type FileStatisticsCache =
/// especially when done over remote object stores.
///
/// See [`crate::runtime_env::RuntimeEnv`] for more details
-pub type ListFilesCache =
- Arc<dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>>;
+pub trait ListFilesCache:
+ CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>
+{
+ // Returns the cache's object limit.
+ fn cache_limit(&self) -> usize;
+
+ // Returns the cache's object ttl.
+ fn cache_ttl(&self) -> Duration;
+
+ // Updates the cache with a new boject limit.
Review Comment:
```suggestion
// Updates the cache with a new object limit.
```
##########
datafusion/core/tests/datasource/object_store_access.rs:
##########
@@ -237,8 +233,7 @@ async fn query_partitioned_csv_file() {
+---------+-------+-------+---+----+-----+
------- Object Store Request Summary -------
RequestCountingObjectStore()
- Total Requests: 2
- - LIST prefix=data
+ Total Requests: 1
Review Comment:
Could we also add a test that shows that once the cache eviction happens, a
subsquent query does actually make a new LIST request?
##########
datafusion/core/tests/datasource/object_store_access.rs:
##########
@@ -117,8 +117,7 @@ async fn query_multi_csv_file() {
+---------+-------+-------+
------- Object Store Request Summary -------
RequestCountingObjectStore()
- Total Requests: 4
- - LIST prefix=data
+ Total Requests: 3
Review Comment:
it is pretty neat to see this in action
##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -42,8 +47,18 @@ pub type FileStatisticsCache =
/// especially when done over remote object stores.
///
/// See [`crate::runtime_env::RuntimeEnv`] for more details
Review Comment:
This is technically a breaking API change (a good one in my mind). I am just
pointing it out
##########
datafusion/execution/src/cache/cache_unit.rs:
##########
@@ -111,64 +113,224 @@ impl CacheAccessor<Path, Arc<Statistics>> for
DefaultFileStatisticsCache {
///
/// Collected files metadata for listing files.
///
-/// Cache is not invalided until user calls [`Self::remove`] or
[`Self::clear`].
+/// # Internal details
+///
+/// The `capacity` parameter controls the maximum number of entries in the
cache, using a Least
+/// Recently Used eviction algorithm. When adding a new entry, if the total
number of entries in
+/// the cache exceeds `capacity`, the least recently used entries are evicted
until the total
+/// entries are lower than the `capacity`.
///
/// [`ListFilesCache`]: crate::cache::cache_manager::ListFilesCache
#[derive(Default)]
pub struct DefaultListFilesCache {
- statistics: DashMap<Path, Arc<Vec<ObjectMeta>>>,
+ state: Mutex<DefaultListFilesCacheState>,
+}
+
+impl DefaultListFilesCache {
+ pub fn new(capacity: usize, ttl: Duration) -> Self {
+ Self {
+ state: Mutex::new(DefaultListFilesCacheState::new(capacity, ttl)),
+ }
+ }
+
+ pub fn cache_limit(&self) -> usize {
+ self.state.lock().unwrap().capacity
+ }
+
+ pub fn cache_ttl(&self) -> Duration {
+ self.state.lock().unwrap().ttl
+ }
+}
+
+pub(super) const DEFAULT_LIST_FILES_CACHE_LIMIT: usize = 128 * 1024; // ~130k
objects
+pub(super) const DEFAULT_LIST_FILES_CACHE_TTL: Duration = Duration::new(600,
0); // 10min
+
+pub struct DefaultListFilesCacheState {
+ lru_queue: LruQueue<Path, (Arc<Vec<ObjectMeta>>, Instant)>,
+ capacity: usize, // TODO: do "bytes" matter here, or should we stick with
"entries"?
+ size: usize,
+ ttl: Duration,
+}
+
+// TODO: Do we even want to support "default" here?
+impl Default for DefaultListFilesCacheState {
+ fn default() -> Self {
+ Self {
+ lru_queue: LruQueue::new(),
+ capacity: DEFAULT_LIST_FILES_CACHE_LIMIT,
+ size: 0,
+ ttl: DEFAULT_LIST_FILES_CACHE_TTL,
+ }
+ }
+}
+
+impl DefaultListFilesCacheState {
+ fn new(capacity: usize, ttl: Duration) -> Self {
+ Self {
+ lru_queue: LruQueue::new(),
+ capacity,
+ size: 0,
+ ttl,
+ }
+ }
+
+ fn get(&mut self, key: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
Review Comment:
I think we just changes some of these APIs to take `&self` rather than `&mut
self` -- I am not sure if we want to do the same thing here.
- https://github.com/apache/datafusion/pull/18726
##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -189,13 +228,20 @@ pub struct CacheManagerConfig {
/// Avoid get same file statistics repeatedly in same datafusion session.
/// Default is disable. Fow now only supports Parquet files.
pub table_files_statistics_cache: Option<FileStatisticsCache>,
- /// Enable cache of file metadata when listing files.
- /// This setting avoids listing file meta of the same path repeatedly
- /// in same session, which may be expensive in certain situations (e.g.
remote object storage).
+ /// Enable caching of file metadata when listing files.
+ /// Enabling the cache avoids repeat list and metadata fetch operations,
which may be expensive
+ /// in certain situations (e.g. remote object storage), for objects under
paths that are
+ /// cached.
/// Note that if this option is enabled, DataFusion will not see any
updates to the underlying
- /// location.
- /// Default is disable.
- pub list_files_cache: Option<ListFilesCache>,
+ /// storage for at least `list_files_cache_ttl` duration.
+ /// Default is disabled.
+ pub list_files_cache: Option<Arc<dyn ListFilesCache>>,
+ /// Limit the number of objects to keep in the `list_files_cache`.
Default: ~125k objects
+ pub list_files_cache_limit: usize,
Review Comment:
I found it a little strange that the cache size is set on the cache manager
itself rather than an options struct -- though now i see it is consistent with
how metadata_cache_limits work
##########
datafusion/execution/src/cache/cache_unit.rs:
##########
@@ -111,64 +113,224 @@ impl CacheAccessor<Path, Arc<Statistics>> for
DefaultFileStatisticsCache {
///
/// Collected files metadata for listing files.
///
-/// Cache is not invalided until user calls [`Self::remove`] or
[`Self::clear`].
+/// # Internal details
+///
+/// The `capacity` parameter controls the maximum number of entries in the
cache, using a Least
+/// Recently Used eviction algorithm. When adding a new entry, if the total
number of entries in
+/// the cache exceeds `capacity`, the least recently used entries are evicted
until the total
+/// entries are lower than the `capacity`.
///
/// [`ListFilesCache`]: crate::cache::cache_manager::ListFilesCache
#[derive(Default)]
pub struct DefaultListFilesCache {
- statistics: DashMap<Path, Arc<Vec<ObjectMeta>>>,
+ state: Mutex<DefaultListFilesCacheState>,
+}
+
+impl DefaultListFilesCache {
+ pub fn new(capacity: usize, ttl: Duration) -> Self {
+ Self {
+ state: Mutex::new(DefaultListFilesCacheState::new(capacity, ttl)),
+ }
+ }
+
+ pub fn cache_limit(&self) -> usize {
+ self.state.lock().unwrap().capacity
+ }
+
+ pub fn cache_ttl(&self) -> Duration {
+ self.state.lock().unwrap().ttl
+ }
+}
+
+pub(super) const DEFAULT_LIST_FILES_CACHE_LIMIT: usize = 128 * 1024; // ~130k
objects
+pub(super) const DEFAULT_LIST_FILES_CACHE_TTL: Duration = Duration::new(600,
0); // 10min
Review Comment:
My personal opinion is that we should set TTL much longer (maybe even
infinite) by default. Users who know their files are changing are likely going
to have to crank it down from a default anyways, so we might as well make the
default behavior more deterministic
##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -209,6 +255,8 @@ impl Default for CacheManagerConfig {
Self {
table_files_statistics_cache: Default::default(),
list_files_cache: Default::default(),
+ list_files_cache_limit: DEFAULT_LIST_FILES_CACHE_LIMIT,
+ list_files_cache_ttl: DEFAULT_LIST_FILES_CACHE_TTL,
Review Comment:
I do agree having a "infinite TTL" is an important usecase
I don't have a strong opinion on how that is expressed in the config options
##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -140,7 +155,17 @@ impl CacheManager {
let file_statistic_cache =
config.table_files_statistics_cache.as_ref().map(Arc::clone);
- let list_files_cache =
config.list_files_cache.as_ref().map(Arc::clone);
+ let list_files_cache = config
+ .list_files_cache
+ .as_ref()
+ .map(Arc::clone)
+ .unwrap_or_else(|| {
+ Arc::new(DefaultListFilesCache::new(
+ // TODO: config
+ 512 * 1024,
+ Duration::new(600, 0),
Review Comment:
I agree
Another thing we should do is some way to introspect the cache as part of
datafusion-cli
##########
datafusion/execution/src/cache/cache_unit.rs:
##########
@@ -111,64 +113,224 @@ impl CacheAccessor<Path, Arc<Statistics>> for
DefaultFileStatisticsCache {
///
/// Collected files metadata for listing files.
///
-/// Cache is not invalided until user calls [`Self::remove`] or
[`Self::clear`].
+/// # Internal details
+///
+/// The `capacity` parameter controls the maximum number of entries in the
cache, using a Least
+/// Recently Used eviction algorithm. When adding a new entry, if the total
number of entries in
+/// the cache exceeds `capacity`, the least recently used entries are evicted
until the total
+/// entries are lower than the `capacity`.
///
/// [`ListFilesCache`]: crate::cache::cache_manager::ListFilesCache
#[derive(Default)]
pub struct DefaultListFilesCache {
- statistics: DashMap<Path, Arc<Vec<ObjectMeta>>>,
+ state: Mutex<DefaultListFilesCacheState>,
+}
+
+impl DefaultListFilesCache {
+ pub fn new(capacity: usize, ttl: Duration) -> Self {
+ Self {
+ state: Mutex::new(DefaultListFilesCacheState::new(capacity, ttl)),
+ }
+ }
+
+ pub fn cache_limit(&self) -> usize {
+ self.state.lock().unwrap().capacity
+ }
+
+ pub fn cache_ttl(&self) -> Duration {
+ self.state.lock().unwrap().ttl
+ }
+}
+
+pub(super) const DEFAULT_LIST_FILES_CACHE_LIMIT: usize = 128 * 1024; // ~130k
objects
+pub(super) const DEFAULT_LIST_FILES_CACHE_TTL: Duration = Duration::new(600,
0); // 10min
+
+pub struct DefaultListFilesCacheState {
+ lru_queue: LruQueue<Path, (Arc<Vec<ObjectMeta>>, Instant)>,
+ capacity: usize, // TODO: do "bytes" matter here, or should we stick with
"entries"?
Review Comment:
I think that bytes makes more sense, mostly because that is the resource
that is actually used. count of items is a proxy for resource usage
##########
datafusion/execution/src/cache/cache_unit.rs:
##########
@@ -111,64 +113,224 @@ impl CacheAccessor<Path, Arc<Statistics>> for
DefaultFileStatisticsCache {
///
/// Collected files metadata for listing files.
///
-/// Cache is not invalided until user calls [`Self::remove`] or
[`Self::clear`].
+/// # Internal details
+///
+/// The `capacity` parameter controls the maximum number of entries in the
cache, using a Least
+/// Recently Used eviction algorithm. When adding a new entry, if the total
number of entries in
+/// the cache exceeds `capacity`, the least recently used entries are evicted
until the total
+/// entries are lower than the `capacity`.
///
/// [`ListFilesCache`]: crate::cache::cache_manager::ListFilesCache
#[derive(Default)]
pub struct DefaultListFilesCache {
- statistics: DashMap<Path, Arc<Vec<ObjectMeta>>>,
+ state: Mutex<DefaultListFilesCacheState>,
+}
+
+impl DefaultListFilesCache {
+ pub fn new(capacity: usize, ttl: Duration) -> Self {
+ Self {
+ state: Mutex::new(DefaultListFilesCacheState::new(capacity, ttl)),
+ }
+ }
+
+ pub fn cache_limit(&self) -> usize {
+ self.state.lock().unwrap().capacity
+ }
+
+ pub fn cache_ttl(&self) -> Duration {
+ self.state.lock().unwrap().ttl
+ }
+}
+
+pub(super) const DEFAULT_LIST_FILES_CACHE_LIMIT: usize = 128 * 1024; // ~130k
objects
+pub(super) const DEFAULT_LIST_FILES_CACHE_TTL: Duration = Duration::new(600,
0); // 10min
+
+pub struct DefaultListFilesCacheState {
+ lru_queue: LruQueue<Path, (Arc<Vec<ObjectMeta>>, Instant)>,
+ capacity: usize, // TODO: do "bytes" matter here, or should we stick with
"entries"?
+ size: usize,
+ ttl: Duration,
+}
+
+// TODO: Do we even want to support "default" here?
+impl Default for DefaultListFilesCacheState {
+ fn default() -> Self {
+ Self {
+ lru_queue: LruQueue::new(),
+ capacity: DEFAULT_LIST_FILES_CACHE_LIMIT,
+ size: 0,
+ ttl: DEFAULT_LIST_FILES_CACHE_TTL,
+ }
+ }
+}
+
+impl DefaultListFilesCacheState {
+ fn new(capacity: usize, ttl: Duration) -> Self {
+ Self {
+ lru_queue: LruQueue::new(),
+ capacity,
+ size: 0,
+ ttl,
+ }
+ }
+
+ fn get(&mut self, key: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
+ let (object_metas, expires) = self.lru_queue.get(key)?;
+
+ if Instant::now() > *expires {
+ self.remove(key);
+ None
+ } else {
+ Some(Arc::clone(object_metas))
+ }
+ }
+
+ fn contains_key(&mut self, k: &Path) -> bool {
+ let Some((_, expires)) = self.lru_queue.peek(k) else {
+ return false;
+ };
+
+ if Instant::now() > *expires {
+ self.remove(k);
+ return false;
+ }
+
+ true
+ }
+
+ fn put(
+ &mut self,
+ key: &Path,
+ value: Arc<Vec<ObjectMeta>>,
+ ) -> Option<Arc<Vec<ObjectMeta>>> {
+ let value_size = value.len();
+
+ // no point in trying to add this value to the cache if it cannot fit
entirely
+ if value_size > self.capacity {
+ return None;
+ }
+
+ // if the key is already in the cache, the old value is removed
+ let expires = Instant::now() + self.ttl;
+ let old_value = self.lru_queue.put(key.clone(), (value, expires));
+ self.size += value_size;
+ if let Some((ref old_metas, _)) = old_value {
+ self.size -= old_metas.len();
+ }
+
+ self.evict_entries();
+
+ old_value.map(|v| v.0)
+ }
+
+ fn evict_entries(&mut self) {
+ while self.size > self.capacity {
+ if let Some(removed) = self.lru_queue.pop() {
+ let metas: Arc<Vec<ObjectMeta>> = removed.1 .0;
+ self.size -= metas.len();
+ } else {
+ // cache is empty while memory_used > memory_limit, cannot
happen
+ debug_assert!(
+ false,
+ "cache is empty while memory_used > memory_limit, cannot
happen"
+ );
+ return;
+ }
+ }
+ }
+
+ /// Removes an entry from the cache and returns it, if it exists.
+ fn remove(&mut self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
+ if let Some((old_metas, _)) = self.lru_queue.remove(k) {
+ self.capacity -= old_metas.len();
+ Some(old_metas)
+ } else {
+ None
+ }
+ }
+
+ /// Returns the number of entries currently cached.
+ fn len(&self) -> usize {
+ self.lru_queue.len()
+ }
+
+ /// Removes all entries from the cache.
+ fn clear(&mut self) {
+ self.lru_queue.clear();
+ self.capacity = 0;
+ }
+}
+
+impl ListFilesCache for DefaultListFilesCache {
+ fn cache_limit(&self) -> usize {
+ let state = self.state.lock().unwrap();
+ state.capacity
+ }
+
+ fn cache_ttl(&self) -> Duration {
+ let state = self.state.lock().unwrap();
+ state.ttl
+ }
+
+ fn update_cache_limit(&self, limit: usize) {
+ let mut state = self.state.lock().unwrap();
+ state.capacity = limit;
+ state.evict_entries();
+ }
}
impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {
type Extra = ObjectMeta;
fn get(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
- self.statistics.get(k).map(|x| Arc::clone(x.value()))
+ let mut state = self.state.lock().unwrap();
+ state.get(k)
}
- fn get_with_extra(
- &self,
- _k: &Path,
- _e: &Self::Extra,
- ) -> Option<Arc<Vec<ObjectMeta>>> {
- panic!("Not supported DefaultListFilesCache get_with_extra")
Review Comment:
I can't remember what the rationale for this panic was. It seems to have
come in via
- https://github.com/apache/datafusion/pull/7620
Maybe @Ted-Jiang or @suremarc have some thoughts here
##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -42,8 +47,18 @@ pub type FileStatisticsCache =
/// especially when done over remote object stores.
///
/// See [`crate::runtime_env::RuntimeEnv`] for more details
-pub type ListFilesCache =
- Arc<dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>>;
+pub trait ListFilesCache:
Review Comment:
I am personally in favor of keeping what is in DataFusion as simple to use /
understand as possible (which i think this and the metadata cache do)
In terms of customizable eviction strategy, as we have mentioned elsewhere
that is already possible today, but it requires effectively copying/forking the
entire cache implementation which adds to the maintenance burden of downstream
projects
However, adding more APIs to DataFusion increases the maintenance burden on
the core project
So I see customizable eviction strategies as a tradeoff. If there are
multiple users who are likely to use a customizable eviction strategy, I agree
it makes sense to put it in the core repo. If there are not that many, I think
it would be better to keep DataFusion simpler and move the burden downstream
for those users who need it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]