alamb commented on code in PR #16971:
URL: https://github.com/apache/datafusion/pull/16971#discussion_r2243780405


##########
datafusion/execution/src/cache/cache_unit.rs:
##########
@@ -157,9 +158,79 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for 
DefaultListFilesCache {
     }
 }
 
+/// Collected file embedded metadata cache.
+/// The metadata for some file is invalided when the file size or last 
modification time have been
+/// changed.
+#[derive(Default)]
+pub struct DefaultFilesMetadataCache {
+    metadata: DashMap<Path, (ObjectMeta, Arc<FileMetadata>)>,
+}
+
+impl CacheAccessor<Path, Arc<FileMetadata>> for DefaultFilesMetadataCache {
+    type Extra = ObjectMeta;
+
+    fn get(&self, _k: &Path) -> Option<Arc<FileMetadata>> {
+        panic!("get in DefaultFilesMetadataCache is not supported, please use 
get_with_extra")
+    }
+
+    fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> 
Option<Arc<FileMetadata>> {
+        self.metadata
+            .get(k)
+            .map(|s| {
+                let (extra, metadata) = s.value();
+                if extra.size != e.size || extra.last_modified != 
e.last_modified {
+                    None
+                } else {
+                    Some(Arc::clone(metadata))
+                }
+            })
+            .unwrap_or(None)
+    }
+
+    fn put(&self, _key: &Path, _value: Arc<FileMetadata>) -> 
Option<Arc<FileMetadata>> {
+        panic!("put in DefaultFilesMetadataCache is not supported, please use 
put_with_extra")

Review Comment:
   🤔  a panic like that is unfortunate -- maybe we should change the API so 
this function can return an error (in a follow on PR)



##########
datafusion/datasource-parquet/src/reader.rs:
##########
@@ -150,3 +152,130 @@ impl ParquetFileReaderFactory for 
DefaultParquetFileReaderFactory {
         }))
     }
 }
+
+/// Implementation of [`ParquetFileReaderFactory`] supporting the caching of 
footer and page
+/// metadata. Reads and updates the [`FileMetadataCache`] with the 
[`ParquetMetaData`] data.
+/// This reader always loads the entire metadata (including page index, unless 
the file is

Review Comment:
   👍 



##########
datafusion/sqllogictest/test_files/parquet.slt:
##########
@@ -750,3 +750,122 @@ drop table int96_from_spark;
 
 statement ok
 set datafusion.execution.parquet.coerce_int96 = ns;
+
+
+### Tests for metadata caching
+
+# Create temporary data
+query I
+COPY (
+  SELECT 'k-' || i as k, i as v
+  FROM generate_series(1, 20000) t(i)
+  ORDER BY k
+)
+TO 'test_files/scratch/parquet/cache_metadata.parquet'
+OPTIONS (MAX_ROW_GROUP_SIZE 4096, DATA_PAGE_ROW_COUNT_LIMIT 2048);
+----
+20000
+
+# Enable the cache
+statement ok
+set datafusion.execution.parquet.cache_metadata = true;
+
+statement ok
+CREATE EXTERNAL TABLE t
+STORED AS PARQUET
+LOCATION 'test_files/scratch/parquet/cache_metadata.parquet';
+
+query TI
+select * from t where k = 'k-1000' or k = 'k-9999' order by k
+----
+k-1000 1000
+k-9999 9999
+
+query IT
+select v, k from t where (v between 1 and 2) or (v between 9999 and 10000) 
order by v
+----
+1 k-1
+2 k-2
+9999 k-9999
+10000 k-10000
+
+# Updating the file should invalidate the cache. Otherwise, the following 
queries would fail 
+# (e.g., with "Arrow: Parquet argument error: External: incomplete frame").
+query I

Review Comment:
   amazing



##########
datafusion/execution/src/cache/cache_unit.rs:
##########
@@ -232,4 +303,52 @@ mod tests {
             meta.clone()
         );
     }
+
+    #[test]

Review Comment:
   😍 



##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -86,6 +114,10 @@ pub struct CacheManagerConfig {
     /// location.  
     /// Default is disable.
     pub list_files_cache: Option<ListFilesCache>,
+    /// Cache of file-embedded metadata, used to avoid reading it multiple 
times when processing a
+    /// data file (e.g., Parquet footer and page metadata).
+    /// If not provided, the [`CacheManager`] will create a 
[`DefaultFilesMetadataCache`].
+    pub file_metadata_cache: Option<FileMetadataCache>,

Review Comment:
   if it is already an option, why do we need a `DefaultFilesMetadataCache`? 🤔 
   
   Couldn't we just leave it as `None`?



##########
datafusion/common/src/config.rs:
##########
@@ -549,6 +549,12 @@ config_namespace! {
         /// (reading) Use any available bloom filters when reading parquet 
files
         pub bloom_filter_on_read: bool, default = true
 
+        /// (reading) Whether or not to enable the caching of embedded 
metadata of Parquet files
+        /// (footer and page metadata). Enabling it can offer substantial 
performance improvements
+        /// for repeated queries over large files. By default, the cache is 
automatically
+        /// invalidated when the underlying file is modified.
+        pub cache_metadata: bool, default = false

Review Comment:
   Eventually, I think it would be better to have this be a size setting 
`metadata_cache_size` as then that can represent both disabled (`0` size) and a 
memory cap. 
   
   We can do this in a follow on PR



##########
datafusion/datasource-parquet/src/reader.rs:
##########
@@ -150,3 +152,130 @@ impl ParquetFileReaderFactory for 
DefaultParquetFileReaderFactory {
         }))
     }
 }
