alamb commented on code in PR #17031:
URL: https://github.com/apache/datafusion/pull/17031#discussion_r2252448318


##########
datafusion/datasource-parquet/src/file_format.rs:
##########
@@ -449,17 +449,14 @@ impl FileFormat for ParquetFormat {
 
         // Use the CachedParquetFileReaderFactory when metadata caching is 
enabled
         if self.options.global.cache_metadata {

Review Comment:
   What are your thoughts about (in a follow on PR) removing the 
`options.cache_metadata` and always trying to save the metadata (which will be 
a noop if there is no room)?



##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -65,30 +73,38 @@ impl Debug for dyn FileMetadataCache {
     }
 }
 
-#[derive(Default, Debug)]
+#[derive(Debug)]
 pub struct CacheManager {
     file_statistic_cache: Option<FileStatisticsCache>,
     list_files_cache: Option<ListFilesCache>,
-    file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
+    file_metadata_cache: Arc<dyn FileMetadataCache>,

Review Comment:
   Seeing the idea of having a default file_metadata_cache installed got me 
thinking about @BlakeOrth's comment here: 
https://github.com/apache/datafusion/pull/16971#issuecomment-3145516316
   
   After this work to cache file metadata, it seems like we may want to 
consider adding default caches for ListFiles and FileStatistics as well 🤔  (as 
a follow on PR of course)



##########
datafusion/execution/src/cache/cache_unit.rs:
##########
@@ -158,33 +159,168 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for 
DefaultListFilesCache {
     }
 }
 
