This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 86011519b5 fix: flaky cache test (#19140)
86011519b5 is described below

commit 86011519b5a2e223ff3b31d2aa7765ff08bf4c86
Author: xonx <[email protected]>
AuthorDate: Fri Dec 12 04:25:08 2025 +0530

    fix: flaky cache test (#19140)
    
    ## Which issue does this PR close?
    - Closes https://github.com/apache/datafusion/issues/19114.
    
    ## Rationale for this change
    The test test_cache_with_ttl_and_lru was flaky and failing
    intermittently in CI. It relied on std::thread::sleep and
    Instant::now(), which caused race conditions when the test environment
    was slow or under load. This PR makes the test correct by removing
    reliance on the system clock and thread sleeping.
    
    ## What changes are included in this PR?
    -> Introduced a TimeProvider trait to abstract time retrieval.
    
    -> Refactored DefaultListFilesCache to use  provider.
    
    -> Added DefaultListFilesCache::new_with_provider for testing purposes.
    
    -> Updated test_cache_with_ttl_and_lru to use a MockTimeProvider.
    
    ## Are these changes tested?
    Yes. I verified the fix by running the specific test locally without
    failure.
    
    ## Are there any user-facing changes?
    No.
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/execution/src/cache/list_files_cache.rs | 128 +++++++++++++++------
 1 file changed, 96 insertions(+), 32 deletions(-)

diff --git a/datafusion/execution/src/cache/list_files_cache.rs 
b/datafusion/execution/src/cache/list_files_cache.rs
index 8ab6d4b173..c209a01274 100644
--- a/datafusion/execution/src/cache/list_files_cache.rs
+++ b/datafusion/execution/src/cache/list_files_cache.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::mem::size_of;
 use std::{
     sync::{Arc, Mutex},
     time::Duration,
@@ -25,6 +26,19 @@ use object_store::{ObjectMeta, path::Path};
 
 use crate::cache::{CacheAccessor, cache_manager::ListFilesCache, 
lru_queue::LruQueue};
 
+pub trait TimeProvider: Send + Sync + 'static {
+    fn now(&self) -> Instant;
+}
+
+#[derive(Debug, Default)]
+pub struct SystemTimeProvider;
+
+impl TimeProvider for SystemTimeProvider {
+    fn now(&self) -> Instant {
+        Instant::now()
+    }
+}
+
 /// Default implementation of [`ListFilesCache`]
 ///
 /// Caches file metadata for file listing operations.
@@ -41,9 +55,15 @@ use crate::cache::{CacheAccessor, 
cache_manager::ListFilesCache, lru_queue::LruQ
 /// Users should use the [`Self::get`] and [`Self::put`] methods. The
 /// [`Self::get_with_extra`] and [`Self::put_with_extra`] methods simply call
 /// `get` and `put`, respectively.
-#[derive(Default)]
 pub struct DefaultListFilesCache {
     state: Mutex<DefaultListFilesCacheState>,
+    time_provider: Arc<dyn TimeProvider>,
+}
+
+impl Default for DefaultListFilesCache {
+    fn default() -> Self {
+        Self::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, None)
+    }
 }
 
 impl DefaultListFilesCache {
@@ -55,9 +75,16 @@ impl DefaultListFilesCache {
     pub fn new(memory_limit: usize, ttl: Option<Duration>) -> Self {
         Self {
             state: Mutex::new(DefaultListFilesCacheState::new(memory_limit, 
ttl)),
+            time_provider: Arc::new(SystemTimeProvider),
         }
     }
 
+    #[cfg(test)]
+    pub(crate) fn with_time_provider(mut self, provider: Arc<dyn 
TimeProvider>) -> Self {
+        self.time_provider = provider;
+        self
+    }
+
     /// Returns the cache's memory limit in bytes.
     pub fn cache_limit(&self) -> usize {
         self.state.lock().unwrap().memory_limit
@@ -83,14 +110,18 @@ struct ListFilesEntry {
 }
 
 impl ListFilesEntry {
-    fn try_new(metas: Arc<Vec<ObjectMeta>>, ttl: Option<Duration>) -> 
Option<Self> {
+    fn try_new(
+        metas: Arc<Vec<ObjectMeta>>,
+        ttl: Option<Duration>,
+        now: Instant,
+    ) -> Option<Self> {
         let size_bytes = (metas.capacity() * size_of::<ObjectMeta>())
             + metas.iter().map(meta_heap_bytes).reduce(|acc, b| acc + b)?;
 
         Some(Self {
             metas,
             size_bytes,
-            expires: ttl.map(|t| Instant::now() + t),
+            expires: ttl.map(|t| now + t),
         })
     }
 }
@@ -141,14 +172,16 @@ impl DefaultListFilesCacheState {
         }
     }
 
-    /// Returns the respective entry from the cache, if it exists and the 
entry has not expired.
+    /// Returns the respective entry from the cache, if it exists and the entry
+    /// has not expired by `now`.
+    ///
     /// If the entry exists it becomes the most recently used. If the entry 
has expired it is
     /// removed from the cache
-    fn get(&mut self, key: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
+    fn get(&mut self, key: &Path, now: Instant) -> 
Option<Arc<Vec<ObjectMeta>>> {
         let entry = self.lru_queue.get(key)?;
 
         match entry.expires {
-            Some(exp) if Instant::now() > exp => {
+            Some(exp) if now > exp => {
                 self.remove(key);
                 None
             }
@@ -156,16 +189,18 @@ impl DefaultListFilesCacheState {
         }
     }
 
-    /// Checks if the respective entry is currently cached. If the entry has 
expired it is removed
-    /// from the cache.
+    /// Checks if the respective entry is currently cached.
+    ///
+    /// If the entry has expired by `now` it is removed from the cache.
+    ///
     /// The LRU queue is not updated.
-    fn contains_key(&mut self, k: &Path) -> bool {
+    fn contains_key(&mut self, k: &Path, now: Instant) -> bool {
         let Some(entry) = self.lru_queue.peek(k) else {
             return false;
         };
 
         match entry.expires {
-            Some(exp) if Instant::now() > exp => {
+            Some(exp) if now > exp => {
                 self.remove(k);
                 false
             }
@@ -173,15 +208,18 @@ impl DefaultListFilesCacheState {
         }
     }
 
-    /// Adds a new key-value pair to cache, meaning LRU entries might be 
evicted if required.
+    /// Adds a new key-value pair to cache expiring at `now` + the TTL.
+    ///
+    /// This means that LRU entries might be evicted if required.
     /// If the key is already in the cache, the previous entry is returned.
     /// If the size of the entry is greater than the `memory_limit`, the value 
is not inserted.
     fn put(
         &mut self,
         key: &Path,
         value: Arc<Vec<ObjectMeta>>,
+        now: Instant,
     ) -> Option<Arc<Vec<ObjectMeta>>> {
-        let entry = ListFilesEntry::try_new(value, self.ttl)?;
+        let entry = ListFilesEntry::try_new(value, self.ttl, now)?;
         let entry_size = entry.size_bytes;
 
         // no point in trying to add this value to the cache if it cannot fit 
entirely
@@ -263,7 +301,8 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for 
DefaultListFilesCache {
 
     fn get(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
         let mut state = self.state.lock().unwrap();
-        state.get(k)
+        let now = self.time_provider.now();
+        state.get(k, now)
     }
 
     fn get_with_extra(&self, k: &Path, _e: &Self::Extra) -> 
Option<Arc<Vec<ObjectMeta>>> {
@@ -276,7 +315,8 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for 
DefaultListFilesCache {
         value: Arc<Vec<ObjectMeta>>,
     ) -> Option<Arc<Vec<ObjectMeta>>> {
         let mut state = self.state.lock().unwrap();
-        state.put(key, value)
+        let now = self.time_provider.now();
+        state.put(key, value, now)
     }
 
     fn put_with_extra(
@@ -295,7 +335,8 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for 
DefaultListFilesCache {
 
     fn contains_key(&self, k: &Path) -> bool {
         let mut state = self.state.lock().unwrap();
-        state.contains_key(k)
+        let now = self.time_provider.now();
+        state.contains_key(k, now)
     }
 
     fn len(&self) -> usize {
@@ -319,6 +360,31 @@ mod tests {
     use chrono::DateTime;
     use std::thread;
 
+    struct MockTimeProvider {
+        base: Instant,
+        offset: Mutex<Duration>,
+    }
+
+    impl MockTimeProvider {
+        fn new() -> Self {
+            Self {
+                base: Instant::now(),
+                offset: Mutex::new(Duration::ZERO),
+            }
+        }
+
+        fn inc(&self, duration: Duration) {
+            let mut offset = self.offset.lock().unwrap();
+            *offset += duration;
+        }
+    }
+
+    impl TimeProvider for MockTimeProvider {
+        fn now(&self) -> Instant {
+            self.base + *self.offset.lock().unwrap()
+        }
+    }
+
     /// Helper function to create a test ObjectMeta with a specific path and 
location string size
     fn create_test_object_meta(path: &str, location_size: usize) -> ObjectMeta 
{
         // Create a location string of the desired size by padding with zeros
@@ -565,9 +631,6 @@ mod tests {
     }
 
     #[test]
-    // Ignored due to flakiness in CI. See
-    // https://github.com/apache/datafusion/issues/19114
-    #[ignore]
     fn test_cache_with_ttl() {
         let ttl = Duration::from_millis(100);
         let cache = DefaultListFilesCache::new(10000, Some(ttl));
@@ -596,21 +659,21 @@ mod tests {
     }
 
     #[test]
-    // Ignored due to flakiness in CI. See
-    // https://github.com/apache/datafusion/issues/19114
-    #[ignore]
     fn test_cache_with_ttl_and_lru() {
         let ttl = Duration::from_millis(200);
-        let cache = DefaultListFilesCache::new(1000, Some(ttl));
+
+        let mock_time = Arc::new(MockTimeProvider::new());
+        let cache = DefaultListFilesCache::new(1000, Some(ttl))
+            .with_time_provider(Arc::clone(&mock_time) as Arc<dyn 
TimeProvider>);
 
         let (path1, value1, _) = create_test_list_files_entry("path1", 1, 400);
         let (path2, value2, _) = create_test_list_files_entry("path2", 1, 400);
         let (path3, value3, _) = create_test_list_files_entry("path3", 1, 400);
 
         cache.put(&path1, value1);
-        thread::sleep(Duration::from_millis(50));
+        mock_time.inc(Duration::from_millis(50));
         cache.put(&path2, value2);
-        thread::sleep(Duration::from_millis(50));
+        mock_time.inc(Duration::from_millis(50));
 
         // path3 should evict path1 due to size limit
         cache.put(&path3, value3);
@@ -618,10 +681,10 @@ mod tests {
         assert!(cache.contains_key(&path2));
         assert!(cache.contains_key(&path3));
 
-        // Wait for path2 to expire
-        thread::sleep(Duration::from_millis(150));
+        mock_time.inc(Duration::from_millis(151));
+
         assert!(!cache.contains_key(&path2)); // Expired
-        assert!(cache.contains_key(&path3)); // Still valid
+        assert!(cache.contains_key(&path3)); // Still valid 
     }
 
     #[test]
@@ -671,7 +734,8 @@ mod tests {
     fn test_entry_creation() {
         // Test with empty vector
         let empty_vec: Arc<Vec<ObjectMeta>> = Arc::new(vec![]);
-        let entry = ListFilesEntry::try_new(empty_vec, None);
+        let now = Instant::now();
+        let entry = ListFilesEntry::try_new(empty_vec, None, now);
         assert!(entry.is_none());
 
         // Validate entry size
@@ -679,7 +743,7 @@ mod tests {
             .map(|i| create_test_object_meta(&format!("file{i}"), 30))
             .collect();
         let metas = Arc::new(metas);
-        let entry = ListFilesEntry::try_new(metas, None).unwrap();
+        let entry = ListFilesEntry::try_new(metas, None, now).unwrap();
         assert_eq!(entry.metas.len(), 5);
         // Size should be: capacity * sizeof(ObjectMeta) + (5 * 30) for heap 
bytes
         let expected_size =
@@ -689,9 +753,9 @@ mod tests {
         // Test with TTL
         let meta = create_test_object_meta("file", 50);
         let ttl = Duration::from_secs(10);
-        let entry = ListFilesEntry::try_new(Arc::new(vec![meta]), 
Some(ttl)).unwrap();
-        let created = Instant::now();
-        assert!(entry.expires.unwrap() > created);
+        let entry =
+            ListFilesEntry::try_new(Arc::new(vec![meta]), Some(ttl), 
now).unwrap();
+        assert!(entry.expires.unwrap() > now);
     }
 
     #[test]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to