nuno-faria commented on code in PR #16971:
URL: https://github.com/apache/datafusion/pull/16971#discussion_r2240645876


##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -59,6 +75,13 @@ impl CacheManager {
         if let Some(lc) = &config.list_files_cache {
             manager.list_files_cache = Some(Arc::clone(lc))
         }
+        if let Some(mc) = &config.file_metadata_cache {

Review Comment:
   Here the `file_metadata_cache` is assigned to `DefaultFilesMetadataCache` if 
not provided. This makes it easier to enable metadata caching using just 
`ParquetReadOptions` or `set datafusion.execution.parquet.cache_metadata = 
true`.



##########
docs/source/user-guide/configs.md:
##########
@@ -60,6 +60,7 @@ Environment variables are read during `SessionConfig` 
initialisation so they mus
 | datafusion.execution.parquet.binary_as_string                           | 
false                     | (reading) If true, parquet reader will read columns 
of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet 
files generated by some legacy writers do not correctly set the UTF8 flag for 
strings, causing string columns to be loaded as BLOB instead.                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                       |
 | datafusion.execution.parquet.coerce_int96                               | 
NULL                      | (reading) If true, parquet reader will read columns 
of physical type int96 as originating from a different resolution than 
nanosecond. This is useful for reading data from systems like Spark which 
stores microsecond resolution timestamps in an int96 allowing it to write 
values with a larger date range than 64-bit timestamps with nanosecond 
resolution.                                                                     
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                  |
 | datafusion.execution.parquet.bloom_filter_on_read                       | 
true                      | (reading) Use any available bloom filters when 
reading parquet files                                                           
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                         |
+| datafusion.execution.parquet.cache_metadata                             | 
false                     | (reading) Whether or not to enable the caching of 
embedded metadata of Parquet files (footer and page metadata). Enabling it can 
offer substantial performance improvements for repeated queries over large 
files. By default, the cache is automatically invalidated when the underlying 
file is modified.                                                               
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                              |

Review Comment:
   The metadata cache is automatically invalidated when the file changes, but 
is never cleared. Is there a problem with this?



##########
datafusion/datasource-parquet/src/reader.rs:
##########
@@ -150,3 +152,130 @@ impl ParquetFileReaderFactory for 
DefaultParquetFileReaderFactory {
         }))
     }
 }
+
+/// Implementation of [`ParquetFileReaderFactory`] supporting the caching of 
footer and page
+/// metadata. Reads and updates the [`FileMetadataCache`] with the 
[`ParquetMetaData`] data.
+/// This reader always loads the entire metadata (including page index, unless 
the file is
+/// encrypted), even if not required by the current query, to ensure it is 
always available for
+/// those that need it.
+#[derive(Debug)]
+pub struct CachedParquetFileReaderFactory {
+    store: Arc<dyn ObjectStore>,
+    metadata_cache: FileMetadataCache,
+}
+
+impl CachedParquetFileReaderFactory {
+    pub fn new(store: Arc<dyn ObjectStore>, metadata_cache: FileMetadataCache) 
-> Self {
+        Self {
+            store,
+            metadata_cache,
+        }
+    }
+}
+
+impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
+    fn create_reader(
+        &self,
+        partition_index: usize,
+        file_meta: FileMeta,
+        metadata_size_hint: Option<usize>,
+        metrics: &ExecutionPlanMetricsSet,
+    ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
+        let file_metrics = ParquetFileMetrics::new(
+            partition_index,
+            file_meta.location().as_ref(),
+            metrics,
+        );
+        let store = Arc::clone(&self.store);
+
+        let mut inner =
+            ParquetObjectReader::new(store, 
file_meta.object_meta.location.clone())
+                .with_file_size(file_meta.object_meta.size);
+
+        if let Some(hint) = metadata_size_hint {
+            inner = inner.with_footer_size_hint(hint)
+        };
+
+        Ok(Box::new(CachedParquetFileReader {
+            inner,
+            file_metrics,
+            file_meta,
+            metadata_cache: Arc::clone(&self.metadata_cache),
+        }))
+    }
+}
+
+/// Implements [`AsyncFileReader`] for a Parquet file in object storage. Reads 
the file metadata
+/// from the [`FileMetadataCache`], if available, otherwise reads it directly 
from the file and then
+/// updates the cache.
+pub(crate) struct CachedParquetFileReader {
+    pub file_metrics: ParquetFileMetrics,
+    pub inner: ParquetObjectReader,
+    file_meta: FileMeta,
+    metadata_cache: FileMetadataCache,
+}
+
+impl AsyncFileReader for CachedParquetFileReader {
+    fn get_bytes(
+        &mut self,
+        range: Range<u64>,
+    ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
+        let bytes_scanned = range.end - range.start;
+        self.file_metrics.bytes_scanned.add(bytes_scanned as usize);
+        self.inner.get_bytes(range)
+    }
+
+    fn get_byte_ranges(
+        &mut self,
+        ranges: Vec<Range<u64>>,
+    ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
+    where
+        Self: Send,
+    {
+        let total: u64 = ranges.iter().map(|r| r.end - r.start).sum();
+        self.file_metrics.bytes_scanned.add(total as usize);
+        self.inner.get_byte_ranges(ranges)
+    }
+
+    fn get_metadata<'a>(
+        &'a mut self,
+        options: Option<&'a ArrowReaderOptions>,
+    ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
+        let file_meta = self.file_meta.clone();
+        let metadata_cache = Arc::clone(&self.metadata_cache);
+
+        async move {
+            let object_meta = &file_meta.object_meta;
+
+            // lookup if the metadata is already cached

Review Comment:
   If two workers call this at the same time for the same file, they will both 
independently read the metadata and then update the cache. There should be no 
problem with this, but pointing it out just in case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to