This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 86011519b5 fix: flaky cache test (#19140)
86011519b5 is described below
commit 86011519b5a2e223ff3b31d2aa7765ff08bf4c86
Author: xonx <[email protected]>
AuthorDate: Fri Dec 12 04:25:08 2025 +0530
fix: flaky cache test (#19140)
## Which issue does this PR close?
- Closes https://github.com/apache/datafusion/issues/19114.
## Rationale for this change
The test test_cache_with_ttl_and_lru was flaky and failing
intermittently in CI. It relied on std::thread::sleep and
Instant::now(), which caused race conditions when the test environment
was slow or under load. This PR makes the test correct by removing
reliance on the system clock and thread sleeping.
## What changes are included in this PR?
-> Introduced a TimeProvider trait to abstract time retrieval.
-> Refactored DefaultListFilesCache to use provider.
-> Added DefaultListFilesCache::new_with_provider for testing purposes.
-> Updated test_cache_with_ttl_and_lru to use a MockTimeProvider.
## Are these changes tested?
Yes. I verified the fix by running the specific test locally without
failure.
## Are there any user-facing changes?
No.
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/execution/src/cache/list_files_cache.rs | 128 +++++++++++++++------
1 file changed, 96 insertions(+), 32 deletions(-)
diff --git a/datafusion/execution/src/cache/list_files_cache.rs
b/datafusion/execution/src/cache/list_files_cache.rs
index 8ab6d4b173..c209a01274 100644
--- a/datafusion/execution/src/cache/list_files_cache.rs
+++ b/datafusion/execution/src/cache/list_files_cache.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use std::mem::size_of;
use std::{
sync::{Arc, Mutex},
time::Duration,
@@ -25,6 +26,19 @@ use object_store::{ObjectMeta, path::Path};
use crate::cache::{CacheAccessor, cache_manager::ListFilesCache,
lru_queue::LruQueue};
+pub trait TimeProvider: Send + Sync + 'static {
+ fn now(&self) -> Instant;
+}
+
+#[derive(Debug, Default)]
+pub struct SystemTimeProvider;
+
+impl TimeProvider for SystemTimeProvider {
+ fn now(&self) -> Instant {
+ Instant::now()
+ }
+}
+
/// Default implementation of [`ListFilesCache`]
///
/// Caches file metadata for file listing operations.
@@ -41,9 +55,15 @@ use crate::cache::{CacheAccessor,
cache_manager::ListFilesCache, lru_queue::LruQ
/// Users should use the [`Self::get`] and [`Self::put`] methods. The
/// [`Self::get_with_extra`] and [`Self::put_with_extra`] methods simply call
/// `get` and `put`, respectively.
-#[derive(Default)]
pub struct DefaultListFilesCache {
state: Mutex<DefaultListFilesCacheState>,
+ time_provider: Arc<dyn TimeProvider>,
+}
+
+impl Default for DefaultListFilesCache {
+ fn default() -> Self {
+ Self::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, None)
+ }
}
impl DefaultListFilesCache {
@@ -55,9 +75,16 @@ impl DefaultListFilesCache {
pub fn new(memory_limit: usize, ttl: Option<Duration>) -> Self {
Self {
state: Mutex::new(DefaultListFilesCacheState::new(memory_limit,
ttl)),
+ time_provider: Arc::new(SystemTimeProvider),
}
}
+ #[cfg(test)]
+ pub(crate) fn with_time_provider(mut self, provider: Arc<dyn
TimeProvider>) -> Self {
+ self.time_provider = provider;
+ self
+ }
+
/// Returns the cache's memory limit in bytes.
pub fn cache_limit(&self) -> usize {
self.state.lock().unwrap().memory_limit
@@ -83,14 +110,18 @@ struct ListFilesEntry {
}
impl ListFilesEntry {
- fn try_new(metas: Arc<Vec<ObjectMeta>>, ttl: Option<Duration>) ->
Option<Self> {
+ fn try_new(
+ metas: Arc<Vec<ObjectMeta>>,
+ ttl: Option<Duration>,
+ now: Instant,
+ ) -> Option<Self> {
let size_bytes = (metas.capacity() * size_of::<ObjectMeta>())
+ metas.iter().map(meta_heap_bytes).reduce(|acc, b| acc + b)?;
Some(Self {
metas,
size_bytes,
- expires: ttl.map(|t| Instant::now() + t),
+ expires: ttl.map(|t| now + t),
})
}
}
@@ -141,14 +172,16 @@ impl DefaultListFilesCacheState {
}
}
- /// Returns the respective entry from the cache, if it exists and the
entry has not expired.
+ /// Returns the respective entry from the cache, if it exists and the entry
+ /// has not expired by `now`.
+ ///
/// If the entry exists it becomes the most recently used. If the entry
has expired it is
/// removed from the cache
- fn get(&mut self, key: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
+ fn get(&mut self, key: &Path, now: Instant) ->
Option<Arc<Vec<ObjectMeta>>> {
let entry = self.lru_queue.get(key)?;
match entry.expires {
- Some(exp) if Instant::now() > exp => {
+ Some(exp) if now > exp => {
self.remove(key);
None
}
@@ -156,16 +189,18 @@ impl DefaultListFilesCacheState {
}
}
- /// Checks if the respective entry is currently cached. If the entry has
expired it is removed
- /// from the cache.
+ /// Checks if the respective entry is currently cached.
+ ///
+ /// If the entry has expired by `now` it is removed from the cache.
+ ///
/// The LRU queue is not updated.
- fn contains_key(&mut self, k: &Path) -> bool {
+ fn contains_key(&mut self, k: &Path, now: Instant) -> bool {
let Some(entry) = self.lru_queue.peek(k) else {
return false;
};
match entry.expires {
- Some(exp) if Instant::now() > exp => {
+ Some(exp) if now > exp => {
self.remove(k);
false
}
@@ -173,15 +208,18 @@ impl DefaultListFilesCacheState {
}
}
- /// Adds a new key-value pair to cache, meaning LRU entries might be
evicted if required.
+ /// Adds a new key-value pair to cache expiring at `now` + the TTL.
+ ///
+ /// This means that LRU entries might be evicted if required.
/// If the key is already in the cache, the previous entry is returned.
/// If the size of the entry is greater than the `memory_limit`, the value
is not inserted.
fn put(
&mut self,
key: &Path,
value: Arc<Vec<ObjectMeta>>,
+ now: Instant,
) -> Option<Arc<Vec<ObjectMeta>>> {
- let entry = ListFilesEntry::try_new(value, self.ttl)?;
+ let entry = ListFilesEntry::try_new(value, self.ttl, now)?;
let entry_size = entry.size_bytes;
// no point in trying to add this value to the cache if it cannot fit
entirely
@@ -263,7 +301,8 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for
DefaultListFilesCache {
fn get(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
let mut state = self.state.lock().unwrap();
- state.get(k)
+ let now = self.time_provider.now();
+ state.get(k, now)
}
fn get_with_extra(&self, k: &Path, _e: &Self::Extra) ->
Option<Arc<Vec<ObjectMeta>>> {
@@ -276,7 +315,8 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for
DefaultListFilesCache {
value: Arc<Vec<ObjectMeta>>,
) -> Option<Arc<Vec<ObjectMeta>>> {
let mut state = self.state.lock().unwrap();
- state.put(key, value)
+ let now = self.time_provider.now();
+ state.put(key, value, now)
}
fn put_with_extra(
@@ -295,7 +335,8 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for
DefaultListFilesCache {
fn contains_key(&self, k: &Path) -> bool {
let mut state = self.state.lock().unwrap();
- state.contains_key(k)
+ let now = self.time_provider.now();
+ state.contains_key(k, now)
}
fn len(&self) -> usize {
@@ -319,6 +360,31 @@ mod tests {
use chrono::DateTime;
use std::thread;
+ struct MockTimeProvider {
+ base: Instant,
+ offset: Mutex<Duration>,
+ }
+
+ impl MockTimeProvider {
+ fn new() -> Self {
+ Self {
+ base: Instant::now(),
+ offset: Mutex::new(Duration::ZERO),
+ }
+ }
+
+ fn inc(&self, duration: Duration) {
+ let mut offset = self.offset.lock().unwrap();
+ *offset += duration;
+ }
+ }
+
+ impl TimeProvider for MockTimeProvider {
+ fn now(&self) -> Instant {
+ self.base + *self.offset.lock().unwrap()
+ }
+ }
+
/// Helper function to create a test ObjectMeta with a specific path and
location string size
fn create_test_object_meta(path: &str, location_size: usize) -> ObjectMeta
{
// Create a location string of the desired size by padding with zeros
@@ -565,9 +631,6 @@ mod tests {
}
#[test]
- // Ignored due to flakiness in CI. See
- // https://github.com/apache/datafusion/issues/19114
- #[ignore]
fn test_cache_with_ttl() {
let ttl = Duration::from_millis(100);
let cache = DefaultListFilesCache::new(10000, Some(ttl));
@@ -596,21 +659,21 @@ mod tests {
}
#[test]
- // Ignored due to flakiness in CI. See
- // https://github.com/apache/datafusion/issues/19114
- #[ignore]
fn test_cache_with_ttl_and_lru() {
let ttl = Duration::from_millis(200);
- let cache = DefaultListFilesCache::new(1000, Some(ttl));
+
+ let mock_time = Arc::new(MockTimeProvider::new());
+ let cache = DefaultListFilesCache::new(1000, Some(ttl))
+ .with_time_provider(Arc::clone(&mock_time) as Arc<dyn
TimeProvider>);
let (path1, value1, _) = create_test_list_files_entry("path1", 1, 400);
let (path2, value2, _) = create_test_list_files_entry("path2", 1, 400);
let (path3, value3, _) = create_test_list_files_entry("path3", 1, 400);
cache.put(&path1, value1);
- thread::sleep(Duration::from_millis(50));
+ mock_time.inc(Duration::from_millis(50));
cache.put(&path2, value2);
- thread::sleep(Duration::from_millis(50));
+ mock_time.inc(Duration::from_millis(50));
// path3 should evict path1 due to size limit
cache.put(&path3, value3);
@@ -618,10 +681,10 @@ mod tests {
assert!(cache.contains_key(&path2));
assert!(cache.contains_key(&path3));
- // Wait for path2 to expire
- thread::sleep(Duration::from_millis(150));
+ mock_time.inc(Duration::from_millis(151));
+
assert!(!cache.contains_key(&path2)); // Expired
- assert!(cache.contains_key(&path3)); // Still valid
+ assert!(cache.contains_key(&path3)); // Still valid
}
#[test]
@@ -671,7 +734,8 @@ mod tests {
fn test_entry_creation() {
// Test with empty vector
let empty_vec: Arc<Vec<ObjectMeta>> = Arc::new(vec![]);
- let entry = ListFilesEntry::try_new(empty_vec, None);
+ let now = Instant::now();
+ let entry = ListFilesEntry::try_new(empty_vec, None, now);
assert!(entry.is_none());
// Validate entry size
@@ -679,7 +743,7 @@ mod tests {
.map(|i| create_test_object_meta(&format!("file{i}"), 30))
.collect();
let metas = Arc::new(metas);
- let entry = ListFilesEntry::try_new(metas, None).unwrap();
+ let entry = ListFilesEntry::try_new(metas, None, now).unwrap();
assert_eq!(entry.metas.len(), 5);
// Size should be: capacity * sizeof(ObjectMeta) + (5 * 30) for heap
bytes
let expected_size =
@@ -689,9 +753,9 @@ mod tests {
// Test with TTL
let meta = create_test_object_meta("file", 50);
let ttl = Duration::from_secs(10);
- let entry = ListFilesEntry::try_new(Arc::new(vec![meta]),
Some(ttl)).unwrap();
- let created = Instant::now();
- assert!(entry.expires.unwrap() > created);
+ let entry =
+ ListFilesEntry::try_new(Arc::new(vec![meta]), Some(ttl),
now).unwrap();
+ assert!(entry.expires.unwrap() > now);
}
#[test]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]