+
+/// Implementation of [`ParquetFileReaderFactory`] supporting the caching of 
footer and page
+/// metadata. Reads and updates the [`FileMetadataCache`] with the 
[`ParquetMetaData`] data.
+/// This reader always loads the entire metadata (including page index, unless 
the file is
+/// encrypted), even if not required by the current query, to ensure it is 
always available for
+/// those that need it.
+#[derive(Debug)]
+pub struct CachedParquetFileReaderFactory {
+    store: Arc<dyn ObjectStore>,
+    metadata_cache: FileMetadataCache,
+}
+
+impl CachedParquetFileReaderFactory {
+    pub fn new(store: Arc<dyn ObjectStore>, metadata_cache: FileMetadataCache) 
-> Self {
+        Self {
+            store,
+            metadata_cache,
+        }
+    }
+}
+
+impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
+    fn create_reader(
+        &self,
+        partition_index: usize,
+        file_meta: FileMeta,
+        metadata_size_hint: Option<usize>,
+        metrics: &ExecutionPlanMetricsSet,
+    ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
+        let file_metrics = ParquetFileMetrics::new(
+            partition_index,
+            file_meta.location().as_ref(),
+            metrics,
+        );
+        let store = Arc::clone(&self.store);
+
+        let mut inner =
+            ParquetObjectReader::new(store, 
file_meta.object_meta.location.clone())
+                .with_file_size(file_meta.object_meta.size);
+
+        if let Some(hint) = metadata_size_hint {
+            inner = inner.with_footer_size_hint(hint)
+        };
+
+        Ok(Box::new(CachedParquetFileReader {
+            inner,
+            file_metrics,
+            file_meta,
+            metadata_cache: Arc::clone(&self.metadata_cache),
+        }))
+    }
+}
+
+/// Implements [`AsyncFileReader`] for a Parquet file in object storage. Reads 
the file metadata
+/// from the [`FileMetadataCache`], if available, otherwise reads it directly 
from the file and then
+/// updates the cache.
+pub(crate) struct CachedParquetFileReader {
+    pub file_metrics: ParquetFileMetrics,
+    pub inner: ParquetObjectReader,
+    file_meta: FileMeta,
+    metadata_cache: FileMetadataCache,
+}
+
+impl AsyncFileReader for CachedParquetFileReader {
+    fn get_bytes(
+        &mut self,
+        range: Range<u64>,
+    ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
+        let bytes_scanned = range.end - range.start;
+        self.file_metrics.bytes_scanned.add(bytes_scanned as usize);
+        self.inner.get_bytes(range)
+    }
+
+    fn get_byte_ranges(
+        &mut self,
+        ranges: Vec<Range<u64>>,
+    ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
+    where
+        Self: Send,
+    {
+        let total: u64 = ranges.iter().map(|r| r.end - r.start).sum();
+        self.file_metrics.bytes_scanned.add(total as usize);
+        self.inner.get_byte_ranges(ranges)
+    }
+
+    fn get_metadata<'a>(
+        &'a mut self,
+        options: Option<&'a ArrowReaderOptions>,
+    ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
+        let file_meta = self.file_meta.clone();
+        let metadata_cache = Arc::clone(&self.metadata_cache);
+
+        async move {

Review Comment:
   it is impressive that you worked out this API dance -- it is something I 
really don't like about the current API of the parquet reader. 
   
   BTW I am working on improving it (no changes needed or suggested here, I am 
just self-promoting): 
   - https://github.com/apache/arrow-rs/issues/7983



##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -32,6 +34,13 @@ pub type FileStatisticsCache =
 pub type ListFilesCache =
     Arc<dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>>;
 
+/// Represents generic file-embedded metadata.
+pub type FileMetadata = dyn Any + Send + Sync;

Review Comment:
   I realize you are just following along with the pattern that is already in 
this file, which is good. 
   
   However, I recommend we make this a trait, partly to improve the 
documentation and partly because I think we are going to need to add a method 
like `memory_size()` to report on the memory consumed by the object
   
   ```rust
   pub trait FileMetadata: Any + Send + Sync {
   ...
   }
   ```
   
   I would recommend the same for the FileMetadataCache below



##########
docs/source/user-guide/configs.md:
##########
@@ -60,6 +60,7 @@ Environment variables are read during `SessionConfig` 
initialisation so they mus
 | datafusion.execution.parquet.binary_as_string                           | 
false                     | (reading) If true, parquet reader will read columns 
of `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`. Parquet 
files generated by some legacy writers do not correctly set the UTF8 flag for 
strings, causing string columns to be loaded as BLOB instead.                   
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                       |
 | datafusion.execution.parquet.coerce_int96                               | 
NULL                      | (reading) If true, parquet reader will read columns 
of physical type int96 as originating from a different resolution than 
nanosecond. This is useful for reading data from systems like Spark which 
stores microsecond resolution timestamps in an int96 allowing it to write 
values with a larger date range than 64-bit timestamps with nanosecond 
resolution.                                                                     
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                  |
 | datafusion.execution.parquet.bloom_filter_on_read                       | 
true                      | (reading) Use any available bloom filters when 
reading parquet files                                                           
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                         |
+| datafusion.execution.parquet.cache_metadata                             | 
false                     | (reading) Whether or not to enable the caching of 
embedded metadata of Parquet files (footer and page metadata). Enabling it can 
offer substantial performance improvements for repeated queries over large 
files. By default, the cache is automatically invalidated when the underlying 
file is modified.                                                               
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                              |

Review Comment:
   I do think it is a problem if we wanted to turn this feature on by default, 
which I do. However,  i don't think we need to make any changes in this 
particular PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to