alamb commented on code in PR #17031:
URL: https://github.com/apache/datafusion/pull/17031#discussion_r2360642526


##########
datafusion/execution/src/cache/cache_unit.rs:
##########
@@ -158,33 +159,165 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for 
DefaultListFilesCache {
     }
 }
 
+/// Handles the inner state of the [`DefaultFilesMetadataCache`] struct.
+struct DefaultFilesMetadataCacheState {
+    lru_queue: LruQueue<Path, (ObjectMeta, Arc<dyn FileMetadata>)>,
+    memory_limit: usize,
+    memory_used: usize,
+}
+
+impl DefaultFilesMetadataCacheState {
+    fn new(memory_limit: usize) -> Self {
+        Self {
+            lru_queue: LruQueue::new(),
+            memory_limit,
+            memory_used: 0,
+        }
+    }
+
+    /// Returns the respective entry from the cache, if it exists and the 
`size` and `last_modified`
+    /// properties from [`ObjectMeta`] match.
+    /// If the entry exists, it becomes the most recently used.
+    fn get(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
+        self.lru_queue
+            .get(&k.location)
+            .map(|(object_meta, metadata)| {
+                if object_meta.size != k.size
+                    || object_meta.last_modified != k.last_modified
+                {
+                    None
+                } else {
+                    Some(Arc::clone(metadata))
+                }
+            })
+            .unwrap_or(None)
+    }
+
+    /// Checks if the metadata is currently cached (entry exists and the 
`size` and `last_modified`
+    /// properties of [`ObjectMeta`] match).
+    /// The LRU queue is not updated.
+    fn contains_key(&self, k: &ObjectMeta) -> bool {
+        self.lru_queue
+            .peek(&k.location)
+            .map(|(object_meta, _)| {
+                object_meta.size == k.size && object_meta.last_modified == 
k.last_modified
+            })
+            .unwrap_or(false)
+    }
+
+    /// Adds a new key-value pair to cache, meaning LRU entries might be 
evicted if required.
+    /// If the key is already in the cache, the previous metadata is returned.
+    /// If the size of the metadata is greater than the `memory_limit`, the 
value is not inserted.
+    fn put(
+        &mut self,
+        key: ObjectMeta,
+        value: Arc<dyn FileMetadata>,
+    ) -> Option<Arc<dyn FileMetadata>> {
+        let value_size = value.memory_size();
+
+        // no point in trying to add this value to the cache if it cannot fit 
entirely
+        if value_size > self.memory_limit {
+            return None;
+        }
+
+        // if the key is already in the cache, the old value is removed
+        let old_value = self.lru_queue.put(key.location.clone(), (key, value));
+        self.memory_used += value_size;
+        if let Some((_, ref old_metadata)) = old_value {
+            self.memory_used -= old_metadata.memory_size();
+        }
+
+        self.evict_entries();
+
+        old_value.map(|v| v.1)
+    }
+
+    /// Evicts entries from the LRU cache until `memory_used` is lower than 
`memory_limit`.
+    fn evict_entries(&mut self) {
+        while self.memory_used > self.memory_limit {
+            if let Some(removed) = self.lru_queue.pop() {
+                let metadata: Arc<dyn FileMetadata> = removed.1 .1;
+                self.memory_used -= metadata.memory_size();
+            } else {
+                // cache is empty while memory_used > memory_limit, cannot 
happen
+                debug_assert!(
+                    false,
+                    "cache is empty while memory_used > memory_limit, cannot 
happen"
+                );
+                return;
+            }
+        }
+    }
+
+    /// Removes an entry from the cache and returns it, if it exists.
+    fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
+        if let Some((_, old_metadata)) = self.lru_queue.remove(&k.location) {
+            self.memory_used -= old_metadata.memory_size();
+            Some(old_metadata)
+        } else {
+            None
+        }
+    }
+
+    /// Returns the number of entries currently cached.
+    fn len(&self) -> usize {
+        self.lru_queue.len()
+    }
+
+    /// Removes all entries from the cache.
+    fn clear(&mut self) {
+        self.lru_queue.clear();
+        self.memory_used = 0;
+    }
+}
+
 /// Collected file embedded metadata cache.
 /// The metadata for some file is invalided when the file size or last 
modification time have been
 /// changed.
+/// The `memory_limit` passed in the constructor controls the maximum size of 
the cache, which uses
+/// a Least Recently Used eviction algorithm.
 /// Users should use the `get` and `put` methods. The `get_with_extra` and 
