This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 25ad99dc6e feat: Add the ability to review the contents of the
Metadata Cache (#17126)
25ad99dc6e is described below
commit 25ad99dc6e37c4e320c1f74ee5106b1d4b184507
Author: Nuno Faria <[email protected]>
AuthorDate: Tue Aug 12 21:08:31 2025 +0100
feat: Add the ability to review the contents of the Metadata Cache (#17126)
* feat: Add the ability to review the contents of the Metadata Cache
* Remove e_tag from test_metadata_cache
* Add entry in the user doc about this function
* Change type to UInt64
* Fix prettier
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion-cli/src/functions.rs | 123 ++++++++-
datafusion-cli/src/main.rs | 105 +++++++-
datafusion/datasource-parquet/src/reader.rs | 7 +
datafusion/execution/src/cache/cache_manager.rs | 24 +-
datafusion/execution/src/cache/cache_unit.rs | 263 ++++++++++++++++++-
datafusion/execution/src/cache/lru_queue.rs | 7 +-
.../user-guide/cli/{usage.md => functions.md} | 282 +++++----------------
docs/source/user-guide/cli/index.rst | 1 +
docs/source/user-guide/cli/usage.md | 63 +----
9 files changed, 588 insertions(+), 287 deletions(-)
diff --git a/datafusion-cli/src/functions.rs b/datafusion-cli/src/functions.rs
index f07dac649d..3ec446c515 100644
--- a/datafusion-cli/src/functions.rs
+++ b/datafusion-cli/src/functions.rs
@@ -22,8 +22,8 @@ use std::fs::File;
use std::str::FromStr;
use std::sync::Arc;
-use arrow::array::{Int64Array, StringArray};
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow::array::{Int64Array, StringArray, TimestampMillisecondArray,
UInt64Array};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches;
use datafusion::catalog::{Session, TableFunctionImpl};
@@ -31,6 +31,7 @@ use datafusion::common::{plan_err, Column};
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
+use datafusion::execution::cache::cache_manager::CacheManager;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::scalar::ScalarValue;
@@ -460,3 +461,121 @@ impl TableFunctionImpl for ParquetMetadataFunc {
Ok(Arc::new(parquet_metadata))
}
}
+
+/// METADATA_CACHE table function
+#[derive(Debug)]
+struct MetadataCacheTable {
+ schema: SchemaRef,
+ batch: RecordBatch,
+}
+
+#[async_trait]
+impl TableProvider for MetadataCacheTable {
+ fn as_any(&self) -> &dyn std::any::Any {
+ self
+ }
+
+ fn schema(&self) -> arrow::datatypes::SchemaRef {
+ self.schema.clone()
+ }
+
+ fn table_type(&self) -> datafusion::logical_expr::TableType {
+ datafusion::logical_expr::TableType::Base
+ }
+
+ async fn scan(
+ &self,
+ _state: &dyn Session,
+ projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(MemorySourceConfig::try_new_exec(
+ &[vec![self.batch.clone()]],
+ TableProvider::schema(self),
+ projection.cloned(),
+ )?)
+ }
+}
+
+#[derive(Debug)]
+pub struct MetadataCacheFunc {
+ cache_manager: Arc<CacheManager>,
+}
+
+impl MetadataCacheFunc {
+ pub fn new(cache_manager: Arc<CacheManager>) -> Self {
+ Self { cache_manager }
+ }
+}
+
+impl TableFunctionImpl for MetadataCacheFunc {
+ fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
+ if !exprs.is_empty() {
+ return plan_err!("metadata_cache should have no arguments");
+ }
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("path", DataType::Utf8, false),
+ Field::new(
+ "file_modified",
+ DataType::Timestamp(TimeUnit::Millisecond, None),
+ false,
+ ),
+ Field::new("file_size_bytes", DataType::UInt64, false),
+ Field::new("e_tag", DataType::Utf8, true),
+ Field::new("version", DataType::Utf8, true),
+ Field::new("metadata_size_bytes", DataType::UInt64, false),
+ Field::new("hits", DataType::UInt64, false),
+ Field::new("extra", DataType::Utf8, true),
+ ]));
+
+ // construct record batch from metadata
+ let mut path_arr = vec![];
+ let mut file_modified_arr = vec![];
+ let mut file_size_bytes_arr = vec![];
+ let mut e_tag_arr = vec![];
+ let mut version_arr = vec![];
+ let mut metadata_size_bytes = vec![];
+ let mut hits_arr = vec![];
+ let mut extra_arr = vec![];
+
+ let cached_entries =
self.cache_manager.get_file_metadata_cache().list_entries();
+
+ for (path, entry) in cached_entries {
+ path_arr.push(path.to_string());
+ file_modified_arr
+
.push(Some(entry.object_meta.last_modified.timestamp_millis()));
+ file_size_bytes_arr.push(entry.object_meta.size);
+ e_tag_arr.push(entry.object_meta.e_tag);
+ version_arr.push(entry.object_meta.version);
+ metadata_size_bytes.push(entry.size_bytes as u64);
+ hits_arr.push(entry.hits as u64);
+
+ let mut extra = entry
+ .extra
+ .iter()
+ .map(|(k, v)| format!("{k}={v}"))
+ .collect::<Vec<_>>();
+ extra.sort();
+ extra_arr.push(extra.join(" "));
+ }
+
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(StringArray::from(path_arr)),
+ Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
+ Arc::new(UInt64Array::from(file_size_bytes_arr)),
+ Arc::new(StringArray::from(e_tag_arr)),
+ Arc::new(StringArray::from(version_arr)),
+ Arc::new(UInt64Array::from(metadata_size_bytes)),
+ Arc::new(UInt64Array::from(hits_arr)),
+ Arc::new(StringArray::from(extra_arr)),
+ ],
+ )?;
+
+ let metadata_cache = MetadataCacheTable { schema, batch };
+ Ok(Arc::new(metadata_cache))
+ }
+}
diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index fdecb185e3..a28e97a9f8 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -30,7 +30,7 @@ use datafusion::execution::memory_pool::{
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
-use datafusion_cli::functions::ParquetMetadataFunc;
+use datafusion_cli::functions::{MetadataCacheFunc, ParquetMetadataFunc};
use datafusion_cli::{
exec,
pool_type::PoolType,
@@ -219,6 +219,14 @@ async fn main_inner() -> Result<()> {
// register `parquet_metadata` table function to get metadata from parquet
files
ctx.register_udtf("parquet_metadata", Arc::new(ParquetMetadataFunc {}));
+ // register `metadata_cache` table function to get the contents of the
file metadata cache
+ ctx.register_udtf(
+ "metadata_cache",
+ Arc::new(MetadataCacheFunc::new(
+ ctx.task_ctx().runtime_env().cache_manager.clone(),
+ )),
+ );
+
let mut print_options = PrintOptions {
format: args.format,
quiet: args.quiet,
@@ -397,7 +405,7 @@ pub fn extract_disk_limit(size: &str) -> Result<usize,
String> {
#[cfg(test)]
mod tests {
use super::*;
- use datafusion::common::test_util::batches_to_string;
+ use datafusion::{common::test_util::batches_to_string,
prelude::ParquetReadOptions};
use insta::assert_snapshot;
fn assert_conversion(input: &str, expected: Result<usize, String>) {
@@ -512,4 +520,97 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn test_metadata_cache() -> Result<(), DataFusionError> {
+ let ctx = SessionContext::new();
+ ctx.register_udtf(
+ "metadata_cache",
+ Arc::new(MetadataCacheFunc::new(
+ ctx.task_ctx().runtime_env().cache_manager.clone(),
+ )),
+ );
+
+ ctx.register_parquet(
+ "alltypes_plain",
+ "../parquet-testing/data/alltypes_plain.parquet",
+ ParquetReadOptions::new(),
+ )
+ .await?;
+
+ ctx.register_parquet(
+ "alltypes_tiny_pages",
+ "../parquet-testing/data/alltypes_tiny_pages.parquet",
+ ParquetReadOptions::new(),
+ )
+ .await?;
+
+ ctx.register_parquet(
+ "lz4_raw_compressed_larger",
+ "../parquet-testing/data/lz4_raw_compressed_larger.parquet",
+ ParquetReadOptions::new(),
+ )
+ .await?;
+
+ ctx.sql("select * from alltypes_plain")
+ .await?
+ .collect()
+ .await?;
+ ctx.sql("select * from alltypes_tiny_pages")
+ .await?
+ .collect()
+ .await?;
+ ctx.sql("select * from lz4_raw_compressed_larger")
+ .await?
+ .collect()
+ .await?;
+
+ // initial state
+ let sql = "SELECT split_part(path, '/', -1) as filename,
file_size_bytes, metadata_size_bytes, hits, extra from metadata_cache() order
by filename";
+ let df = ctx.sql(sql).await?;
+ let rbs = df.collect().await?;
+
+ assert_snapshot!(batches_to_string(&rbs),@r#"
+
+-----------------------------------+-----------------+---------------------+------+------------------+
+ | filename | file_size_bytes |
metadata_size_bytes | hits | extra |
+
+-----------------------------------+-----------------+---------------------+------+------------------+
+ | alltypes_plain.parquet | 1851 | 10181
| 2 | page_index=false |
+ | alltypes_tiny_pages.parquet | 454233 | 881634
| 2 | page_index=true |
+ | lz4_raw_compressed_larger.parquet | 380836 | 2939
| 2 | page_index=false |
+
+-----------------------------------+-----------------+---------------------+------+------------------+
+ "#);
+
+ // increase the number of hits
+ ctx.sql("select * from alltypes_plain")
+ .await?
+ .collect()
+ .await?;
+ ctx.sql("select * from alltypes_plain")
+ .await?
+ .collect()
+ .await?;
+ ctx.sql("select * from alltypes_plain")
+ .await?
+ .collect()
+ .await?;
+ ctx.sql("select * from lz4_raw_compressed_larger")
+ .await?
+ .collect()
+ .await?;
+ let sql = "select split_part(path, '/', -1) as filename,
file_size_bytes, metadata_size_bytes, hits, extra from metadata_cache() order
by filename";
+ let df = ctx.sql(sql).await?;
+ let rbs = df.collect().await?;
+
+ assert_snapshot!(batches_to_string(&rbs),@r#"
+
+-----------------------------------+-----------------+---------------------+------+------------------+
+ | filename | file_size_bytes |
metadata_size_bytes | hits | extra |
+
+-----------------------------------+-----------------+---------------------+------+------------------+
+ | alltypes_plain.parquet | 1851 | 10181
| 5 | page_index=false |
+ | alltypes_tiny_pages.parquet | 454233 | 881634
| 2 | page_index=true |
+ | lz4_raw_compressed_larger.parquet | 380836 | 2939
| 3 | page_index=false |
+
+-----------------------------------+-----------------+---------------------+------+------------------+
+ "#);
+
+ Ok(())
+ }
}
diff --git a/datafusion/datasource-parquet/src/reader.rs
b/datafusion/datasource-parquet/src/reader.rs
index df37581868..eab801e506 100644
--- a/datafusion/datasource-parquet/src/reader.rs
+++ b/datafusion/datasource-parquet/src/reader.rs
@@ -30,6 +30,7 @@ use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use parquet::file::metadata::ParquetMetaData;
use std::any::Any;
+use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;
@@ -296,4 +297,10 @@ impl FileMetadata for CachedParquetMetaData {
fn memory_size(&self) -> usize {
self.0.memory_size()
}
+
+ fn extra_info(&self) -> HashMap<String, String> {
+ let page_index =
+ self.0.column_index().is_some() && self.0.offset_index().is_some();
+ HashMap::from([("page_index".to_owned(), page_index.to_string())])
+ }
}
diff --git a/datafusion/execution/src/cache/cache_manager.rs
b/datafusion/execution/src/cache/cache_manager.rs
index a91e4f8458..473cc2b2fa 100644
--- a/datafusion/execution/src/cache/cache_manager.rs
+++ b/datafusion/execution/src/cache/cache_manager.rs
@@ -21,6 +21,7 @@ use datafusion_common::{Result, Statistics};
use object_store::path::Path;
use object_store::ObjectMeta;
use std::any::Any;
+use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
@@ -42,17 +43,36 @@ pub trait FileMetadata: Any + Send + Sync {
/// Returns the size of the metadata in bytes.
fn memory_size(&self) -> usize;
+
+ /// Returns extra information about this entry (used by
[`FileMetadataCache::list_entries`]).
+ fn extra_info(&self) -> HashMap<String, String>;
}
/// Cache to store file-embedded metadata.
pub trait FileMetadataCache:
CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>, Extra = ObjectMeta>
{
- // Returns the cache's memory limit in bytes.
+ /// Returns the cache's memory limit in bytes.
fn cache_limit(&self) -> usize;
- // Updates the cache with a new memory limit in bytes.
+ /// Updates the cache with a new memory limit in bytes.
fn update_cache_limit(&self, limit: usize);
+
+ /// Retrieves the information about the entries currently cached.
+ fn list_entries(&self) -> HashMap<Path, FileMetadataCacheEntry>;
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+/// Represents information about a cached metadata entry.
+/// This is used to expose the metadata cache contents to outside modules.
+pub struct FileMetadataCacheEntry {
+ pub object_meta: ObjectMeta,
+ /// Size of the cached metadata, in bytes.
+ pub size_bytes: usize,
+ /// Number of times this entry was retrieved.
+ pub hits: usize,
+ /// Additional object-specific information.
+ pub extra: HashMap<String, String>,
}
impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> {
diff --git a/datafusion/execution/src/cache/cache_unit.rs
b/datafusion/execution/src/cache/cache_unit.rs
index 576076ca4e..067a61bf35 100644
--- a/datafusion/execution/src/cache/cache_unit.rs
+++ b/datafusion/execution/src/cache/cache_unit.rs
@@ -15,9 +15,12 @@
// specific language governing permissions and limitations
// under the License.
+use std::collections::HashMap;
use std::sync::{Arc, Mutex};
-use crate::cache::cache_manager::{FileMetadata, FileMetadataCache};
+use crate::cache::cache_manager::{
+ FileMetadata, FileMetadataCache, FileMetadataCacheEntry,
+};
use crate::cache::lru_queue::LruQueue;
use crate::cache::CacheAccessor;
@@ -164,6 +167,7 @@ struct DefaultFilesMetadataCacheState {
lru_queue: LruQueue<Path, (ObjectMeta, Arc<dyn FileMetadata>)>,
memory_limit: usize,
memory_used: usize,
+ cache_hits: HashMap<Path, usize>,
}
impl DefaultFilesMetadataCacheState {
@@ -172,6 +176,7 @@ impl DefaultFilesMetadataCacheState {
lru_queue: LruQueue::new(),
memory_limit,
memory_used: 0,
+ cache_hits: HashMap::new(),
}
}
@@ -187,6 +192,7 @@ impl DefaultFilesMetadataCacheState {
{
None
} else {
+ *self.cache_hits.entry(k.location.clone()).or_insert(0) +=
1;
Some(Arc::clone(metadata))
}
})
@@ -220,6 +226,7 @@ impl DefaultFilesMetadataCacheState {
return None;
}
+ self.cache_hits.insert(key.location.clone(), 0);
// if the key is already in the cache, the old value is removed
let old_value = self.lru_queue.put(key.location.clone(), (key, value));
self.memory_used += value_size;
@@ -253,6 +260,7 @@ impl DefaultFilesMetadataCacheState {
fn remove(&mut self, k: &ObjectMeta) -> Option<Arc<dyn FileMetadata>> {
if let Some((_, old_metadata)) = self.lru_queue.remove(&k.location) {
self.memory_used -= old_metadata.memory_size();
+ self.cache_hits.remove(&k.location);
Some(old_metadata)
} else {
None
@@ -268,6 +276,7 @@ impl DefaultFilesMetadataCacheState {
fn clear(&mut self) {
self.lru_queue.clear();
self.memory_used = 0;
+ self.cache_hits.clear();
}
}
@@ -310,6 +319,25 @@ impl FileMetadataCache for DefaultFilesMetadataCache {
state.memory_limit = limit;
state.evict_entries();
}
+
+ fn list_entries(&self) -> HashMap<Path, FileMetadataCacheEntry> {
+ let state = self.state.lock().unwrap();
+ let mut entries = HashMap::<Path, FileMetadataCacheEntry>::new();
+
+ for (path, (object_meta, metadata)) in state.lru_queue.list_entries() {
+ entries.insert(
+ path.clone(),
+ FileMetadataCacheEntry {
+ object_meta: object_meta.clone(),
+ size_bytes: metadata.memory_size(),
+ hits: *state.cache_hits.get(path).expect("entry must
exist"),
+ extra: metadata.extra_info(),
+ },
+ );
+ }
+
+ entries
+ }
}
impl CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>> for
DefaultFilesMetadataCache {
@@ -373,9 +401,12 @@ impl CacheAccessor<ObjectMeta, Arc<dyn FileMetadata>> for
DefaultFilesMetadataCa
#[cfg(test)]
mod tests {
+ use std::collections::HashMap;
use std::sync::Arc;
- use crate::cache::cache_manager::{FileMetadata, FileMetadataCache};
+ use crate::cache::cache_manager::{
+ FileMetadata, FileMetadataCache, FileMetadataCacheEntry,
+ };
use crate::cache::cache_unit::{
DefaultFileStatisticsCache, DefaultFilesMetadataCache,
DefaultListFilesCache,
};
@@ -464,6 +495,10 @@ mod tests {
fn memory_size(&self) -> usize {
self.metadata.len()
}
+
+ fn extra_info(&self) -> HashMap<String, String> {
+ HashMap::from([("extra_info".to_owned(), "abc".to_owned())])
+ }
}
#[test]
@@ -663,4 +698,228 @@ mod tests {
assert!(!cache.contains_key(&object_meta13));
assert!(cache.contains_key(&object_meta14));
}
+
+ #[test]
+ fn test_default_file_metadata_cache_entries_info() {
+ let mut cache = DefaultFilesMetadataCache::new(1000);
+ let (object_meta1, metadata1) = generate_test_metadata_with_size("1",
100);
+ let (object_meta2, metadata2) = generate_test_metadata_with_size("2",
200);
+ let (object_meta3, metadata3) = generate_test_metadata_with_size("3",
300);
+
+ // initial entries, all will have hits = 0
+ cache.put(&object_meta1, metadata1);
+ cache.put(&object_meta2, metadata2);
+ cache.put(&object_meta3, metadata3);
+ assert_eq!(
+ cache.list_entries(),
+ HashMap::from([
+ (
+ Path::from("1"),
+ FileMetadataCacheEntry {
+ object_meta: object_meta1.clone(),
+ size_bytes: 100,
+ hits: 0,
+ extra: HashMap::from([(
+ "extra_info".to_owned(),
+ "abc".to_owned()
+ )]),
+ }
+ ),
+ (
+ Path::from("2"),
+ FileMetadataCacheEntry {
+ object_meta: object_meta2.clone(),
+ size_bytes: 200,
+ hits: 0,
+ extra: HashMap::from([(
+ "extra_info".to_owned(),
+ "abc".to_owned()
+ )]),
+ }
+ ),
+ (
+ Path::from("3"),
+ FileMetadataCacheEntry {
+ object_meta: object_meta3.clone(),
+ size_bytes: 300,
+ hits: 0,
+ extra: HashMap::from([(
+ "extra_info".to_owned(),
+ "abc".to_owned()
+ )]),
+ }
+ )
+ ])
+ );
+
+ // new hit on "1"
+ cache.get(&object_meta1);
+ assert_eq!(
+ cache.list_entries(),
+ HashMap::from([
+ (
+ Path::from("1"),
+ FileMetadataCacheEntry {
+ object_meta: object_meta1.clone(),
+ size_bytes: 100,
+ hits: 1,
+ extra: HashMap::from([(
+ "extra_info".to_owned(),
+ "abc".to_owned()
+ )]),
+ }
+ ),
+ (
+ Path::from("2"),
+ FileMetadataCacheEntry {
+ object_meta: object_meta2.clone(),
+ size_bytes: 200,
+ hits: 0,
+ extra: HashMap::from([(
+ "extra_info".to_owned(),
+ "abc".to_owned()
+ )]),
+ }
+ ),
+ (
+ Path::from("3"),
+ FileMetadataCacheEntry {
+ object_meta: object_meta3.clone(),
+ size_bytes: 300,
+ hits: 0,
+ extra: HashMap::from([(
+ "extra_info".to_owned(),
+ "abc".to_owned()
+ )]),
+ }
+ )
+ ])
+ );
+
+ // new entry, will evict "2"
+ let (object_meta4, metadata4) = generate_test_metadata_with_size("4",
600);
+ cache.put(&object_meta4, metadata4);
+ assert_eq!(
+ cache.list_entries(),
+ HashMap::from([
+ (
+ Path::from("1"),
+ FileMetadataCacheEntry {
+ object_meta: object_meta1.clone(),
+ size_bytes: 100,
+ hits: 1,
+ extra: HashMap::from([(
+ "extra_info".to_owned(),
+ "abc".to_owned()
+ )]),
+ }
+ ),
+ (
+ Path::from("3"),
+ FileMetadataCacheEntry {
+ object_meta: object_meta3.clone(),
+ size_bytes: 300,
+ hits: 0,
+ extra: HashMap::from([(
+ "extra_info".to_owned(),
+ "abc".to_owned()
+ )]),
+ }
+ ),
+ (
+ Path::from("4"),
+ FileMetadataCacheEntry {
+ object_meta: object_meta4.clone(),
+ size_bytes: 600,
+ hits: 0,
+ extra: HashMap::from([(
+ "extra_info".to_owned(),
+ "abc".to_owned()
+ )]),
+ }
+ )
+ ])
+ );
+
+ // replace entry "1"
+ let (object_meta1_new, metadata1_new) =
generate_test_metadata_with_size("1", 50);
+ cache.put(&object_meta1_new, metadata1_new);
+ assert_eq!(
+ cache.list_entries(),
+ HashMap::from([
+ (
+ Path::from("1"),
+ FileMetadataCacheEntry {
+ object_meta: object_meta1_new.clone(),
+ size_bytes: 50,
+ hits: 0,
+ extra: HashMap::from([(
+ "extra_info".to_owned(),
+ "abc".to_owned()
+ )]),
+ }
+ ),
+ (
+ Path::from("3"),
+ FileMetadataCacheEntry {
+ object_meta: object_meta3.clone(),
+ size_bytes: 300,
+ hits: 0,
+ extra: HashMap::from([(
+ "extra_info".to_owned(),
+ "abc".to_owned()
+ )]),
+ }
+ ),
+ (
+ Path::from("4"),
+ FileMetadataCacheEntry {
+ object_meta: object_meta4.clone(),
+ size_bytes: 600,
+ hits: 0,
+ extra: HashMap::from([(
+ "extra_info".to_owned(),
+ "abc".to_owned()
+ )]),
+ }
+ )
+ ])
+ );
+
+ // remove entry "4"
+ cache.remove(&object_meta4);
+ assert_eq!(
+ cache.list_entries(),
+ HashMap::from([
+ (
+ Path::from("1"),
+ FileMetadataCacheEntry {
+ object_meta: object_meta1_new.clone(),
+ size_bytes: 50,
+ hits: 0,
+ extra: HashMap::from([(
+ "extra_info".to_owned(),
+ "abc".to_owned()
+ )]),
+ }
+ ),
+ (
+ Path::from("3"),
+ FileMetadataCacheEntry {
+ object_meta: object_meta3.clone(),
+ size_bytes: 300,
+ hits: 0,
+ extra: HashMap::from([(
+ "extra_info".to_owned(),
+ "abc".to_owned()
+ )]),
+ }
+ )
+ ])
+ );
+
+ // clear
+ cache.clear();
+ assert_eq!(cache.list_entries(), HashMap::from([]));
+ }
}
diff --git a/datafusion/execution/src/cache/lru_queue.rs
b/datafusion/execution/src/cache/lru_queue.rs
index 3dc308dc3f..fb3d158ced 100644
--- a/datafusion/execution/src/cache/lru_queue.rs
+++ b/datafusion/execution/src/cache/lru_queue.rs
@@ -201,12 +201,17 @@ impl<K: Eq + Hash + Clone, V> LruQueue<K, V> {
self.data.is_empty()
}
- // Removes all entries from the queue.
+ /// Removes all entries from the queue.
pub fn clear(&mut self) {
self.queue.head = None;
self.queue.tail = None;
self.data.clear();
}
+
+ /// Returns a reference to the entries currently in the queue.
+ pub fn list_entries(&self) -> HashMap<&K, &V> {
+ self.data.iter().map(|(k, (_, v))| (k, v)).collect()
+ }
}
#[cfg(test)]
diff --git a/docs/source/user-guide/cli/usage.md
b/docs/source/user-guide/cli/functions.md
similarity index 51%
copy from docs/source/user-guide/cli/usage.md
copy to docs/source/user-guide/cli/functions.md
index 13f0e7cff1..305b53c16f 100644
--- a/docs/source/user-guide/cli/usage.md
+++ b/docs/source/user-guide/cli/functions.md
@@ -17,223 +17,12 @@
under the License.
-->
-# Usage
-
-See the current usage using `datafusion-cli --help`:
-
-```bash
-Apache Arrow <[email protected]>
-Command Line Client for DataFusion query engine.
-
-USAGE:
- datafusion-cli [OPTIONS]
-
-OPTIONS:
- -b, --batch-size <BATCH_SIZE>
- The batch size of each query, or use DataFusion default
-
- -c, --command <COMMAND>...
- Execute the given command string(s), then exit
-
- --color
- Enables console syntax highlighting
-
- -f, --file <FILE>...
- Execute commands from file(s), then exit
-
- --format <FORMAT>
- [default: table] [possible values: csv, tsv, table, json, nd-json]
-
- -h, --help
- Print help information
-
- -m, --memory-limit <MEMORY_LIMIT>
- The memory pool limitation (e.g. '10g'), default to None (no limit)
-
- --maxrows <MAXROWS>
- The max number of rows to display for 'Table' format
- [possible values: numbers(0/10/...), inf(no limit)] [default: 40]
-
- --mem-pool-type <MEM_POOL_TYPE>
- Specify the memory pool type 'greedy' or 'fair', default to
'greedy'
-
- --top-memory-consumers <TOP_MEMORY_CONSUMERS>
- The number of top memory consumers to display when query fails due
to memory exhaustion. To disable memory consumer tracking, set this value to 0
[default: 3]
-
- -d, --disk-limit <DISK_LIMIT>
- Available disk space for spilling queries (e.g. '10g'), default to
None (uses DataFusion's default value of '100g')
-
- -p, --data-path <DATA_PATH>
- Path to your data, default to current directory
-
- -q, --quiet
- Reduce printing other than the results and work quietly
-
- -r, --rc <RC>...
- Run the provided files on startup instead of ~/.datafusionrc
-
- -V, --version
- Print version information
-```
-
-## Commands
-
-Available commands inside DataFusion CLI are:
-
-- Quit
-
-```bash
-> \q
-```
-
-- Help
-
-```bash
-> \?
-```
-
-- ListTables
-
-```bash
-> \d
-```
-
-- DescribeTable
-
-```bash
-> \d table_name
-```
-
-- QuietMode
-
-```bash
-> \quiet [true|false]
-```
-
-- list function
-
-```bash
-> \h
-```
-
-- Search and describe function
-
-```bash
-> \h function
-```
-
-## Supported SQL
-
-In addition to the normal [SQL supported in DataFusion], `datafusion-cli` also
-supports additional statements and commands:
-
-[sql supported in datafusion]: ../sql/index.rst
-
-### `SHOW ALL [VERBOSE]`
-
-Show configuration options
-
-```sql
-> show all;
-
-+-------------------------------------------------+---------+
-| name | value |
-+-------------------------------------------------+---------+
-| datafusion.execution.batch_size | 8192 |
-| datafusion.execution.coalesce_batches | true |
-| datafusion.execution.time_zone | UTC |
-| datafusion.explain.logical_plan_only | false |
-| datafusion.explain.physical_plan_only | false |
-| datafusion.optimizer.filter_null_join_keys | false |
-| datafusion.optimizer.skip_failed_rules | true |
-+-------------------------------------------------+---------+
-
-```
-
-### `SHOW <OPTION>>`
-
-Show specific configuration option
-
-```SQL
-> show datafusion.execution.batch_size;
-
-+-------------------------------------------------+---------+
-| name | value |
-+-------------------------------------------------+---------+
-| datafusion.execution.batch_size | 8192 |
-+-------------------------------------------------+---------+
-
-```
-
-### `SET <OPTION> TO <VALUE>`
-
-- Set configuration options
-
-```sql
-> SET datafusion.execution.batch_size to 1024;
-```
-
-## Configuration Options
-
-All available configuration options can be seen using `SHOW ALL` as described
above.
-
-You can change the configuration options using environment
-variables. `datafusion-cli` looks in the corresponding environment
-variable with an upper case name and all `.` converted to `_`.
-
-For example, to set `datafusion.execution.batch_size` to `1024` you
-would set the `DATAFUSION_EXECUTION_BATCH_SIZE` environment variable
-appropriately:
-
-```shell
-$ DATAFUSION_EXECUTION_BATCH_SIZE=1024 datafusion-cli
-DataFusion CLI v12.0.0
-> show all;
-+-------------------------------------------------+---------+
-| name | value |
-+-------------------------------------------------+---------+
-| datafusion.execution.batch_size | 1024 |
-| datafusion.execution.coalesce_batches | true |
-| datafusion.execution.time_zone | UTC |
-| datafusion.explain.logical_plan_only | false |
-| datafusion.explain.physical_plan_only | false |
-| datafusion.optimizer.filter_null_join_keys | false |
-| datafusion.optimizer.skip_failed_rules | true |
-+-------------------------------------------------+---------+
-8 rows in set. Query took 0.002 seconds.
-```
-
-You can change the configuration options using `SET` statement as well
-
-```shell
-$ datafusion-cli
-DataFusion CLI v13.0.0
-> show datafusion.execution.batch_size;
-+---------------------------------+---------+
-| name | value |
-+---------------------------------+---------+
-| datafusion.execution.batch_size | 8192 |
-+---------------------------------+---------+
-1 row in set. Query took 0.011 seconds.
-
-> set datafusion.execution.batch_size to 1024;
-0 rows in set. Query took 0.000 seconds.
-
-> show datafusion.execution.batch_size;
-+---------------------------------+---------+
-| name | value |
-+---------------------------------+---------+
-| datafusion.execution.batch_size | 1024 |
-+---------------------------------+---------+
-1 row in set. Query took 0.005 seconds.
-```
-
-## Functions
+# CLI Specific Functions
`datafusion-cli` comes with build-in functions that are not included in the
-DataFusion SQL engine. These functions are:
+DataFusion SQL engine by default. These functions are:
-### `parquet_metadata`
+## `parquet_metadata`
The `parquet_metadata` table function can be used to inspect detailed metadata
about a parquet file such as statistics, sizes, and other information. This can
@@ -259,7 +48,8 @@ LIMIT 3;
```
The returned table has the following columns for each row for each column chunk
-in the file. Please refer to the [Parquet Documentation] for more information.
+in the file. Please refer to the [Parquet Documentation] for more information
in
+the meaning of these fields.
[parquet documentation]: https://parquet.apache.org/
@@ -289,6 +79,64 @@ in the file. Please refer to the [Parquet Documentation]
for more information.
| total_compressed_size | Int64 | Number of bytes the column chunk's
data after encoding and compression (what is stored in the file) |
| total_uncompressed_size | Int64 | Number of bytes the column chunk's
data after encoding |
-+-------------------------+-----------+-------------+
-
[`page index`]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
+
+## `metadata_cache`
+
+The `metadata_cache` function shows information about the default File
Metadata Cache that is used by the
+[`ListingTable`] implementation in DataFusion. This cache is used to speed up
+reading metadata from files when scanning directories with many files.
+
+For example, after creating a table with the [CREATE EXTERNAL
TABLE](../sql/ddl.md#create-external-table)
+command:
+
+```sql
+> create external table hits
+ stored as parquet
+ location
's3://clickhouse-public-datasets/hits_compatible/athena_partitioned/';
+```
+
+You can inspect the metadata cache by querying the `metadata_cache` function:
+
+```sql
+> select * from metadata_cache();
++----------------------------------------------------+---------------------+-----------------+---------------------------------------+---------+---------------------+------+------------------+
+| path | file_modified |
file_size_bytes | e_tag | version |
metadata_size_bytes | hits | extra |
++----------------------------------------------------+---------------------+-----------------+---------------------------------------+---------+---------------------+------+------------------+
+| hits_compatible/athena_partitioned/hits_61.parquet | 2022-07-03T15:40:34 |
117270944 | "5db11cad1ca0d80d748fc92c914b010a-6" | NULL | 212949
| 0 | page_index=false |
+| hits_compatible/athena_partitioned/hits_32.parquet | 2022-07-03T15:37:17 |
94506004 | "2f7db49a9fe242179590b615b94a39d2-5" | NULL | 278157
| 0 | page_index=false |
+| hits_compatible/athena_partitioned/hits_40.parquet | 2022-07-03T15:38:07 |
142508647 | "9e5852b45a469d5a05bf270a286eab8a-8" | NULL | 212917
| 0 | page_index=false |
+| hits_compatible/athena_partitioned/hits_93.parquet | 2022-07-03T15:44:07 |
127987774 | "751100bf0dac7d489b9836abf3108b99-7" | NULL | 278318
| 0 | page_index=false |
+| .
|
++----------------------------------------------------+---------------------+-----------------+---------------------------------------+---------+---------------------+------+------------------+
+```
+
+Since `metadata_cache` is a normal table function, you can use it in most
places you can use
+a table reference.
+
+For example, to get the total size consumed by the cached entries:
+
+```sql
+> select sum(metadata_size_bytes) from metadata_cache();
++-------------------------------------------+
+| sum(metadata_cache().metadata_size_bytes) |
++-------------------------------------------+
+| 22972345 |
++-------------------------------------------+
+```
+
+The columns of the returned table are:
+
+| column_name | data_type | Description
|
+| ------------------- | --------- |
-----------------------------------------------------------------------------------------
|
+| path | Utf8 | File path relative to the object store /
filesystem root |
+| file_modified | Timestamp | Last modified time of the file
|
+| file_size_bytes | UInt64 | Size of the file in bytes
|
+| e_tag | Utf8 | [Entity Tag] (ETag) of the file if
available |
+| version | Utf8 | Version of the file if available (for
object stores that support versioning) |
+| metadata_size_bytes | UInt64 | Size of the cached metadata in memory (not
its thrift encoded form) |
+| hits | UInt64 | Number of times the cached metadata has
been accessed |
+| extra | Utf8 | Extra information about the cached
metadata (e.g., if page index information is included) |
+
+[`listingtable`]:
https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
+[entity tag]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag
diff --git a/docs/source/user-guide/cli/index.rst
b/docs/source/user-guide/cli/index.rst
index 874cfc0eae..325b0dce3f 100644
--- a/docs/source/user-guide/cli/index.rst
+++ b/docs/source/user-guide/cli/index.rst
@@ -25,3 +25,4 @@ DataFusion CLI
installation
usage
datasources
+ functions
diff --git a/docs/source/user-guide/cli/usage.md
b/docs/source/user-guide/cli/usage.md
index 13f0e7cff1..263728f5b0 100644
--- a/docs/source/user-guide/cli/usage.md
+++ b/docs/source/user-guide/cli/usage.md
@@ -231,64 +231,5 @@ DataFusion CLI v13.0.0
## Functions
`datafusion-cli` comes with build-in functions that are not included in the
-DataFusion SQL engine. These functions are:
-
-### `parquet_metadata`
-
-The `parquet_metadata` table function can be used to inspect detailed metadata
-about a parquet file such as statistics, sizes, and other information. This can
-be helpful to understand how parquet files are structured.
-
-For example, to see information about the `"WatchID"` column in the
-`hits.parquet` file, you can use:
-
-```sql
-SELECT path_in_schema, row_group_id, row_group_num_rows, stats_min, stats_max,
total_compressed_size
-FROM parquet_metadata('hits.parquet')
-WHERE path_in_schema = '"WatchID"'
-LIMIT 3;
-
-+----------------+--------------+--------------------+---------------------+---------------------+-----------------------+
-| path_in_schema | row_group_id | row_group_num_rows | stats_min |
stats_max | total_compressed_size |
-+----------------+--------------+--------------------+---------------------+---------------------+-----------------------+
-| "WatchID" | 0 | 450560 | 4611687214012840539 |
9223369186199968220 | 3883759 |
-| "WatchID" | 1 | 612174 | 4611689135232456464 |
9223371478009085789 | 5176803 |
-| "WatchID" | 2 | 344064 | 4611692774829951781 |
9223363791697310021 | 3031680 |
-+----------------+--------------+--------------------+---------------------+---------------------+-----------------------+
-3 rows in set. Query took 0.053 seconds.
-```
-
-The returned table has the following columns for each row for each column chunk
-in the file. Please refer to the [Parquet Documentation] for more information.
-
-[parquet documentation]: https://parquet.apache.org/
-
-| column_name | data_type | Description
|
-| ----------------------- | --------- |
---------------------------------------------------------------------------------------------------
|
-| filename | Utf8 | Name of the file
|
-| row_group_id | Int64 | Row group index the column chunk
belongs to |
-| row_group_num_rows | Int64 | Count of rows stored in the row group
|
-| row_group_num_columns | Int64 | Total number of columns in the row
group (same for all row groups) |
-| row_group_bytes | Int64 | Number of bytes used to store the row
group (not including metadata) |
-| column_id | Int64 | ID of the column
|
-| file_offset | Int64 | Offset within the file that this
column chunk's data begins |
-| num_values | Int64 | Total number of values in this column
chunk |
-| path_in_schema | Utf8 | "Path" (column name) of the column
chunk in the schema |
-| type | Utf8 | Parquet data type of the column chunk
|
-| stats_min | Utf8 | The minimum value for this column
chunk, if stored in the statistics, cast to a string |
-| stats_max | Utf8 | The maximum value for this column
chunk, if stored in the statistics, cast to a string |
-| stats_null_count | Int64 | Number of null values in this column
chunk, if stored in the statistics |
-| stats_distinct_count | Int64 | Number of distinct values in this
column chunk, if stored in the statistics |
-| stats_min_value | Utf8 | Same as `stats_min`
|
-| stats_max_value | Utf8 | Same as `stats_max`
|
-| compression | Utf8 | Block level compression (e.g.
`SNAPPY`) used for this column chunk |
-| encodings | Utf8 | All block level encodings (e.g.
`[PLAIN_DICTIONARY, PLAIN, RLE]`) used for this column chunk |
-| index_page_offset | Int64 | Offset in the file of the [`page
index`], if any |
-| dictionary_page_offset | Int64 | Offset in the file of the dictionary
page, if any |
-| data_page_offset | Int64 | Offset in the file of the first data
page, if any |
-| total_compressed_size | Int64 | Number of bytes the column chunk's
data after encoding and compression (what is stored in the file) |
-| total_uncompressed_size | Int64 | Number of bytes the column chunk's
data after encoding |
-
-+-------------------------+-----------+-------------+
-
-[`page index`]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
+DataFusion SQL engine, see [DataFusion CLI specific functions](functions.md)
section
+for details.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]