This is an automated email from the ASF dual-hosted git repository.
xudong963 pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-52 by this push:
new 1ac18a3aff [branch-52] Backport `list_files_cache`, and make default
ListingFilesCache table scoped (#19704)
1ac18a3aff is described below
commit 1ac18a3affa740051b7e0c6375a2aa1b42fce2d9
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Jan 9 04:14:16 2026 -0500
[branch-52] Backport `list_files_cache`, and make default ListingFilesCache
table scoped (#19704)
## Which issue does this PR close?
- part of https://github.com/apache/datafusion/issues/18566
## Rationale for this change
Backport the fix for this regression into 52 release branch:
- https://github.com/apache/datafusion/issues/19573
## What changes are included in this PR?
Backport these two commits to `branch-52` (cherry-pick was clean)
- 1037f0a / #19388
- e6049de / #19616
<details><summary>Commands</summary>
<p>
```shell
andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion$ git cherry-pick
1037f0a
[branch-52 1fc70ac20] feat: add list_files_cache table function for
`datafusion-cli` (#19388)
Author: jizezhang <[email protected]>
Date: Tue Jan 6 05:23:39 2026 -0800
5 files changed, 446 insertions(+), 31 deletions(-)
andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion$ git cherry-pick
e6049de
Auto-merging datafusion/core/src/execution/context/mod.rs
[branch-52 aa3d413f0] Make default ListingFilesCache table scoped (#19616)
Author: jizezhang <[email protected]>
Date: Thu Jan 8 06:34:10 2026 -0800
10 files changed, 474 insertions(+), 184 deletions(-)
```
</p>
</details>
## Are these changes tested?
By CI and new tests
## Are there any user-facing changes?
A new datafusion-cli function and dropping a external table now clears
the listing cache
---------
Co-authored-by: jizezhang <[email protected]>
---
datafusion-cli/src/functions.rs | 185 ++++++-
datafusion-cli/src/main.rs | 113 +++-
datafusion/catalog-listing/src/table.rs | 7 +-
.../core/src/datasource/listing_table_factory.rs | 7 +-
datafusion/core/src/execution/context/mod.rs | 7 +-
datafusion/datasource/src/url.rs | 49 +-
datafusion/execution/src/cache/cache_manager.rs | 10 +-
datafusion/execution/src/cache/list_files_cache.rs | 566 ++++++++++++++++-----
datafusion/execution/src/cache/mod.rs | 1 +
docs/source/user-guide/cli/functions.md | 50 ++
10 files changed, 850 insertions(+), 145 deletions(-)
diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs
index a45d57e8e9..8a6ad448d8 100644
--- a/datafusion-cli/src/functions.rs
+++ b/datafusion-cli/src/functions.rs
@@ -17,13 +17,18 @@
//! Functions that are query-able and searchable via the `\h` command
+use datafusion_common::instant::Instant;
use std::fmt;
use std::fs::File;
use std::str::FromStr;
use std::sync::Arc;
-use arrow::array::{Int64Array, StringArray, TimestampMillisecondArray,
UInt64Array};
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
+use arrow::array::{
+ DurationMillisecondArray, GenericListArray, Int64Array, StringArray,
StructArray,
+ TimestampMillisecondArray, UInt64Array,
+};
+use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer};
+use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use datafusion::catalog::{Session, TableFunctionImpl};
@@ -697,3 +702,179 @@ impl TableFunctionImpl for StatisticsCacheFunc {
Ok(Arc::new(statistics_cache))
}
}
+
+// Implementation of the `list_files_cache` table function in datafusion-cli.
+///
+/// This function returns the cached results of running a LIST command on a
particular object store path for a table. The object metadata is returned as a
List of Structs, with one Struct for each object.
+/// DataFusion uses these cached results to plan queries against external
tables.
+/// # Schema
+/// ```sql
+/// > describe select * from list_files_cache();
+///
+---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
+/// | column_name | data_type
| is_nullable |
+///
+---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
+/// | table | Utf8
| NO |
+/// | path | Utf8
| NO |
+/// | metadata_size_bytes | UInt64
| NO |
+/// | expires_in | Duration(ms)
| YES |
+/// | metadata_list | List(Struct("file_path": non-null Utf8,
"file_modified": non-null Timestamp(ms), "file_size_bytes": non-null UInt64,
"e_tag": Utf8, "version": Utf8), field: 'metadata') | YES |
+///
+---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
+/// ```
+#[derive(Debug)]
+struct ListFilesCacheTable {
+ schema: SchemaRef,
+ batch: RecordBatch,
+}
+
+#[async_trait]
+impl TableProvider for ListFilesCacheTable {
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ fn schema(&self) -> arrow::datatypes::SchemaRef {
+ self.schema.clone()
+ }
+
+ fn table_type(&self) -> datafusion::logical_expr::TableType {
+ datafusion::logical_expr::TableType::Base
+ }
+
+ async fn scan(
+ &self,
+ _state: &dyn Session,
+ projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(MemorySourceConfig::try_new_exec(
+ &[vec![self.batch.clone()]],
+ TableProvider::schema(self),
+ projection.cloned(),
+ )?)
+ }
+}
+
+#[derive(Debug)]
+pub struct ListFilesCacheFunc {
+ cache_manager: Arc<CacheManager>,
+}
+
+impl ListFilesCacheFunc {
+ pub fn new(cache_manager: Arc<CacheManager>) -> Self {
+ Self { cache_manager }
+ }
+}
+
+impl TableFunctionImpl for ListFilesCacheFunc {
+ fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
+ if !exprs.is_empty() {
+ return plan_err!("list_files_cache should have no arguments");
+ }
+
+ let nested_fields = Fields::from(vec![
+ Field::new("file_path", DataType::Utf8, false),
+ Field::new(
+ "file_modified",
+ DataType::Timestamp(TimeUnit::Millisecond, None),
+ false,
+ ),
+ Field::new("file_size_bytes", DataType::UInt64, false),
+ Field::new("e_tag", DataType::Utf8, true),
+ Field::new("version", DataType::Utf8, true),
+ ]);
+
+ let metadata_field =
+ Field::new("metadata", DataType::Struct(nested_fields.clone()),
true);
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("table", DataType::Utf8, false),
+ Field::new("path", DataType::Utf8, false),
+ Field::new("metadata_size_bytes", DataType::UInt64, false),
+ // expires field in ListFilesEntry has type Instant when set, from
which we cannot get "the number of seconds", hence using Duration instead of
Timestamp as data type.
+ Field::new(
+ "expires_in",
+ DataType::Duration(TimeUnit::Millisecond),
+ true,
+ ),
+ Field::new(
+ "metadata_list",
+ DataType::List(Arc::new(metadata_field.clone())),
+ true,
+ ),
+ ]));
+
+ let mut table_arr = vec![];
+ let mut path_arr = vec![];
+ let mut metadata_size_bytes_arr = vec![];
+ let mut expires_arr = vec![];
+
+ let mut file_path_arr = vec![];
+ let mut file_modified_arr = vec![];
+ let mut file_size_bytes_arr = vec![];
+ let mut etag_arr = vec![];
+ let mut version_arr = vec![];
+ let mut offsets: Vec<i32> = vec![0];
+
+ if let Some(list_files_cache) =
self.cache_manager.get_list_files_cache() {
+ let now = Instant::now();
+ let mut current_offset: i32 = 0;
+
+ for (path, entry) in list_files_cache.list_entries() {
+ table_arr.push(path.table.map_or("NULL".to_string(), |t|
t.to_string()));
+ path_arr.push(path.path.to_string());
+ metadata_size_bytes_arr.push(entry.size_bytes as u64);
+ // calculates time left before entry expires
+ expires_arr.push(
+ entry
+ .expires
+ .map(|t| t.duration_since(now).as_millis() as i64),
+ );
+
+ for meta in entry.metas.iter() {
+ file_path_arr.push(meta.location.to_string());
+
file_modified_arr.push(meta.last_modified.timestamp_millis());
+ file_size_bytes_arr.push(meta.size);
+ etag_arr.push(meta.e_tag.clone());
+ version_arr.push(meta.version.clone());
+ }
+ current_offset += entry.metas.len() as i32;
+ offsets.push(current_offset);
+ }
+ }
+
+ let struct_arr = StructArray::new(
+ nested_fields,
+ vec![
+ Arc::new(StringArray::from(file_path_arr)),
+ Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
+ Arc::new(UInt64Array::from(file_size_bytes_arr)),
+ Arc::new(StringArray::from(etag_arr)),
+ Arc::new(StringArray::from(version_arr)),
+ ],
+ None,
+ );
+
+ let offsets_buffer: OffsetBuffer<i32> =
+ OffsetBuffer::new(ScalarBuffer::from(Buffer::from_vec(offsets)));
+
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(StringArray::from(table_arr)),
+ Arc::new(StringArray::from(path_arr)),
+ Arc::new(UInt64Array::from(metadata_size_bytes_arr)),
+ Arc::new(DurationMillisecondArray::from(expires_arr)),
+ Arc::new(GenericListArray::new(
+ Arc::new(metadata_field),
+ offsets_buffer,
+ Arc::new(struct_arr),
+ None,
+ )),
+ ],
+ )?;
+
+ let list_files_cache = ListFilesCacheTable { schema, batch };
+ Ok(Arc::new(list_files_cache))
+ }
+}
diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index 8f69ae4779..9e53260e42 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -32,7 +32,7 @@ use datafusion::logical_expr::ExplainFormat;
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
use datafusion_cli::functions::{
- MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc,
+ ListFilesCacheFunc, MetadataCacheFunc, ParquetMetadataFunc,
StatisticsCacheFunc,
};
use datafusion_cli::object_storage::instrumented::{
InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
@@ -253,6 +253,13 @@ async fn main_inner() -> Result<()> {
)),
);
+ ctx.register_udtf(
+ "list_files_cache",
+ Arc::new(ListFilesCacheFunc::new(
+ ctx.task_ctx().runtime_env().cache_manager.clone(),
+ )),
+ );
+
let mut print_options = PrintOptions {
format: args.format,
quiet: args.quiet,
@@ -431,15 +438,20 @@ pub fn extract_disk_limit(size: &str) -> Result<usize,
String> {
#[cfg(test)]
mod tests {
+ use std::time::Duration;
+
use super::*;
use datafusion::{
common::test_util::batches_to_string,
execution::cache::{
- cache_manager::CacheManagerConfig,
cache_unit::DefaultFileStatisticsCache,
+ DefaultListFilesCache, cache_manager::CacheManagerConfig,
+ cache_unit::DefaultFileStatisticsCache,
},
- prelude::ParquetReadOptions,
+ prelude::{ParquetReadOptions, col, lit, split_part},
};
use insta::assert_snapshot;
+ use object_store::memory::InMemory;
+ use url::Url;
fn assert_conversion(input: &str, expected: Result<usize, String>) {
let result = extract_memory_pool_size(input);
@@ -741,4 +753,99 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn test_list_files_cache() -> Result<(), DataFusionError> {
+ let list_files_cache = Arc::new(DefaultListFilesCache::new(
+ 1024,
+ Some(Duration::from_secs(1)),
+ ));
+
+ let rt = RuntimeEnvBuilder::new()
+ .with_cache_manager(
+ CacheManagerConfig::default()
+ .with_list_files_cache(Some(list_files_cache)),
+ )
+ .build_arc()
+ .unwrap();
+
+ let ctx = SessionContext::new_with_config_rt(SessionConfig::default(),
rt);
+
+ ctx.register_object_store(
+ &Url::parse("mem://test_table").unwrap(),
+ Arc::new(InMemory::new()),
+ );
+
+ ctx.register_udtf(
+ "list_files_cache",
+ Arc::new(ListFilesCacheFunc::new(
+ ctx.task_ctx().runtime_env().cache_manager.clone(),
+ )),
+ );
+
+ ctx.sql(
+ "CREATE EXTERNAL TABLE src_table
+ STORED AS PARQUET
+ LOCATION '../parquet-testing/data/alltypes_plain.parquet'",
+ )
+ .await?
+ .collect()
+ .await?;
+
+ ctx.sql("COPY (SELECT * FROM src_table) TO
'mem://test_table/0.parquet' STORED AS PARQUET").await?.collect().await?;
+
+ ctx.sql("COPY (SELECT * FROM src_table) TO
'mem://test_table/1.parquet' STORED AS PARQUET").await?.collect().await?;
+
+ ctx.sql(
+ "CREATE EXTERNAL TABLE test_table
+ STORED AS PARQUET
+ LOCATION 'mem://test_table/'
+ ",
+ )
+ .await?
+ .collect()
+ .await?;
+
+ let sql = "SELECT metadata_size_bytes, expires_in, metadata_list FROM
list_files_cache()";
+ let df = ctx
+ .sql(sql)
+ .await?
+ .unnest_columns(&["metadata_list"])?
+ .with_column_renamed("metadata_list", "metadata")?
+ .unnest_columns(&["metadata"])?;
+
+ assert_eq!(
+ 2,
+ df.clone()
+ .filter(col("expires_in").is_not_null())?
+ .count()
+ .await?
+ );
+
+ let df = df
+ .with_column_renamed(r#""metadata.file_size_bytes""#,
"file_size_bytes")?
+ .with_column_renamed(r#""metadata.e_tag""#, "etag")?
+ .with_column(
+ "filename",
+ split_part(col(r#""metadata.file_path""#), lit("/"), lit(-1)),
+ )?
+ .select_columns(&[
+ "metadata_size_bytes",
+ "filename",
+ "file_size_bytes",
+ "etag",
+ ])?
+ .sort(vec![col("filename").sort(true, false)])?;
+ let rbs = df.collect().await?;
+ assert_snapshot!(batches_to_string(&rbs),@r"
+ +---------------------+-----------+-----------------+------+
+ | metadata_size_bytes | filename | file_size_bytes | etag |
+ +---------------------+-----------+-----------------+------+
+ | 212 | 0.parquet | 3645 | 0 |
+ | 212 | 1.parquet | 3645 | 1 |
+ +---------------------+-----------+-----------------+------+
+ ");
+
+ Ok(())
+ }
}
diff --git a/datafusion/catalog-listing/src/table.rs
b/datafusion/catalog-listing/src/table.rs
index 9fb2dd2dce..a175d47f4d 100644
--- a/datafusion/catalog-listing/src/table.rs
+++ b/datafusion/catalog-listing/src/table.rs
@@ -34,6 +34,7 @@ use
datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_datasource::{
ListingTableUrl, PartitionedFile, TableSchema,
compute_all_files_statistics,
};
+use datafusion_execution::cache::TableScopedPath;
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
use datafusion_expr::dml::InsertOp;
@@ -565,7 +566,11 @@ impl TableProvider for ListingTable {
// Invalidate cache entries for this table if they exist
if let Some(lfc) =
state.runtime_env().cache_manager.get_list_files_cache() {
- let _ = lfc.remove(table_path.prefix());
+ let key = TableScopedPath {
+ table: table_path.get_table_ref().clone(),
+ path: table_path.prefix().clone(),
+ };
+ let _ = lfc.remove(&key);
}
// Sink related option, apart from format
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs
b/datafusion/core/src/datasource/listing_table_factory.rs
index 3ca388af0c..86af691fd7 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -63,7 +63,8 @@ impl TableProviderFactory for ListingTableFactory {
))?
.create(session_state, &cmd.options)?;
- let mut table_path = ListingTableUrl::parse(&cmd.location)?;
+ let mut table_path =
+
ListingTableUrl::parse(&cmd.location)?.with_table_ref(cmd.name.clone());
let file_extension = match table_path.is_collection() {
// Setting the extension to be empty instead of allowing the
default extension seems
// odd, but was done to ensure existing behavior isn't modified.
It seems like this
@@ -160,7 +161,9 @@ impl TableProviderFactory for ListingTableFactory {
}
None => format!("*.{}", cmd.file_type.to_lowercase()),
};
- table_path = table_path.with_glob(glob.as_ref())?;
+ table_path = table_path
+ .with_glob(glob.as_ref())?
+ .with_table_ref(cmd.name.clone());
}
let schema = options.infer_schema(session_state,
&table_path).await?;
let df_schema = Arc::clone(&schema).to_dfschema()?;
diff --git a/datafusion/core/src/execution/context/mod.rs
b/datafusion/core/src/execution/context/mod.rs
index a769bb01b4..6df90b205c 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -1315,7 +1315,7 @@ impl SessionContext {
let table = table_ref.table().to_owned();
let maybe_schema = {
let state = self.state.read();
- let resolved = state.resolve_table_ref(table_ref);
+ let resolved = state.resolve_table_ref(table_ref.clone());
state
.catalog_list()
.catalog(&resolved.catalog)
@@ -1327,6 +1327,11 @@ impl SessionContext {
&& table_provider.table_type() == table_type
{
schema.deregister_table(&table)?;
+ if table_type == TableType::Base
+ && let Some(lfc) =
self.runtime_env().cache_manager.get_list_files_cache()
+ {
+ lfc.drop_table_entries(&Some(table_ref))?;
+ }
return Ok(true);
}
diff --git a/datafusion/datasource/src/url.rs b/datafusion/datasource/src/url.rs
index 155d6efe46..2428275ac3 100644
--- a/datafusion/datasource/src/url.rs
+++ b/datafusion/datasource/src/url.rs
@@ -17,7 +17,8 @@
use std::sync::Arc;
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{DataFusionError, Result, TableReference};
+use datafusion_execution::cache::TableScopedPath;
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_session::Session;
@@ -41,6 +42,8 @@ pub struct ListingTableUrl {
prefix: Path,
/// An optional glob expression used to filter files
glob: Option<Pattern>,
+
+ table_ref: Option<TableReference>,
}
impl ListingTableUrl {
@@ -145,7 +148,12 @@ impl ListingTableUrl {
/// to create a [`ListingTableUrl`].
pub fn try_new(url: Url, glob: Option<Pattern>) -> Result<Self> {
let prefix = Path::from_url_path(url.path())?;
- Ok(Self { url, prefix, glob })
+ Ok(Self {
+ url,
+ prefix,
+ glob,
+ table_ref: None,
+ })
}
/// Returns the URL scheme
@@ -255,7 +263,14 @@ impl ListingTableUrl {
};
let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
- list_with_cache(ctx, store, &self.prefix, prefix.as_ref()).await?
+ list_with_cache(
+ ctx,
+ store,
+ self.table_ref.as_ref(),
+ &self.prefix,
+ prefix.as_ref(),
+ )
+ .await?
} else {
match store.head(&full_prefix).await {
Ok(meta) => futures::stream::once(async { Ok(meta) })
@@ -264,7 +279,14 @@ impl ListingTableUrl {
// If the head command fails, it is likely that object doesn't
exist.
// Retry as though it were a prefix (aka a collection)
Err(object_store::Error::NotFound { .. }) => {
- list_with_cache(ctx, store, &self.prefix,
prefix.as_ref()).await?
+ list_with_cache(
+ ctx,
+ store,
+ self.table_ref.as_ref(),
+ &self.prefix,
+ prefix.as_ref(),
+ )
+ .await?
}
Err(e) => return Err(e.into()),
}
@@ -323,6 +345,15 @@ impl ListingTableUrl {
Pattern::new(glob).map_err(|e|
DataFusionError::External(Box::new(e)))?;
Self::try_new(self.url, Some(glob))
}
+
+ pub fn with_table_ref(mut self, table_ref: TableReference) -> Self {
+ self.table_ref = Some(table_ref);
+ self
+ }
+
+ pub fn get_table_ref(&self) -> &Option<TableReference> {
+ &self.table_ref
+ }
}
/// Lists files with cache support, using prefix-aware lookups.
@@ -345,6 +376,7 @@ impl ListingTableUrl {
async fn list_with_cache<'b>(
ctx: &'b dyn Session,
store: &'b dyn ObjectStore,
+ table_ref: Option<&TableReference>,
table_base_path: &Path,
prefix: Option<&Path>,
) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
@@ -367,9 +399,14 @@ async fn list_with_cache<'b>(
// Convert prefix to Option<Path> for cache lookup
let prefix_filter = prefix.cloned();
+ let table_scoped_base_path = TableScopedPath {
+ table: table_ref.cloned(),
+ path: table_base_path.clone(),
+ };
+
// Try cache lookup with optional prefix filter
let vec = if let Some(res) =
- cache.get_with_extra(table_base_path, &prefix_filter)
+ cache.get_with_extra(&table_scoped_base_path, &prefix_filter)
{
debug!("Hit list files cache");
res.as_ref().clone()
@@ -380,7 +417,7 @@ async fn list_with_cache<'b>(
.list(Some(table_base_path))
.try_collect::<Vec<ObjectMeta>>()
.await?;
- cache.put(table_base_path, Arc::new(vec.clone()));
+ cache.put(&table_scoped_base_path, Arc::new(vec.clone()));
// If a prefix filter was requested, apply it to the results
if prefix.is_some() {
diff --git a/datafusion/execution/src/cache/cache_manager.rs
b/datafusion/execution/src/cache/cache_manager.rs
index c76a68c651..162074d909 100644
--- a/datafusion/execution/src/cache/cache_manager.rs
+++ b/datafusion/execution/src/cache/cache_manager.rs
@@ -16,7 +16,10 @@
// under the License.
use crate::cache::cache_unit::DefaultFilesMetadataCache;
+use crate::cache::list_files_cache::ListFilesEntry;
+use crate::cache::list_files_cache::TableScopedPath;
use crate::cache::{CacheAccessor, DefaultListFilesCache};
+use datafusion_common::TableReference;
use datafusion_common::stats::Precision;
use datafusion_common::{Result, Statistics};
use object_store::ObjectMeta;
@@ -80,7 +83,7 @@ pub struct FileStatisticsCacheEntry {
///
/// See [`crate::runtime_env::RuntimeEnv`] for more details.
pub trait ListFilesCache:
- CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = Option<Path>>
+ CacheAccessor<TableScopedPath, Arc<Vec<ObjectMeta>>, Extra = Option<Path>>
{
/// Returns the cache's memory limit in bytes.
fn cache_limit(&self) -> usize;
@@ -93,6 +96,11 @@ pub trait ListFilesCache:
/// Updates the cache with a new TTL (time-to-live).
fn update_cache_ttl(&self, ttl: Option<Duration>);
+
+ /// Retrieves the information about the entries currently cached.
+ fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry>;
+
+ fn drop_table_entries(&self, table_ref: &Option<TableReference>) ->
Result<()>;
}
/// Generic file-embedded metadata used with [`FileMetadataCache`].
diff --git a/datafusion/execution/src/cache/list_files_cache.rs
b/datafusion/execution/src/cache/list_files_cache.rs
index 661bc47b54..858219e5b8 100644
--- a/datafusion/execution/src/cache/list_files_cache.rs
+++ b/datafusion/execution/src/cache/list_files_cache.rs
@@ -17,10 +17,12 @@
use std::mem::size_of;
use std::{
+ collections::HashMap,
sync::{Arc, Mutex},
time::Duration,
};
+use datafusion_common::TableReference;
use datafusion_common::instant::Instant;
use object_store::{ObjectMeta, path::Path};
@@ -103,10 +105,11 @@ impl DefaultListFilesCache {
}
}
-struct ListFilesEntry {
- metas: Arc<Vec<ObjectMeta>>,
- size_bytes: usize,
- expires: Option<Instant>,
+#[derive(Clone, PartialEq, Debug)]
+pub struct ListFilesEntry {
+ pub metas: Arc<Vec<ObjectMeta>>,
+ pub size_bytes: usize,
+ pub expires: Option<Instant>,
}
impl ListFilesEntry {
@@ -146,9 +149,15 @@ pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize =
1024 * 1024; // 1MiB
/// The default cache TTL for the [`DefaultListFilesCache`]
pub const DEFAULT_LIST_FILES_CACHE_TTL: Option<Duration> = None; // Infinite
+#[derive(PartialEq, Eq, Hash, Clone, Debug)]
+pub struct TableScopedPath {
+ pub table: Option<TableReference>,
+ pub path: Path,
+}
+
/// Handles the inner state of the [`DefaultListFilesCache`] struct.
pub struct DefaultListFilesCacheState {
- lru_queue: LruQueue<Path, ListFilesEntry>,
+ lru_queue: LruQueue<TableScopedPath, ListFilesEntry>,
memory_limit: usize,
memory_used: usize,
ttl: Option<Duration>,
@@ -196,17 +205,17 @@ impl DefaultListFilesCacheState {
/// ```
fn get_with_prefix(
&mut self,
- table_base: &Path,
+ table_scoped_base_path: &TableScopedPath,
prefix: Option<&Path>,
now: Instant,
) -> Option<Arc<Vec<ObjectMeta>>> {
- let entry = self.lru_queue.get(table_base)?;
+ let entry = self.lru_queue.get(table_scoped_base_path)?;
// Check expiration
if let Some(exp) = entry.expires
&& now > exp
{
- self.remove(table_base);
+ self.remove(table_scoped_base_path);
return None;
}
@@ -216,6 +225,7 @@ impl DefaultListFilesCacheState {
};
// Build the full prefix path: table_base/prefix
+ let table_base = &table_scoped_base_path.path;
let mut parts: Vec<_> = table_base.parts().collect();
parts.extend(prefix.parts());
let full_prefix = Path::from_iter(parts);
@@ -241,7 +251,7 @@ impl DefaultListFilesCacheState {
/// If the entry has expired by `now` it is removed from the cache.
///
/// The LRU queue is not updated.
- fn contains_key(&mut self, k: &Path, now: Instant) -> bool {
+ fn contains_key(&mut self, k: &TableScopedPath, now: Instant) -> bool {
let Some(entry) = self.lru_queue.peek(k) else {
return false;
};
@@ -262,7 +272,7 @@ impl DefaultListFilesCacheState {
/// If the size of the entry is greater than the `memory_limit`, the value
is not inserted.
fn put(
&mut self,
- key: &Path,
+ key: &TableScopedPath,
value: Arc<Vec<ObjectMeta>>,
now: Instant,
) -> Option<Arc<Vec<ObjectMeta>>> {
@@ -304,7 +314,7 @@ impl DefaultListFilesCacheState {
}
/// Removes an entry from the cache and returns it, if it exists.
- fn remove(&mut self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
+ fn remove(&mut self, k: &TableScopedPath) -> Option<Arc<Vec<ObjectMeta>>> {
if let Some(entry) = self.lru_queue.remove(k) {
self.memory_used -= entry.size_bytes;
Some(entry.metas)
@@ -347,15 +357,41 @@ impl ListFilesCache for DefaultListFilesCache {
state.ttl = ttl;
state.evict_entries();
}
+
+ fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry> {
+ let state = self.state.lock().unwrap();
+ let mut entries = HashMap::<TableScopedPath, ListFilesEntry>::new();
+ for (path, entry) in state.lru_queue.list_entries() {
+ entries.insert(path.clone(), entry.clone());
+ }
+ entries
+ }
+
+ fn drop_table_entries(
+ &self,
+ table_ref: &Option<TableReference>,
+ ) -> datafusion_common::Result<()> {
+ let mut state = self.state.lock().unwrap();
+ let mut table_paths = vec![];
+ for (path, _) in state.lru_queue.list_entries() {
+ if path.table == *table_ref {
+ table_paths.push(path.clone());
+ }
+ }
+ for path in table_paths {
+ state.remove(&path);
+ }
+ Ok(())
+ }
}
-impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {
+impl CacheAccessor<TableScopedPath, Arc<Vec<ObjectMeta>>> for
DefaultListFilesCache {
type Extra = Option<Path>;
/// Gets all files for the given table base path.
///
/// This is equivalent to calling `get_with_extra(k, &None)`.
- fn get(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
+ fn get(&self, k: &TableScopedPath) -> Option<Arc<Vec<ObjectMeta>>> {
self.get_with_extra(k, &None)
}
@@ -374,17 +410,17 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for
DefaultListFilesCache {
/// can serve queries for any partition subset without additional storage
calls.
fn get_with_extra(
&self,
- table_base: &Path,
+ table_scoped_path: &TableScopedPath,
prefix: &Self::Extra,
) -> Option<Arc<Vec<ObjectMeta>>> {
let mut state = self.state.lock().unwrap();
let now = self.time_provider.now();
- state.get_with_prefix(table_base, prefix.as_ref(), now)
+ state.get_with_prefix(table_scoped_path, prefix.as_ref(), now)
}
fn put(
&self,
- key: &Path,
+ key: &TableScopedPath,
value: Arc<Vec<ObjectMeta>>,
) -> Option<Arc<Vec<ObjectMeta>>> {
let mut state = self.state.lock().unwrap();
@@ -394,19 +430,19 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for
DefaultListFilesCache {
fn put_with_extra(
&self,
- key: &Path,
+ key: &TableScopedPath,
value: Arc<Vec<ObjectMeta>>,
_e: &Self::Extra,
) -> Option<Arc<Vec<ObjectMeta>>> {
self.put(key, value)
}
- fn remove(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
+ fn remove(&self, k: &TableScopedPath) -> Option<Arc<Vec<ObjectMeta>>> {
let mut state = self.state.lock().unwrap();
state.remove(k)
}
- fn contains_key(&self, k: &Path) -> bool {
+ fn contains_key(&self, k: &TableScopedPath) -> bool {
let mut state = self.state.lock().unwrap();
let now = self.time_provider.now();
state.contains_key(k, now)
@@ -431,7 +467,6 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for
DefaultListFilesCache {
mod tests {
use super::*;
use chrono::DateTime;
- use std::thread;
struct MockTimeProvider {
base: Instant,
@@ -499,43 +534,79 @@ mod tests {
#[test]
fn test_basic_operations() {
let cache = DefaultListFilesCache::default();
+ let table_ref = Some(TableReference::from("table"));
let path = Path::from("test_path");
+ let key = TableScopedPath {
+ table: table_ref.clone(),
+ path,
+ };
// Initially cache is empty
- assert!(cache.get(&path).is_none());
- assert!(!cache.contains_key(&path));
+ assert!(cache.get(&key).is_none());
+ assert!(!cache.contains_key(&key));
assert_eq!(cache.len(), 0);
// Put an entry
let meta = create_test_object_meta("file1", 50);
let value = Arc::new(vec![meta.clone()]);
- cache.put(&path, Arc::clone(&value));
+ cache.put(&key, Arc::clone(&value));
// Entry should be retrievable
- assert!(cache.contains_key(&path));
+ assert!(cache.contains_key(&key));
assert_eq!(cache.len(), 1);
- let retrieved = cache.get(&path).unwrap();
+ let retrieved = cache.get(&key).unwrap();
assert_eq!(retrieved.len(), 1);
assert_eq!(retrieved[0].location, meta.location);
// Remove the entry
- let removed = cache.remove(&path).unwrap();
+ let removed = cache.remove(&key).unwrap();
assert_eq!(removed.len(), 1);
- assert!(!cache.contains_key(&path));
+ assert!(!cache.contains_key(&key));
assert_eq!(cache.len(), 0);
// Put multiple entries
- let (path1, value1, _) = create_test_list_files_entry("path1", 2, 50);
- let (path2, value2, _) = create_test_list_files_entry("path2", 3, 50);
- cache.put(&path1, value1);
- cache.put(&path2, value2);
+ let (path1, value1, size1) = create_test_list_files_entry("path1", 2,
50);
+ let (path2, value2, size2) = create_test_list_files_entry("path2", 3,
50);
+ let key1 = TableScopedPath {
+ table: table_ref.clone(),
+ path: path1,
+ };
+ let key2 = TableScopedPath {
+ table: table_ref,
+ path: path2,
+ };
+ cache.put(&key1, Arc::clone(&value1));
+ cache.put(&key2, Arc::clone(&value2));
assert_eq!(cache.len(), 2);
+ // List cache entries
+ assert_eq!(
+ cache.list_entries(),
+ HashMap::from([
+ (
+ key1.clone(),
+ ListFilesEntry {
+ metas: value1,
+ size_bytes: size1,
+ expires: None,
+ }
+ ),
+ (
+ key2.clone(),
+ ListFilesEntry {
+ metas: value2,
+ size_bytes: size2,
+ expires: None,
+ }
+ )
+ ])
+ );
+
// Clear all entries
cache.clear();
assert_eq!(cache.len(), 0);
- assert!(!cache.contains_key(&path1));
- assert!(!cache.contains_key(&path2));
+ assert!(!cache.contains_key(&key1));
+ assert!(!cache.contains_key(&key2));
}
#[test]
@@ -547,24 +618,42 @@ mod tests {
// Set cache limit to exactly fit all three entries
let cache = DefaultListFilesCache::new(size * 3, None);
+ let table_ref = Some(TableReference::from("table"));
+ let key1 = TableScopedPath {
+ table: table_ref.clone(),
+ path: path1,
+ };
+ let key2 = TableScopedPath {
+ table: table_ref.clone(),
+ path: path2,
+ };
+ let key3 = TableScopedPath {
+ table: table_ref.clone(),
+ path: path3,
+ };
+
// All three entries should fit
- cache.put(&path1, value1);
- cache.put(&path2, value2);
- cache.put(&path3, value3);
+ cache.put(&key1, value1);
+ cache.put(&key2, value2);
+ cache.put(&key3, value3);
assert_eq!(cache.len(), 3);
- assert!(cache.contains_key(&path1));
- assert!(cache.contains_key(&path2));
- assert!(cache.contains_key(&path3));
+ assert!(cache.contains_key(&key1));
+ assert!(cache.contains_key(&key2));
+ assert!(cache.contains_key(&key3));
// Adding a new entry should evict path1 (LRU)
let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100);
- cache.put(&path4, value4);
+ let key4 = TableScopedPath {
+ table: table_ref,
+ path: path4,
+ };
+ cache.put(&key4, value4);
assert_eq!(cache.len(), 3);
- assert!(!cache.contains_key(&path1)); // Evicted
- assert!(cache.contains_key(&path2));
- assert!(cache.contains_key(&path3));
- assert!(cache.contains_key(&path4));
+ assert!(!cache.contains_key(&key1)); // Evicted
+ assert!(cache.contains_key(&key2));
+ assert!(cache.contains_key(&key3));
+ assert!(cache.contains_key(&key4));
}
#[test]
@@ -576,24 +665,42 @@ mod tests {
// Set cache limit to fit exactly three entries
let cache = DefaultListFilesCache::new(size * 3, None);
- cache.put(&path1, value1);
- cache.put(&path2, value2);
- cache.put(&path3, value3);
+ let table_ref = Some(TableReference::from("table"));
+ let key1 = TableScopedPath {
+ table: table_ref.clone(),
+ path: path1,
+ };
+ let key2 = TableScopedPath {
+ table: table_ref.clone(),
+ path: path2,
+ };
+ let key3 = TableScopedPath {
+ table: table_ref.clone(),
+ path: path3,
+ };
+
+ cache.put(&key1, value1);
+ cache.put(&key2, value2);
+ cache.put(&key3, value3);
assert_eq!(cache.len(), 3);
// Access path1 to move it to front (MRU)
// Order is now: path2 (LRU), path3, path1 (MRU)
- cache.get(&path1);
+ cache.get(&key1);
// Adding a new entry should evict path2 (the LRU)
let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100);
- cache.put(&path4, value4);
+ let key4 = TableScopedPath {
+ table: table_ref,
+ path: path4,
+ };
+ cache.put(&key4, value4);
assert_eq!(cache.len(), 3);
- assert!(cache.contains_key(&path1)); // Still present (recently
accessed)
- assert!(!cache.contains_key(&path2)); // Evicted (was LRU)
- assert!(cache.contains_key(&path3));
- assert!(cache.contains_key(&path4));
+ assert!(cache.contains_key(&key1)); // Still present (recently
accessed)
+ assert!(!cache.contains_key(&key2)); // Evicted (was LRU)
+ assert!(cache.contains_key(&key3));
+ assert!(cache.contains_key(&key4));
}
#[test]
@@ -604,19 +711,32 @@ mod tests {
// Set cache limit to fit both entries
let cache = DefaultListFilesCache::new(size * 2, None);
- cache.put(&path1, value1);
- cache.put(&path2, value2);
+ let table_ref = Some(TableReference::from("table"));
+ let key1 = TableScopedPath {
+ table: table_ref.clone(),
+ path: path1,
+ };
+ let key2 = TableScopedPath {
+ table: table_ref.clone(),
+ path: path2,
+ };
+ cache.put(&key1, value1);
+ cache.put(&key2, value2);
assert_eq!(cache.len(), 2);
// Try to add an entry that's too large to fit in the cache
let (path_large, value_large, _) =
create_test_list_files_entry("large", 1, 1000);
- cache.put(&path_large, value_large);
+ let key_large = TableScopedPath {
+ table: table_ref,
+ path: path_large,
+ };
+ cache.put(&key_large, value_large);
// Large entry should not be added
- assert!(!cache.contains_key(&path_large));
+ assert!(!cache.contains_key(&key_large));
assert_eq!(cache.len(), 2);
- assert!(cache.contains_key(&path1));
- assert!(cache.contains_key(&path2));
+ assert!(cache.contains_key(&key1));
+ assert!(cache.contains_key(&key2));
}
#[test]
@@ -628,21 +748,38 @@ mod tests {
// Set cache limit for exactly 3 entries
let cache = DefaultListFilesCache::new(size * 3, None);
- cache.put(&path1, value1);
- cache.put(&path2, value2);
- cache.put(&path3, value3);
+ let table_ref = Some(TableReference::from("table"));
+ let key1 = TableScopedPath {
+ table: table_ref.clone(),
+ path: path1,
+ };
+ let key2 = TableScopedPath {
+ table: table_ref.clone(),
+ path: path2,
+ };
+ let key3 = TableScopedPath {
+ table: table_ref.clone(),
+ path: path3,
+ };
+ cache.put(&key1, value1);
+ cache.put(&key2, value2);
+ cache.put(&key3, value3);
assert_eq!(cache.len(), 3);
// Add a large entry that requires evicting 2 entries
let (path_large, value_large, _) =
create_test_list_files_entry("large", 1, 200);
- cache.put(&path_large, value_large);
+ let key_large = TableScopedPath {
+ table: table_ref,
+ path: path_large,
+ };
+ cache.put(&key_large, value_large);
// path1 and path2 should be evicted (both LRU), path3 and path_large
remain
assert_eq!(cache.len(), 2);
- assert!(!cache.contains_key(&path1)); // Evicted
- assert!(!cache.contains_key(&path2)); // Evicted
- assert!(cache.contains_key(&path3));
- assert!(cache.contains_key(&path_large));
+ assert!(!cache.contains_key(&key1)); // Evicted
+ assert!(!cache.contains_key(&key2)); // Evicted
+ assert!(cache.contains_key(&key3));
+ assert!(cache.contains_key(&key_large));
}
#[test]
@@ -653,10 +790,23 @@ mod tests {
let cache = DefaultListFilesCache::new(size * 3, None);
+ let table_ref = Some(TableReference::from("table"));
+ let key1 = TableScopedPath {
+ table: table_ref.clone(),
+ path: path1,
+ };
+ let key2 = TableScopedPath {
+ table: table_ref.clone(),
+ path: path2,
+ };
+ let key3 = TableScopedPath {
+ table: table_ref,
+ path: path3,
+ };
// Add three entries
- cache.put(&path1, value1);
- cache.put(&path2, value2);
- cache.put(&path3, value3);
+ cache.put(&key1, value1);
+ cache.put(&key2, value2);
+ cache.put(&key3, value3);
assert_eq!(cache.len(), 3);
// Resize cache to only fit one entry
@@ -664,70 +814,136 @@ mod tests {
// Should keep only the most recent entry (path3, the MRU)
assert_eq!(cache.len(), 1);
- assert!(cache.contains_key(&path3));
+ assert!(cache.contains_key(&key3));
// Earlier entries (LRU) should be evicted
- assert!(!cache.contains_key(&path1));
- assert!(!cache.contains_key(&path2));
+ assert!(!cache.contains_key(&key1));
+ assert!(!cache.contains_key(&key2));
}
#[test]
fn test_entry_update_with_size_change() {
let (path1, value1, size) = create_test_list_files_entry("path1", 1,
100);
- let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
+ let (path2, value2, size2) = create_test_list_files_entry("path2", 1,
100);
let (path3, value3_v1, _) = create_test_list_files_entry("path3", 1,
100);
let cache = DefaultListFilesCache::new(size * 3, None);
+ let table_ref = Some(TableReference::from("table"));
+ let key1 = TableScopedPath {
+ table: table_ref.clone(),
+ path: path1,
+ };
+ let key2 = TableScopedPath {
+ table: table_ref.clone(),
+ path: path2,
+ };
+ let key3 = TableScopedPath {
+ table: table_ref,
+ path: path3,
+ };
// Add three entries
- cache.put(&path1, value1);
- cache.put(&path2, value2);
- cache.put(&path3, value3_v1);
+ cache.put(&key1, value1);
+ cache.put(&key2, Arc::clone(&value2));
+ cache.put(&key3, value3_v1);
assert_eq!(cache.len(), 3);
// Update path3 with same size - should not cause eviction
let (_, value3_v2, _) = create_test_list_files_entry("path3", 1, 100);
- cache.put(&path3, value3_v2);
+ cache.put(&key3, value3_v2);
assert_eq!(cache.len(), 3);
- assert!(cache.contains_key(&path1));
- assert!(cache.contains_key(&path2));
- assert!(cache.contains_key(&path3));
+ assert!(cache.contains_key(&key1));
+ assert!(cache.contains_key(&key2));
+ assert!(cache.contains_key(&key3));
// Update path3 with larger size that requires evicting path1 (LRU)
- let (_, value3_v3, _) = create_test_list_files_entry("path3", 1, 200);
- cache.put(&path3, value3_v3);
+ let (_, value3_v3, size3_v3) = create_test_list_files_entry("path3",
1, 200);
+ cache.put(&key3, Arc::clone(&value3_v3));
assert_eq!(cache.len(), 2);
- assert!(!cache.contains_key(&path1)); // Evicted (was LRU)
- assert!(cache.contains_key(&path2));
- assert!(cache.contains_key(&path3));
+ assert!(!cache.contains_key(&key1)); // Evicted (was LRU)
+ assert!(cache.contains_key(&key2));
+ assert!(cache.contains_key(&key3));
+
+ // List cache entries
+ assert_eq!(
+ cache.list_entries(),
+ HashMap::from([
+ (
+ key2,
+ ListFilesEntry {
+ metas: value2,
+ size_bytes: size2,
+ expires: None,
+ }
+ ),
+ (
+ key3,
+ ListFilesEntry {
+ metas: value3_v3,
+ size_bytes: size3_v3,
+ expires: None,
+ }
+ )
+ ])
+ );
}
#[test]
fn test_cache_with_ttl() {
let ttl = Duration::from_millis(100);
- let cache = DefaultListFilesCache::new(10000, Some(ttl));
- let (path1, value1, _) = create_test_list_files_entry("path1", 2, 50);
- let (path2, value2, _) = create_test_list_files_entry("path2", 2, 50);
+ let mock_time = Arc::new(MockTimeProvider::new());
+ let cache = DefaultListFilesCache::new(10000, Some(ttl))
+ .with_time_provider(Arc::clone(&mock_time) as Arc<dyn
TimeProvider>);
- cache.put(&path1, value1);
- cache.put(&path2, value2);
+ let (path1, value1, size1) = create_test_list_files_entry("path1", 2,
50);
+ let (path2, value2, size2) = create_test_list_files_entry("path2", 2,
50);
- // Entries should be accessible immediately
- assert!(cache.get(&path1).is_some());
- assert!(cache.get(&path2).is_some());
- assert!(cache.contains_key(&path1));
- assert!(cache.contains_key(&path2));
- assert_eq!(cache.len(), 2);
+ let table_ref = Some(TableReference::from("table"));
+ let key1 = TableScopedPath {
+ table: table_ref.clone(),
+ path: path1,
+ };
+ let key2 = TableScopedPath {
+ table: table_ref,
+ path: path2,
+ };
+ cache.put(&key1, Arc::clone(&value1));
+ cache.put(&key2, Arc::clone(&value2));
+ // Entries should be accessible immediately
+ assert!(cache.get(&key1).is_some());
+ assert!(cache.get(&key2).is_some());
+ // List cache entries
+ assert_eq!(
+ cache.list_entries(),
+ HashMap::from([
+ (
+ key1.clone(),
+ ListFilesEntry {
+ metas: value1,
+ size_bytes: size1,
+ expires: mock_time.now().checked_add(ttl),
+ }
+ ),
+ (
+ key2.clone(),
+ ListFilesEntry {
+ metas: value2,
+ size_bytes: size2,
+ expires: mock_time.now().checked_add(ttl),
+ }
+ )
+ ])
+ );
// Wait for TTL to expire
- thread::sleep(Duration::from_millis(150));
+ mock_time.inc(Duration::from_millis(150));
// Entries should now return None and be removed when observed through
get or contains_key
- assert!(cache.get(&path1).is_none());
+ assert!(cache.get(&key1).is_none());
assert_eq!(cache.len(), 1); // path1 was removed by get()
- assert!(!cache.contains_key(&path2));
+ assert!(!cache.contains_key(&key2));
assert_eq!(cache.len(), 0); // path2 was removed by contains_key()
}
@@ -743,21 +959,34 @@ mod tests {
let (path2, value2, _) = create_test_list_files_entry("path2", 1, 400);
let (path3, value3, _) = create_test_list_files_entry("path3", 1, 400);
- cache.put(&path1, value1);
+ let table_ref = Some(TableReference::from("table"));
+ let key1 = TableScopedPath {
+ table: table_ref.clone(),
+ path: path1,
+ };
+ let key2 = TableScopedPath {
+ table: table_ref.clone(),
+ path: path2,
+ };
+ let key3 = TableScopedPath {
+ table: table_ref,
+ path: path3,
+ };
+ cache.put(&key1, value1);
mock_time.inc(Duration::from_millis(50));
- cache.put(&path2, value2);
+ cache.put(&key2, value2);
mock_time.inc(Duration::from_millis(50));
// path3 should evict path1 due to size limit
- cache.put(&path3, value3);
- assert!(!cache.contains_key(&path1)); // Evicted by LRU
- assert!(cache.contains_key(&path2));
- assert!(cache.contains_key(&path3));
+ cache.put(&key3, value3);
+ assert!(!cache.contains_key(&key1)); // Evicted by LRU
+ assert!(cache.contains_key(&key2));
+ assert!(cache.contains_key(&key3));
mock_time.inc(Duration::from_millis(151));
- assert!(!cache.contains_key(&path2)); // Expired
- assert!(cache.contains_key(&path3)); // Still valid
+ assert!(!cache.contains_key(&key2)); // Expired
+ assert!(cache.contains_key(&key3)); // Still valid
}
#[test]
@@ -843,7 +1072,12 @@ mod tests {
// Add entry and verify memory tracking
let (path1, value1, size1) = create_test_list_files_entry("path1", 1,
100);
- cache.put(&path1, value1);
+ let table_ref = Some(TableReference::from("table"));
+ let key1 = TableScopedPath {
+ table: table_ref.clone(),
+ path: path1,
+ };
+ cache.put(&key1, value1);
{
let state = cache.state.lock().unwrap();
assert_eq!(state.memory_used, size1);
@@ -851,14 +1085,18 @@ mod tests {
// Add another entry
let (path2, value2, size2) = create_test_list_files_entry("path2", 1,
200);
- cache.put(&path2, value2);
+ let key2 = TableScopedPath {
+ table: table_ref.clone(),
+ path: path2,
+ };
+ cache.put(&key2, value2);
{
let state = cache.state.lock().unwrap();
assert_eq!(state.memory_used, size1 + size2);
}
// Remove first entry and verify memory decreases
- cache.remove(&path1);
+ cache.remove(&key1);
{
let state = cache.state.lock().unwrap();
assert_eq!(state.memory_used, size2);
@@ -902,12 +1140,17 @@ mod tests {
]);
// Cache the full table listing
- cache.put(&table_base, files);
+ let table_ref = Some(TableReference::from("table"));
+ let key = TableScopedPath {
+ table: table_ref,
+ path: table_base,
+ };
+ cache.put(&key, files);
// Query for partition a=1 using get_with_extra
// New API: get_with_extra(table_base, Some(relative_prefix))
let prefix_a1 = Some(Path::from("a=1"));
- let result = cache.get_with_extra(&table_base, &prefix_a1);
+ let result = cache.get_with_extra(&key, &prefix_a1);
// Should return filtered results (only files from a=1)
assert!(result.is_some());
@@ -921,7 +1164,7 @@ mod tests {
// Query for partition a=2
let prefix_a2 = Some(Path::from("a=2"));
- let result_2 = cache.get_with_extra(&table_base, &prefix_a2);
+ let result_2 = cache.get_with_extra(&key, &prefix_a2);
assert!(result_2.is_some());
let filtered_2 = result_2.unwrap();
@@ -947,16 +1190,21 @@ mod tests {
create_object_meta_with_path("my_table/a=2/file3.parquet"),
create_object_meta_with_path("my_table/a=2/file4.parquet"),
]);
- cache.put(&table_base, full_files);
+ let table_ref = Some(TableReference::from("table"));
+ let key = TableScopedPath {
+ table: table_ref,
+ path: table_base,
+ };
+ cache.put(&key, full_files);
// Query with no prefix filter (None) should return all 4 files
- let result = cache.get_with_extra(&table_base, &None);
+ let result = cache.get_with_extra(&key, &None);
assert!(result.is_some());
let files = result.unwrap();
assert_eq!(files.len(), 4);
// Also test using get() which delegates to get_with_extra(&None)
- let result_get = cache.get(&table_base);
+ let result_get = cache.get(&key);
assert!(result_get.is_some());
assert_eq!(result_get.unwrap().len(), 4);
}
@@ -967,14 +1215,19 @@ mod tests {
let cache = DefaultListFilesCache::new(100000, None);
let table_base = Path::from("my_table");
+ let table_ref = Some(TableReference::from("table"));
+ let key = TableScopedPath {
+ table: table_ref,
+ path: table_base,
+ };
// Query for full table should miss (nothing cached)
- let result = cache.get_with_extra(&table_base, &None);
+ let result = cache.get_with_extra(&key, &None);
assert!(result.is_none());
// Query with prefix should also miss
let prefix = Some(Path::from("a=1"));
- let result_2 = cache.get_with_extra(&table_base, &prefix);
+ let result_2 = cache.get_with_extra(&key, &prefix);
assert!(result_2.is_none());
}
@@ -988,11 +1241,16 @@ mod tests {
create_object_meta_with_path("my_table/a=1/file1.parquet"),
create_object_meta_with_path("my_table/a=2/file2.parquet"),
]);
- cache.put(&table_base, files);
+ let table_ref = Some(TableReference::from("table"));
+ let key = TableScopedPath {
+ table: table_ref,
+ path: table_base,
+ };
+ cache.put(&key, files);
// Query for partition a=3 which doesn't exist
let prefix_a3 = Some(Path::from("a=3"));
- let result = cache.get_with_extra(&table_base, &prefix_a3);
+ let result = cache.get_with_extra(&key, &prefix_a3);
// Should return None since no files match
assert!(result.is_none());
@@ -1018,23 +1276,28 @@ mod tests {
"events/year=2025/month=01/day=01/file4.parquet",
),
]);
- cache.put(&table_base, files);
+ let table_ref = Some(TableReference::from("table"));
+ let key = TableScopedPath {
+ table: table_ref,
+ path: table_base,
+ };
+ cache.put(&key, files);
// Query for year=2024/month=01 (should get 2 files)
let prefix_month = Some(Path::from("year=2024/month=01"));
- let result = cache.get_with_extra(&table_base, &prefix_month);
+ let result = cache.get_with_extra(&key, &prefix_month);
assert!(result.is_some());
assert_eq!(result.unwrap().len(), 2);
// Query for year=2024 (should get 3 files)
let prefix_year = Some(Path::from("year=2024"));
- let result_year = cache.get_with_extra(&table_base, &prefix_year);
+ let result_year = cache.get_with_extra(&key, &prefix_year);
assert!(result_year.is_some());
assert_eq!(result_year.unwrap().len(), 3);
// Query for specific day (should get 1 file)
let prefix_day = Some(Path::from("year=2024/month=01/day=01"));
- let result_day = cache.get_with_extra(&table_base, &prefix_day);
+ let result_day = cache.get_with_extra(&key, &prefix_day);
assert!(result_day.is_some());
assert_eq!(result_day.unwrap().len(), 1);
}
@@ -1055,18 +1318,63 @@ mod tests {
create_object_meta_with_path("table_b/part=2/file2.parquet"),
]);
- cache.put(&table_a, files_a);
- cache.put(&table_b, files_b);
+ let table_ref_a = Some(TableReference::from("table_a"));
+ let table_ref_b = Some(TableReference::from("table_b"));
+ let key_a = TableScopedPath {
+ table: table_ref_a,
+ path: table_a,
+ };
+ let key_b = TableScopedPath {
+ table: table_ref_b,
+ path: table_b,
+ };
+ cache.put(&key_a, files_a);
+ cache.put(&key_b, files_b);
// Query table_a should only return table_a files
- let result_a = cache.get(&table_a);
+ let result_a = cache.get(&key_a);
assert!(result_a.is_some());
assert_eq!(result_a.unwrap().len(), 1);
// Query table_b with prefix should only return matching table_b files
let prefix = Some(Path::from("part=1"));
- let result_b = cache.get_with_extra(&table_b, &prefix);
+ let result_b = cache.get_with_extra(&key_b, &prefix);
assert!(result_b.is_some());
assert_eq!(result_b.unwrap().len(), 1);
}
+
+ #[test]
+ fn test_drop_table_entries() {
+ let cache = DefaultListFilesCache::default();
+
+ let (path1, value1, _) = create_test_list_files_entry("path1", 1, 100);
+ let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
+ let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
+
+ let table_ref1 = Some(TableReference::from("table1"));
+ let key1 = TableScopedPath {
+ table: table_ref1.clone(),
+ path: path1,
+ };
+ let key2 = TableScopedPath {
+ table: table_ref1.clone(),
+ path: path2,
+ };
+
+ let table_ref2 = Some(TableReference::from("table2"));
+ let key3 = TableScopedPath {
+ table: table_ref2.clone(),
+ path: path3,
+ };
+
+ cache.put(&key1, value1);
+ cache.put(&key2, value2);
+ cache.put(&key3, value3);
+
+ cache.drop_table_entries(&table_ref1).unwrap();
+
+ assert!(!cache.contains_key(&key1));
+ assert!(!cache.contains_key(&key2));
+ assert!(cache.contains_key(&key3));
+ }
}
diff --git a/datafusion/execution/src/cache/mod.rs
b/datafusion/execution/src/cache/mod.rs
index 8172069fdb..93b9f0520b 100644
--- a/datafusion/execution/src/cache/mod.rs
+++ b/datafusion/execution/src/cache/mod.rs
@@ -24,6 +24,7 @@ mod list_files_cache;
pub use file_metadata_cache::DefaultFilesMetadataCache;
pub use list_files_cache::DefaultListFilesCache;
+pub use list_files_cache::TableScopedPath;
/// A trait that can be implemented to provide custom cache behavior for the
caches managed by
/// [`cache_manager::CacheManager`].
diff --git a/docs/source/user-guide/cli/functions.md
b/docs/source/user-guide/cli/functions.md
index f3b0163534..ea353d5c8d 100644
--- a/docs/source/user-guide/cli/functions.md
+++ b/docs/source/user-guide/cli/functions.md
@@ -170,5 +170,55 @@ The columns of the returned table are:
| table_size_bytes | Utf8 | Size of the table, in bytes
|
| statistics_size_bytes | UInt64 | Size of the cached statistics in memory
|
+## `list_files_cache`
+
+The `list_files_cache` function shows information about the `ListFilesCache`
that is used by the [`ListingTable`] implementation in DataFusion. When
creating a [`ListingTable`], DataFusion lists the files in the table's location
and caches results in the `ListFilesCache`. Subsequent queries against the same
table can reuse this cached information instead of re-listing the files. Cache
entries are scoped to tables.
+
+You can inspect the cache by querying the `list_files_cache` function. For
example,
+
+```sql
+> set datafusion.runtime.list_files_cache_ttl = "30s";
+> create external table overturemaps
+stored as parquet
+location
's3://overturemaps-us-west-2/release/2025-12-17.0/theme=base/type=infrastructure';
+0 row(s) fetched.
+> select table, path, metadata_size_bytes, expires_in,
unnest(metadata_list)['file_size_bytes'] as file_size_bytes,
unnest(metadata_list)['e_tag'] as e_tag from list_files_cache() limit 10;
++--------------+-----------------------------------------------------+---------------------+-----------------------------------+-----------------+---------------------------------------+
+| table | path |
metadata_size_bytes | expires_in | file_size_bytes |
e_tag |
++--------------+-----------------------------------------------------+---------------------+-----------------------------------+-----------------+---------------------------------------+
+| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750
| 0 days 0 hours 0 mins 25.264 secs | 999055952 |
"35fc8fbe8400960b54c66fbb408c48e8-60" |
+| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750
| 0 days 0 hours 0 mins 25.264 secs | 975592768 |
"8a16e10b722681cdc00242564b502965-59" |
+| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750
| 0 days 0 hours 0 mins 25.264 secs | 1082925747 |
"24cd13ddb5e0e438952d2499f5dabe06-65" |
+| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750
| 0 days 0 hours 0 mins 25.264 secs | 1008425557 |
"37663e31c7c64d4ef355882bcd47e361-61" |
+| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750
| 0 days 0 hours 0 mins 25.264 secs | 1065561905 |
"4e7c50d2d1b3c5ed7b82b4898f5ac332-64" |
+| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750
| 0 days 0 hours 0 mins 25.264 secs | 1045655427 |
"8fff7e6a72d375eba668727c55d4f103-63" |
+| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750
| 0 days 0 hours 0 mins 25.264 secs | 1086822683 |
"b67167d8022d778936c330a52a5f1922-65" |
+| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750
| 0 days 0 hours 0 mins 25.264 secs | 1016732378 |
"6d70857a0473ed9ed3fc6e149814168b-61" |
+| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750
| 0 days 0 hours 0 mins 25.264 secs | 991363784 |
"c9cafb42fcbb413f851691c895dd7c2b-60" |
+| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750
| 0 days 0 hours 0 mins 25.264 secs | 1032469715 |
"7540252d0d67158297a67038a3365e0f-62" |
++--------------+-----------------------------------------------------+---------------------+-----------------------------------+-----------------+---------------------------------------+
+```
+
+The columns of the returned table are:
+| column_name | data_type | Description |
+| ------------------- | ------------ |
-----------------------------------------------------------------------------------------
|
+| table | Utf8 | Name of the table |
+| path | Utf8 | File path relative to the object store / filesystem root |
+| metadata_size_bytes | UInt64 | Size of the cached metadata in memory (not
its thrift encoded form) |
+| expires_in | Duration(ms) | Last modified time of the file |
+| metadata_list | List(Struct) | List of metadatas, one for each file under
the path. |
+
+A metadata struct in the metadata_list contains the following fields:
+
+```text
+{
+ "file_path":
"release/2025-12-17.0/theme=base/type=infrastructure/part-00000-d556e455-e0c5-4940-b367-daff3287a952-c000.zstd.parquet",
+ "file_modified": "2025-12-17T22:20:29",
+ "file_size_bytes": 999055952,
+ "e_tag": "35fc8fbe8400960b54c66fbb408c48e8-60",
+ "version": null
+}
+```
+
[`listingtable`]:
https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
[entity tag]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]