`put_with_extra` methods
 /// simply call `get` and `put`, respectively.
-#[derive(Default)]
 pub struct DefaultFilesMetadataCache {
-    metadata: DashMap<Path, (ObjectMeta, Arc<dyn FileMetadata>)>,
+    // the state is wrapped in a Mutex to ensure the operations are atomic
+    state: Mutex<DefaultFilesMetadataCacheState>,
+}
+
+impl DefaultFilesMetadataCache {
+    /// The `memory_limit` parameter controls the maximum size of the cache, 
in bytes, using a Least
+    /// Recently Used eviction algorithm.
+    pub fn new(memory_limit: usize) -> Self {
+        Self {
+            state: 
Mutex::new(DefaultFilesMetadataCacheState::new(memory_limit)),
+        }
+    }
+
+    /// Returns the size of the cached memory, in bytes.
+    pub fn memory_used(&self) -> usize {
+        let state = self.state.lock().unwrap();
+        state.memory_used
+    }
 }
 
-impl FileMetadataCache for DefaultFilesMetadataCache {}
+impl FileMetadataCache for DefaultFilesMetadataCache {
+    fn cache_limit(&self) -> usize {
+        let state = self.state.lock().unwrap();
+        state.memory_limit
+    }
+
+    fn update_cache_limit(&self, limit: usize) {
+        let mut state = self.state.lock().unwrap();
+        state.memory_limit = limit;
+        state.evict_entries();
+    }
+}
 
 impl CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>> for 
DefaultFilesMetadataCache {
     type Extra = ObjectMeta;
 
     fn get(&self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
-        self.metadata
-            .get(&k.location)
-            .map(|s| {
-                let (extra, metadata) = s.value();
-                if extra.size != k.size || extra.last_modified != 
k.last_modified {
-                    None
-                } else {
-                    Some(Arc::clone(metadata))
-                }
-            })
-            .unwrap_or(None)
+        let mut state = self.state.lock().unwrap();
+        state.get(k)

Review Comment:
   Tanks @abhita ! I think if you are interested in alternative approaches the 
best thing to do would be 
   1. Create some sort of benchmark (maybe make 10k little parquet files with 
small metadata) that shows cache management consuming significant time
   
   Then you could explore various strategies to improve



##########
datafusion/execution/src/cache/cache_unit.rs:
##########
@@ -158,33 +159,165 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for 
DefaultListFilesCache {
     }
 }
 
+/// Handles the inner state of the [`DefaultFilesMetadataCache`] struct.
+struct DefaultFilesMetadataCacheState {
+    lru_queue: LruQueue<Path, (ObjectMeta, Arc<dyn FileMetadata>)>,
+    memory_limit: usize,
+    memory_used: usize,
+}
+
+impl DefaultFilesMetadataCacheState {
+    fn new(memory_limit: usize) -> Self {
+        Self {
+            lru_queue: LruQueue::new(),
+            memory_limit,
+            memory_used: 0,
+        }
+    }
+
+    /// Returns the respective entry from the cache, if it exists and the 
`size` and `last_modified`
+    /// properties from [`ObjectMeta`] match.
+    /// If the entry exists, it becomes the most recently used.
+    fn get(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
+        self.lru_queue
+            .get(&k.location)
+            .map(|(object_meta, metadata)| {
+                if object_meta.size != k.size
+                    || object_meta.last_modified != k.last_modified
+                {
+                    None
+                } else {
+                    Some(Arc::clone(metadata))
+                }
+            })
+            .unwrap_or(None)
+    }
+
+    /// Checks if the metadata is currently cached (entry exists and the 
`size` and `last_modified`
+    /// properties of [`ObjectMeta`] match).
+    /// The LRU queue is not updated.
+    fn contains_key(&self, k: &ObjectMeta) -> bool {
+        self.lru_queue
+            .peek(&k.location)
+            .map(|(object_meta, _)| {
+                object_meta.size == k.size && object_meta.last_modified == 
k.last_modified
+            })
+            .unwrap_or(false)
+    }
+
+    /// Adds a new key-value pair to cache, meaning LRU entries might be 
evicted if required.
+    /// If the key is already in the cache, the previous metadata is returned.
+    /// If the size of the metadata is greater than the `memory_limit`, the 
value is not inserted.
+    fn put(
+        &mut self,
+        key: ObjectMeta,
+        value: Arc<dyn FileMetadata>,
+    ) -> Option<Arc<dyn FileMetadata>> {
+        let value_size = value.memory_size();
+
+        // no point in trying to add this value to the cache if it cannot fit 
entirely
+        if value_size > self.memory_limit {
+            return None;
+        }
+
+        // if the key is already in the cache, the old value is removed
+        let old_value = self.lru_queue.put(key.location.clone(), (key, value));
+        self.memory_used += value_size;
+        if let Some((_, ref old_metadata)) = old_value {
+            self.memory_used -= old_metadata.memory_size();
+        }
+
+        self.evict_entries();
+
+        old_value.map(|v| v.1)
+    }
+
+    /// Evicts entries from the LRU cache until `memory_used` is lower than 
`memory_limit`.
+    fn evict_entries(&mut self) {
+        while self.memory_used > self.memory_limit {
+            if let Some(removed) = self.lru_queue.pop() {
+                let metadata: Arc<dyn FileMetadata> = removed.1 .1;
+                self.memory_used -= metadata.memory_size();
+            } else {
+                // cache is empty while memory_used > memory_limit, cannot 
happen
+                debug_assert!(
+                    false,
+                    "cache is empty while memory_used > memory_limit, cannot 
happen"
+                );
+                return;
+            }
+        }
+    }
+
+    /// Removes an entry from the cache and returns it, if it exists.
+    fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
+        if let Some((_, old_metadata)) = self.lru_queue.remove(&k.location) {
+            self.memory_used -= old_metadata.memory_size();
+            Some(old_metadata)
+        } else {
+            None
+        }
+    }
+
+    /// Returns the number of entries currently cached.
+    fn len(&self) -> usize {
+        self.lru_queue.len()
+    }
+
+    /// Removes all entries from the cache.
+    fn clear(&mut self) {
+        self.lru_queue.clear();
+        self.memory_used = 0;
+    }
+}
+
 /// Collected file embedded metadata cache.
 /// The metadata for some file is invalided when the file size or last 
