This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 093b775adc [feat] Support cache ListFiles result cache in session
level (#7620)
093b775adc is described below
commit 093b775adc3593e9e5cb7343e28406ed458551ad
Author: Yang Jiang <[email protected]>
AuthorDate: Sat Oct 7 18:23:49 2023 +0800
[feat] Support cache ListFiles result cache in session level (#7620)
* support cache list files under path in session
* add test
* fix clippy
* fix clippy2
* fix comment
* Update datafusion/execution/src/cache/cache_manager.rs
Co-authored-by: Andrew Lamb <[email protected]>
* fix fmt
* fix api change
* fix api change2
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/core/src/datasource/listing/helpers.rs | 16 ++-
datafusion/core/src/datasource/listing/table.rs | 8 +-
datafusion/core/src/datasource/listing/url.rs | 34 +++--
datafusion/core/src/test/object_store.rs | 15 +-
datafusion/core/tests/parquet/file_statistics.rs | 160 +++++++++++++++++-----
datafusion/execution/src/cache/cache_manager.rs | 30 ++++
datafusion/execution/src/cache/cache_unit.rs | 83 ++++++++++-
7 files changed, 296 insertions(+), 50 deletions(-)
diff --git a/datafusion/core/src/datasource/listing/helpers.rs
b/datafusion/core/src/datasource/listing/helpers.rs
index 476c58b698..d6a0add9b2 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -36,6 +36,7 @@ use crate::{error::Result, scalar::ScalarValue};
use super::PartitionedFile;
use crate::datasource::listing::ListingTableUrl;
+use crate::execution::context::SessionState;
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_common::{Column, DFField, DFSchema, DataFusionError};
use datafusion_expr::expr::ScalarUDF;
@@ -315,6 +316,7 @@ async fn prune_partitions(
/// `filters` might contain expressions that can be resolved only at the
/// file level (e.g. Parquet row group pruning).
pub async fn pruned_partition_list<'a>(
+ ctx: &'a SessionState,
store: &'a dyn ObjectStore,
table_path: &'a ListingTableUrl,
filters: &'a [Expr],
@@ -325,7 +327,8 @@ pub async fn pruned_partition_list<'a>(
if partition_cols.is_empty() {
return Ok(Box::pin(
table_path
- .list_all_files(store, file_extension)
+ .list_all_files(ctx, store, file_extension)
+ .await?
.map_ok(|object_meta| object_meta.into()),
));
}
@@ -422,7 +425,7 @@ mod tests {
use futures::StreamExt;
use crate::logical_expr::{case, col, lit};
- use crate::test::object_store::make_test_store;
+ use crate::test::object_store::make_test_store_and_state;
use super::*;
@@ -468,12 +471,13 @@ mod tests {
#[tokio::test]
async fn test_pruned_partition_list_empty() {
- let store = make_test_store(&[
+ let (store, state) = make_test_store_and_state(&[
("tablepath/mypartition=val1/notparquetfile", 100),
("tablepath/file.parquet", 100),
]);
let filter = Expr::eq(col("mypartition"), lit("val1"));
let pruned = pruned_partition_list(
+ &state,
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter],
@@ -490,13 +494,14 @@ mod tests {
#[tokio::test]
async fn test_pruned_partition_list() {
- let store = make_test_store(&[
+ let (store, state) = make_test_store_and_state(&[
("tablepath/mypartition=val1/file.parquet", 100),
("tablepath/mypartition=val2/file.parquet", 100),
("tablepath/mypartition=val1/other=val3/file.parquet", 100),
]);
let filter = Expr::eq(col("mypartition"), lit("val1"));
let pruned = pruned_partition_list(
+ &state,
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter],
@@ -532,7 +537,7 @@ mod tests {
#[tokio::test]
async fn test_pruned_partition_list_multi() {
- let store = make_test_store(&[
+ let (store, state) = make_test_store_and_state(&[
("tablepath/part1=p1v1/file.parquet", 100),
("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100),
("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100),
@@ -544,6 +549,7 @@ mod tests {
// filter3 cannot be resolved at partition pruning
let filter3 = Expr::eq(col("part2"), col("other"));
let pruned = pruned_partition_list(
+ &state,
store.as_ref(),
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter1, filter2, filter3],
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 5b1710d344..7834de2b19 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -165,7 +165,8 @@ impl ListingTableConfig {
.table_paths
.get(0)
.unwrap()
- .list_all_files(store.as_ref(), "")
+ .list_all_files(state, store.as_ref(), "")
+ .await?
.next()
.await
.ok_or_else(|| DataFusionError::Internal("No files for
table".into()))??;
@@ -507,7 +508,8 @@ impl ListingOptions {
let store = state.runtime_env().object_store(table_path)?;
let files: Vec<_> = table_path
- .list_all_files(store.as_ref(), &self.file_extension)
+ .list_all_files(state, store.as_ref(), &self.file_extension)
+ .await?
.try_collect()
.await?;
@@ -844,6 +846,7 @@ impl TableProvider for ListingTable {
let store = state.runtime_env().object_store(table_path)?;
let file_list_stream = pruned_partition_list(
+ state,
store.as_ref(),
table_path,
&[],
@@ -933,6 +936,7 @@ impl ListingTable {
// list files (with partitions)
let file_list =
future::try_join_all(self.table_paths.iter().map(|table_path| {
pruned_partition_list(
+ ctx,
store.as_ref(),
table_path,
filters,
diff --git a/datafusion/core/src/datasource/listing/url.rs
b/datafusion/core/src/datasource/listing/url.rs
index 96998de17b..4d1ca4853a 100644
--- a/datafusion/core/src/datasource/listing/url.rs
+++ b/datafusion/core/src/datasource/listing/url.rs
@@ -18,14 +18,17 @@
use std::fs;
use crate::datasource::object_store::ObjectStoreUrl;
+use crate::execution::context::SessionState;
use datafusion_common::{DataFusionError, Result};
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use glob::Pattern;
use itertools::Itertools;
+use log::debug;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use percent_encoding;
+use std::sync::Arc;
use url::Url;
/// A parsed URL identifying files for a listing table, see
[`ListingTableUrl::parse`]
@@ -185,28 +188,43 @@ impl ListingTableUrl {
}
/// List all files identified by this [`ListingTableUrl`] for the provided
`file_extension`
- pub(crate) fn list_all_files<'a>(
+ pub(crate) async fn list_all_files<'a>(
&'a self,
+ ctx: &'a SessionState,
store: &'a dyn ObjectStore,
file_extension: &'a str,
- ) -> BoxStream<'a, Result<ObjectMeta>> {
+ ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
// If the prefix is a file, use a head request, otherwise list
let is_dir = self.url.as_str().ends_with('/');
let list = match is_dir {
- true => futures::stream::once(store.list(Some(&self.prefix)))
- .try_flatten()
- .boxed(),
+ true => match
ctx.runtime_env().cache_manager.get_list_files_cache() {
+ None => futures::stream::once(store.list(Some(&self.prefix)))
+ .try_flatten()
+ .boxed(),
+ Some(cache) => {
+ if let Some(res) = cache.get(&self.prefix) {
+ debug!("Hit list all files cache");
+
futures::stream::iter(res.as_ref().clone().into_iter().map(Ok))
+ .boxed()
+ } else {
+ let list_res = store.list(Some(&self.prefix)).await;
+ let vec =
list_res?.try_collect::<Vec<ObjectMeta>>().await?;
+ cache.put(&self.prefix, Arc::new(vec.clone()));
+ futures::stream::iter(vec.into_iter().map(Ok)).boxed()
+ }
+ }
+ },
false => futures::stream::once(store.head(&self.prefix)).boxed(),
};
-
- list.map_err(Into::into)
+ Ok(list
.try_filter(move |meta| {
let path = &meta.location;
let extension_match = path.as_ref().ends_with(file_extension);
let glob_match = self.contains(path);
futures::future::ready(extension_match && glob_match)
})
- .boxed()
+ .map_err(DataFusionError::ObjectStore)
+ .boxed())
}
/// Returns this [`ListingTableUrl`] as a string
diff --git a/datafusion/core/src/test/object_store.rs
b/datafusion/core/src/test/object_store.rs
index 425d0724ea..08cebb56cc 100644
--- a/datafusion/core/src/test/object_store.rs
+++ b/datafusion/core/src/test/object_store.rs
@@ -15,7 +15,10 @@
// specific language governing permissions and limitations
// under the License.
//! Object store implementation used for testing
+use crate::execution::context::SessionState;
use crate::prelude::SessionContext;
+use datafusion_execution::config::SessionConfig;
+use datafusion_execution::runtime_env::RuntimeEnv;
use futures::FutureExt;
use object_store::{memory::InMemory, path::Path, ObjectMeta, ObjectStore};
use std::sync::Arc;
@@ -25,11 +28,11 @@ use url::Url;
pub fn register_test_store(ctx: &SessionContext, files: &[(&str, u64)]) {
let url = Url::parse("test://").unwrap();
ctx.runtime_env()
- .register_object_store(&url, make_test_store(files));
+ .register_object_store(&url, make_test_store_and_state(files).0);
}
/// Create a test object store with the provided files
-pub fn make_test_store(files: &[(&str, u64)]) -> Arc<dyn ObjectStore> {
+pub fn make_test_store_and_state(files: &[(&str, u64)]) -> (Arc<InMemory>,
SessionState) {
let memory = InMemory::new();
for (name, size) in files {
@@ -40,7 +43,13 @@ pub fn make_test_store(files: &[(&str, u64)]) -> Arc<dyn
ObjectStore> {
.unwrap();
}
- Arc::new(memory)
+ (
+ Arc::new(memory),
+ SessionState::new_with_config_rt(
+ SessionConfig::default(),
+ Arc::new(RuntimeEnv::default()),
+ ),
+ )
}
/// Helper method to fetch the file size and date at given path and create a
`ObjectMeta`
diff --git a/datafusion/core/tests/parquet/file_statistics.rs
b/datafusion/core/tests/parquet/file_statistics.rs
index 58f81cc571..ac4a91720e 100644
--- a/datafusion/core/tests/parquet/file_statistics.rs
+++ b/datafusion/core/tests/parquet/file_statistics.rs
@@ -19,15 +19,20 @@ use
datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
+use datafusion::datasource::physical_plan::ParquetExec;
use datafusion::datasource::TableProvider;
use datafusion::execution::context::SessionState;
use datafusion::prelude::SessionContext;
use datafusion_execution::cache::cache_manager::CacheManagerConfig;
use datafusion_execution::cache::cache_unit;
-use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
+use datafusion_execution::cache::cache_unit::{
+ DefaultFileStatisticsCache, DefaultListFilesCache,
+};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+use std::fs;
use std::sync::Arc;
+use tempfile::tempdir;
#[tokio::test]
async fn load_table_stats_with_session_level_cache() {
@@ -35,78 +40,162 @@ async fn load_table_stats_with_session_level_cache() {
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
let table_path = ListingTableUrl::parse(filename).unwrap();
- let (cache1, state1) = get_cache_runtime_state();
+ let (cache1, _, state1) = get_cache_runtime_state();
// Create a separate DefaultFileStatisticsCache
- let (cache2, state2) = get_cache_runtime_state();
+ let (cache2, _, state2) = get_cache_runtime_state();
let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
- let table1 = get_listing_with_cache(&table_path, cache1, &state1,
&opt).await;
- let table2 = get_listing_with_cache(&table_path, cache2, &state2,
&opt).await;
+ let table1 = get_listing_table(&table_path, Some(cache1), &opt).await;
+ let table2 = get_listing_table(&table_path, Some(cache2), &opt).await;
//Session 1 first time list files
- assert_eq!(get_cache_size(&state1), 0);
+ assert_eq!(get_static_cache_size(&state1), 0);
let exec1 = table1.scan(&state1, None, &[], None).await.unwrap();
assert_eq!(exec1.statistics().num_rows, Some(8));
assert_eq!(exec1.statistics().total_byte_size, Some(671));
- assert_eq!(get_cache_size(&state1), 1);
+ assert_eq!(get_static_cache_size(&state1), 1);
//Session 2 first time list files
//check session 1 cache result not show in session 2
- assert_eq!(
- state2
- .runtime_env()
- .cache_manager
- .get_file_statistic_cache()
- .unwrap()
- .len(),
- 0
- );
+ assert_eq!(get_static_cache_size(&state2), 0);
let exec2 = table2.scan(&state2, None, &[], None).await.unwrap();
assert_eq!(exec2.statistics().num_rows, Some(8));
assert_eq!(exec2.statistics().total_byte_size, Some(671));
- assert_eq!(get_cache_size(&state2), 1);
+ assert_eq!(get_static_cache_size(&state2), 1);
//Session 1 second time list files
//check session 1 cache result not show in session 2
- assert_eq!(get_cache_size(&state1), 1);
+ assert_eq!(get_static_cache_size(&state1), 1);
let exec3 = table1.scan(&state1, None, &[], None).await.unwrap();
assert_eq!(exec3.statistics().num_rows, Some(8));
assert_eq!(exec3.statistics().total_byte_size, Some(671));
// List same file no increase
- assert_eq!(get_cache_size(&state1), 1);
+ assert_eq!(get_static_cache_size(&state1), 1);
}
-async fn get_listing_with_cache(
+#[tokio::test]
+async fn list_files_with_session_level_cache() {
+ let p_name = "alltypes_plain.parquet";
+ let testdata = datafusion::test_util::parquet_test_data();
+ let filename = format!("{}/{}", testdata, p_name);
+
+ let temp_path1 = tempdir()
+ .unwrap()
+ .into_path()
+ .into_os_string()
+ .into_string()
+ .unwrap();
+ let temp_filename1 = format!("{}/{}", temp_path1, p_name);
+
+ let temp_path2 = tempdir()
+ .unwrap()
+ .into_path()
+ .into_os_string()
+ .into_string()
+ .unwrap();
+ let temp_filename2 = format!("{}/{}", temp_path2, p_name);
+
+ fs::copy(filename.clone(), temp_filename1).expect("panic");
+ fs::copy(filename, temp_filename2).expect("panic");
+
+ let table_path = ListingTableUrl::parse(temp_path1).unwrap();
+
+ let (_, _, state1) = get_cache_runtime_state();
+
+ // Create a separate DefaultFileStatisticsCache
+ let (_, _, state2) = get_cache_runtime_state();
+
+ let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
+
+ let table1 = get_listing_table(&table_path, None, &opt).await;
+ let table2 = get_listing_table(&table_path, None, &opt).await;
+
+ //Session 1 first time list files
+ assert_eq!(get_list_file_cache_size(&state1), 0);
+ let exec1 = table1.scan(&state1, None, &[], None).await.unwrap();
+ let parquet1 = exec1.as_any().downcast_ref::<ParquetExec>().unwrap();
+
+ assert_eq!(get_list_file_cache_size(&state1), 1);
+ let fg = &parquet1.base_config().file_groups;
+ assert_eq!(fg.len(), 1);
+ assert_eq!(fg.get(0).unwrap().len(), 1);
+
+ //Session 2 first time list files
+ //check session 1 cache result not show in session 2
+ assert_eq!(get_list_file_cache_size(&state2), 0);
+ let exec2 = table2.scan(&state2, None, &[], None).await.unwrap();
+ let parquet2 = exec2.as_any().downcast_ref::<ParquetExec>().unwrap();
+
+ assert_eq!(get_list_file_cache_size(&state2), 1);
+ let fg2 = &parquet2.base_config().file_groups;
+ assert_eq!(fg2.len(), 1);
+ assert_eq!(fg2.get(0).unwrap().len(), 1);
+
+ //Session 1 second time list files
+ //check session 1 cache result not show in session 2
+ assert_eq!(get_list_file_cache_size(&state1), 1);
+ let exec3 = table1.scan(&state1, None, &[], None).await.unwrap();
+ let parquet3 = exec3.as_any().downcast_ref::<ParquetExec>().unwrap();
+
+ assert_eq!(get_list_file_cache_size(&state1), 1);
+ let fg = &parquet3.base_config().file_groups;
+ assert_eq!(fg.len(), 1);
+ assert_eq!(fg.get(0).unwrap().len(), 1);
+ // List same file no increase
+ assert_eq!(get_list_file_cache_size(&state1), 1);
+}
+
+async fn get_listing_table(
table_path: &ListingTableUrl,
- cache1: Arc<DefaultFileStatisticsCache>,
- state1: &SessionState,
+ static_cache: Option<Arc<DefaultFileStatisticsCache>>,
opt: &ListingOptions,
) -> ListingTable {
- let schema = opt.infer_schema(state1, table_path).await.unwrap();
+ let schema = opt
+ .infer_schema(
+ &SessionState::new_with_config_rt(
+ SessionConfig::default(),
+ Arc::new(RuntimeEnv::default()),
+ ),
+ table_path,
+ )
+ .await
+ .unwrap();
let config1 = ListingTableConfig::new(table_path.clone())
.with_listing_options(opt.clone())
.with_schema(schema);
- ListingTable::try_new(config1)
- .unwrap()
- .with_cache(Some(cache1))
+ let table = ListingTable::try_new(config1).unwrap();
+ if let Some(c) = static_cache {
+ table.with_cache(Some(c))
+ } else {
+ table
+ }
}
-fn get_cache_runtime_state() -> (Arc<DefaultFileStatisticsCache>,
SessionState) {
+fn get_cache_runtime_state() -> (
+ Arc<DefaultFileStatisticsCache>,
+ Arc<DefaultListFilesCache>,
+ SessionState,
+) {
let cache_config = CacheManagerConfig::default();
- let cache1 = Arc::new(cache_unit::DefaultFileStatisticsCache::default());
- let cache_config =
cache_config.with_files_statistics_cache(Some(cache1.clone()));
+ let file_static_cache =
Arc::new(cache_unit::DefaultFileStatisticsCache::default());
+ let list_file_cache =
Arc::new(cache_unit::DefaultListFilesCache::default());
+
+ let cache_config = cache_config
+ .with_files_statistics_cache(Some(file_static_cache.clone()))
+ .with_list_files_cache(Some(list_file_cache.clone()));
+
let rt = Arc::new(
RuntimeEnv::new(RuntimeConfig::new().with_cache_manager(cache_config)).unwrap(),
);
let state = SessionContext::new_with_config_rt(SessionConfig::default(),
rt).state();
- (cache1, state)
+ (file_static_cache, list_file_cache, state)
}
-fn get_cache_size(state1: &SessionState) -> usize {
+fn get_static_cache_size(state1: &SessionState) -> usize {
state1
.runtime_env()
.cache_manager
@@ -114,3 +203,12 @@ fn get_cache_size(state1: &SessionState) -> usize {
.unwrap()
.len()
}
+
+fn get_list_file_cache_size(state1: &SessionState) -> usize {
+ state1
+ .runtime_env()
+ .cache_manager
+ .get_list_files_cache()
+ .unwrap()
+ .len()
+}
diff --git a/datafusion/execution/src/cache/cache_manager.rs
b/datafusion/execution/src/cache/cache_manager.rs
index 987b47bbb8..9752926368 100644
--- a/datafusion/execution/src/cache/cache_manager.rs
+++ b/datafusion/execution/src/cache/cache_manager.rs
@@ -29,15 +29,25 @@ use std::sync::Arc;
pub type FileStatisticsCache =
Arc<dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta>>;
+pub type ListFilesCache =
+ Arc<dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>>;
+
impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
}
}
+impl Debug for dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra =
ObjectMeta> {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(f, "Cache name: {} with length: {}", self.name(), self.len())
+ }
+}
+
#[derive(Default, Debug)]
pub struct CacheManager {
file_statistic_cache: Option<FileStatisticsCache>,
+ list_files_cache: Option<ListFilesCache>,
}
impl CacheManager {
@@ -46,6 +56,9 @@ impl CacheManager {
if let Some(cc) = &config.table_files_statistics_cache {
manager.file_statistic_cache = Some(cc.clone())
}
+ if let Some(lc) = &config.list_files_cache {
+ manager.list_files_cache = Some(lc.clone())
+ }
Ok(Arc::new(manager))
}
@@ -53,6 +66,11 @@ impl CacheManager {
pub fn get_file_statistic_cache(&self) -> Option<FileStatisticsCache> {
self.file_statistic_cache.clone()
}
+
+ /// Get the cache of objectMeta under same path.
+ pub fn get_list_files_cache(&self) -> Option<ListFilesCache> {
+ self.list_files_cache.clone()
+ }
}
#[derive(Clone, Default)]
@@ -61,6 +79,13 @@ pub struct CacheManagerConfig {
/// Avoid get same file statistics repeatedly in same datafusion session.
/// Default is disable. Fow now only supports Parquet files.
pub table_files_statistics_cache: Option<FileStatisticsCache>,
+ /// Enable cache of file metadata when listing files.
+ /// This setting avoids listing file meta of the same path repeatedly
+ /// in same session, which may be expensive in certain situations (e.g.
remote object storage).
+ /// Note that if this option is enabled, DataFusion will not see any
updates to the underlying
+ /// location.
+ /// Default is disable.
+ pub list_files_cache: Option<ListFilesCache>,
}
impl CacheManagerConfig {
@@ -71,4 +96,9 @@ impl CacheManagerConfig {
self.table_files_statistics_cache = cache;
self
}
+
+ pub fn with_list_files_cache(mut self, cache: Option<ListFilesCache>) ->
Self {
+ self.list_files_cache = cache;
+ self
+ }
}
diff --git a/datafusion/execution/src/cache/cache_unit.rs
b/datafusion/execution/src/cache/cache_unit.rs
index 3ef699ac23..c432a67587 100644
--- a/datafusion/execution/src/cache/cache_unit.rs
+++ b/datafusion/execution/src/cache/cache_unit.rs
@@ -94,9 +94,69 @@ impl CacheAccessor<Path, Arc<Statistics>> for
DefaultFileStatisticsCache {
}
}
+/// Collected files metadata for listing files.
+/// Cache will not invalided until user call remove or clear.
+#[derive(Default)]
+pub struct DefaultListFilesCache {
+ statistics: DashMap<Path, Arc<Vec<ObjectMeta>>>,
+}
+
+impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {
+ type Extra = ObjectMeta;
+
+ fn get(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
+ self.statistics.get(k).map(|x| x.value().clone())
+ }
+
+ fn get_with_extra(
+ &self,
+ _k: &Path,
+ _e: &Self::Extra,
+ ) -> Option<Arc<Vec<ObjectMeta>>> {
+ panic!("Not supported DefaultListFilesCache get_with_extra")
+ }
+
+ fn put(
+ &self,
+ key: &Path,
+ value: Arc<Vec<ObjectMeta>>,
+ ) -> Option<Arc<Vec<ObjectMeta>>> {
+ self.statistics.insert(key.clone(), value)
+ }
+
+ fn put_with_extra(
+ &self,
+ _key: &Path,
+ _value: Arc<Vec<ObjectMeta>>,
+ _e: &Self::Extra,
+ ) -> Option<Arc<Vec<ObjectMeta>>> {
+ panic!("Not supported DefaultListFilesCache put_with_extra")
+ }
+
+ fn remove(&mut self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
+ self.statistics.remove(k).map(|x| x.1)
+ }
+
+ fn contains_key(&self, k: &Path) -> bool {
+ self.statistics.contains_key(k)
+ }
+
+ fn len(&self) -> usize {
+ self.statistics.len()
+ }
+
+ fn clear(&self) {
+ self.statistics.clear()
+ }
+
+ fn name(&self) -> String {
+ "DefaultListFilesCache".to_string()
+ }
+}
+
#[cfg(test)]
mod tests {
- use crate::cache::cache_unit::DefaultFileStatisticsCache;
+ use crate::cache::cache_unit::{DefaultFileStatisticsCache,
DefaultListFilesCache};
use crate::cache::CacheAccessor;
use chrono::DateTime;
use datafusion_common::Statistics;
@@ -137,4 +197,25 @@ mod tests {
meta2.location = Path::from("test2");
assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
}
+
+ #[test]
+ fn test_list_file_cache() {
+ let meta = ObjectMeta {
+ location: Path::from("test"),
+ last_modified:
DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
+ .unwrap()
+ .into(),
+ size: 1024,
+ e_tag: None,
+ };
+
+ let cache = DefaultListFilesCache::default();
+ assert!(cache.get(&meta.location).is_none());
+
+ cache.put(&meta.location, vec![meta.clone()].into());
+ assert_eq!(
+ cache.get(&meta.location).unwrap().get(0).unwrap().clone(),
+ meta.clone()
+ );
+ }
}