This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-52 by this push:
     new 1ac18a3aff [branch-52] Backport `list_files_cache`, and make default 
ListingFilesCache table scoped (#19704)
1ac18a3aff is described below

commit 1ac18a3affa740051b7e0c6375a2aa1b42fce2d9
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Jan 9 04:14:16 2026 -0500

    [branch-52] Backport `list_files_cache`, and make default ListingFilesCache 
table scoped (#19704)
    
    ## Which issue does this PR close?
    
    - part of https://github.com/apache/datafusion/issues/18566
    
    ## Rationale for this change
    
    Backport the fix for this regression into 52 release branch:
    -  https://github.com/apache/datafusion/issues/19573
    
    ## What changes are included in this PR?
    
    Backport these two commits to `branch-52` (cherry-pick was clean)
    - 1037f0a / #19388
    - e6049de / #19616
    
    <details><summary>Commands</summary>
    <p>
    
    ```shell
    andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion$ git cherry-pick 
1037f0a
    [branch-52 1fc70ac20] feat: add list_files_cache table function for 
`datafusion-cli` (#19388)
     Author: jizezhang <[email protected]>
     Date: Tue Jan 6 05:23:39 2026 -0800
     5 files changed, 446 insertions(+), 31 deletions(-)
    andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion$ git cherry-pick  
e6049de
    Auto-merging datafusion/core/src/execution/context/mod.rs
    [branch-52 aa3d413f0] Make default ListingFilesCache table scoped (#19616)
     Author: jizezhang <[email protected]>
     Date: Thu Jan 8 06:34:10 2026 -0800
     10 files changed, 474 insertions(+), 184 deletions(-)
    ```
    
    </p>
    </details>
    
    ## Are these changes tested?
    
    By CI and new tests
    
    ## Are there any user-facing changes?
    
    A new datafusion-cli function and dropping a external table now clears
    the listing cache
    
    ---------
    
    Co-authored-by: jizezhang <[email protected]>
---
 datafusion-cli/src/functions.rs                    | 185 ++++++-
 datafusion-cli/src/main.rs                         | 113 +++-
 datafusion/catalog-listing/src/table.rs            |   7 +-
 .../core/src/datasource/listing_table_factory.rs   |   7 +-
 datafusion/core/src/execution/context/mod.rs       |   7 +-
 datafusion/datasource/src/url.rs                   |  49 +-
 datafusion/execution/src/cache/cache_manager.rs    |  10 +-
 datafusion/execution/src/cache/list_files_cache.rs | 566 ++++++++++++++++-----
 datafusion/execution/src/cache/mod.rs              |   1 +
 docs/source/user-guide/cli/functions.md            |  50 ++
 10 files changed, 850 insertions(+), 145 deletions(-)

diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs
index a45d57e8e9..8a6ad448d8 100644
--- a/datafusion-cli/src/functions.rs
+++ b/datafusion-cli/src/functions.rs
@@ -17,13 +17,18 @@
 
 //! Functions that are query-able and searchable via the `\h` command
 
+use datafusion_common::instant::Instant;
 use std::fmt;
 use std::fs::File;
 use std::str::FromStr;
 use std::sync::Arc;
 
-use arrow::array::{Int64Array, StringArray, TimestampMillisecondArray, 
UInt64Array};
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
+use arrow::array::{
+    DurationMillisecondArray, GenericListArray, Int64Array, StringArray, 
StructArray,
+    TimestampMillisecondArray, UInt64Array,
+};
+use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer};
+use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};
 use arrow::record_batch::RecordBatch;
 use arrow::util::pretty::pretty_format_batches;
 use datafusion::catalog::{Session, TableFunctionImpl};
@@ -697,3 +702,179 @@ impl TableFunctionImpl for StatisticsCacheFunc {
         Ok(Arc::new(statistics_cache))
     }
 }
+
+// Implementation of the `list_files_cache` table function in datafusion-cli.
+///
+/// This function returns the cached results of running a LIST command on a 
particular object store path for a table. The object metadata is returned as a 
List of Structs, with one Struct for each object.
+/// DataFusion uses these cached results to plan queries against external 
tables.
+/// # Schema
+/// ```sql
+/// > describe select * from list_files_cache();
+/// 
+---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
+/// | column_name         | data_type                                          
                                                                                
                                      | is_nullable |
+/// 
+---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
+/// | table               | Utf8                                               
                                                                                
                                      | NO          |
+/// | path                | Utf8                                               
                                                                                
                                      | NO          |
+/// | metadata_size_bytes | UInt64                                             
                                                                                
                                      | NO          |
+/// | expires_in          | Duration(ms)                                       
                                                                                
                                      | YES         |
+/// | metadata_list       | List(Struct("file_path": non-null Utf8, 
"file_modified": non-null Timestamp(ms), "file_size_bytes": non-null UInt64, 
"e_tag": Utf8, "version": Utf8), field: 'metadata') | YES         |
+/// 
+---------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+
+/// ```
+#[derive(Debug)]
+struct ListFilesCacheTable {
+    schema: SchemaRef,
+    batch: RecordBatch,
+}
+
+#[async_trait]
+impl TableProvider for ListFilesCacheTable {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn schema(&self) -> arrow::datatypes::SchemaRef {
+        self.schema.clone()
+    }
+
+    fn table_type(&self) -> datafusion::logical_expr::TableType {
+        datafusion::logical_expr::TableType::Base
+    }
+
+    async fn scan(
+        &self,
+        _state: &dyn Session,
+        projection: Option<&Vec<usize>>,
+        _filters: &[Expr],
+        _limit: Option<usize>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(MemorySourceConfig::try_new_exec(
+            &[vec![self.batch.clone()]],
+            TableProvider::schema(self),
+            projection.cloned(),
+        )?)
+    }
+}
+
+#[derive(Debug)]
+pub struct ListFilesCacheFunc {
+    cache_manager: Arc<CacheManager>,
+}
+
+impl ListFilesCacheFunc {
+    pub fn new(cache_manager: Arc<CacheManager>) -> Self {
+        Self { cache_manager }
+    }
+}
+
+impl TableFunctionImpl for ListFilesCacheFunc {
+    fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
+        if !exprs.is_empty() {
+            return plan_err!("list_files_cache should have no arguments");
+        }
+
+        let nested_fields = Fields::from(vec![
+            Field::new("file_path", DataType::Utf8, false),
+            Field::new(
+                "file_modified",
+                DataType::Timestamp(TimeUnit::Millisecond, None),
+                false,
+            ),
+            Field::new("file_size_bytes", DataType::UInt64, false),
+            Field::new("e_tag", DataType::Utf8, true),
+            Field::new("version", DataType::Utf8, true),
+        ]);
+
+        let metadata_field =
+            Field::new("metadata", DataType::Struct(nested_fields.clone()), 
true);
+
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("table", DataType::Utf8, false),
+            Field::new("path", DataType::Utf8, false),
+            Field::new("metadata_size_bytes", DataType::UInt64, false),
+            // expires field in ListFilesEntry has type Instant when set, from 
which we cannot get "the number of seconds", hence using Duration instead of 
Timestamp as data type.
+            Field::new(
+                "expires_in",
+                DataType::Duration(TimeUnit::Millisecond),
+                true,
+            ),
+            Field::new(
+                "metadata_list",
+                DataType::List(Arc::new(metadata_field.clone())),
+                true,
+            ),
+        ]));
+
+        let mut table_arr = vec![];
+        let mut path_arr = vec![];
+        let mut metadata_size_bytes_arr = vec![];
+        let mut expires_arr = vec![];
+
+        let mut file_path_arr = vec![];
+        let mut file_modified_arr = vec![];
+        let mut file_size_bytes_arr = vec![];
+        let mut etag_arr = vec![];
+        let mut version_arr = vec![];
+        let mut offsets: Vec<i32> = vec![0];
+
+        if let Some(list_files_cache) = 
self.cache_manager.get_list_files_cache() {
+            let now = Instant::now();
+            let mut current_offset: i32 = 0;
+
+            for (path, entry) in list_files_cache.list_entries() {
+                table_arr.push(path.table.map_or("NULL".to_string(), |t| 
t.to_string()));
+                path_arr.push(path.path.to_string());
+                metadata_size_bytes_arr.push(entry.size_bytes as u64);
+                // calculates time left before entry expires
+                expires_arr.push(
+                    entry
+                        .expires
+                        .map(|t| t.duration_since(now).as_millis() as i64),
+                );
+
+                for meta in entry.metas.iter() {
+                    file_path_arr.push(meta.location.to_string());
+                    
file_modified_arr.push(meta.last_modified.timestamp_millis());
+                    file_size_bytes_arr.push(meta.size);
+                    etag_arr.push(meta.e_tag.clone());
+                    version_arr.push(meta.version.clone());
+                }
+                current_offset += entry.metas.len() as i32;
+                offsets.push(current_offset);
+            }
+        }
+
+        let struct_arr = StructArray::new(
+            nested_fields,
+            vec![
+                Arc::new(StringArray::from(file_path_arr)),
+                Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
+                Arc::new(UInt64Array::from(file_size_bytes_arr)),
+                Arc::new(StringArray::from(etag_arr)),
+                Arc::new(StringArray::from(version_arr)),
+            ],
+            None,
+        );
+
+        let offsets_buffer: OffsetBuffer<i32> =
+            OffsetBuffer::new(ScalarBuffer::from(Buffer::from_vec(offsets)));
+
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![
+                Arc::new(StringArray::from(table_arr)),
+                Arc::new(StringArray::from(path_arr)),
+                Arc::new(UInt64Array::from(metadata_size_bytes_arr)),
+                Arc::new(DurationMillisecondArray::from(expires_arr)),
+                Arc::new(GenericListArray::new(
+                    Arc::new(metadata_field),
+                    offsets_buffer,
+                    Arc::new(struct_arr),
+                    None,
+                )),
+            ],
+        )?;
+
+        let list_files_cache = ListFilesCacheTable { schema, batch };
+        Ok(Arc::new(list_files_cache))
+    }
+}
diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index 8f69ae4779..9e53260e42 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -32,7 +32,7 @@ use datafusion::logical_expr::ExplainFormat;
 use datafusion::prelude::SessionContext;
 use datafusion_cli::catalog::DynamicObjectStoreCatalog;
 use datafusion_cli::functions::{
-    MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc,
+    ListFilesCacheFunc, MetadataCacheFunc, ParquetMetadataFunc, 
StatisticsCacheFunc,
 };
 use datafusion_cli::object_storage::instrumented::{
     InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
@@ -253,6 +253,13 @@ async fn main_inner() -> Result<()> {
         )),
     );
 
