alamb commented on code in PR #16971: URL: https://github.com/apache/datafusion/pull/16971#discussion_r2243780405
########## datafusion/execution/src/cache/cache_unit.rs: ########## @@ -157,9 +158,79 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache { } } +/// Collected file embedded metadata cache. +/// The metadata for some file is invalided when the file size or last modification time have been +/// changed. +#[derive(Default)] +pub struct DefaultFilesMetadataCache { + metadata: DashMap<Path, (ObjectMeta, Arc<FileMetadata>)>, +} + +impl CacheAccessor<Path, Arc<FileMetadata>> for DefaultFilesMetadataCache { + type Extra = ObjectMeta; + + fn get(&self, _k: &Path) -> Option<Arc<FileMetadata>> { + panic!("get in DefaultFilesMetadataCache is not supported, please use get_with_extra") + } + + fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> Option<Arc<FileMetadata>> { + self.metadata + .get(k) + .map(|s| { + let (extra, metadata) = s.value(); + if extra.size != e.size || extra.last_modified != e.last_modified { + None + } else { + Some(Arc::clone(metadata)) + } + }) + .unwrap_or(None) + } + + fn put(&self, _key: &Path, _value: Arc<FileMetadata>) -> Option<Arc<FileMetadata>> { + panic!("put in DefaultFilesMetadataCache is not supported, please use put_with_extra") Review Comment: 🤔 a panic like that is unfortunate -- maybe we should change the API so this function can return an error (in a follow on PR) ########## datafusion/datasource-parquet/src/reader.rs: ########## @@ -150,3 +152,130 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { })) } } + +/// Implementation of [`ParquetFileReaderFactory`] supporting the caching of footer and page +/// metadata. Reads and updates the [`FileMetadataCache`] with the [`ParquetMetaData`] data. +/// This reader always loads the entire metadata (including page index, unless the file is Review Comment: 👍 ########## datafusion/sqllogictest/test_files/parquet.slt: ########## @@ -750,3 +750,122 @@ drop table int96_from_spark; statement ok set datafusion.execution.parquet.coerce_int96 = ns; + + +### Tests for metadata caching + +# Create temporary data +query I +COPY ( + SELECT 'k-' || i as k, i as v + FROM generate_series(1, 20000) t(i) + ORDER BY k +) +TO 'test_files/scratch/parquet/cache_metadata.parquet' +OPTIONS (MAX_ROW_GROUP_SIZE 4096, DATA_PAGE_ROW_COUNT_LIMIT 2048); +---- +20000 + +# Enable the cache +statement ok +set datafusion.execution.parquet.cache_metadata = true; + +statement ok +CREATE EXTERNAL TABLE t +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet/cache_metadata.parquet'; + +query TI +select * from t where k = 'k-1000' or k = 'k-9999' order by k +---- +k-1000 1000 +k-9999 9999 + +query IT +select v, k from t where (v between 1 and 2) or (v between 9999 and 10000) order by v +---- +1 k-1 +2 k-2 +9999 k-9999 +10000 k-10000 + +# Updating the file should invalidate the cache. Otherwise, the following queries would fail +# (e.g., with "Arrow: Parquet argument error: External: incomplete frame"). +query I Review Comment: amazing ########## datafusion/execution/src/cache/cache_unit.rs: ########## @@ -232,4 +303,52 @@ mod tests { meta.clone() ); } + + #[test] Review Comment: 😍 ########## datafusion/execution/src/cache/cache_manager.rs: ########## @@ -86,6 +114,10 @@ pub struct CacheManagerConfig { /// location. /// Default is disable. pub list_files_cache: Option<ListFilesCache>, + /// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a + /// data file (e.g., Parquet footer and page metadata). + /// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`]. + pub file_metadata_cache: Option<FileMetadataCache>, Review Comment: if it is already an option, why do we need a `DefaultFilesMetadataCache`? 🤔 Couldn't we just leave it as `None`? ########## datafusion/common/src/config.rs: ########## @@ -549,6 +549,12 @@ config_namespace! { /// (reading) Use any available bloom filters when reading parquet files pub bloom_filter_on_read: bool, default = true + /// (reading) Whether or not to enable the caching of embedded metadata of Parquet files + /// (footer and page metadata). Enabling it can offer substantial performance improvements + /// for repeated queries over large files. By default, the cache is automatically + /// invalidated when the underlying file is modified. + pub cache_metadata: bool, default = false Review Comment: Eventually, I think it would be better to have this be a size setting `metadata_cache_size` as then that can represent both disabled (`0` size) and a memory cap. We can do this in a follow on PR ########## datafusion/datasource-parquet/src/reader.rs: ########## @@ -150,3 +152,130 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { })) } } + +/// Implementation of [`ParquetFileReaderFactory`] supporting the caching of footer and page +/// metadata. Reads and updates the [`FileMetadataCache`] with the [`ParquetMetaData`] data. +/// This reader always loads the entire metadata (including page index, unless the file is +/// encrypted), even if not required by the current query, to ensure it is always available for +/// those that need it. +#[derive(Debug)] +pub struct CachedParquetFileReaderFactory { + store: Arc<dyn ObjectStore>, + metadata_cache: FileMetadataCache, +} + +impl CachedParquetFileReaderFactory { + pub fn new(store: Arc<dyn ObjectStore>, metadata_cache: FileMetadataCache) -> Self { + Self { + store, + metadata_cache, + } + } +} + +impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { + fn create_reader( + &self, + partition_index: usize, + file_meta: FileMeta, + metadata_size_hint: Option<usize>, + metrics: &ExecutionPlanMetricsSet, + ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> { + let file_metrics = ParquetFileMetrics::new( + partition_index, + file_meta.location().as_ref(), + metrics, + ); + let store = Arc::clone(&self.store); + + let mut inner = + ParquetObjectReader::new(store, file_meta.object_meta.location.clone()) + .with_file_size(file_meta.object_meta.size); + + if let Some(hint) = metadata_size_hint { + inner = inner.with_footer_size_hint(hint) + }; + + Ok(Box::new(CachedParquetFileReader { + inner, + file_metrics, + file_meta, + metadata_cache: Arc::clone(&self.metadata_cache), + })) + } +} + +/// Implements [`AsyncFileReader`] for a Parquet file in object storage. Reads the file metadata +/// from the [`FileMetadataCache`], if available, otherwise reads it directly from the file and then +/// updates the cache. +pub(crate) struct CachedParquetFileReader { + pub file_metrics: ParquetFileMetrics, + pub inner: ParquetObjectReader, + file_meta: FileMeta, + metadata_cache: FileMetadataCache, +} + +impl AsyncFileReader for CachedParquetFileReader { + fn get_bytes( + &mut self, + range: Range<u64>, + ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> { + let bytes_scanned = range.end - range.start; + self.file_metrics.bytes_scanned.add(bytes_scanned as usize); + self.inner.get_bytes(range) + } + + fn get_byte_ranges( + &mut self, + ranges: Vec<Range<u64>>, + ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> + where + Self: Send, + { + let total: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + self.file_metrics.bytes_scanned.add(total as usize); + self.inner.get_byte_ranges(ranges) + } + + fn get_metadata<'a>( + &'a mut self, + options: Option<&'a ArrowReaderOptions>, + ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> { + let file_meta = self.file_meta.clone(); + let metadata_cache = Arc::clone(&self.metadata_cache); + + async move { Review Comment: it is impressive that you worked out this API dance -- it is something I really don't like about the current API of the parquet reader. BTW I am working on improving it (no changes needed or suggested here, I am just self-promoting): - https://github.com/apache/arrow-rs/issues/7983 ########## datafusion/execution/src/cache/cache_manager.rs: ########## @@ -32,6 +34,13 @@ pub type FileStatisticsCache = pub type ListFilesCache = Arc<dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>>; +/// Represents generic file-embedded metadata. +pub type FileMetadata = dyn Any + Send + Sync; Review Comment: I realize you are just following along with the pattern that is already in this file, which is good. However, I recommend we make this a trait, partly to improve the documentation and partly because I think we are going to need to add a method like `memory_size()` to report on the memory consumed by the object ```rust pub trait FileMetadata: Any + Send + Sync { ... } ``` I would recommend the same for the FileMetadataCache below ########## docs/source/user-guide/configs.md: ########## @@ -60,6 +60,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. | | datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files | +| datafusion.execution.parquet.cache_metadata | false | (reading) Whether or not to enable the caching of embedded metadata of Parquet files (footer and page metadata). Enabling it can offer substantial performance improvements for repeated queries over large files. By default, the cache is automatically invalidated when the underlying file is modified. | Review Comment: I do think it is a problem if we wanted to turn this feature on by default, which I do. However, i don't think we need to make any changes in this particular PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org