modification time have been
 /// changed.
+/// The `memory_limit` passed in the constructor controls the maximum size of 
the cache, which uses
+/// a Least Recently Used eviction algorithm.
 /// Users should use the `get` and `put` methods. The `get_with_extra` and 
`put_with_extra` methods
 /// simply call `get` and `put`, respectively.
-#[derive(Default)]
 pub struct DefaultFilesMetadataCache {
-    metadata: DashMap<Path, (ObjectMeta, Arc<dyn FileMetadata>)>,
+    // the state is wrapped in a Mutex to ensure the operations are atomic
+    state: Mutex<DefaultFilesMetadataCacheState>,
+}
+
+impl DefaultFilesMetadataCache {
+    /// The `memory_limit` parameter controls the maximum size of the cache, 
in bytes, using a Least
+    /// Recently Used eviction algorithm.
+    pub fn new(memory_limit: usize) -> Self {
+        Self {
+            state: 
Mutex::new(DefaultFilesMetadataCacheState::new(memory_limit)),
+        }
+    }
+
+    /// Returns the size of the cached memory, in bytes.
+    pub fn memory_used(&self) -> usize {
+        let state = self.state.lock().unwrap();
+        state.memory_used
+    }
 }
 
-impl FileMetadataCache for DefaultFilesMetadataCache {}
+impl FileMetadataCache for DefaultFilesMetadataCache {
+    fn cache_limit(&self) -> usize {
+        let state = self.state.lock().unwrap();
+        state.memory_limit
+    }
+
+    fn update_cache_limit(&self, limit: usize) {
+        let mut state = self.state.lock().unwrap();
+        state.memory_limit = limit;
+        state.evict_entries();
+    }
+}
 
 impl CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>> for 
DefaultFilesMetadataCache {
     type Extra = ObjectMeta;
 
     fn get(&self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
-        self.metadata
-            .get(&k.location)
-            .map(|s| {
-                let (extra, metadata) = s.value();
-                if extra.size != k.size || extra.last_modified != 
k.last_modified {
-                    None
-                } else {
-                    Some(Arc::clone(metadata))
-                }
-            })
-            .unwrap_or(None)
+        let mut state = self.state.lock().unwrap();
+        state.get(k)

Review Comment:
   Tanks @abhita ! I think if you are interested in alternative approaches the 
best thing to do would be 
   1. Create some sort of benchmark (maybe make 10k little parquet files with 
small metadata) that shows cache management consuming significant time
   
   Then you could explore various strategies to improve the performance



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to