+    ctx.register_udtf(
+        "list_files_cache",
+        Arc::new(ListFilesCacheFunc::new(
+            ctx.task_ctx().runtime_env().cache_manager.clone(),
+        )),
+    );
+
     let mut print_options = PrintOptions {
         format: args.format,
         quiet: args.quiet,
@@ -431,15 +438,20 @@ pub fn extract_disk_limit(size: &str) -> Result<usize, 
String> {
 
 #[cfg(test)]
 mod tests {
+    use std::time::Duration;
+
     use super::*;
     use datafusion::{
         common::test_util::batches_to_string,
         execution::cache::{
-            cache_manager::CacheManagerConfig, 
cache_unit::DefaultFileStatisticsCache,
+            DefaultListFilesCache, cache_manager::CacheManagerConfig,
+            cache_unit::DefaultFileStatisticsCache,
         },
-        prelude::ParquetReadOptions,
+        prelude::{ParquetReadOptions, col, lit, split_part},
     };
     use insta::assert_snapshot;
+    use object_store::memory::InMemory;
+    use url::Url;
 
     fn assert_conversion(input: &str, expected: Result<usize, String>) {
         let result = extract_memory_pool_size(input);
@@ -741,4 +753,99 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_list_files_cache() -> Result<(), DataFusionError> {
+        let list_files_cache = Arc::new(DefaultListFilesCache::new(
+            1024,
+            Some(Duration::from_secs(1)),
+        ));
+
+        let rt = RuntimeEnvBuilder::new()
+            .with_cache_manager(
+                CacheManagerConfig::default()
+                    .with_list_files_cache(Some(list_files_cache)),
+            )
+            .build_arc()
+            .unwrap();
+
+        let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), 
rt);
+
+        ctx.register_object_store(
+            &Url::parse("mem://test_table").unwrap(),
+            Arc::new(InMemory::new()),
+        );
+
+        ctx.register_udtf(
+            "list_files_cache",
+            Arc::new(ListFilesCacheFunc::new(
+                ctx.task_ctx().runtime_env().cache_manager.clone(),
+            )),
+        );
+
+        ctx.sql(
+            "CREATE EXTERNAL TABLE src_table
+            STORED AS PARQUET
+            LOCATION '../parquet-testing/data/alltypes_plain.parquet'",
+        )
+        .await?
+        .collect()
+        .await?;
+
+        ctx.sql("COPY (SELECT * FROM src_table) TO 
'mem://test_table/0.parquet' STORED AS PARQUET").await?.collect().await?;
+
+        ctx.sql("COPY (SELECT * FROM src_table) TO 
'mem://test_table/1.parquet' STORED AS PARQUET").await?.collect().await?;
+
+        ctx.sql(
+            "CREATE EXTERNAL TABLE test_table 
+            STORED AS PARQUET
+            LOCATION 'mem://test_table/'
+        ",
+        )
+        .await?
+        .collect()
+        .await?;
+
+        let sql = "SELECT metadata_size_bytes, expires_in, metadata_list FROM 
list_files_cache()";
+        let df = ctx
+            .sql(sql)
+            .await?
+            .unnest_columns(&["metadata_list"])?
+            .with_column_renamed("metadata_list", "metadata")?
+            .unnest_columns(&["metadata"])?;
+
+        assert_eq!(
+            2,
+            df.clone()
+                .filter(col("expires_in").is_not_null())?
+                .count()
+                .await?
+        );
+
+        let df = df
+            .with_column_renamed(r#""metadata.file_size_bytes""#, 
"file_size_bytes")?
+            .with_column_renamed(r#""metadata.e_tag""#, "etag")?
+            .with_column(
+                "filename",
+                split_part(col(r#""metadata.file_path""#), lit("/"), lit(-1)),
+            )?
+            .select_columns(&[
+                "metadata_size_bytes",
+                "filename",
+                "file_size_bytes",
+                "etag",
+            ])?
+            .sort(vec![col("filename").sort(true, false)])?;
+        let rbs = df.collect().await?;
+        assert_snapshot!(batches_to_string(&rbs),@r"
+        +---------------------+-----------+-----------------+------+
+        | metadata_size_bytes | filename  | file_size_bytes | etag |
+        +---------------------+-----------+-----------------+------+
+        | 212                 | 0.parquet | 3645            | 0    |
+        | 212                 | 1.parquet | 3645            | 1    |
+        +---------------------+-----------+-----------------+------+
+        ");
+
+        Ok(())
+    }
 }
diff --git a/datafusion/catalog-listing/src/table.rs 
b/datafusion/catalog-listing/src/table.rs
index 9fb2dd2dce..a175d47f4d 100644
--- a/datafusion/catalog-listing/src/table.rs
+++ b/datafusion/catalog-listing/src/table.rs
@@ -34,6 +34,7 @@ use 
datafusion_datasource::schema_adapter::SchemaAdapterFactory;
 use datafusion_datasource::{
     ListingTableUrl, PartitionedFile, TableSchema, 
compute_all_files_statistics,
 };
+use datafusion_execution::cache::TableScopedPath;
 use datafusion_execution::cache::cache_manager::FileStatisticsCache;
 use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
 use datafusion_expr::dml::InsertOp;
@@ -565,7 +566,11 @@ impl TableProvider for ListingTable {
 
         // Invalidate cache entries for this table if they exist
         if let Some(lfc) = 
state.runtime_env().cache_manager.get_list_files_cache() {
-            let _ = lfc.remove(table_path.prefix());
+            let key = TableScopedPath {
+                table: table_path.get_table_ref().clone(),
+                path: table_path.prefix().clone(),
+            };
+            let _ = lfc.remove(&key);
         }
 
         // Sink related option, apart from format
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs 
b/datafusion/core/src/datasource/listing_table_factory.rs
index 3ca388af0c..86af691fd7 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -63,7 +63,8 @@ impl TableProviderFactory for ListingTableFactory {
             ))?
             .create(session_state, &cmd.options)?;
 
-        let mut table_path = ListingTableUrl::parse(&cmd.location)?;
+        let mut table_path =
+            
ListingTableUrl::parse(&cmd.location)?.with_table_ref(cmd.name.clone());
         let file_extension = match table_path.is_collection() {
             // Setting the extension to be empty instead of allowing the 
default extension seems
             // odd, but was done to ensure existing behavior isn't modified. 
It seems like this
@@ -160,7 +161,9 @@ impl TableProviderFactory for ListingTableFactory {
                         }
                         None => format!("*.{}", cmd.file_type.to_lowercase()),
                     };
-                    table_path = table_path.with_glob(glob.as_ref())?;
+                    table_path = table_path
+                        .with_glob(glob.as_ref())?
+                        .with_table_ref(cmd.name.clone());
                 }
                 let schema = options.infer_schema(session_state, 
&table_path).await?;
                 let df_schema = Arc::clone(&schema).to_dfschema()?;
diff --git a/datafusion/core/src/execution/context/mod.rs 
b/datafusion/core/src/execution/context/mod.rs
index a769bb01b4..6df90b205c 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -1315,7 +1315,7 @@ impl SessionContext {
         let table = table_ref.table().to_owned();
         let maybe_schema = {
             let state = self.state.read();
-            let resolved = state.resolve_table_ref(table_ref);
+            let resolved = state.resolve_table_ref(table_ref.clone());
             state
                 .catalog_list()
                 .catalog(&resolved.catalog)
@@ -1327,6 +1327,11 @@ impl SessionContext {
             && table_provider.table_type() == table_type
         {
             schema.deregister_table(&table)?;
+            if table_type == TableType::Base
+                && let Some(lfc) = 
self.runtime_env().cache_manager.get_list_files_cache()
+            {
+                lfc.drop_table_entries(&Some(table_ref))?;
+            }
             return Ok(true);
         }
 
diff --git a/datafusion/datasource/src/url.rs b/datafusion/datasource/src/url.rs
index 155d6efe46..2428275ac3 100644
--- a/datafusion/datasource/src/url.rs
+++ b/datafusion/datasource/src/url.rs
@@ -17,7 +17,8 @@
 
 use std::sync::Arc;
 
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::{DataFusionError, Result, TableReference};
+use datafusion_execution::cache::TableScopedPath;
 use datafusion_execution::object_store::ObjectStoreUrl;
 use datafusion_session::Session;
 
@@ -41,6 +42,8 @@ pub struct ListingTableUrl {
     prefix: Path,
     /// An optional glob expression used to filter files
     glob: Option<Pattern>,
+
+    table_ref: Option<TableReference>,
 }
 
 impl ListingTableUrl {
@@ -145,7 +148,12 @@ impl ListingTableUrl {
     /// to create a [`ListingTableUrl`].
     pub fn try_new(url: Url, glob: Option<Pattern>) -> Result<Self> {
         let prefix = Path::from_url_path(url.path())?;
-        Ok(Self { url, prefix, glob })
+        Ok(Self {
+            url,
+            prefix,
+            glob,
+            table_ref: None,
+        })
     }
 
     /// Returns the URL scheme
@@ -255,7 +263,14 @@ impl ListingTableUrl {
         };
 
         let list: BoxStream<'a, Result<ObjectMeta>> = if self.is_collection() {
-            list_with_cache(ctx, store, &self.prefix, prefix.as_ref()).await?
+            list_with_cache(
+                ctx,
+                store,
+                self.table_ref.as_ref(),
+                &self.prefix,
+                prefix.as_ref(),
+            )
+            .await?
         } else {
             match store.head(&full_prefix).await {
                 Ok(meta) => futures::stream::once(async { Ok(meta) })
@@ -264,7 +279,14 @@ impl ListingTableUrl {
                 // If the head command fails, it is likely that object doesn't 
exist.
                 // Retry as though it were a prefix (aka a collection)
                 Err(object_store::Error::NotFound { .. }) => {
-                    list_with_cache(ctx, store, &self.prefix, 
prefix.as_ref()).await?
+                    list_with_cache(
+                        ctx,
+                        store,
+                        self.table_ref.as_ref(),
+                        &self.prefix,
+                        prefix.as_ref(),
+                    )
+                    .await?
                 }
                 Err(e) => return Err(e.into()),
             }
@@ -323,6 +345,15 @@ impl ListingTableUrl {
             Pattern::new(glob).map_err(|e| 
DataFusionError::External(Box::new(e)))?;
         Self::try_new(self.url, Some(glob))
     }
+
+    pub fn with_table_ref(mut self, table_ref: TableReference) -> Self {
+        self.table_ref = Some(table_ref);
+        self
+    }
+
+    pub fn get_table_ref(&self) -> &Option<TableReference> {
+        &self.table_ref
+    }
 }
 
 /// Lists files with cache support, using prefix-aware lookups.
@@ -345,6 +376,7 @@ impl ListingTableUrl {
 async fn list_with_cache<'b>(
     ctx: &'b dyn Session,
     store: &'b dyn ObjectStore,
+    table_ref: Option<&TableReference>,
     table_base_path: &Path,
     prefix: Option<&Path>,
 ) -> Result<BoxStream<'b, Result<ObjectMeta>>> {
@@ -367,9 +399,14 @@ async fn list_with_cache<'b>(
             // Convert prefix to Option<Path> for cache lookup
             let prefix_filter = prefix.cloned();
 
+            let table_scoped_base_path = TableScopedPath {
+                table: table_ref.cloned(),
+                path: table_base_path.clone(),
+            };
+
             // Try cache lookup with optional prefix filter
             let vec = if let Some(res) =
-                cache.get_with_extra(table_base_path, &prefix_filter)
+                cache.get_with_extra(&table_scoped_base_path, &prefix_filter)
             {
                 debug!("Hit list files cache");
                 res.as_ref().clone()
@@ -380,7 +417,7 @@ async fn list_with_cache<'b>(
                     .list(Some(table_base_path))
                     .try_collect::<Vec<ObjectMeta>>()
                     .await?;
-                cache.put(table_base_path, Arc::new(vec.clone()));
+                cache.put(&table_scoped_base_path, Arc::new(vec.clone()));
 
                 // If a prefix filter was requested, apply it to the results
                 if prefix.is_some() {
diff --git a/datafusion/execution/src/cache/cache_manager.rs 
b/datafusion/execution/src/cache/cache_manager.rs
index c76a68c651..162074d909 100644
--- a/datafusion/execution/src/cache/cache_manager.rs
+++ b/datafusion/execution/src/cache/cache_manager.rs
@@ -16,7 +16,10 @@
 // under the License.
 
 use crate::cache::cache_unit::DefaultFilesMetadataCache;
+use crate::cache::list_files_cache::ListFilesEntry;
+use crate::cache::list_files_cache::TableScopedPath;
 use crate::cache::{CacheAccessor, DefaultListFilesCache};
+use datafusion_common::TableReference;
 use datafusion_common::stats::Precision;
 use datafusion_common::{Result, Statistics};
 use object_store::ObjectMeta;
@@ -80,7 +83,7 @@ pub struct FileStatisticsCacheEntry {
 ///
 /// See [`crate::runtime_env::RuntimeEnv`] for more details.
 pub trait ListFilesCache:
-    CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = Option<Path>>
+    CacheAccessor<TableScopedPath, Arc<Vec<ObjectMeta>>, Extra = Option<Path>>
 {
     /// Returns the cache's memory limit in bytes.
     fn cache_limit(&self) -> usize;
@@ -93,6 +96,11 @@ pub trait ListFilesCache:
 
     /// Updates the cache with a new TTL (time-to-live).
     fn update_cache_ttl(&self, ttl: Option<Duration>);
+
+    /// Retrieves the information about the entries currently cached.
+    fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry>;
+
+    fn drop_table_entries(&self, table_ref: &Option<TableReference>) -> 
Result<()>;
 }
 
 /// Generic file-embedded metadata used with [`FileMetadataCache`].
diff --git a/datafusion/execution/src/cache/list_files_cache.rs 
b/datafusion/execution/src/cache/list_files_cache.rs
index 661bc47b54..858219e5b8 100644
--- a/datafusion/execution/src/cache/list_files_cache.rs
+++ b/datafusion/execution/src/cache/list_files_cache.rs
@@ -17,10 +17,12 @@
 
 use std::mem::size_of;
 use std::{
+    collections::HashMap,
     sync::{Arc, Mutex},
     time::Duration,
 };
 
+use datafusion_common::TableReference;
 use datafusion_common::instant::Instant;
 use object_store::{ObjectMeta, path::Path};
 
@@ -103,10 +105,11 @@ impl DefaultListFilesCache {
     }
 }
 
-struct ListFilesEntry {
-    metas: Arc<Vec<ObjectMeta>>,
-    size_bytes: usize,
-    expires: Option<Instant>,
+#[derive(Clone, PartialEq, Debug)]
+pub struct ListFilesEntry {
+    pub metas: Arc<Vec<ObjectMeta>>,
+    pub size_bytes: usize,
+    pub expires: Option<Instant>,
 }
 
 impl ListFilesEntry {
@@ -146,9 +149,15 @@ pub const DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT: usize = 
1024 * 1024; // 1MiB
 /// The default cache TTL for the [`DefaultListFilesCache`]
 pub const DEFAULT_LIST_FILES_CACHE_TTL: Option<Duration> = None; // Infinite
 
+#[derive(PartialEq, Eq, Hash, Clone, Debug)]
+pub struct TableScopedPath {
+    pub table: Option<TableReference>,
+    pub path: Path,
+}
+
 /// Handles the inner state of the [`DefaultListFilesCache`] struct.
 pub struct DefaultListFilesCacheState {
-    lru_queue: LruQueue<Path, ListFilesEntry>,
+    lru_queue: LruQueue<TableScopedPath, ListFilesEntry>,
     memory_limit: usize,
     memory_used: usize,
     ttl: Option<Duration>,
@@ -196,17 +205,17 @@ impl DefaultListFilesCacheState {
     /// ```
     fn get_with_prefix(
         &mut self,
-        table_base: &Path,
+        table_scoped_base_path: &TableScopedPath,
         prefix: Option<&Path>,
         now: Instant,
     ) -> Option<Arc<Vec<ObjectMeta>>> {
-        let entry = self.lru_queue.get(table_base)?;
+        let entry = self.lru_queue.get(table_scoped_base_path)?;
 
         // Check expiration
         if let Some(exp) = entry.expires
             && now > exp
         {
-            self.remove(table_base);
+            self.remove(table_scoped_base_path);
             return None;
         }
 
@@ -216,6 +225,7 @@ impl DefaultListFilesCacheState {
         };
 
         // Build the full prefix path: table_base/prefix
+        let table_base = &table_scoped_base_path.path;
         let mut parts: Vec<_> = table_base.parts().collect();
         parts.extend(prefix.parts());
         let full_prefix = Path::from_iter(parts);
@@ -241,7 +251,7 @@ impl DefaultListFilesCacheState {
     /// If the entry has expired by `now` it is removed from the cache.
     ///
     /// The LRU queue is not updated.
-    fn contains_key(&mut self, k: &Path, now: Instant) -> bool {
+    fn contains_key(&mut self, k: &TableScopedPath, now: Instant) -> bool {
         let Some(entry) = self.lru_queue.peek(k) else {
             return false;
         };
@@ -262,7 +272,7 @@ impl DefaultListFilesCacheState {
     /// If the size of the entry is greater than the `memory_limit`, the value 
is not inserted.
     fn put(
         &mut self,
-        key: &Path,
+        key: &TableScopedPath,
         value: Arc<Vec<ObjectMeta>>,
         now: Instant,
     ) -> Option<Arc<Vec<ObjectMeta>>> {
@@ -304,7 +314,7 @@ impl DefaultListFilesCacheState {
     }
 
     /// Removes an entry from the cache and returns it, if it exists.
-    fn remove(&mut self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
+    fn remove(&mut self, k: &TableScopedPath) -> Option<Arc<Vec<ObjectMeta>>> {
         if let Some(entry) = self.lru_queue.remove(k) {
             self.memory_used -= entry.size_bytes;
             Some(entry.metas)
@@ -347,15 +357,41 @@ impl ListFilesCache for DefaultListFilesCache {
         state.ttl = ttl;
         state.evict_entries();
     }
+
+    fn list_entries(&self) -> HashMap<TableScopedPath, ListFilesEntry> {
+        let state = self.state.lock().unwrap();
+        let mut entries = HashMap::<TableScopedPath, ListFilesEntry>::new();
+        for (path, entry) in state.lru_queue.list_entries() {
+            entries.insert(path.clone(), entry.clone());
+        }
+        entries
+    }
+
+    fn drop_table_entries(
+        &self,
+        table_ref: &Option<TableReference>,
+    ) -> datafusion_common::Result<()> {
+        let mut state = self.state.lock().unwrap();
+        let mut table_paths = vec![];
+        for (path, _) in state.lru_queue.list_entries() {
+            if path.table == *table_ref {
+                table_paths.push(path.clone());
+            }
+        }
+        for path in table_paths {
+            state.remove(&path);
+        }
+        Ok(())
+    }
 }
 
-impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {
+impl CacheAccessor<TableScopedPath, Arc<Vec<ObjectMeta>>> for 
DefaultListFilesCache {
     type Extra = Option<Path>;
 
     /// Gets all files for the given table base path.
     ///
     /// This is equivalent to calling `get_with_extra(k, &None)`.
-    fn get(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
+    fn get(&self, k: &TableScopedPath) -> Option<Arc<Vec<ObjectMeta>>> {
         self.get_with_extra(k, &None)
     }
 
@@ -374,17 +410,17 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for 
DefaultListFilesCache {
     /// can serve queries for any partition subset without additional storage 
calls.
     fn get_with_extra(
         &self,
-        table_base: &Path,
+        table_scoped_path: &TableScopedPath,
         prefix: &Self::Extra,
     ) -> Option<Arc<Vec<ObjectMeta>>> {
         let mut state = self.state.lock().unwrap();
         let now = self.time_provider.now();
-        state.get_with_prefix(table_base, prefix.as_ref(), now)
+        state.get_with_prefix(table_scoped_path, prefix.as_ref(), now)
     }
 
     fn put(
         &self,
-        key: &Path,
+        key: &TableScopedPath,
         value: Arc<Vec<ObjectMeta>>,
     ) -> Option<Arc<Vec<ObjectMeta>>> {
         let mut state = self.state.lock().unwrap();
@@ -394,19 +430,19 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for 
DefaultListFilesCache {
 
     fn put_with_extra(
         &self,
-        key: &Path,
+        key: &TableScopedPath,
         value: Arc<Vec<ObjectMeta>>,
         _e: &Self::Extra,
     ) -> Option<Arc<Vec<ObjectMeta>>> {
         self.put(key, value)
     }
 
-    fn remove(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
+    fn remove(&self, k: &TableScopedPath) -> Option<Arc<Vec<ObjectMeta>>> {
         let mut state = self.state.lock().unwrap();
         state.remove(k)
     }
 
-    fn contains_key(&self, k: &Path) -> bool {
+    fn contains_key(&self, k: &TableScopedPath) -> bool {
         let mut state = self.state.lock().unwrap();
         let now = self.time_provider.now();
         state.contains_key(k, now)
@@ -431,7 +467,6 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for 
DefaultListFilesCache {
 mod tests {
     use super::*;
     use chrono::DateTime;
-    use std::thread;
 
     struct MockTimeProvider {
         base: Instant,
@@ -499,43 +534,79 @@ mod tests {
     #[test]
     fn test_basic_operations() {
         let cache = DefaultListFilesCache::default();
+        let table_ref = Some(TableReference::from("table"));
         let path = Path::from("test_path");
+        let key = TableScopedPath {
+            table: table_ref.clone(),
+            path,
+        };
 
         // Initially cache is empty
-        assert!(cache.get(&path).is_none());
-        assert!(!cache.contains_key(&path));
+        assert!(cache.get(&key).is_none());
+        assert!(!cache.contains_key(&key));
         assert_eq!(cache.len(), 0);
 
         // Put an entry
         let meta = create_test_object_meta("file1", 50);
         let value = Arc::new(vec![meta.clone()]);
-        cache.put(&path, Arc::clone(&value));
+        cache.put(&key, Arc::clone(&value));
 
         // Entry should be retrievable
-        assert!(cache.contains_key(&path));
+        assert!(cache.contains_key(&key));
         assert_eq!(cache.len(), 1);
-        let retrieved = cache.get(&path).unwrap();
+        let retrieved = cache.get(&key).unwrap();
         assert_eq!(retrieved.len(), 1);
         assert_eq!(retrieved[0].location, meta.location);
 
         // Remove the entry
-        let removed = cache.remove(&path).unwrap();
+        let removed = cache.remove(&key).unwrap();
         assert_eq!(removed.len(), 1);
-        assert!(!cache.contains_key(&path));
+        assert!(!cache.contains_key(&key));
         assert_eq!(cache.len(), 0);
 
         // Put multiple entries
-        let (path1, value1, _) = create_test_list_files_entry("path1", 2, 50);
-        let (path2, value2, _) = create_test_list_files_entry("path2", 3, 50);
-        cache.put(&path1, value1);
-        cache.put(&path2, value2);
+        let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 
50);
+        let (path2, value2, size2) = create_test_list_files_entry("path2", 3, 
50);
+        let key1 = TableScopedPath {
+            table: table_ref.clone(),
+            path: path1,
+        };
+        let key2 = TableScopedPath {
+            table: table_ref,
+            path: path2,
+        };
+        cache.put(&key1, Arc::clone(&value1));
+        cache.put(&key2, Arc::clone(&value2));
         assert_eq!(cache.len(), 2);
 
+        // List cache entries
+        assert_eq!(
+            cache.list_entries(),
+            HashMap::from([
+                (
+                    key1.clone(),
+                    ListFilesEntry {
+                        metas: value1,
+                        size_bytes: size1,
+                        expires: None,
+                    }
+                ),
+                (
+                    key2.clone(),
+                    ListFilesEntry {
+                        metas: value2,
+                        size_bytes: size2,
+                        expires: None,
+                    }
+                )
+            ])
+        );
+
         // Clear all entries
         cache.clear();
         assert_eq!(cache.len(), 0);
-        assert!(!cache.contains_key(&path1));
-        assert!(!cache.contains_key(&path2));
+        assert!(!cache.contains_key(&key1));
+        assert!(!cache.contains_key(&key2));
     }
 
     #[test]
@@ -547,24 +618,42 @@ mod tests {
         // Set cache limit to exactly fit all three entries
         let cache = DefaultListFilesCache::new(size * 3, None);
 
+        let table_ref = Some(TableReference::from("table"));
+        let key1 = TableScopedPath {
+            table: table_ref.clone(),
+            path: path1,
+        };
+        let key2 = TableScopedPath {
+            table: table_ref.clone(),
+            path: path2,
+        };
+        let key3 = TableScopedPath {
+            table: table_ref.clone(),
+            path: path3,
+        };
+
         // All three entries should fit
-        cache.put(&path1, value1);
-        cache.put(&path2, value2);
-        cache.put(&path3, value3);
+        cache.put(&key1, value1);
+        cache.put(&key2, value2);
+        cache.put(&key3, value3);
         assert_eq!(cache.len(), 3);
-        assert!(cache.contains_key(&path1));
-        assert!(cache.contains_key(&path2));
-        assert!(cache.contains_key(&path3));
+        assert!(cache.contains_key(&key1));
+        assert!(cache.contains_key(&key2));
+        assert!(cache.contains_key(&key3));
 
         // Adding a new entry should evict path1 (LRU)
         let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100);
-        cache.put(&path4, value4);
+        let key4 = TableScopedPath {
+            table: table_ref,
+            path: path4,
+        };
+        cache.put(&key4, value4);
 
         assert_eq!(cache.len(), 3);
-        assert!(!cache.contains_key(&path1)); // Evicted
-        assert!(cache.contains_key(&path2));
-        assert!(cache.contains_key(&path3));
-        assert!(cache.contains_key(&path4));
+        assert!(!cache.contains_key(&key1)); // Evicted
+        assert!(cache.contains_key(&key2));
+        assert!(cache.contains_key(&key3));
+        assert!(cache.contains_key(&key4));
     }
 
     #[test]
@@ -576,24 +665,42 @@ mod tests {
         // Set cache limit to fit exactly three entries
         let cache = DefaultListFilesCache::new(size * 3, None);
 
-        cache.put(&path1, value1);
-        cache.put(&path2, value2);
-        cache.put(&path3, value3);
+        let table_ref = Some(TableReference::from("table"));
+        let key1 = TableScopedPath {
+            table: table_ref.clone(),
+            path: path1,
+        };
+        let key2 = TableScopedPath {
+            table: table_ref.clone(),
+            path: path2,
+        };
+        let key3 = TableScopedPath {
+            table: table_ref.clone(),
+            path: path3,
+        };
+
+        cache.put(&key1, value1);
+        cache.put(&key2, value2);
+        cache.put(&key3, value3);
         assert_eq!(cache.len(), 3);
 
         // Access path1 to move it to front (MRU)
         // Order is now: path2 (LRU), path3, path1 (MRU)
-        cache.get(&path1);
+        cache.get(&key1);
 
         // Adding a new entry should evict path2 (the LRU)
         let (path4, value4, _) = create_test_list_files_entry("path4", 1, 100);
-        cache.put(&path4, value4);
+        let key4 = TableScopedPath {
+            table: table_ref,
+            path: path4,
+        };
+        cache.put(&key4, value4);
 
         assert_eq!(cache.len(), 3);
-        assert!(cache.contains_key(&path1)); // Still present (recently 
accessed)
-        assert!(!cache.contains_key(&path2)); // Evicted (was LRU)
-        assert!(cache.contains_key(&path3));
-        assert!(cache.contains_key(&path4));
+        assert!(cache.contains_key(&key1)); // Still present (recently 
accessed)
+        assert!(!cache.contains_key(&key2)); // Evicted (was LRU)
+        assert!(cache.contains_key(&key3));
+        assert!(cache.contains_key(&key4));
     }
 
     #[test]
@@ -604,19 +711,32 @@ mod tests {
         // Set cache limit to fit both entries
         let cache = DefaultListFilesCache::new(size * 2, None);
 
-        cache.put(&path1, value1);
-        cache.put(&path2, value2);
+        let table_ref = Some(TableReference::from("table"));
+        let key1 = TableScopedPath {
+            table: table_ref.clone(),
+            path: path1,
+        };
+        let key2 = TableScopedPath {
+            table: table_ref.clone(),
+            path: path2,
+        };
+        cache.put(&key1, value1);
+        cache.put(&key2, value2);
         assert_eq!(cache.len(), 2);
 
         // Try to add an entry that's too large to fit in the cache
         let (path_large, value_large, _) = 
create_test_list_files_entry("large", 1, 1000);
-        cache.put(&path_large, value_large);
+        let key_large = TableScopedPath {
+            table: table_ref,
+            path: path_large,
+        };
+        cache.put(&key_large, value_large);
 
         // Large entry should not be added
-        assert!(!cache.contains_key(&path_large));
+        assert!(!cache.contains_key(&key_large));
         assert_eq!(cache.len(), 2);
-        assert!(cache.contains_key(&path1));
-        assert!(cache.contains_key(&path2));
+        assert!(cache.contains_key(&key1));
+        assert!(cache.contains_key(&key2));
     }
 
     #[test]
@@ -628,21 +748,38 @@ mod tests {
         // Set cache limit for exactly 3 entries
         let cache = DefaultListFilesCache::new(size * 3, None);
 
-        cache.put(&path1, value1);
-        cache.put(&path2, value2);
-        cache.put(&path3, value3);
+        let table_ref = Some(TableReference::from("table"));
+        let key1 = TableScopedPath {
+            table: table_ref.clone(),
+            path: path1,
+        };
+        let key2 = TableScopedPath {
+            table: table_ref.clone(),
+            path: path2,
+        };
+        let key3 = TableScopedPath {
+            table: table_ref.clone(),
+            path: path3,
+        };
+        cache.put(&key1, value1);
+        cache.put(&key2, value2);
+        cache.put(&key3, value3);
         assert_eq!(cache.len(), 3);
 
         // Add a large entry that requires evicting 2 entries
         let (path_large, value_large, _) = 
create_test_list_files_entry("large", 1, 200);
-        cache.put(&path_large, value_large);
+        let key_large = TableScopedPath {
+            table: table_ref,
+            path: path_large,
+        };
+        cache.put(&key_large, value_large);
 
         // path1 and path2 should be evicted (both LRU), path3 and path_large 
remain
         assert_eq!(cache.len(), 2);
-        assert!(!cache.contains_key(&path1)); // Evicted
-        assert!(!cache.contains_key(&path2)); // Evicted
-        assert!(cache.contains_key(&path3));
-        assert!(cache.contains_key(&path_large));
+        assert!(!cache.contains_key(&key1)); // Evicted
+        assert!(!cache.contains_key(&key2)); // Evicted
+        assert!(cache.contains_key(&key3));
+        assert!(cache.contains_key(&key_large));
     }
 
     #[test]
@@ -653,10 +790,23 @@ mod tests {
 
         let cache = DefaultListFilesCache::new(size * 3, None);
 
+        let table_ref = Some(TableReference::from("table"));
+        let key1 = TableScopedPath {
+            table: table_ref.clone(),
+            path: path1,
+        };
+        let key2 = TableScopedPath {
+            table: table_ref.clone(),
+            path: path2,
+        };
+        let key3 = TableScopedPath {
+            table: table_ref,
+            path: path3,
+        };
         // Add three entries
-        cache.put(&path1, value1);
-        cache.put(&path2, value2);
-        cache.put(&path3, value3);
+        cache.put(&key1, value1);
+        cache.put(&key2, value2);
+        cache.put(&key3, value3);
         assert_eq!(cache.len(), 3);
 
         // Resize cache to only fit one entry
@@ -664,70 +814,136 @@ mod tests {
 
         // Should keep only the most recent entry (path3, the MRU)
         assert_eq!(cache.len(), 1);
-        assert!(cache.contains_key(&path3));
+        assert!(cache.contains_key(&key3));
         // Earlier entries (LRU) should be evicted
-        assert!(!cache.contains_key(&path1));
-        assert!(!cache.contains_key(&path2));
+        assert!(!cache.contains_key(&key1));
+        assert!(!cache.contains_key(&key2));
     }
 
     #[test]
     fn test_entry_update_with_size_change() {
         let (path1, value1, size) = create_test_list_files_entry("path1", 1, 
100);
-        let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
+        let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 
100);
         let (path3, value3_v1, _) = create_test_list_files_entry("path3", 1, 
100);
 
         let cache = DefaultListFilesCache::new(size * 3, None);
 
+        let table_ref = Some(TableReference::from("table"));
+        let key1 = TableScopedPath {
+            table: table_ref.clone(),
+            path: path1,
+        };
+        let key2 = TableScopedPath {
+            table: table_ref.clone(),
+            path: path2,
+        };
+        let key3 = TableScopedPath {
+            table: table_ref,
+            path: path3,
+        };
         // Add three entries
-        cache.put(&path1, value1);
-        cache.put(&path2, value2);
-        cache.put(&path3, value3_v1);
+        cache.put(&key1, value1);
+        cache.put(&key2, Arc::clone(&value2));
+        cache.put(&key3, value3_v1);
         assert_eq!(cache.len(), 3);
 
         // Update path3 with same size - should not cause eviction
         let (_, value3_v2, _) = create_test_list_files_entry("path3", 1, 100);
-        cache.put(&path3, value3_v2);
+        cache.put(&key3, value3_v2);
 
         assert_eq!(cache.len(), 3);
-        assert!(cache.contains_key(&path1));
-        assert!(cache.contains_key(&path2));
-        assert!(cache.contains_key(&path3));
+        assert!(cache.contains_key(&key1));
+        assert!(cache.contains_key(&key2));
+        assert!(cache.contains_key(&key3));
 
         // Update path3 with larger size that requires evicting path1 (LRU)
-        let (_, value3_v3, _) = create_test_list_files_entry("path3", 1, 200);
-        cache.put(&path3, value3_v3);
+        let (_, value3_v3, size3_v3) = create_test_list_files_entry("path3", 
1, 200);
+        cache.put(&key3, Arc::clone(&value3_v3));
 
         assert_eq!(cache.len(), 2);
-        assert!(!cache.contains_key(&path1)); // Evicted (was LRU)
-        assert!(cache.contains_key(&path2));
-        assert!(cache.contains_key(&path3));
+        assert!(!cache.contains_key(&key1)); // Evicted (was LRU)
+        assert!(cache.contains_key(&key2));
+        assert!(cache.contains_key(&key3));
+
+        // List cache entries
+        assert_eq!(
+            cache.list_entries(),
+            HashMap::from([
+                (
+                    key2,
+                    ListFilesEntry {
+                        metas: value2,
+                        size_bytes: size2,
+                        expires: None,
+                    }
+                ),
+                (
+                    key3,
+                    ListFilesEntry {
+                        metas: value3_v3,
+                        size_bytes: size3_v3,
+                        expires: None,
+                    }
+                )
+            ])
+        );
     }
 
     #[test]
     fn test_cache_with_ttl() {
         let ttl = Duration::from_millis(100);
-        let cache = DefaultListFilesCache::new(10000, Some(ttl));
 
-        let (path1, value1, _) = create_test_list_files_entry("path1", 2, 50);
-        let (path2, value2, _) = create_test_list_files_entry("path2", 2, 50);
+        let mock_time = Arc::new(MockTimeProvider::new());
+        let cache = DefaultListFilesCache::new(10000, Some(ttl))
+            .with_time_provider(Arc::clone(&mock_time) as Arc<dyn 
TimeProvider>);
 
-        cache.put(&path1, value1);
-        cache.put(&path2, value2);
+        let (path1, value1, size1) = create_test_list_files_entry("path1", 2, 
50);
+        let (path2, value2, size2) = create_test_list_files_entry("path2", 2, 
50);
 
-        // Entries should be accessible immediately
-        assert!(cache.get(&path1).is_some());
-        assert!(cache.get(&path2).is_some());
-        assert!(cache.contains_key(&path1));
-        assert!(cache.contains_key(&path2));
-        assert_eq!(cache.len(), 2);
+        let table_ref = Some(TableReference::from("table"));
+        let key1 = TableScopedPath {
+            table: table_ref.clone(),
+            path: path1,
+        };
+        let key2 = TableScopedPath {
+            table: table_ref,
+            path: path2,
+        };
+        cache.put(&key1, Arc::clone(&value1));
+        cache.put(&key2, Arc::clone(&value2));
 
+        // Entries should be accessible immediately
+        assert!(cache.get(&key1).is_some());
+        assert!(cache.get(&key2).is_some());
+        // List cache entries
+        assert_eq!(
+            cache.list_entries(),
+            HashMap::from([
+                (
+                    key1.clone(),
+                    ListFilesEntry {
+                        metas: value1,
+                        size_bytes: size1,
+                        expires: mock_time.now().checked_add(ttl),
+                    }
+                ),
+                (
+                    key2.clone(),
+                    ListFilesEntry {
+                        metas: value2,
+                        size_bytes: size2,
+                        expires: mock_time.now().checked_add(ttl),
+                    }
+                )
+            ])
+        );
         // Wait for TTL to expire
-        thread::sleep(Duration::from_millis(150));
+        mock_time.inc(Duration::from_millis(150));
 
         // Entries should now return None and be removed when observed through 
get or contains_key
-        assert!(cache.get(&path1).is_none());
+        assert!(cache.get(&key1).is_none());
         assert_eq!(cache.len(), 1); // path1 was removed by get()
-        assert!(!cache.contains_key(&path2));
+        assert!(!cache.contains_key(&key2));
         assert_eq!(cache.len(), 0); // path2 was removed by contains_key()
     }
 
@@ -743,21 +959,34 @@ mod tests {
         let (path2, value2, _) = create_test_list_files_entry("path2", 1, 400);
         let (path3, value3, _) = create_test_list_files_entry("path3", 1, 400);
 
-        cache.put(&path1, value1);
+        let table_ref = Some(TableReference::from("table"));
+        let key1 = TableScopedPath {
+            table: table_ref.clone(),
+            path: path1,
+        };
+        let key2 = TableScopedPath {
+            table: table_ref.clone(),
+            path: path2,
+        };
+        let key3 = TableScopedPath {
+            table: table_ref,
+            path: path3,
+        };
+        cache.put(&key1, value1);
         mock_time.inc(Duration::from_millis(50));
-        cache.put(&path2, value2);
+        cache.put(&key2, value2);
         mock_time.inc(Duration::from_millis(50));
 
         // path3 should evict path1 due to size limit
-        cache.put(&path3, value3);
-        assert!(!cache.contains_key(&path1)); // Evicted by LRU
-        assert!(cache.contains_key(&path2));
-        assert!(cache.contains_key(&path3));
+        cache.put(&key3, value3);
+        assert!(!cache.contains_key(&key1)); // Evicted by LRU
+        assert!(cache.contains_key(&key2));
+        assert!(cache.contains_key(&key3));
 
         mock_time.inc(Duration::from_millis(151));
 
-        assert!(!cache.contains_key(&path2)); // Expired
-        assert!(cache.contains_key(&path3)); // Still valid 
+        assert!(!cache.contains_key(&key2)); // Expired
+        assert!(cache.contains_key(&key3)); // Still valid 
     }
 
     #[test]
@@ -843,7 +1072,12 @@ mod tests {
 
         // Add entry and verify memory tracking
         let (path1, value1, size1) = create_test_list_files_entry("path1", 1, 
100);
-        cache.put(&path1, value1);
+        let table_ref = Some(TableReference::from("table"));
+        let key1 = TableScopedPath {
+            table: table_ref.clone(),
+            path: path1,
+        };
+        cache.put(&key1, value1);
         {
             let state = cache.state.lock().unwrap();
             assert_eq!(state.memory_used, size1);
@@ -851,14 +1085,18 @@ mod tests {
 
         // Add another entry
         let (path2, value2, size2) = create_test_list_files_entry("path2", 1, 
200);
-        cache.put(&path2, value2);
+        let key2 = TableScopedPath {
+            table: table_ref.clone(),
+            path: path2,
+        };
+        cache.put(&key2, value2);
         {
             let state = cache.state.lock().unwrap();
             assert_eq!(state.memory_used, size1 + size2);
         }
 
         // Remove first entry and verify memory decreases
-        cache.remove(&path1);
+        cache.remove(&key1);
         {
             let state = cache.state.lock().unwrap();
             assert_eq!(state.memory_used, size2);
@@ -902,12 +1140,17 @@ mod tests {
         ]);
 
         // Cache the full table listing
-        cache.put(&table_base, files);
+        let table_ref = Some(TableReference::from("table"));
+        let key = TableScopedPath {
+            table: table_ref,
+            path: table_base,
+        };
+        cache.put(&key, files);
 
         // Query for partition a=1 using get_with_extra
         // New API: get_with_extra(table_base, Some(relative_prefix))
         let prefix_a1 = Some(Path::from("a=1"));
-        let result = cache.get_with_extra(&table_base, &prefix_a1);
+        let result = cache.get_with_extra(&key, &prefix_a1);
 
         // Should return filtered results (only files from a=1)
         assert!(result.is_some());
@@ -921,7 +1164,7 @@ mod tests {
 
         // Query for partition a=2
         let prefix_a2 = Some(Path::from("a=2"));
-        let result_2 = cache.get_with_extra(&table_base, &prefix_a2);
+        let result_2 = cache.get_with_extra(&key, &prefix_a2);
 
         assert!(result_2.is_some());
         let filtered_2 = result_2.unwrap();
@@ -947,16 +1190,21 @@ mod tests {
             create_object_meta_with_path("my_table/a=2/file3.parquet"),
             create_object_meta_with_path("my_table/a=2/file4.parquet"),
         ]);
-        cache.put(&table_base, full_files);
+        let table_ref = Some(TableReference::from("table"));
+        let key = TableScopedPath {
+            table: table_ref,
+            path: table_base,
+        };
+        cache.put(&key, full_files);
 
         // Query with no prefix filter (None) should return all 4 files
-        let result = cache.get_with_extra(&table_base, &None);
+        let result = cache.get_with_extra(&key, &None);
         assert!(result.is_some());
         let files = result.unwrap();
         assert_eq!(files.len(), 4);
 
         // Also test using get() which delegates to get_with_extra(&None)
-        let result_get = cache.get(&table_base);
+        let result_get = cache.get(&key);
         assert!(result_get.is_some());
         assert_eq!(result_get.unwrap().len(), 4);
     }
@@ -967,14 +1215,19 @@ mod tests {
         let cache = DefaultListFilesCache::new(100000, None);
 
         let table_base = Path::from("my_table");
+        let table_ref = Some(TableReference::from("table"));
+        let key = TableScopedPath {
+            table: table_ref,
+            path: table_base,
+        };
 
         // Query for full table should miss (nothing cached)
-        let result = cache.get_with_extra(&table_base, &None);
+        let result = cache.get_with_extra(&key, &None);
         assert!(result.is_none());
 
         // Query with prefix should also miss
         let prefix = Some(Path::from("a=1"));
-        let result_2 = cache.get_with_extra(&table_base, &prefix);
+        let result_2 = cache.get_with_extra(&key, &prefix);
         assert!(result_2.is_none());
     }
 
@@ -988,11 +1241,16 @@ mod tests {
             create_object_meta_with_path("my_table/a=1/file1.parquet"),
             create_object_meta_with_path("my_table/a=2/file2.parquet"),
         ]);
-        cache.put(&table_base, files);
+        let table_ref = Some(TableReference::from("table"));
+        let key = TableScopedPath {
+            table: table_ref,
+            path: table_base,
+        };
+        cache.put(&key, files);
 
         // Query for partition a=3 which doesn't exist
         let prefix_a3 = Some(Path::from("a=3"));
-        let result = cache.get_with_extra(&table_base, &prefix_a3);
+        let result = cache.get_with_extra(&key, &prefix_a3);
 
         // Should return None since no files match
         assert!(result.is_none());
@@ -1018,23 +1276,28 @@ mod tests {
                 "events/year=2025/month=01/day=01/file4.parquet",
             ),
         ]);
-        cache.put(&table_base, files);
+        let table_ref = Some(TableReference::from("table"));
+        let key = TableScopedPath {
+            table: table_ref,
+            path: table_base,
+        };
+        cache.put(&key, files);
 
         // Query for year=2024/month=01 (should get 2 files)
         let prefix_month = Some(Path::from("year=2024/month=01"));
-        let result = cache.get_with_extra(&table_base, &prefix_month);
+        let result = cache.get_with_extra(&key, &prefix_month);
         assert!(result.is_some());
         assert_eq!(result.unwrap().len(), 2);
 
         // Query for year=2024 (should get 3 files)
         let prefix_year = Some(Path::from("year=2024"));
-        let result_year = cache.get_with_extra(&table_base, &prefix_year);
+        let result_year = cache.get_with_extra(&key, &prefix_year);
         assert!(result_year.is_some());
         assert_eq!(result_year.unwrap().len(), 3);
 
         // Query for specific day (should get 1 file)
         let prefix_day = Some(Path::from("year=2024/month=01/day=01"));
-        let result_day = cache.get_with_extra(&table_base, &prefix_day);
+        let result_day = cache.get_with_extra(&key, &prefix_day);
         assert!(result_day.is_some());
         assert_eq!(result_day.unwrap().len(), 1);
     }
@@ -1055,18 +1318,63 @@ mod tests {
             create_object_meta_with_path("table_b/part=2/file2.parquet"),
         ]);
 
-        cache.put(&table_a, files_a);
-        cache.put(&table_b, files_b);
+        let table_ref_a = Some(TableReference::from("table_a"));
+        let table_ref_b = Some(TableReference::from("table_b"));
+        let key_a = TableScopedPath {
+            table: table_ref_a,
+            path: table_a,
+        };
+        let key_b = TableScopedPath {
+            table: table_ref_b,
+            path: table_b,
+        };
+        cache.put(&key_a, files_a);
+        cache.put(&key_b, files_b);
 
         // Query table_a should only return table_a files
-        let result_a = cache.get(&table_a);
+        let result_a = cache.get(&key_a);
         assert!(result_a.is_some());
         assert_eq!(result_a.unwrap().len(), 1);
 
         // Query table_b with prefix should only return matching table_b files
         let prefix = Some(Path::from("part=1"));
-        let result_b = cache.get_with_extra(&table_b, &prefix);
+        let result_b = cache.get_with_extra(&key_b, &prefix);
         assert!(result_b.is_some());
         assert_eq!(result_b.unwrap().len(), 1);
     }
+
+    #[test]
+    fn test_drop_table_entries() {
+        let cache = DefaultListFilesCache::default();
+
+        let (path1, value1, _) = create_test_list_files_entry("path1", 1, 100);
+        let (path2, value2, _) = create_test_list_files_entry("path2", 1, 100);
+        let (path3, value3, _) = create_test_list_files_entry("path3", 1, 100);
+
+        let table_ref1 = Some(TableReference::from("table1"));
+        let key1 = TableScopedPath {
+            table: table_ref1.clone(),
+            path: path1,
+        };
+        let key2 = TableScopedPath {
+            table: table_ref1.clone(),
+            path: path2,
+        };
+
+        let table_ref2 = Some(TableReference::from("table2"));
+        let key3 = TableScopedPath {
+            table: table_ref2.clone(),
+            path: path3,
+        };
+
+        cache.put(&key1, value1);
+        cache.put(&key2, value2);
+        cache.put(&key3, value3);
+
+        cache.drop_table_entries(&table_ref1).unwrap();
+
+        assert!(!cache.contains_key(&key1));
+        assert!(!cache.contains_key(&key2));
+        assert!(cache.contains_key(&key3));
+    }
 }
diff --git a/datafusion/execution/src/cache/mod.rs 
b/datafusion/execution/src/cache/mod.rs
index 8172069fdb..93b9f0520b 100644
--- a/datafusion/execution/src/cache/mod.rs
+++ b/datafusion/execution/src/cache/mod.rs
@@ -24,6 +24,7 @@ mod list_files_cache;
 
 pub use file_metadata_cache::DefaultFilesMetadataCache;
 pub use list_files_cache::DefaultListFilesCache;
+pub use list_files_cache::TableScopedPath;
 
 /// A trait that can be implemented to provide custom cache behavior for the 
caches managed by
 /// [`cache_manager::CacheManager`].
diff --git a/docs/source/user-guide/cli/functions.md 
b/docs/source/user-guide/cli/functions.md
index f3b0163534..ea353d5c8d 100644
--- a/docs/source/user-guide/cli/functions.md
+++ b/docs/source/user-guide/cli/functions.md
@@ -170,5 +170,55 @@ The columns of the returned table are:
 | table_size_bytes      | Utf8      | Size of the table, in bytes              
                                    |
 | statistics_size_bytes | UInt64    | Size of the cached statistics in memory  
                                    |
 
+## `list_files_cache`
+
+The `list_files_cache` function shows information about the `ListFilesCache` 
that is used by the [`ListingTable`] implementation in DataFusion. When 
creating a [`ListingTable`], DataFusion lists the files in the table's location 
and caches results in the `ListFilesCache`. Subsequent queries against the same 
table can reuse this cached information instead of re-listing the files. Cache 
entries are scoped to tables.
+
+You can inspect the cache by querying the `list_files_cache` function. For 
example,
+
+```sql
+> set datafusion.runtime.list_files_cache_ttl = "30s";
+> create external table overturemaps
+stored as parquet
+location 
's3://overturemaps-us-west-2/release/2025-12-17.0/theme=base/type=infrastructure';
+0 row(s) fetched.
+> select table, path, metadata_size_bytes, expires_in, 
unnest(metadata_list)['file_size_bytes'] as file_size_bytes, 
unnest(metadata_list)['e_tag'] as e_tag from list_files_cache() limit 10;
++--------------+-----------------------------------------------------+---------------------+-----------------------------------+-----------------+---------------------------------------+
+| table        | path                                                | 
metadata_size_bytes | expires_in                        | file_size_bytes | 
e_tag                                 |
++--------------+-----------------------------------------------------+---------------------+-----------------------------------+-----------------+---------------------------------------+
+| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750    
            | 0 days 0 hours 0 mins 25.264 secs | 999055952       | 
"35fc8fbe8400960b54c66fbb408c48e8-60" |
+| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750    
            | 0 days 0 hours 0 mins 25.264 secs | 975592768       | 
"8a16e10b722681cdc00242564b502965-59" |
+| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750    
            | 0 days 0 hours 0 mins 25.264 secs | 1082925747      | 
"24cd13ddb5e0e438952d2499f5dabe06-65" |
+| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750    
            | 0 days 0 hours 0 mins 25.264 secs | 1008425557      | 
"37663e31c7c64d4ef355882bcd47e361-61" |
+| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750    
            | 0 days 0 hours 0 mins 25.264 secs | 1065561905      | 
"4e7c50d2d1b3c5ed7b82b4898f5ac332-64" |
+| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750    
            | 0 days 0 hours 0 mins 25.264 secs | 1045655427      | 
"8fff7e6a72d375eba668727c55d4f103-63" |
+| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750    
            | 0 days 0 hours 0 mins 25.264 secs | 1086822683      | 
"b67167d8022d778936c330a52a5f1922-65" |
+| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750    
            | 0 days 0 hours 0 mins 25.264 secs | 1016732378      | 
"6d70857a0473ed9ed3fc6e149814168b-61" |
+| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750    
            | 0 days 0 hours 0 mins 25.264 secs | 991363784       | 
"c9cafb42fcbb413f851691c895dd7c2b-60" |
+| overturemaps | release/2025-12-17.0/theme=base/type=infrastructure | 2750    
            | 0 days 0 hours 0 mins 25.264 secs | 1032469715      | 
"7540252d0d67158297a67038a3365e0f-62" |
++--------------+-----------------------------------------------------+---------------------+-----------------------------------+-----------------+---------------------------------------+
+```
+
+The columns of the returned table are:
+| column_name | data_type | Description |
+| ------------------- | ------------ | 
-----------------------------------------------------------------------------------------
 |
+| table | Utf8 | Name of the table |
+| path | Utf8 | File path relative to the object store / filesystem root |
+| metadata_size_bytes | UInt64 | Size of the cached metadata in memory (not 
its thrift encoded form) |
+| expires_in | Duration(ms) | Last modified time of the file |
+| metadata_list | List(Struct) | List of metadatas, one for each file under 
the path. |
+
+A metadata struct in the metadata_list contains the following fields:
+
+```text
+{
+  "file_path": 
"release/2025-12-17.0/theme=base/type=infrastructure/part-00000-d556e455-e0c5-4940-b367-daff3287a952-c000.zstd.parquet",
+  "file_modified": "2025-12-17T22:20:29",
+  "file_size_bytes": 999055952,
+  "e_tag": "35fc8fbe8400960b54c66fbb408c48e8-60",
+  "version": null
+}
+```
+
 [`listingtable`]: 
https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
 [entity tag]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to