BlakeOrth commented on code in PR #19298:
URL: https://github.com/apache/datafusion/pull/19298#discussion_r2615148301
##########
datafusion/datasource/src/url.rs:
##########
@@ -324,33 +324,105 @@ impl ListingTableUrl {
}
}
+/// Lists files with cache support, using prefix-aware lookups.
+///
+/// # Arguments
+/// * `ctx` - The session context
+/// * `store` - The object store to list from
+/// * `full_prefix` - The full prefix to list (table_base + partition prefix)
+/// * `table_base_path` - The table's base path (the stable cache key)
+///
+/// # Cache Behavior :
+/// The cache key is always `table_base_path`. When a partition-specific
listing
+/// is requested (full_prefix includes partition path), the cache:
+/// - Looks up `table_base_path` in the cache
+/// - Filters results to match `full_prefix`
+/// - Returns filtered results without a storage call
+///
+/// On cache miss, the full table is always listed and cached, ensuring
+/// subsequent partition queries can be served from cache.
async fn list_with_cache<'b>(
ctx: &'b dyn Session,
store: &'b dyn ObjectStore,
- prefix: &Path,
+ full_prefix: &Path,
+ table_base_path: &Path,
) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
match ctx.runtime_env().cache_manager.get_list_files_cache() {
None => Ok(store
- .list(Some(prefix))
+ .list(Some(full_prefix))
.map(|res| res.map_err(|e|
DataFusionError::ObjectStore(Box::new(e))))
.boxed()),
Some(cache) => {
- let vec = if let Some(res) = cache.get(prefix) {
- debug!("Hit list all files cache");
+ // Compute the relative prefix (partition path relative to table
base)
+ let relative_prefix = compute_relative_prefix(table_base_path,
full_prefix);
Review Comment:
I'm feeling a bit confused with the structure of this code. The calling
method already has the knowledge of the table's base path vs a user's provided
prefix, and it concatenates them into a full listing path here:
https://github.com/Yuvraj-cyborg/datafusion/blob/47e1bcf9cb91b83fe0b46d7bc0cfa232fa60e2cd/datafusion/datasource/src/url.rs#L248-L254
Rather than performing that concatenation and then deconstructing the path,
can we just pass in `table_base_path` and `prefix`?
##########
datafusion/datasource/src/url.rs:
##########
@@ -324,33 +324,105 @@ impl ListingTableUrl {
}
}
+/// Lists files with cache support, using prefix-aware lookups.
+///
+/// # Arguments
+/// * `ctx` - The session context
+/// * `store` - The object store to list from
+/// * `full_prefix` - The full prefix to list (table_base + partition prefix)
+/// * `table_base_path` - The table's base path (the stable cache key)
+///
+/// # Cache Behavior :
+/// The cache key is always `table_base_path`. When a partition-specific
listing
+/// is requested (full_prefix includes partition path), the cache:
+/// - Looks up `table_base_path` in the cache
+/// - Filters results to match `full_prefix`
+/// - Returns filtered results without a storage call
+///
+/// On cache miss, the full table is always listed and cached, ensuring
+/// subsequent partition queries can be served from cache.
async fn list_with_cache<'b>(
ctx: &'b dyn Session,
store: &'b dyn ObjectStore,
- prefix: &Path,
+ full_prefix: &Path,
+ table_base_path: &Path,
) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
match ctx.runtime_env().cache_manager.get_list_files_cache() {
None => Ok(store
- .list(Some(prefix))
+ .list(Some(full_prefix))
.map(|res| res.map_err(|e|
DataFusionError::ObjectStore(Box::new(e))))
.boxed()),
Some(cache) => {
Review Comment:
Given how critical this functionality is for correct results from a listing
table, and the fact that this method now has additional complexity, I think
some additional tests that exercise the path for `Some(cache)` would be nice.
In my mind it would be good to assert the results from the `Some(cache)` code
path and the `None` code path always generate equivalent outputs.
##########
datafusion/execution/src/cache/list_files_cache.rs:
##########
@@ -172,20 +172,73 @@ impl DefaultListFilesCacheState {
}
}
- /// Returns the respective entry from the cache, if it exists and the entry
- /// has not expired by `now`.
+ /// Performs a prefix-aware cache lookup.
///
- /// If the entry exists it becomes the most recently used. If the entry
has expired it is
- /// removed from the cache
- fn get(&mut self, key: &Path, now: Instant) ->
Option<Arc<Vec<ObjectMeta>>> {
- let entry = self.lru_queue.get(key)?;
+ /// # Arguments
+ /// * `table_base` - The table's base path (the cache key)
+ /// * `prefix` - Optional prefix filter relative to the table base path
+ /// * `now` - Current time for expiration checking
+ ///
+ /// # Behavior
+ /// - Fetches the cache entry for `table_base`
+ /// - If `prefix` is `Some`, filters results to only files matching
`table_base/prefix`
+ /// - Returns the (potentially filtered) results
+ ///
+ /// # Example
+ /// ```text
+ /// get_with_prefix("my_table", Some("a=1"), now)
+ /// → Fetch cache entry for "my_table"
+ /// → Filter to files matching "my_table/a=1/*"
+ /// → Return filtered results
+ /// ```
+ fn get_with_prefix(
+ &mut self,
+ table_base: &Path,
+ prefix: Option<&Path>,
+ now: Instant,
+ ) -> Option<Arc<Vec<ObjectMeta>>> {
+ let entry = self.lru_queue.get(table_base)?;
- match entry.expires {
- Some(exp) if now > exp => {
- self.remove(key);
- None
+ // Check expiration
+ if let Some(exp) = entry.expires
+ && now > exp
+ {
+ self.remove(table_base);
+ return None;
+ }
+
+ match prefix {
+ None => {
+ // No prefix filter, return all files
+ Some(Arc::clone(&entry.metas))
+ }
+ Some(prefix) => {
+ // Build the full prefix path: table_base/prefix
+ let full_prefix = if table_base.as_ref().is_empty() {
Review Comment:
Style nit -- I think this code would format a bit better using a `let-else`
statement rather than a `match`:
```rust
let Some(prefix) = prefix else {
return Some(Arc::clone(&entry.metas))
}
let full_prefix = ...
```
##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -64,9 +64,21 @@ pub struct FileStatisticsCacheEntry {
/// command on the local filesystem. This operation can be expensive,
/// especially when done over remote object stores.
///
+/// The cache key is always the table's base path, ensuring a stable cache key.
+/// The `Extra` type is `Option<Path>`, representing an optional prefix filter
+/// (relative to the table base path) for partition-aware lookups.
+///
+/// When `get_with_extra(key, Some(prefix))` is called:
+/// - The cache entry for `key` (table base path) is fetched
+/// - Results are filtered to only include files matching `key/prefix`
+/// - Filtered results are returned without making a storage call
+///
+/// This enables efficient partition pruning: a single cached listing of the
+/// full table can serve queries for any partition subset.
+///
/// See [`crate::runtime_env::RuntimeEnv`] for more details.
pub trait ListFilesCache:
- CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>
+ CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = Option<Path>>
Review Comment:
Does having `Extra` as `Option<Path>` as opposed to just `Path` buy us
anything here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]