This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ccedcb878d [minor] extract collect file statistics method and add doc 
(#9490)
ccedcb878d is described below

commit ccedcb878db62dbca5a4d6accd93888ec814a479
Author: Yang Jiang <[email protected]>
AuthorDate: Sun Mar 10 00:28:52 2024 +0800

    [minor] extract collect file statistics method and add doc (#9490)
    
    * [minor] extract collect file statistics method and add doc
    
    * fix clippy
    
    * fix doc
---
 datafusion/core/src/datasource/listing/table.rs | 76 +++++++++++++++----------
 1 file changed, 45 insertions(+), 31 deletions(-)

diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 00821a1cdd..88476ffb09 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -62,6 +62,7 @@ use datafusion_physical_expr::{
 
 use async_trait::async_trait;
 use futures::{future, stream, StreamExt, TryStreamExt};
+use object_store::ObjectStore;
 
 /// Configuration for creating a [`ListingTable`]
 #[derive(Debug, Clone)]
@@ -450,7 +451,7 @@ impl ListingOptions {
 }
 
 /// Reads data from one or more files via an
-/// [`ObjectStore`](object_store::ObjectStore). For example, from
+/// [`ObjectStore`]. For example, from
 /// local files or objects from AWS S3. Implements [`TableProvider`],
 /// a DataFusion data source.
 ///
@@ -844,38 +845,14 @@ impl ListingTable {
         let files = file_list
             .map(|part_file| async {
                 let part_file = part_file?;
-                let mut statistics_result = 
Statistics::new_unknown(&self.file_schema);
                 if self.options.collect_stat {
-                    let statistics_cache = self.collected_statistics.clone();
-                    match statistics_cache.get_with_extra(
-                        &part_file.object_meta.location,
-                        &part_file.object_meta,
-                    ) {
-                        Some(statistics) => {
-                            statistics_result = statistics.as_ref().clone()
-                        }
-                        None => {
-                            let statistics = self
-                                .options
-                                .format
-                                .infer_stats(
-                                    ctx,
-                                    &store,
-                                    self.file_schema.clone(),
-                                    &part_file.object_meta,
-                                )
-                                .await?;
-                            statistics_cache.put_with_extra(
-                                &part_file.object_meta.location,
-                                statistics.clone().into(),
-                                &part_file.object_meta,
-                            );
-                            statistics_result = statistics;
-                        }
-                    }
+                    let statistics =
+                        self.do_collect_statistics(ctx, &store, 
&part_file).await?;
+                    Ok((part_file, statistics)) as Result<(PartitionedFile, 
Statistics)>
+                } else {
+                    Ok((part_file, Statistics::new_unknown(&self.file_schema)))
+                        as Result<(PartitionedFile, Statistics)>
                 }
-                Ok((part_file, statistics_result))
-                    as Result<(PartitionedFile, Statistics)>
             })
             .boxed()
             .buffered(ctx.config_options().execution.meta_fetch_concurrency);
@@ -893,6 +870,43 @@ impl ListingTable {
             statistics,
         ))
     }
+
+    /// Collects statistics for a given partitioned file.
+    ///
+    /// This method first checks if the statistics for the given file are 
already cached.
+    /// If they are, it returns the cached statistics.
+    /// If they are not, it infers the statistics from the file and stores 
them in the cache.
+    async fn do_collect_statistics<'a>(
+        &'a self,
+        ctx: &SessionState,
+        store: &Arc<dyn ObjectStore>,
+        part_file: &PartitionedFile,
+    ) -> Result<Statistics> {
+        let statistics_cache = self.collected_statistics.clone();
+        return match statistics_cache
+            .get_with_extra(&part_file.object_meta.location, 
&part_file.object_meta)
+        {
+            Some(statistics) => Ok(statistics.as_ref().clone()),
+            None => {
+                let statistics = self
+                    .options
+                    .format
+                    .infer_stats(
+                        ctx,
+                        store,
+                        self.file_schema.clone(),
+                        &part_file.object_meta,
+                    )
+                    .await?;
+                statistics_cache.put_with_extra(
+                    &part_file.object_meta.location,
+                    statistics.clone().into(),
+                    &part_file.object_meta,
+                );
+                Ok(statistics)
+            }
+        };
+    }
 }
 
 #[cfg(test)]

Reply via email to