nuno-faria commented on code in PR #16971: URL: https://github.com/apache/datafusion/pull/16971#discussion_r2240645876
########## datafusion/execution/src/cache/cache_manager.rs: ########## @@ -59,6 +75,13 @@ impl CacheManager { if let Some(lc) = &config.list_files_cache { manager.list_files_cache = Some(Arc::clone(lc)) } + if let Some(mc) = &config.file_metadata_cache { Review Comment: Here the `file_metadata_cache` is assigned to `DefaultFilesMetadataCache` if not provided. This makes it easier to enable metadata caching using just `ParquetReadOptions` or `set datafusion.execution.parquet.cache_metadata = true`. ########## docs/source/user-guide/configs.md: ########## @@ -60,6 +60,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.binary_as_string | false | (reading) If true, parquet reader will read columns of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet files generated by some legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. | | datafusion.execution.parquet.coerce_int96 | NULL | (reading) If true, parquet reader will read columns of physical type int96 as originating from a different resolution than nanosecond. This is useful for reading data from systems like Spark which stores microsecond resolution timestamps in an int96 allowing it to write values with a larger date range than 64-bit timestamps with nanosecond resolution. | | datafusion.execution.parquet.bloom_filter_on_read | true | (reading) Use any available bloom filters when reading parquet files | +| datafusion.execution.parquet.cache_metadata | false | (reading) Whether or not to enable the caching of embedded metadata of Parquet files (footer and page metadata). Enabling it can offer substantial performance improvements for repeated queries over large files. By default, the cache is automatically invalidated when the underlying file is modified. | Review Comment: The metadata cache is automatically invalidated when the file changes, but is never cleared. Is there a problem with this? ########## datafusion/datasource-parquet/src/reader.rs: ########## @@ -150,3 +152,130 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { })) } } + +/// Implementation of [`ParquetFileReaderFactory`] supporting the caching of footer and page +/// metadata. Reads and updates the [`FileMetadataCache`] with the [`ParquetMetaData`] data. +/// This reader always loads the entire metadata (including page index, unless the file is +/// encrypted), even if not required by the current query, to ensure it is always available for +/// those that need it. +#[derive(Debug)] +pub struct CachedParquetFileReaderFactory { + store: Arc<dyn ObjectStore>, + metadata_cache: FileMetadataCache, +} + +impl CachedParquetFileReaderFactory { + pub fn new(store: Arc<dyn ObjectStore>, metadata_cache: FileMetadataCache) -> Self { + Self { + store, + metadata_cache, + } + } +} + +impl ParquetFileReaderFactory for CachedParquetFileReaderFactory { + fn create_reader( + &self, + partition_index: usize, + file_meta: FileMeta, + metadata_size_hint: Option<usize>, + metrics: &ExecutionPlanMetricsSet, + ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> { + let file_metrics = ParquetFileMetrics::new( + partition_index, + file_meta.location().as_ref(), + metrics, + ); + let store = Arc::clone(&self.store); + + let mut inner = + ParquetObjectReader::new(store, file_meta.object_meta.location.clone()) + .with_file_size(file_meta.object_meta.size); + + if let Some(hint) = metadata_size_hint { + inner = inner.with_footer_size_hint(hint) + }; + + Ok(Box::new(CachedParquetFileReader { + inner, + file_metrics, + file_meta, + metadata_cache: Arc::clone(&self.metadata_cache), + })) + } +} + +/// Implements [`AsyncFileReader`] for a Parquet file in object storage. Reads the file metadata +/// from the [`FileMetadataCache`], if available, otherwise reads it directly from the file and then +/// updates the cache. +pub(crate) struct CachedParquetFileReader { + pub file_metrics: ParquetFileMetrics, + pub inner: ParquetObjectReader, + file_meta: FileMeta, + metadata_cache: FileMetadataCache, +} + +impl AsyncFileReader for CachedParquetFileReader { + fn get_bytes( + &mut self, + range: Range<u64>, + ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> { + let bytes_scanned = range.end - range.start; + self.file_metrics.bytes_scanned.add(bytes_scanned as usize); + self.inner.get_bytes(range) + } + + fn get_byte_ranges( + &mut self, + ranges: Vec<Range<u64>>, + ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> + where + Self: Send, + { + let total: u64 = ranges.iter().map(|r| r.end - r.start).sum(); + self.file_metrics.bytes_scanned.add(total as usize); + self.inner.get_byte_ranges(ranges) + } + + fn get_metadata<'a>( + &'a mut self, + options: Option<&'a ArrowReaderOptions>, + ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> { + let file_meta = self.file_meta.clone(); + let metadata_cache = Arc::clone(&self.metadata_cache); + + async move { + let object_meta = &file_meta.object_meta; + + // lookup if the metadata is already cached Review Comment: If two workers call this at the same time for the same file, they will both independently read the metadata and then update the cache. There should be no problem with this, but pointing it out just in case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org