+/// Handles the inner state of the [`DefaultFilesMetadataCache`] struct.
+struct DefaultFilesMetadataCacheState {
+    lru_cache: LruCache<Path, (ObjectMeta, Arc<dyn FileMetadata>)>,
+    memory_limit: Option<usize>,
+    memory_used: usize,
+}
+
+impl DefaultFilesMetadataCacheState {
+    fn new(memory_limit: Option<usize>) -> Self {
+        Self {
+            lru_cache: LruCache::unbounded(),
+            memory_limit,
+            memory_used: 0,
+        }
+    }
+
+    /// Returns the respective entry from the cache, if it exists and the 
`size` and `last_modified`
+    /// properties from [`ObjectMeta`] match.
+    /// If the entry exists, it becomes the most recently used.
+    fn get(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
+        self.lru_cache
+            .get(&k.location)
+            .map(|(object_meta, metadata)| {
+                if object_meta.size != k.size
+                    || object_meta.last_modified != k.last_modified
+                {
+                    None
+                } else {
+                    Some(Arc::clone(metadata))
+                }
+            })
+            .unwrap_or(None)
+    }
+
+    /// Checks if the metadata is currently cached (entry exists and the 
`size` and `last_modified`
+    /// properties of [`ObjectMeta`] match).
+    /// The LRU queue is not updated.
+    fn contains_key(&self, k: &ObjectMeta) -> bool {
+        self.lru_cache
+            .peek(&k.location)
+            .map(|(object_meta, _)| {
+                object_meta.size == k.size && object_meta.last_modified == 
k.last_modified
+            })
+            .unwrap_or(false)
+    }
+
+    /// Adds a new key-value pair to cache, meaning LRU entries might be 
evicted if required.
+    /// If the key is already in the cache, the previous metadata is returned.
+    /// If the size of the metadata is greater than the `memory_limit`, the 
value is not inserted.
+    fn put(
+        &mut self,
+        key: ObjectMeta,
+        value: Arc<dyn FileMetadata>,
+    ) -> Option<Arc<dyn FileMetadata>> {
+        let value_size = value.memory_size();
+
+        if let Some(limit) = self.memory_limit {
+            // no point in trying to add this value to the cache if it cannot 
fit entirely
+            if value_size > limit {
+                return None;
+            }
+        }
+
+        // if the key is already in the cache, the old value is removed
+        let old_value = self.lru_cache.put(key.location.clone(), (key, value));
+        self.memory_used += value_size;
+        if let Some((_, ref old_metadata)) = old_value {
+            self.memory_used -= old_metadata.memory_size();
+        }
+
+        self.evict_entries();
+
+        old_value.map(|v| v.1)
+    }
+
+    /// Evicts entries from the LRU cache until `memory_used` is lower than 
`memory_limit`.
+    /// If `memory_limit` is `None`, no entries are removed.
+    fn evict_entries(&mut self) {
+        let Some(memory_limit) = self.memory_limit else {
+            return;
+        };
+
+        while self.memory_used > memory_limit {
+            if let Some(removed) = self.lru_cache.pop_lru() {
+                let metadata: Arc<dyn FileMetadata> = removed.1 .1;
+                self.memory_used -= metadata.memory_size();
+            } else {
+                // cache is empty while memory_used > memory_limit, cannot 
happen
+                unreachable!();

Review Comment:
   would it be safer to `return` here rather than panic!?
   
   Maybe as a half way thing we could leave a debug_assert so debug versions 
hit an error but release builds are unaffected:
   
   ```suggestion
                   // use debug assert to find issue in debug builds, but don't 
panic release builds
                   debug_assert!(false, "cache is empty while memory_used > 
memory_limit, cannot happen")
                   return;
   ```



##########
datafusion/execution/src/runtime_env.rs:
##########
@@ -337,6 +349,11 @@ impl RuntimeEnvBuilder {
                 key: "datafusion.runtime.temp_directory".to_string(),
                 value: None, // Default is system-dependent
                 description: "The path to the temporary file directory.",
+            },
+            ConfigEntry {
+                key: 
"datafusion.runtime.file_metadata_cache_limit".to_string(),
+                value: Some("1G".to_owned()),
+                description: "Maximum memory limit for the file-embedded 
metadata cache. Supports suffixes K (kilobytes), M (megabytes), and G 
(gigabytes). Example: '2G' for 2 gigabytes.",

Review Comment:
   What would you think about making this configuration setting more general, 
looking forward to adding more things to this cache over time. For example, how 
about this:
   
   `datafusion.runtime.metadata_cache_limit`
   
   "Maximum memory to use for per-file metadata cache such as Parquet metadata. 
Supports suffixes K (kilobytes), M (megabytes), and G (gigabytes). Example: 
'2G' for 2 gigabytes."



##########
Cargo.toml:
##########
@@ -153,6 +153,7 @@ hex = { version = "0.4.3" }
 indexmap = "2.10.0"
 itertools = "0.14"
 log = "^0.4"
+lru = "0.16.0"

Review Comment:
   I agree it seems to be a reasonable crate. However, I think in general if we 
can avoid new dependencies in DataFusion that would be good -- our dependency 
trail is already quite large, and I realize one new dependency doesn't seem 
like much (but that is what we said when introducing all the existing ones too 
😢 ) 
   
   Note `lru` is also a net new dependency (no existing DataFusion dependency 
uses it)
   
   It also has a bunch of `unsafe` which isn't necessarily a deal breaker 
itself, but unless it is performance critical I think we should avoid a 
potential source of crashing / non deterministic bugs



##########
Cargo.toml:
##########
@@ -153,6 +153,7 @@ hex = { version = "0.4.3" }
 indexmap = "2.10.0"
 itertools = "0.14"
 log = "^0.4"
+lru = "0.16.0"

Review Comment:
   However, I did some research and I think implementing a LRU cache in Rust 
that actually has O(1) properties will be non trivial (there is a good writeup 
here: https://seanchen1991.github.io/posts/lru-cache/)
   
   My personal preference would be to implement something custom but I am 
really torn about this, especially given it would be nice to implement other 
LRU caches (like listing and statistics, for example) 🤔 
   
   The best I could come up with was using a `HashMap<Path, usize>` that maps 
to an index, in a VecDequeue that implements the linked list implemented as 
described in the blog. I don't think it would be simple though
   



##########
datafusion/core/tests/sql/runtime_config.rs:
##########
@@ -200,6 +200,40 @@ async fn test_max_temp_directory_size_enforcement() {
     );
 }
 
+#[tokio::test]
+async fn test_file_metadata_cache_limit() {
+    let ctx = SessionContext::new();
+
+    let update_limit = async |ctx: &SessionContext, limit: &str| {
+        ctx.sql(
+            format!("SET datafusion.runtime.file_metadata_cache_limit = 
'{limit}'")
+                .as_str(),
+        )
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+    };
+
+    let get_limit = |ctx: &SessionContext| -> Option<usize> {
+        ctx.task_ctx()
+            .runtime_env()
+            .cache_manager
+            .get_file_metadata_cache()
+            .cache_limit()
+    };
+
+    update_limit(&ctx, "100M").await;
+    assert_eq!(get_limit(&ctx), Some(100 * 1024 * 1024));
+
+    update_limit(&ctx, "2G").await;
+    assert_eq!(get_limit(&ctx), Some(2 * 1024 * 1024 * 1024));
+
+    update_limit(&ctx, "123K").await;

Review Comment:
   nice



##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -39,12 +39,20 @@ pub trait FileMetadata: Any + Send + Sync {
     /// Returns the file metadata as [`Any`] so that it can be downcasted to a 
specific
     /// implementation.
     fn as_any(&self) -> &dyn Any;
+
+    /// Returns the size of the metadata in bytes.
+    fn memory_size(&self) -> usize;
 }
 
 /// Cache to store file-embedded metadata.
 pub trait FileMetadataCache:
     CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>, Extra = ObjectMeta>
 {
+    // Returns the cache's memory limit in bytes, or `None` for no limit.

Review Comment:
   I don't think the API should allow for unlimited memory usage personally as 
memory is finite -- if a user has to set a giant number to get a unlimited 
cache, it will at least be clear they are doing so. 
   
   With this `Option` API I worry that someone will set it to `None` by mistake 
thinking that will mean "don't cache anything" and end up with a memory 
explosion on their hands
   
   ```suggestion
       // Returns the cache's memory limit in bytes, or `None` for no limit.
   ```



##########
datafusion/core/src/execution/context/mod.rs:
##########
@@ -1068,6 +1068,10 @@ impl SessionContext {
                 builder.with_max_temp_directory_size(directory_size as u64)
             }
             "temp_directory" => builder.with_temp_file_path(value),
+            "file_metadata_cache_limit" => {
+                let limit = Self::parse_memory_limit(value)?;
+                builder.with_file_metadata_cache_limit(Some(limit))
+            }

Review Comment:
   I think having people set the size to something giant (for example, 
`u64::MAX`) if they really want to allow unbounded memory usage is a reasonable 
UX. It gives a hint that maybe that isn't a great idea to use an unbounded 
memory consumer



##########
datafusion/execution/src/cache/cache_unit.rs:
##########
@@ -215,25 +350,23 @@ impl CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>> for 
DefaultFilesMetadataCa
     }
 
     fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {

Review Comment:
   Yeah, I suggest we propose this cleanup in a follow on PR / ticket so we can 
discuss it separately if desired



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to