This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 093b775adc [feat] Support cache ListFiles result cache in session 
level (#7620)
093b775adc is described below

commit 093b775adc3593e9e5cb7343e28406ed458551ad
Author: Yang Jiang <[email protected]>
AuthorDate: Sat Oct 7 18:23:49 2023 +0800

    [feat] Support cache ListFiles result cache in session level (#7620)
    
    * support cache list files under path in session
    
    * add test
    
    * fix clippy
    
    * fix clippy2
    
    * fix comment
    
    * Update datafusion/execution/src/cache/cache_manager.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * fix fmt
    
    * fix api change
    
    * fix api change2
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/core/src/datasource/listing/helpers.rs |  16 ++-
 datafusion/core/src/datasource/listing/table.rs   |   8 +-
 datafusion/core/src/datasource/listing/url.rs     |  34 +++--
 datafusion/core/src/test/object_store.rs          |  15 +-
 datafusion/core/tests/parquet/file_statistics.rs  | 160 +++++++++++++++++-----
 datafusion/execution/src/cache/cache_manager.rs   |  30 ++++
 datafusion/execution/src/cache/cache_unit.rs      |  83 ++++++++++-
 7 files changed, 296 insertions(+), 50 deletions(-)

diff --git a/datafusion/core/src/datasource/listing/helpers.rs 
b/datafusion/core/src/datasource/listing/helpers.rs
index 476c58b698..d6a0add9b2 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -36,6 +36,7 @@ use crate::{error::Result, scalar::ScalarValue};
 
 use super::PartitionedFile;
 use crate::datasource::listing::ListingTableUrl;
+use crate::execution::context::SessionState;
 use datafusion_common::tree_node::{TreeNode, VisitRecursion};
 use datafusion_common::{Column, DFField, DFSchema, DataFusionError};
 use datafusion_expr::expr::ScalarUDF;
@@ -315,6 +316,7 @@ async fn prune_partitions(
 /// `filters` might contain expressions that can be resolved only at the
 /// file level (e.g. Parquet row group pruning).
 pub async fn pruned_partition_list<'a>(
+    ctx: &'a SessionState,
     store: &'a dyn ObjectStore,
     table_path: &'a ListingTableUrl,
     filters: &'a [Expr],
@@ -325,7 +327,8 @@ pub async fn pruned_partition_list<'a>(
     if partition_cols.is_empty() {
         return Ok(Box::pin(
             table_path
-                .list_all_files(store, file_extension)
+                .list_all_files(ctx, store, file_extension)
+                .await?
                 .map_ok(|object_meta| object_meta.into()),
         ));
     }
@@ -422,7 +425,7 @@ mod tests {
     use futures::StreamExt;
 
     use crate::logical_expr::{case, col, lit};
-    use crate::test::object_store::make_test_store;
+    use crate::test::object_store::make_test_store_and_state;
 
     use super::*;
 
@@ -468,12 +471,13 @@ mod tests {
 
     #[tokio::test]
     async fn test_pruned_partition_list_empty() {
-        let store = make_test_store(&[
+        let (store, state) = make_test_store_and_state(&[
             ("tablepath/mypartition=val1/notparquetfile", 100),
             ("tablepath/file.parquet", 100),
         ]);
         let filter = Expr::eq(col("mypartition"), lit("val1"));
         let pruned = pruned_partition_list(
+            &state,
             store.as_ref(),
             &ListingTableUrl::parse("file:///tablepath/").unwrap(),
             &[filter],
@@ -490,13 +494,14 @@ mod tests {
 
     #[tokio::test]
     async fn test_pruned_partition_list() {
-        let store = make_test_store(&[
+        let (store, state) = make_test_store_and_state(&[
             ("tablepath/mypartition=val1/file.parquet", 100),
             ("tablepath/mypartition=val2/file.parquet", 100),
             ("tablepath/mypartition=val1/other=val3/file.parquet", 100),
         ]);
         let filter = Expr::eq(col("mypartition"), lit("val1"));
         let pruned = pruned_partition_list(
+            &state,
             store.as_ref(),
             &ListingTableUrl::parse("file:///tablepath/").unwrap(),
             &[filter],
@@ -532,7 +537,7 @@ mod tests {
 
     #[tokio::test]
     async fn test_pruned_partition_list_multi() {
-        let store = make_test_store(&[
+        let (store, state) = make_test_store_and_state(&[
             ("tablepath/part1=p1v1/file.parquet", 100),
             ("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100),
             ("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100),
@@ -544,6 +549,7 @@ mod tests {
         // filter3 cannot be resolved at partition pruning
         let filter3 = Expr::eq(col("part2"), col("other"));
         let pruned = pruned_partition_list(
+            &state,
             store.as_ref(),
             &ListingTableUrl::parse("file:///tablepath/").unwrap(),
             &[filter1, filter2, filter3],
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 5b1710d344..7834de2b19 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -165,7 +165,8 @@ impl ListingTableConfig {
             .table_paths
             .get(0)
             .unwrap()
-            .list_all_files(store.as_ref(), "")
+            .list_all_files(state, store.as_ref(), "")
+            .await?
             .next()
             .await
             .ok_or_else(|| DataFusionError::Internal("No files for 
table".into()))??;
@@ -507,7 +508,8 @@ impl ListingOptions {
         let store = state.runtime_env().object_store(table_path)?;
 
         let files: Vec<_> = table_path
-            .list_all_files(store.as_ref(), &self.file_extension)
+            .list_all_files(state, store.as_ref(), &self.file_extension)
+            .await?
             .try_collect()
             .await?;
 
@@ -844,6 +846,7 @@ impl TableProvider for ListingTable {
         let store = state.runtime_env().object_store(table_path)?;
 
         let file_list_stream = pruned_partition_list(
+            state,
             store.as_ref(),
             table_path,
             &[],
@@ -933,6 +936,7 @@ impl ListingTable {
         // list files (with partitions)
         let file_list = 
future::try_join_all(self.table_paths.iter().map(|table_path| {
             pruned_partition_list(
+                ctx,
                 store.as_ref(),
                 table_path,
                 filters,
diff --git a/datafusion/core/src/datasource/listing/url.rs 
b/datafusion/core/src/datasource/listing/url.rs
index 96998de17b..4d1ca4853a 100644
--- a/datafusion/core/src/datasource/listing/url.rs
+++ b/datafusion/core/src/datasource/listing/url.rs
@@ -18,14 +18,17 @@
 use std::fs;
 
 use crate::datasource::object_store::ObjectStoreUrl;
+use crate::execution::context::SessionState;
 use datafusion_common::{DataFusionError, Result};
 use futures::stream::BoxStream;
 use futures::{StreamExt, TryStreamExt};
 use glob::Pattern;
 use itertools::Itertools;
+use log::debug;
 use object_store::path::Path;
 use object_store::{ObjectMeta, ObjectStore};
 use percent_encoding;
+use std::sync::Arc;
 use url::Url;
 
 /// A parsed URL identifying files for a listing table, see 
[`ListingTableUrl::parse`]
@@ -185,28 +188,43 @@ impl ListingTableUrl {
     }
 
     /// List all files identified by this [`ListingTableUrl`] for the provided 
`file_extension`
-    pub(crate) fn list_all_files<'a>(
+    pub(crate) async fn list_all_files<'a>(
         &'a self,
+        ctx: &'a SessionState,
         store: &'a dyn ObjectStore,
         file_extension: &'a str,
-    ) -> BoxStream<'a, Result<ObjectMeta>> {
+    ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
         // If the prefix is a file, use a head request, otherwise list
         let is_dir = self.url.as_str().ends_with('/');
         let list = match is_dir {
-            true => futures::stream::once(store.list(Some(&self.prefix)))
-                .try_flatten()
-                .boxed(),
+            true => match 
ctx.runtime_env().cache_manager.get_list_files_cache() {
+                None => futures::stream::once(store.list(Some(&self.prefix)))
+                    .try_flatten()
+                    .boxed(),
+                Some(cache) => {
+                    if let Some(res) = cache.get(&self.prefix) {
+                        debug!("Hit list all files cache");
+                        
futures::stream::iter(res.as_ref().clone().into_iter().map(Ok))
+                            .boxed()
+                    } else {
+                        let list_res = store.list(Some(&self.prefix)).await;
+                        let vec = 
list_res?.try_collect::<Vec<ObjectMeta>>().await?;
+                        cache.put(&self.prefix, Arc::new(vec.clone()));
+                        futures::stream::iter(vec.into_iter().map(Ok)).boxed()
+                    }
+                }
+            },
             false => futures::stream::once(store.head(&self.prefix)).boxed(),
         };
-
-        list.map_err(Into::into)
+        Ok(list
             .try_filter(move |meta| {
                 let path = &meta.location;
                 let extension_match = path.as_ref().ends_with(file_extension);
                 let glob_match = self.contains(path);
                 futures::future::ready(extension_match && glob_match)
             })
-            .boxed()
+            .map_err(DataFusionError::ObjectStore)
+            .boxed())
     }
 
     /// Returns this [`ListingTableUrl`] as a string
diff --git a/datafusion/core/src/test/object_store.rs 
b/datafusion/core/src/test/object_store.rs
index 425d0724ea..08cebb56cc 100644
--- a/datafusion/core/src/test/object_store.rs
+++ b/datafusion/core/src/test/object_store.rs
@@ -15,7 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 //! Object store implementation used for testing
+use crate::execution::context::SessionState;
 use crate::prelude::SessionContext;
+use datafusion_execution::config::SessionConfig;
+use datafusion_execution::runtime_env::RuntimeEnv;
 use futures::FutureExt;
 use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore};
 use std::sync::Arc;
@@ -25,11 +28,11 @@ use url::Url;
 pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) {
     let url = Url::parse("test://").unwrap();
     ctx.runtime_env()
-        .register_object_store(&url, make_test_store(files));
+        .register_object_store(&url, make_test_store_and_state(files).0);
 }
 
 /// Create a test object store with the provided files
-pub fn make_test_store(files: &[(&str, u64)]) -> Arc<dyn ObjectStore> {
+pub fn make_test_store_and_state(files: &[(&str, u64)]) -> (Arc<InMemory>, 
SessionState) {
     let memory = InMemory::new();
 
     for (name, size) in files {
@@ -40,7 +43,13 @@ pub fn make_test_store(files: &[(&str, u64)]) -> Arc<dyn 
ObjectStore> {
             .unwrap();
     }
 
-    Arc::new(memory)
+    (
+        Arc::new(memory),
+        SessionState::new_with_config_rt(
+            SessionConfig::default(),
+            Arc::new(RuntimeEnv::default()),
+        ),
+    )
 }
 
 /// Helper method to fetch the file size and date at given path and create a 
`ObjectMeta`
diff --git a/datafusion/core/tests/parquet/file_statistics.rs 
b/datafusion/core/tests/parquet/file_statistics.rs
index 58f81cc571..ac4a91720e 100644
--- a/datafusion/core/tests/parquet/file_statistics.rs
+++ b/datafusion/core/tests/parquet/file_statistics.rs
@@ -19,15 +19,20 @@ use 
datafusion::datasource::file_format::parquet::ParquetFormat;
 use datafusion::datasource::listing::{
     ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
 };
+use datafusion::datasource::physical_plan::ParquetExec;
 use datafusion::datasource::TableProvider;
 use datafusion::execution::context::SessionState;
 use datafusion::prelude::SessionContext;
 use datafusion_execution::cache::cache_manager::CacheManagerConfig;
 use datafusion_execution::cache::cache_unit;
-use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
+use datafusion_execution::cache::cache_unit::{
+    DefaultFileStatisticsCache, DefaultListFilesCache,
+};
 use datafusion_execution::config::SessionConfig;
 use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+use std::fs;
 use std::sync::Arc;
+use tempfile::tempdir;
 
 #[tokio::test]
 async fn load_table_stats_with_session_level_cache() {
@@ -35,78 +40,162 @@ async fn load_table_stats_with_session_level_cache() {
     let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
     let table_path = ListingTableUrl::parse(filename).unwrap();
 
-    let (cache1, state1) = get_cache_runtime_state();
+    let (cache1, _, state1) = get_cache_runtime_state();
 
     // Create a separate DefaultFileStatisticsCache
-    let (cache2, state2) = get_cache_runtime_state();
+    let (cache2, _, state2) = get_cache_runtime_state();
 
     let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
 
-    let table1 = get_listing_with_cache(&table_path, cache1, &state1, 
&opt).await;
-    let table2 = get_listing_with_cache(&table_path, cache2, &state2, 
&opt).await;
+    let table1 = get_listing_table(&table_path, Some(cache1), &opt).await;
+    let table2 = get_listing_table(&table_path, Some(cache2), &opt).await;
 
     //Session 1 first time list files
-    assert_eq!(get_cache_size(&state1), 0);
+    assert_eq!(get_static_cache_size(&state1), 0);
     let exec1 = table1.scan(&state1, None, &[], None).await.unwrap();
 
     assert_eq!(exec1.statistics().num_rows, Some(8));
     assert_eq!(exec1.statistics().total_byte_size, Some(671));
-    assert_eq!(get_cache_size(&state1), 1);
+    assert_eq!(get_static_cache_size(&state1), 1);
 
     //Session 2 first time list files
     //check session 1 cache result not show in session 2
-    assert_eq!(
-        state2
-            .runtime_env()
-            .cache_manager
-            .get_file_statistic_cache()
-            .unwrap()
-            .len(),
-        0
-    );
+    assert_eq!(get_static_cache_size(&state2), 0);
     let exec2 = table2.scan(&state2, None, &[], None).await.unwrap();
     assert_eq!(exec2.statistics().num_rows, Some(8));
     assert_eq!(exec2.statistics().total_byte_size, Some(671));
-    assert_eq!(get_cache_size(&state2), 1);
+    assert_eq!(get_static_cache_size(&state2), 1);
 
     //Session 1 second time list files
     //check session 1 cache result not show in session 2
-    assert_eq!(get_cache_size(&state1), 1);
+    assert_eq!(get_static_cache_size(&state1), 1);
     let exec3 = table1.scan(&state1, None, &[], None).await.unwrap();
     assert_eq!(exec3.statistics().num_rows, Some(8));
     assert_eq!(exec3.statistics().total_byte_size, Some(671));
     // List same file no increase
-    assert_eq!(get_cache_size(&state1), 1);
+    assert_eq!(get_static_cache_size(&state1), 1);
 }
 
-async fn get_listing_with_cache(
+#[tokio::test]
+async fn list_files_with_session_level_cache() {
+    let p_name = "alltypes_plain.parquet";
+    let testdata = datafusion::test_util::parquet_test_data();
+    let filename = format!("{}/{}", testdata, p_name);
+
+    let temp_path1 = tempdir()
+        .unwrap()
+        .into_path()
+        .into_os_string()
+        .into_string()
+        .unwrap();
+    let temp_filename1 = format!("{}/{}", temp_path1, p_name);
+
+    let temp_path2 = tempdir()
+        .unwrap()
+        .into_path()
+        .into_os_string()
+        .into_string()
+        .unwrap();
+    let temp_filename2 = format!("{}/{}", temp_path2, p_name);
+
+    fs::copy(filename.clone(), temp_filename1).expect("panic");
+    fs::copy(filename, temp_filename2).expect("panic");
+
+    let table_path = ListingTableUrl::parse(temp_path1).unwrap();
+
+    let (_, _, state1) = get_cache_runtime_state();
+
+    // Create a separate DefaultFileStatisticsCache
+    let (_, _, state2) = get_cache_runtime_state();
+
+    let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
+
+    let table1 = get_listing_table(&table_path, None, &opt).await;
+    let table2 = get_listing_table(&table_path, None, &opt).await;
+
+    //Session 1 first time list files
+    assert_eq!(get_list_file_cache_size(&state1), 0);
+    let exec1 = table1.scan(&state1, None, &[], None).await.unwrap();
+    let parquet1 = exec1.as_any().downcast_ref::<ParquetExec>().unwrap();
+
+    assert_eq!(get_list_file_cache_size(&state1), 1);
+    let fg = &parquet1.base_config().file_groups;
+    assert_eq!(fg.len(), 1);
+    assert_eq!(fg.get(0).unwrap().len(), 1);
+
+    //Session 2 first time list files
+    //check session 1 cache result not show in session 2
+    assert_eq!(get_list_file_cache_size(&state2), 0);
+    let exec2 = table2.scan(&state2, None, &[], None).await.unwrap();
+    let parquet2 = exec2.as_any().downcast_ref::<ParquetExec>().unwrap();
+
+    assert_eq!(get_list_file_cache_size(&state2), 1);
+    let fg2 = &parquet2.base_config().file_groups;
+    assert_eq!(fg2.len(), 1);
+    assert_eq!(fg2.get(0).unwrap().len(), 1);
+
+    //Session 1 second time list files
+    //check session 1 cache result not show in session 2
+    assert_eq!(get_list_file_cache_size(&state1), 1);
+    let exec3 = table1.scan(&state1, None, &[], None).await.unwrap();
+    let parquet3 = exec3.as_any().downcast_ref::<ParquetExec>().unwrap();
+
+    assert_eq!(get_list_file_cache_size(&state1), 1);
+    let fg = &parquet3.base_config().file_groups;
+    assert_eq!(fg.len(), 1);
+    assert_eq!(fg.get(0).unwrap().len(), 1);
+    // List same file no increase
+    assert_eq!(get_list_file_cache_size(&state1), 1);
+}
+
+async fn get_listing_table(
     table_path: &ListingTableUrl,
-    cache1: Arc<DefaultFileStatisticsCache>,
-    state1: &SessionState,
+    static_cache: Option<Arc<DefaultFileStatisticsCache>>,
     opt: &ListingOptions,
 ) -> ListingTable {
-    let schema = opt.infer_schema(state1, table_path).await.unwrap();
+    let schema = opt
+        .infer_schema(
+            &SessionState::new_with_config_rt(
+                SessionConfig::default(),
+                Arc::new(RuntimeEnv::default()),
+            ),
+            table_path,
+        )
+        .await
+        .unwrap();
     let config1 = ListingTableConfig::new(table_path.clone())
         .with_listing_options(opt.clone())
         .with_schema(schema);
-    ListingTable::try_new(config1)
-        .unwrap()
-        .with_cache(Some(cache1))
+    let table = ListingTable::try_new(config1).unwrap();
+    if let Some(c) = static_cache {
+        table.with_cache(Some(c))
+    } else {
+        table
+    }
 }
 
-fn get_cache_runtime_state() -> (Arc<DefaultFileStatisticsCache>, 
SessionState) {
+fn get_cache_runtime_state() -> (
+    Arc<DefaultFileStatisticsCache>,
+    Arc<DefaultListFilesCache>,
+    SessionState,
+) {
     let cache_config = CacheManagerConfig::default();
-    let cache1 = Arc::new(cache_unit::DefaultFileStatisticsCache::default());
-    let cache_config = 
cache_config.with_files_statistics_cache(Some(cache1.clone()));
+    let file_static_cache = 
Arc::new(cache_unit::DefaultFileStatisticsCache::default());
+    let list_file_cache = 
Arc::new(cache_unit::DefaultListFilesCache::default());
+
+    let cache_config = cache_config
+        .with_files_statistics_cache(Some(file_static_cache.clone()))
+        .with_list_files_cache(Some(list_file_cache.clone()));
+
     let rt = Arc::new(
         
RuntimeEnv::new(RuntimeConfig::new().with_cache_manager(cache_config)).unwrap(),
     );
     let state = SessionContext::new_with_config_rt(SessionConfig::default(), 
rt).state();
 
-    (cache1, state)
+    (file_static_cache, list_file_cache, state)
 }
 
-fn get_cache_size(state1: &SessionState) -> usize {
+fn get_static_cache_size(state1: &SessionState) -> usize {
     state1
         .runtime_env()
         .cache_manager
@@ -114,3 +203,12 @@ fn get_cache_size(state1: &SessionState) -> usize {
         .unwrap()
         .len()
 }
+
+fn get_list_file_cache_size(state1: &SessionState) -> usize {
+    state1
+        .runtime_env()
+        .cache_manager
+        .get_list_files_cache()
+        .unwrap()
+        .len()
+}
diff --git a/datafusion/execution/src/cache/cache_manager.rs 
b/datafusion/execution/src/cache/cache_manager.rs
index 987b47bbb8..9752926368 100644
--- a/datafusion/execution/src/cache/cache_manager.rs
+++ b/datafusion/execution/src/cache/cache_manager.rs
@@ -29,15 +29,25 @@ use std::sync::Arc;
 pub type FileStatisticsCache =
     Arc<dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta>>;
 
+pub type ListFilesCache =
+    Arc<dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>>;
+
 impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         write!(f, "Cache name: {} with length: {}", self.name(), self.len())
     }
 }
 
+impl Debug for dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = 
ObjectMeta> {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        write!(f, "Cache name: {} with length: {}", self.name(), self.len())
+    }
+}
+
 #[derive(Default, Debug)]
 pub struct CacheManager {
     file_statistic_cache: Option<FileStatisticsCache>,
+    list_files_cache: Option<ListFilesCache>,
 }
 
 impl CacheManager {
@@ -46,6 +56,9 @@ impl CacheManager {
         if let Some(cc) = &config.table_files_statistics_cache {
             manager.file_statistic_cache = Some(cc.clone())
         }
+        if let Some(lc) = &config.list_files_cache {
+            manager.list_files_cache = Some(lc.clone())
+        }
         Ok(Arc::new(manager))
     }
 
@@ -53,6 +66,11 @@ impl CacheManager {
     pub fn get_file_statistic_cache(&self) -> Option<FileStatisticsCache> {
         self.file_statistic_cache.clone()
     }
+
+    /// Get the cache of objectMeta under same path.
+    pub fn get_list_files_cache(&self) -> Option<ListFilesCache> {
+        self.list_files_cache.clone()
+    }
 }
 
 #[derive(Clone, Default)]
@@ -61,6 +79,13 @@ pub struct CacheManagerConfig {
     /// Avoid get same file statistics repeatedly in same datafusion session.
     /// Default is disable. Fow now only supports Parquet files.
     pub table_files_statistics_cache: Option<FileStatisticsCache>,
+    /// Enable cache of file metadata when listing files.
+    /// This setting avoids listing file meta of the same path repeatedly
+    /// in same session, which may be expensive in certain situations (e.g. 
remote object storage).
+    /// Note that if this option is enabled, DataFusion will not see any 
updates to the underlying
+    /// location.  
+    /// Default is disable.
+    pub list_files_cache: Option<ListFilesCache>,
 }
 
 impl CacheManagerConfig {
@@ -71,4 +96,9 @@ impl CacheManagerConfig {
         self.table_files_statistics_cache = cache;
         self
     }
+
+    pub fn with_list_files_cache(mut self, cache: Option<ListFilesCache>) -> 
Self {
+        self.list_files_cache = cache;
+        self
+    }
 }
diff --git a/datafusion/execution/src/cache/cache_unit.rs 
b/datafusion/execution/src/cache/cache_unit.rs
index 3ef699ac23..c432a67587 100644
--- a/datafusion/execution/src/cache/cache_unit.rs
+++ b/datafusion/execution/src/cache/cache_unit.rs
@@ -94,9 +94,69 @@ impl CacheAccessor<Path, Arc<Statistics>> for 
DefaultFileStatisticsCache {
     }
 }
 
+/// Collected files metadata for listing files.
+/// Cache will not invalided until user call remove or clear.
+#[derive(Default)]
+pub struct DefaultListFilesCache {
+    statistics: DashMap<Path, Arc<Vec<ObjectMeta>>>,
+}
+
+impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {
+    type Extra = ObjectMeta;
+
+    fn get(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
+        self.statistics.get(k).map(|x| x.value().clone())
+    }
+
+    fn get_with_extra(
+        &self,
+        _k: &Path,
+        _e: &Self::Extra,
+    ) -> Option<Arc<Vec<ObjectMeta>>> {
+        panic!("Not supported DefaultListFilesCache get_with_extra")
+    }
+
+    fn put(
+        &self,
+        key: &Path,
+        value: Arc<Vec<ObjectMeta>>,
+    ) -> Option<Arc<Vec<ObjectMeta>>> {
+        self.statistics.insert(key.clone(), value)
+    }
+
+    fn put_with_extra(
+        &self,
+        _key: &Path,
+        _value: Arc<Vec<ObjectMeta>>,
+        _e: &Self::Extra,
+    ) -> Option<Arc<Vec<ObjectMeta>>> {
+        panic!("Not supported DefaultListFilesCache put_with_extra")
+    }
+
+    fn remove(&mut self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
+        self.statistics.remove(k).map(|x| x.1)
+    }
+
+    fn contains_key(&self, k: &Path) -> bool {
+        self.statistics.contains_key(k)
+    }
+
+    fn len(&self) -> usize {
+        self.statistics.len()
+    }
+
+    fn clear(&self) {
+        self.statistics.clear()
+    }
+
+    fn name(&self) -> String {
+        "DefaultListFilesCache".to_string()
+    }
+}
+
 #[cfg(test)]
 mod tests {
-    use crate::cache::cache_unit::DefaultFileStatisticsCache;
+    use crate::cache::cache_unit::{DefaultFileStatisticsCache, 
DefaultListFilesCache};
     use crate::cache::CacheAccessor;
     use chrono::DateTime;
     use datafusion_common::Statistics;
@@ -137,4 +197,25 @@ mod tests {
         meta2.location = Path::from("test2");
         assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
     }
+
+    #[test]
+    fn test_list_file_cache() {
+        let meta = ObjectMeta {
+            location: Path::from("test"),
+            last_modified: 
DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
+                .unwrap()
+                .into(),
+            size: 1024,
+            e_tag: None,
+        };
+
+        let cache = DefaultListFilesCache::default();
+        assert!(cache.get(&meta.location).is_none());
+
+        cache.put(&meta.location, vec![meta.clone()].into());
+        assert_eq!(
+            cache.get(&meta.location).unwrap().get(0).unwrap().clone(),
+            meta.clone()
+        );
+    }
 }

Reply via email to