This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ccedcb878d [minor] extract collect file statistics method and add doc
(#9490)
ccedcb878d is described below
commit ccedcb878db62dbca5a4d6accd93888ec814a479
Author: Yang Jiang <[email protected]>
AuthorDate: Sun Mar 10 00:28:52 2024 +0800
[minor] extract collect file statistics method and add doc (#9490)
* [minor] extract collect file statistics method and add doc
* fix clippy
* fix doc
---
datafusion/core/src/datasource/listing/table.rs | 76 +++++++++++++++----------
1 file changed, 45 insertions(+), 31 deletions(-)
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 00821a1cdd..88476ffb09 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -62,6 +62,7 @@ use datafusion_physical_expr::{
use async_trait::async_trait;
use futures::{future, stream, StreamExt, TryStreamExt};
+use object_store::ObjectStore;
/// Configuration for creating a [`ListingTable`]
#[derive(Debug, Clone)]
@@ -450,7 +451,7 @@ impl ListingOptions {
}
/// Reads data from one or more files via an
-/// [`ObjectStore`](object_store::ObjectStore). For example, from
+/// [`ObjectStore`]. For example, from
/// local files or objects from AWS S3. Implements [`TableProvider`],
/// a DataFusion data source.
///
@@ -844,38 +845,14 @@ impl ListingTable {
let files = file_list
.map(|part_file| async {
let part_file = part_file?;
- let mut statistics_result =
Statistics::new_unknown(&self.file_schema);
if self.options.collect_stat {
- let statistics_cache = self.collected_statistics.clone();
- match statistics_cache.get_with_extra(
- &part_file.object_meta.location,
- &part_file.object_meta,
- ) {
- Some(statistics) => {
- statistics_result = statistics.as_ref().clone()
- }
- None => {
- let statistics = self
- .options
- .format
- .infer_stats(
- ctx,
- &store,
- self.file_schema.clone(),
- &part_file.object_meta,
- )
- .await?;
- statistics_cache.put_with_extra(
- &part_file.object_meta.location,
- statistics.clone().into(),
- &part_file.object_meta,
- );
- statistics_result = statistics;
- }
- }
+ let statistics =
+ self.do_collect_statistics(ctx, &store,
&part_file).await?;
+ Ok((part_file, statistics)) as Result<(PartitionedFile,
Statistics)>
+ } else {
+ Ok((part_file, Statistics::new_unknown(&self.file_schema)))
+ as Result<(PartitionedFile, Statistics)>
}
- Ok((part_file, statistics_result))
- as Result<(PartitionedFile, Statistics)>
})
.boxed()
.buffered(ctx.config_options().execution.meta_fetch_concurrency);
@@ -893,6 +870,43 @@ impl ListingTable {
statistics,
))
}
+
+ /// Collects statistics for a given partitioned file.
+ ///
+ /// This method first checks if the statistics for the given file are
already cached.
+ /// If they are, it returns the cached statistics.
+ /// If they are not, it infers the statistics from the file and stores
them in the cache.
+ async fn do_collect_statistics<'a>(
+ &'a self,
+ ctx: &SessionState,
+ store: &Arc<dyn ObjectStore>,
+ part_file: &PartitionedFile,
+ ) -> Result<Statistics> {
+ let statistics_cache = self.collected_statistics.clone();
+ return match statistics_cache
+ .get_with_extra(&part_file.object_meta.location,
&part_file.object_meta)
+ {
+ Some(statistics) => Ok(statistics.as_ref().clone()),
+ None => {
+ let statistics = self
+ .options
+ .format
+ .infer_stats(
+ ctx,
+ store,
+ self.file_schema.clone(),
+ &part_file.object_meta,
+ )
+ .await?;
+ statistics_cache.put_with_extra(
+ &part_file.object_meta.location,
+ statistics.clone().into(),
+ &part_file.object_meta,
+ );
+ Ok(statistics)
+ }
+ };
+ }
}
#[cfg(test)]