alamb commented on code in PR #17031: URL: https://github.com/apache/datafusion/pull/17031#discussion_r2252448318
########## datafusion/datasource-parquet/src/file_format.rs: ########## @@ -449,17 +449,14 @@ impl FileFormat for ParquetFormat { // Use the CachedParquetFileReaderFactory when metadata caching is enabled if self.options.global.cache_metadata { Review Comment: What are your thoughts about (in a follow on PR) removing the `options.cache_metadata` and always trying to save the metadata (which will be a noop if there is no room)? ########## datafusion/execution/src/cache/cache_manager.rs: ########## @@ -65,30 +73,38 @@ impl Debug for dyn FileMetadataCache { } } -#[derive(Default, Debug)] +#[derive(Debug)] pub struct CacheManager { file_statistic_cache: Option<FileStatisticsCache>, list_files_cache: Option<ListFilesCache>, - file_metadata_cache: Option<Arc<dyn FileMetadataCache>>, + file_metadata_cache: Arc<dyn FileMetadataCache>, Review Comment: Seeing the idea of having a default file_metadata_cache installed got me thinking about @BlakeOrth's comment here: https://github.com/apache/datafusion/pull/16971#issuecomment-3145516316 After this work to cache file metadata, it seems like we may want to consider adding default caches for ListFiles and FileStatistics as well 🤔 (as a follow on PR of course) ########## datafusion/execution/src/cache/cache_unit.rs: ########## @@ -158,33 +159,168 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache { } } +/// Handles the inner state of the [`DefaultFilesMetadataCache`] struct. +struct DefaultFilesMetadataCacheState { + lru_cache: LruCache<Path, (ObjectMeta, Arc<dyn FileMetadata>)>, + memory_limit: Option<usize>, + memory_used: usize, +} + +impl DefaultFilesMetadataCacheState { + fn new(memory_limit: Option<usize>) -> Self { + Self { + lru_cache: LruCache::unbounded(), + memory_limit, + memory_used: 0, + } + } + + /// Returns the respective entry from the cache, if it exists and the `size` and `last_modified` + /// properties from [`ObjectMeta`] match. + /// If the entry exists, it becomes the most recently used. + fn get(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> { + self.lru_cache + .get(&k.location) + .map(|(object_meta, metadata)| { + if object_meta.size != k.size + || object_meta.last_modified != k.last_modified + { + None + } else { + Some(Arc::clone(metadata)) + } + }) + .unwrap_or(None) + } + + /// Checks if the metadata is currently cached (entry exists and the `size` and `last_modified` + /// properties of [`ObjectMeta`] match). + /// The LRU queue is not updated. + fn contains_key(&self, k: &ObjectMeta) -> bool { + self.lru_cache + .peek(&k.location) + .map(|(object_meta, _)| { + object_meta.size == k.size && object_meta.last_modified == k.last_modified + }) + .unwrap_or(false) + } + + /// Adds a new key-value pair to cache, meaning LRU entries might be evicted if required. + /// If the key is already in the cache, the previous metadata is returned. + /// If the size of the metadata is greater than the `memory_limit`, the value is not inserted. + fn put( + &mut self, + key: ObjectMeta, + value: Arc<dyn FileMetadata>, + ) -> Option<Arc<dyn FileMetadata>> { + let value_size = value.memory_size(); + + if let Some(limit) = self.memory_limit { + // no point in trying to add this value to the cache if it cannot fit entirely + if value_size > limit { + return None; + } + } + + // if the key is already in the cache, the old value is removed + let old_value = self.lru_cache.put(key.location.clone(), (key, value)); + self.memory_used += value_size; + if let Some((_, ref old_metadata)) = old_value { + self.memory_used -= old_metadata.memory_size(); + } + + self.evict_entries(); + + old_value.map(|v| v.1) + } + + /// Evicts entries from the LRU cache until `memory_used` is lower than `memory_limit`. + /// If `memory_limit` is `None`, no entries are removed. + fn evict_entries(&mut self) { + let Some(memory_limit) = self.memory_limit else { + return; + }; + + while self.memory_used > memory_limit { + if let Some(removed) = self.lru_cache.pop_lru() { + let metadata: Arc<dyn FileMetadata> = removed.1 .1; + self.memory_used -= metadata.memory_size(); + } else { + // cache is empty while memory_used > memory_limit, cannot happen + unreachable!(); Review Comment: would it be safer to `return` here rather than panic!? Maybe as a half way thing we could leave a debug_assert so debug versions hit an error but release builds are unaffected: ```suggestion // use debug assert to find issue in debug builds, but don't panic release builds debug_assert!(false, "cache is empty while memory_used > memory_limit, cannot happen") return; ``` ########## datafusion/execution/src/runtime_env.rs: ########## @@ -337,6 +349,11 @@ impl RuntimeEnvBuilder { key: "datafusion.runtime.temp_directory".to_string(), value: None, // Default is system-dependent description: "The path to the temporary file directory.", + }, + ConfigEntry { + key: "datafusion.runtime.file_metadata_cache_limit".to_string(), + value: Some("1G".to_owned()), + description: "Maximum memory limit for the file-embedded metadata cache. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes.", Review Comment: What would you think about making this configuration setting more general, looking forward to adding more things to this cache over time. For example, how about this: `datafusion.runtime.metadata_cache_limit` "Maximum memory to use for per-file metadata cache such as Parquet metadata. Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: '2G' for 2 gigabytes." ########## Cargo.toml: ########## @@ -153,6 +153,7 @@ hex = { version = "0.4.3" } indexmap = "2.10.0" itertools = "0.14" log = "^0.4" +lru = "0.16.0" Review Comment: I agree it seems to be a reasonable crate. However, I think in general if we can avoid new dependencies in DataFusion that would be good -- our dependency trail is already quite large, and I realize one new dependency doesn't seem like much (but that is what we said when introducing all the existing ones too 😢 ) Note `lru` is also a net new dependency (no existing DataFusion dependency uses it) It also has a bunch of `unsafe` which isn't necessarily a deal breaker itself, but unless it is performance critical I think we should avoid a potential source of crashing / non deterministic bugs ########## Cargo.toml: ########## @@ -153,6 +153,7 @@ hex = { version = "0.4.3" } indexmap = "2.10.0" itertools = "0.14" log = "^0.4" +lru = "0.16.0" Review Comment: However, I did some research and I think implementing a LRU cache in Rust that actually has O(1) properties will be non trivial (there is a good writeup here: https://seanchen1991.github.io/posts/lru-cache/) My personal preference would be to implement something custom but I am really torn about this, especially given it would be nice to implement other LRU caches (like listing and statistics, for example) 🤔 The best I could come up with was using a `HashMap<Path, usize>` that maps to an index, in a VecDequeue that implements the linked list implemented as described in the blog. I don't think it would be simple though ########## datafusion/core/tests/sql/runtime_config.rs: ########## @@ -200,6 +200,40 @@ async fn test_max_temp_directory_size_enforcement() { ); } +#[tokio::test] +async fn test_file_metadata_cache_limit() { + let ctx = SessionContext::new(); + + let update_limit = async |ctx: &SessionContext, limit: &str| { + ctx.sql( + format!("SET datafusion.runtime.file_metadata_cache_limit = '{limit}'") + .as_str(), + ) + .await + .unwrap() + .collect() + .await + .unwrap(); + }; + + let get_limit = |ctx: &SessionContext| -> Option<usize> { + ctx.task_ctx() + .runtime_env() + .cache_manager + .get_file_metadata_cache() + .cache_limit() + }; + + update_limit(&ctx, "100M").await; + assert_eq!(get_limit(&ctx), Some(100 * 1024 * 1024)); + + update_limit(&ctx, "2G").await; + assert_eq!(get_limit(&ctx), Some(2 * 1024 * 1024 * 1024)); + + update_limit(&ctx, "123K").await; Review Comment: nice ########## datafusion/execution/src/cache/cache_manager.rs: ########## @@ -39,12 +39,20 @@ pub trait FileMetadata: Any + Send + Sync { /// Returns the file metadata as [`Any`] so that it can be downcasted to a specific /// implementation. fn as_any(&self) -> &dyn Any; + + /// Returns the size of the metadata in bytes. + fn memory_size(&self) -> usize; } /// Cache to store file-embedded metadata. pub trait FileMetadataCache: CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>, Extra = ObjectMeta> { + // Returns the cache's memory limit in bytes, or `None` for no limit. Review Comment: I don't think the API should allow for unlimited memory usage personally as memory is finite -- if a user has to set a giant number to get a unlimited cache, it will at least be clear they are doing so. With this `Option` API I worry that someone will set it to `None` by mistake thinking that will mean "don't cache anything" and end up with a memory explosion on their hands ```suggestion // Returns the cache's memory limit in bytes, or `None` for no limit. ``` ########## datafusion/core/src/execution/context/mod.rs: ########## @@ -1068,6 +1068,10 @@ impl SessionContext { builder.with_max_temp_directory_size(directory_size as u64) } "temp_directory" => builder.with_temp_file_path(value), + "file_metadata_cache_limit" => { + let limit = Self::parse_memory_limit(value)?; + builder.with_file_metadata_cache_limit(Some(limit)) + } Review Comment: I think having people set the size to something giant (for example, `u64::MAX`) if they really want to allow unbounded memory usage is a reasonable UX. It gives a hint that maybe that isn't a great idea to use an unbounded memory consumer ########## datafusion/execution/src/cache/cache_unit.rs: ########## @@ -215,25 +350,23 @@ impl CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>> for DefaultFilesMetadataCa } fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> { Review Comment: Yeah, I suggest we propose this cleanup in a follow on PR / ticket so we can discuss it separately if desired -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org