This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 899a762230 Populate partition column statistics for PartitionedFile 
(#19284)
899a762230 is described below

commit 899a762230d0abc705482d8898d3793763b6afb4
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Mon Dec 15 07:10:55 2025 -0600

    Populate partition column statistics for PartitionedFile (#19284)
    
    Superseeds https://github.com/apache/datafusion/pull/15865
    Part of https://github.com/apache/datafusion/issues/16800
    
    The idea here was to remove usage of `SchemaAdapter` and at the same
    time actually populate the partition column statistics.
    
    ---------
    
    Co-authored-by: Martin Grigorov <[email protected]>
---
 datafusion/catalog-listing/src/table.rs            |  43 +---
 datafusion/core/src/datasource/listing/table.rs    | 230 +--------------------
 .../physical_optimizer/partition_statistics.rs     | 169 +++++++++++----
 datafusion/core/tests/sql/path_partition.rs        |  30 ++-
 datafusion/datasource/src/file_groups.rs           |  18 +-
 datafusion/datasource/src/file_scan_config.rs      |   1 +
 datafusion/datasource/src/mod.rs                   | 116 ++++++++++-
 7 files changed, 296 insertions(+), 311 deletions(-)

diff --git a/datafusion/catalog-listing/src/table.rs 
b/datafusion/catalog-listing/src/table.rs
index 3f28609471..1dbfaf381a 100644
--- a/datafusion/catalog-listing/src/table.rs
+++ b/datafusion/catalog-listing/src/table.rs
@@ -23,16 +23,13 @@ use async_trait::async_trait;
 use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
 use datafusion_common::stats::Precision;
 use datafusion_common::{
-    Constraints, DataFusionError, SchemaExt, Statistics, 
internal_datafusion_err,
-    plan_err, project_schema,
+    Constraints, SchemaExt, Statistics, internal_datafusion_err, plan_err, 
project_schema,
 };
 use datafusion_datasource::file::FileSource;
 use datafusion_datasource::file_groups::FileGroup;
 use datafusion_datasource::file_scan_config::{FileScanConfig, 
FileScanConfigBuilder};
 use datafusion_datasource::file_sink_config::FileSinkConfig;
-use datafusion_datasource::schema_adapter::{
-    DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
-};
+use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
 use datafusion_datasource::{
     ListingTableUrl, PartitionedFile, TableSchema, 
compute_all_files_statistics,
 };
@@ -331,20 +328,6 @@ impl ListingTable {
         self.schema_adapter_factory.as_ref()
     }
 
-    /// Creates a schema adapter for mapping between file and table schemas
-    ///
-    /// Uses the configured schema adapter factory if available, otherwise 
falls back
-    /// to the default implementation.
-    fn create_schema_adapter(&self) -> Box<dyn SchemaAdapter> {
-        let table_schema = self.schema();
-        match &self.schema_adapter_factory {
-            Some(factory) => {
-                factory.create_with_projected_schema(Arc::clone(&table_schema))
-            }
-            None => 
DefaultSchemaAdapterFactory::from_schema(Arc::clone(&table_schema)),
-        }
-    }
-
     /// Creates a file source and applies schema adapter factory if available
     fn create_file_source_with_schema_adapter(
         &self,
@@ -359,10 +342,8 @@ impl ListingTable {
         );
 
         let mut source = self.options.format.file_source(table_schema);
-        // Apply schema adapter to source if available
-        //
+        // Apply schema adapter to source if available.
         // The source will use this SchemaAdapter to adapt data batches as 
they flow up the plan.
-        // Note: ListingTable also creates a SchemaAdapter in `scan()` but 
that is only used to adapt collected statistics.
         if let Some(factory) = &self.schema_adapter_factory {
             source = source.with_schema_adapter_factory(Arc::clone(factory))?;
         }
@@ -709,25 +690,17 @@ impl ListingTable {
             )
         };
 
-        let (mut file_groups, mut stats) = compute_all_files_statistics(
+        let (file_groups, stats) = compute_all_files_statistics(
             file_groups,
             self.schema(),
             self.options.collect_stat,
             inexact_stats,
         )?;
 
-        let schema_adapter = self.create_schema_adapter();
-        let (schema_mapper, _) = 
schema_adapter.map_schema(self.file_schema.as_ref())?;
-
-        stats.column_statistics =
-            schema_mapper.map_column_statistics(&stats.column_statistics)?;
-        file_groups.iter_mut().try_for_each(|file_group| {
-            if let Some(stat) = file_group.statistics_mut() {
-                stat.column_statistics =
-                    
schema_mapper.map_column_statistics(&stat.column_statistics)?;
-            }
-            Ok::<_, DataFusionError>(())
-        })?;
+        // Note: Statistics already include both file columns and partition 
columns.
+        // PartitionedFile::with_statistics automatically appends exact 
partition column
+        // statistics (min=max=partition_value, null_count=0, 
distinct_count=1) computed
+        // from partition_values.
         Ok(ListFilesResult {
             file_groups,
             statistics: stats,
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 020bdfd5d8..7db79485d1 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -129,16 +129,13 @@ mod tests {
         ListingOptions, ListingTable, ListingTableConfig, SchemaSource,
     };
     use datafusion_common::{
-        assert_contains, plan_err,
+        assert_contains,
         stats::Precision,
         test_util::{batches_to_string, datafusion_test_data},
-        ColumnStatistics, DataFusionError, Result, ScalarValue,
+        DataFusionError, Result, ScalarValue,
     };
     use datafusion_datasource::file_compression_type::FileCompressionType;
     use datafusion_datasource::file_format::FileFormat;
-    use datafusion_datasource::schema_adapter::{
-        SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
-    };
     use datafusion_datasource::ListingTableUrl;
     use datafusion_expr::dml::InsertOp;
     use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
@@ -147,15 +144,12 @@ mod tests {
     use datafusion_physical_expr_common::sort_expr::LexOrdering;
     use datafusion_physical_plan::empty::EmptyExec;
     use datafusion_physical_plan::{collect, ExecutionPlanProperties};
-    use rstest::rstest;
     use std::collections::HashMap;
     use std::io::Write;
     use std::sync::Arc;
     use tempfile::TempDir;
     use url::Url;
 
-    const DUMMY_NULL_COUNT: Precision<usize> = Precision::Exact(42);
-
     /// Creates a test schema with standard field types used in tests
     fn create_test_schema() -> SchemaRef {
         Arc::new(Schema::new(vec![
@@ -1448,31 +1442,6 @@ mod tests {
         Ok(())
     }
 
-    #[tokio::test]
-    async fn test_statistics_mapping_with_custom_factory() -> Result<()> {
-        let ctx = SessionContext::new();
-        let table = create_test_listing_table_with_json_and_adapter(
-            &ctx,
-            false,
-            // NullStatsAdapterFactory sets column_statistics null_count to 
DUMMY_NULL_COUNT
-            Arc::new(NullStatsAdapterFactory {}),
-        )?;
-
-        let result = table.list_files_for_scan(&ctx.state(), &[], None).await?;
-
-        assert_eq!(
-            result.statistics.column_statistics[0].null_count,
-            DUMMY_NULL_COUNT
-        );
-        for g in result.file_groups {
-            if let Some(s) = g.file_statistics(None) {
-                assert_eq!(s.column_statistics[0].null_count, 
DUMMY_NULL_COUNT);
-            }
-        }
-
-        Ok(())
-    }
-
     #[tokio::test]
     async fn test_statistics_mapping_with_default_factory() -> Result<()> {
         let ctx = SessionContext::new();
@@ -1513,199 +1482,4 @@ mod tests {
 
         Ok(())
     }
-
-    #[rstest]
-    #[case(MapSchemaError::TypeIncompatible, "Cannot map incompatible types")]
-    #[case(MapSchemaError::GeneralFailure, "Schema adapter mapping failed")]
-    #[case(
-        MapSchemaError::InvalidProjection,
-        "Invalid projection in schema mapping"
-    )]
-    #[tokio::test]
-    async fn test_schema_adapter_map_schema_errors(
-        #[case] error_type: MapSchemaError,
-        #[case] expected_error_msg: &str,
-    ) -> Result<()> {
-        let ctx = SessionContext::new();
-        let table = create_test_listing_table_with_json_and_adapter(
-            &ctx,
-            false,
-            Arc::new(FailingMapSchemaAdapterFactory { error_type }),
-        )?;
-
-        // The error should bubble up from the scan operation when schema 
mapping fails
-        let scan_result = table.scan(&ctx.state(), None, &[], None).await;
-
-        assert!(scan_result.is_err());
-        let error_msg = scan_result.unwrap_err().to_string();
-        assert!(
-            error_msg.contains(expected_error_msg),
-            "Expected error containing '{expected_error_msg}', got: 
{error_msg}"
-        );
-
-        Ok(())
-    }
-
-    // Test that errors during file listing also bubble up correctly
-    #[tokio::test]
-    async fn test_schema_adapter_error_during_file_listing() -> Result<()> {
-        let ctx = SessionContext::new();
-        let table = create_test_listing_table_with_json_and_adapter(
-            &ctx,
-            true,
-            Arc::new(FailingMapSchemaAdapterFactory {
-                error_type: MapSchemaError::TypeIncompatible,
-            }),
-        )?;
-
-        // The error should bubble up from list_files_for_scan when collecting 
statistics
-        let list_result = table.list_files_for_scan(&ctx.state(), &[], 
None).await;
-
-        assert!(list_result.is_err());
-        let error_msg = list_result.unwrap_err().to_string();
-        assert!(
-            error_msg.contains("Cannot map incompatible types"),
-            "Expected type incompatibility error during file listing, got: 
{error_msg}"
-        );
-
-        Ok(())
-    }
-
-    #[derive(Debug, Copy, Clone)]
-    enum MapSchemaError {
-        TypeIncompatible,
-        GeneralFailure,
-        InvalidProjection,
-    }
-
-    #[derive(Debug)]
-    struct FailingMapSchemaAdapterFactory {
-        error_type: MapSchemaError,
-    }
-
-    impl SchemaAdapterFactory for FailingMapSchemaAdapterFactory {
-        fn create(
-            &self,
-            projected_table_schema: SchemaRef,
-            _table_schema: SchemaRef,
-        ) -> Box<dyn SchemaAdapter> {
-            Box::new(FailingMapSchemaAdapter {
-                schema: projected_table_schema,
-                error_type: self.error_type,
-            })
-        }
-    }
-
-    #[derive(Debug)]
-    struct FailingMapSchemaAdapter {
-        schema: SchemaRef,
-        error_type: MapSchemaError,
-    }
-
-    impl SchemaAdapter for FailingMapSchemaAdapter {
-        fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
-            let field = self.schema.field(index);
-            file_schema.fields.find(field.name()).map(|(i, _)| i)
-        }
-
-        fn map_schema(
-            &self,
-            _file_schema: &Schema,
-        ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
-            // Always fail with different error types based on the configured 
error_type
-            match self.error_type {
-                MapSchemaError::TypeIncompatible => {
-                    plan_err!(
-                        "Cannot map incompatible types: Boolean cannot be cast 
to Utf8"
-                    )
-                }
-                MapSchemaError::GeneralFailure => {
-                    plan_err!("Schema adapter mapping failed due to internal 
error")
-                }
-                MapSchemaError::InvalidProjection => {
-                    plan_err!("Invalid projection in schema mapping: column 
index out of bounds")
-                }
-            }
-        }
-    }
-
-    #[derive(Debug)]
-    struct NullStatsAdapterFactory;
-
-    impl SchemaAdapterFactory for NullStatsAdapterFactory {
-        fn create(
-            &self,
-            projected_table_schema: SchemaRef,
-            _table_schema: SchemaRef,
-        ) -> Box<dyn SchemaAdapter> {
-            Box::new(NullStatsAdapter {
-                schema: projected_table_schema,
-            })
-        }
-    }
-
-    #[derive(Debug)]
-    struct NullStatsAdapter {
-        schema: SchemaRef,
-    }
-
-    impl SchemaAdapter for NullStatsAdapter {
-        fn map_column_index(&self, index: usize, file_schema: &Schema) -> 
Option<usize> {
-            let field = self.schema.field(index);
-            file_schema.fields.find(field.name()).map(|(i, _)| i)
-        }
-
-        fn map_schema(
-            &self,
-            file_schema: &Schema,
-        ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
-            let projection = (0..file_schema.fields().len()).collect();
-            Ok((Arc::new(NullStatsMapper {}), projection))
-        }
-    }
-
-    #[derive(Debug)]
-    struct NullStatsMapper;
-
-    impl SchemaMapper for NullStatsMapper {
-        fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
-            Ok(batch)
-        }
-
-        fn map_column_statistics(
-            &self,
-            stats: &[ColumnStatistics],
-        ) -> Result<Vec<ColumnStatistics>> {
-            Ok(stats
-                .iter()
-                .map(|s| {
-                    let mut s = s.clone();
-                    s.null_count = DUMMY_NULL_COUNT;
-                    s
-                })
-                .collect())
-        }
-    }
-
-    /// Helper function to create a test ListingTable with JSON format and 
custom schema adapter factory
-    fn create_test_listing_table_with_json_and_adapter(
-        ctx: &SessionContext,
-        collect_stat: bool,
-        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
-    ) -> Result<ListingTable> {
-        let path = "table/file.json";
-        register_test_store(ctx, &[(path, 10)]);
-
-        let format = JsonFormat::default();
-        let opt = 
ListingOptions::new(Arc::new(format)).with_collect_stat(collect_stat);
-        let schema = Schema::new(vec![Field::new("a", DataType::Boolean, 
false)]);
-        let table_path = ListingTableUrl::parse("test:///table/")?;
-
-        let config = ListingTableConfig::new(table_path)
-            .with_listing_options(opt)
-            .with_schema(Arc::new(schema))
-            .with_schema_adapter_factory(schema_adapter_factory);
-
-        ListingTable::try_new(config)
-    }
 }
diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs 
b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
index cbfcb71883..1846473e10 100644
--- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
+++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
@@ -112,13 +112,26 @@ mod test {
             .unwrap()
     }
 
+    // Date32 values for test data (days since 1970-01-01):
+    // 2025-03-01 = 20148
+    // 2025-03-02 = 20149
+    // 2025-03-03 = 20150
+    // 2025-03-04 = 20151
+    const DATE_2025_03_01: i32 = 20148;
+    const DATE_2025_03_02: i32 = 20149;
+    const DATE_2025_03_03: i32 = 20150;
+    const DATE_2025_03_04: i32 = 20151;
+
     /// Helper function to create expected statistics for a partition with 
Int32 column
+    ///
+    /// If `date_range` is provided, includes exact statistics for the 
partition date column.
+    /// Partition column statistics are exact because all rows in a partition 
share the same value.
     fn create_partition_statistics(
         num_rows: usize,
         total_byte_size: usize,
         min_value: i32,
         max_value: i32,
-        include_date_column: bool,
+        date_range: Option<(i32, i32)>,
     ) -> Statistics {
         // Int32 is 4 bytes per row
         let int32_byte_size = num_rows * 4;
@@ -131,16 +144,19 @@ mod test {
             byte_size: Precision::Exact(int32_byte_size),
         }];
 
-        if include_date_column {
-            // The date column is a partition column (from the directory path),
-            // not stored in the parquet file, so byte_size is Absent
+        if let Some((min_date, max_date)) = date_range {
+            // Partition column stats are computed from partition values:
+            // - null_count = 0 (partition values from paths are never null)
+            // - min/max are the merged partition values across files in the 
group
+            // - byte_size = num_rows * 4 (Date32 is 4 bytes per row)
+            let date32_byte_size = num_rows * 4;
             column_stats.push(ColumnStatistics {
-                null_count: Precision::Absent,
-                max_value: Precision::Absent,
-                min_value: Precision::Absent,
+                null_count: Precision::Exact(0),
+                max_value: 
Precision::Exact(ScalarValue::Date32(Some(max_date))),
+                min_value: 
Precision::Exact(ScalarValue::Date32(Some(min_date))),
                 sum_value: Precision::Absent,
                 distinct_count: Precision::Absent,
-                byte_size: Precision::Absent,
+                byte_size: Precision::Exact(date32_byte_size),
             });
         }
 
@@ -220,10 +236,22 @@ mod test {
         let statistics = (0..scan.output_partitioning().partition_count())
             .map(|idx| scan.partition_statistics(Some(idx)))
             .collect::<Result<Vec<_>>>()?;
-        let expected_statistic_partition_1 =
-            create_partition_statistics(2, 16, 3, 4, true);
-        let expected_statistic_partition_2 =
-            create_partition_statistics(2, 16, 1, 2, true);
+        // Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02]
+        let expected_statistic_partition_1 = create_partition_statistics(
+            2,
+            16,
+            3,
+            4,
+            Some((DATE_2025_03_01, DATE_2025_03_02)),
+        );
+        // Partition 2: ids [1,2], dates [2025-03-03, 2025-03-04]
+        let expected_statistic_partition_2 = create_partition_statistics(
+            2,
+            16,
+            1,
+            2,
+            Some((DATE_2025_03_03, DATE_2025_03_04)),
+        );
         // Check the statistics of each partition
         assert_eq!(statistics.len(), 2);
         assert_eq!(statistics[0], expected_statistic_partition_1);
@@ -252,10 +280,11 @@ mod test {
         let statistics = 
(0..projection.output_partitioning().partition_count())
             .map(|idx| projection.partition_statistics(Some(idx)))
             .collect::<Result<Vec<_>>>()?;
+        // Projection only includes id column, not the date partition column
         let expected_statistic_partition_1 =
-            create_partition_statistics(2, 8, 3, 4, false);
+            create_partition_statistics(2, 8, 3, 4, None);
         let expected_statistic_partition_2 =
-            create_partition_statistics(2, 8, 1, 2, false);
+            create_partition_statistics(2, 8, 1, 2, None);
         // Check the statistics of each partition
         assert_eq!(statistics.len(), 2);
         assert_eq!(statistics[0], expected_statistic_partition_1);
@@ -283,7 +312,14 @@ mod test {
         let statistics = (0..sort_exec.output_partitioning().partition_count())
             .map(|idx| sort_exec.partition_statistics(Some(idx)))
             .collect::<Result<Vec<_>>>()?;
-        let expected_statistic_partition = create_partition_statistics(4, 32, 
1, 4, true);
+        // All 4 files merged: ids [1-4], dates [2025-03-01, 2025-03-04]
+        let expected_statistic_partition = create_partition_statistics(
+            4,
+            32,
+            1,
+            4,
+            Some((DATE_2025_03_01, DATE_2025_03_04)),
+        );
         assert_eq!(statistics.len(), 1);
         assert_eq!(statistics[0], expected_statistic_partition);
         // Check the statistics_by_partition with real results
@@ -296,10 +332,22 @@ mod test {
         let sort_exec: Arc<dyn ExecutionPlan> = Arc::new(
             SortExec::new(ordering.into(), 
scan_2).with_preserve_partitioning(true),
         );
-        let expected_statistic_partition_1 =
-            create_partition_statistics(2, 16, 3, 4, true);
-        let expected_statistic_partition_2 =
-            create_partition_statistics(2, 16, 1, 2, true);
+        // Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02]
+        let expected_statistic_partition_1 = create_partition_statistics(
+            2,
+            16,
+            3,
+            4,
+            Some((DATE_2025_03_01, DATE_2025_03_02)),
+        );
+        // Partition 2: ids [1,2], dates [2025-03-03, 2025-03-04]
+        let expected_statistic_partition_2 = create_partition_statistics(
+            2,
+            16,
+            1,
+            2,
+            Some((DATE_2025_03_03, DATE_2025_03_04)),
+        );
         let statistics = (0..sort_exec.output_partitioning().partition_count())
             .map(|idx| sort_exec.partition_statistics(Some(idx)))
             .collect::<Result<Vec<_>>>()?;
@@ -349,7 +397,7 @@ mod test {
                     min_value: Precision::Exact(ScalarValue::Null),
                     sum_value: Precision::Exact(ScalarValue::Null),
                     distinct_count: Precision::Exact(0),
-                    byte_size: Precision::Absent,
+                    byte_size: Precision::Exact(16), // 4 rows * 4 bytes 
(Date32)
                 },
             ],
         };
@@ -378,7 +426,7 @@ mod test {
                     min_value: Precision::Exact(ScalarValue::Null),
                     sum_value: Precision::Exact(ScalarValue::Null),
                     distinct_count: Precision::Exact(0),
-                    byte_size: Precision::Absent,
+                    byte_size: Precision::Exact(8), // 2 rows * 4 bytes 
(Date32)
                 },
             ],
         };
@@ -397,10 +445,22 @@ mod test {
             .collect::<Result<Vec<_>>>()?;
         // Check that we have 4 partitions (2 from each scan)
         assert_eq!(statistics.len(), 4);
-        let expected_statistic_partition_1 =
-            create_partition_statistics(2, 16, 3, 4, true);
-        let expected_statistic_partition_2 =
-            create_partition_statistics(2, 16, 1, 2, true);
+        // Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02]
+        let expected_statistic_partition_1 = create_partition_statistics(
+            2,
+            16,
+            3,
+            4,
+            Some((DATE_2025_03_01, DATE_2025_03_02)),
+        );
+        // Partition 2: ids [1,2], dates [2025-03-03, 2025-03-04]
+        let expected_statistic_partition_2 = create_partition_statistics(
+            2,
+            16,
+            1,
+            2,
+            Some((DATE_2025_03_03, DATE_2025_03_04)),
+        );
         // Verify first partition (from first scan)
         assert_eq!(statistics[0], expected_statistic_partition_1);
         // Verify second partition (from first scan)
@@ -494,11 +554,13 @@ mod test {
             .collect::<Result<Vec<_>>>()?;
         // Check that we have 2 partitions
         assert_eq!(statistics.len(), 2);
+        // Cross join output schema: [left.id, left.date, right.id]
         // Cross join doesn't propagate Column's byte_size
         let expected_statistic_partition_1 = Statistics {
             num_rows: Precision::Exact(8),
             total_byte_size: Precision::Exact(512),
             column_statistics: vec![
+                // column 0: left.id (Int32, file column from t1)
                 ColumnStatistics {
                     null_count: Precision::Exact(0),
                     max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
@@ -507,14 +569,17 @@ mod test {
                     distinct_count: Precision::Absent,
                     byte_size: Precision::Absent,
                 },
+                // column 1: left.date (Date32, partition column from t1)
+                // Partition column statistics are exact because all rows in a 
partition share the same value.
                 ColumnStatistics {
-                    null_count: Precision::Absent,
-                    max_value: Precision::Absent,
-                    min_value: Precision::Absent,
+                    null_count: Precision::Exact(0),
+                    max_value: 
Precision::Exact(ScalarValue::Date32(Some(20151))),
+                    min_value: 
Precision::Exact(ScalarValue::Date32(Some(20148))),
                     sum_value: Precision::Absent,
                     distinct_count: Precision::Absent,
                     byte_size: Precision::Absent,
                 },
+                // column 2: right.id (Int32, file column from t2) - right 
partition 0: ids [3,4]
                 ColumnStatistics {
                     null_count: Precision::Exact(0),
                     max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
@@ -529,6 +594,7 @@ mod test {
             num_rows: Precision::Exact(8),
             total_byte_size: Precision::Exact(512),
             column_statistics: vec![
+                // column 0: left.id (Int32, file column from t1)
                 ColumnStatistics {
                     null_count: Precision::Exact(0),
                     max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
@@ -537,14 +603,17 @@ mod test {
                     distinct_count: Precision::Absent,
                     byte_size: Precision::Absent,
                 },
+                // column 1: left.date (Date32, partition column from t1)
+                // Partition column statistics are exact because all rows in a 
partition share the same value.
                 ColumnStatistics {
-                    null_count: Precision::Absent,
-                    max_value: Precision::Absent,
-                    min_value: Precision::Absent,
+                    null_count: Precision::Exact(0),
+                    max_value: 
Precision::Exact(ScalarValue::Date32(Some(20151))),
+                    min_value: 
Precision::Exact(ScalarValue::Date32(Some(20148))),
                     sum_value: Precision::Absent,
                     distinct_count: Precision::Absent,
                     byte_size: Precision::Absent,
                 },
+                // column 2: right.id (Int32, file column from t2) - right 
partition 1: ids [1,2]
                 ColumnStatistics {
                     null_count: Precision::Exact(0),
                     max_value: Precision::Exact(ScalarValue::Int32(Some(2))),
@@ -572,10 +641,22 @@ mod test {
         let scan = create_scan_exec_with_statistics(None, Some(2)).await;
         let coalesce_batches: Arc<dyn ExecutionPlan> =
             Arc::new(CoalesceBatchesExec::new(scan, 2));
-        let expected_statistic_partition_1 =
-            create_partition_statistics(2, 16, 3, 4, true);
-        let expected_statistic_partition_2 =
-            create_partition_statistics(2, 16, 1, 2, true);
+        // Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02]
+        let expected_statistic_partition_1 = create_partition_statistics(
+            2,
+            16,
+            3,
+            4,
+            Some((DATE_2025_03_01, DATE_2025_03_02)),
+        );
+        // Partition 2: ids [1,2], dates [2025-03-03, 2025-03-04]
+        let expected_statistic_partition_2 = create_partition_statistics(
+            2,
+            16,
+            1,
+            2,
+            Some((DATE_2025_03_03, DATE_2025_03_04)),
+        );
         let statistics = 
(0..coalesce_batches.output_partitioning().partition_count())
             .map(|idx| coalesce_batches.partition_statistics(Some(idx)))
             .collect::<Result<Vec<_>>>()?;
@@ -597,7 +678,14 @@ mod test {
         let scan = create_scan_exec_with_statistics(None, Some(2)).await;
         let coalesce_partitions: Arc<dyn ExecutionPlan> =
             Arc::new(CoalescePartitionsExec::new(scan));
-        let expected_statistic_partition = create_partition_statistics(4, 32, 
1, 4, true);
+        // All files merged: ids [1-4], dates [2025-03-01, 2025-03-04]
+        let expected_statistic_partition = create_partition_statistics(
+            4,
+            32,
+            1,
+            4,
+            Some((DATE_2025_03_01, DATE_2025_03_04)),
+        );
         let statistics = 
(0..coalesce_partitions.output_partitioning().partition_count())
             .map(|idx| coalesce_partitions.partition_statistics(Some(idx)))
             .collect::<Result<Vec<_>>>()?;
@@ -646,7 +734,14 @@ mod test {
             .map(|idx| global_limit.partition_statistics(Some(idx)))
             .collect::<Result<Vec<_>>>()?;
         assert_eq!(statistics.len(), 1);
-        let expected_statistic_partition = create_partition_statistics(2, 16, 
3, 4, true);
+        // GlobalLimit takes from first partition: ids [3,4], dates 
[2025-03-01, 2025-03-02]
+        let expected_statistic_partition = create_partition_statistics(
+            2,
+            16,
+            3,
+            4,
+            Some((DATE_2025_03_01, DATE_2025_03_02)),
+        );
         assert_eq!(statistics[0], expected_statistic_partition);
         Ok(())
     }
diff --git a/datafusion/core/tests/sql/path_partition.rs 
b/datafusion/core/tests/sql/path_partition.rs
index 05cc723ef0..3ee4e37589 100644
--- a/datafusion/core/tests/sql/path_partition.rs
+++ b/datafusion/core/tests/sql/path_partition.rs
@@ -31,7 +31,6 @@ use datafusion::{
         listing::{ListingOptions, ListingTable, ListingTableConfig},
     },
     error::Result,
-    physical_plan::ColumnStatistics,
     prelude::SessionContext,
     test_util::{self, arrow_test_data, parquet_test_data},
 };
@@ -464,10 +463,19 @@ async fn parquet_statistics() -> Result<()> {
     assert_eq!(stat_cols.len(), 4);
     // stats for the first col are read from the parquet file
     assert_eq!(stat_cols[0].null_count, Precision::Exact(3));
-    // TODO assert partition column (1,2,3) stats once implemented (#1186)
-    assert_eq!(stat_cols[1], ColumnStatistics::new_unknown(),);
-    assert_eq!(stat_cols[2], ColumnStatistics::new_unknown(),);
-    assert_eq!(stat_cols[3], ColumnStatistics::new_unknown(),);
+    // Partition column statistics (year=2021 for all 3 rows)
+    assert_eq!(stat_cols[1].null_count, Precision::Exact(0));
+    assert_eq!(
+        stat_cols[1].min_value,
+        Precision::Exact(ScalarValue::Int32(Some(2021)))
+    );
+    assert_eq!(
+        stat_cols[1].max_value,
+        Precision::Exact(ScalarValue::Int32(Some(2021)))
+    );
+    // month and day are Utf8 partition columns with statistics
+    assert_eq!(stat_cols[2].null_count, Precision::Exact(0));
+    assert_eq!(stat_cols[3].null_count, Precision::Exact(0));
 
     //// WITH PROJECTION ////
     let dataframe = ctx.sql("SELECT mycol, day FROM t WHERE day='28'").await?;
@@ -479,8 +487,16 @@ async fn parquet_statistics() -> Result<()> {
     assert_eq!(stat_cols.len(), 2);
     // stats for the first col are read from the parquet file
     assert_eq!(stat_cols[0].null_count, Precision::Exact(1));
-    // TODO assert partition column stats once implemented (#1186)
-    assert_eq!(stat_cols[1], ColumnStatistics::new_unknown());
+    // Partition column statistics for day='28' (1 row)
+    assert_eq!(stat_cols[1].null_count, Precision::Exact(0));
+    assert_eq!(
+        stat_cols[1].min_value,
+        Precision::Exact(ScalarValue::Utf8(Some("28".to_string())))
+    );
+    assert_eq!(
+        stat_cols[1].max_value,
+        Precision::Exact(ScalarValue::Utf8(Some("28".to_string())))
+    );
 
     Ok(())
 }
diff --git a/datafusion/datasource/src/file_groups.rs 
b/datafusion/datasource/src/file_groups.rs
index d5fde15ff9..28a403ab92 100644
--- a/datafusion/datasource/src/file_groups.rs
+++ b/datafusion/datasource/src/file_groups.rs
@@ -364,11 +364,27 @@ impl FileGroupPartitioner {
 
 /// Represents a group of partitioned files that'll be processed by a single 
thread.
 /// Maintains optional statistics across all files in the group.
+///
+/// # Statistics
+///
+/// The group-level [`FileGroup::file_statistics`] field contains merged 
statistics from all files
+/// in the group for the **full table schema** (file columns + partition 
columns).
+///
+/// Partition column statistics are derived from the individual file partition 
values:
+/// - `min` = minimum partition value across all files in the group
+/// - `max` = maximum partition value across all files in the group
+/// - `null_count` = 0 (partition values are never null)
+///
+/// This allows query optimizers to prune entire file groups based on 
partition bounds.
 #[derive(Debug, Clone)]
 pub struct FileGroup {
     /// The files in this group
     files: Vec<PartitionedFile>,
-    /// Optional statistics for the data across all files in the group
+    /// Optional statistics for the data across all files in the group.
+    ///
+    /// These statistics cover the full table schema: file columns plus 
partition columns.
+    /// Partition column statistics are merged from individual 
[`PartitionedFile::statistics`],
+    /// which compute exact values from [`PartitionedFile::partition_values`].
     statistics: Option<Arc<Statistics>>,
 }
 
diff --git a/datafusion/datasource/src/file_scan_config.rs 
b/datafusion/datasource/src/file_scan_config.rs
index c2415b3d59..ad89406014 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -730,6 +730,7 @@ impl DataSource for FileScanConfig {
     fn partition_statistics(&self, partition: Option<usize>) -> 
Result<Statistics> {
         if let Some(partition) = partition {
             // Get statistics for a specific partition
+            // Note: FileGroup statistics include partition columns (computed 
from partition_values)
             if let Some(file_group) = self.file_groups.get(partition)
                 && let Some(stat) = file_group.file_statistics(None)
             {
diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs
index 1e71825b99..347e783c27 100644
--- a/datafusion/datasource/src/mod.rs
+++ b/datafusion/datasource/src/mod.rs
@@ -95,6 +95,19 @@ impl FileRange {
 #[derive(Debug, Clone)]
 /// A single file or part of a file that should be read, along with its 
schema, statistics
 /// and partition column values that need to be appended to each row.
+///
+/// # Statistics
+///
+/// The [`Self::statistics`] field contains statistics for the **full table 
schema**,
+/// which includes both file columns and partition columns. When statistics 
are set via
+/// [`Self::with_statistics`], exact statistics for partition columns are 
automatically
+/// computed from [`Self::partition_values`]:
+///
+/// - `min = max = partition_value` (all rows in a file share the same 
partition value)
+/// - `null_count = 0` (partition values extracted from paths are never null)
+/// - `distinct_count = 1` (single distinct value per file for each partition 
column)
+///
+/// This enables query optimizers to use partition column bounds for pruning 
and planning.
 pub struct PartitionedFile {
     /// Path for the file (e.g. URL, filesystem path, etc)
     pub object_meta: ObjectMeta,
@@ -115,6 +128,10 @@ pub struct PartitionedFile {
     ///
     /// DataFusion relies on these statistics for planning (in particular to 
sort file groups),
     /// so if they are incorrect, incorrect answers may result.
+    ///
+    /// These statistics cover the full table schema: file columns plus 
partition columns.
+    /// When set via [`Self::with_statistics`], partition column statistics 
are automatically
+    /// computed from [`Self::partition_values`] with exact 
min/max/null_count/distinct_count.
     pub statistics: Option<Arc<Statistics>>,
     /// An optional field for user defined per object metadata
     pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
@@ -214,9 +231,38 @@ impl PartitionedFile {
         self
     }
 
-    // Update the statistics for this file.
-    pub fn with_statistics(mut self, statistics: Arc<Statistics>) -> Self {
-        self.statistics = Some(statistics);
+    /// Update the statistics for this file.
+    ///
+    /// The provided `statistics` should cover only the file schema columns.
+    /// This method will automatically append exact statistics for partition 
columns
+    /// based on `partition_values`:
+    /// - `min = max = partition_value` (all rows have the same value)
+    /// - `null_count = 0` (partition values from paths are never null)
+    /// - `distinct_count = 1` (all rows have the same partition value)
+    pub fn with_statistics(mut self, file_statistics: Arc<Statistics>) -> Self 
{
+        if self.partition_values.is_empty() {
+            // No partition columns, use stats as-is
+            self.statistics = Some(file_statistics);
+        } else {
+            // Extend stats with exact partition column statistics
+            let mut stats = Arc::unwrap_or_clone(file_statistics);
+            for partition_value in &self.partition_values {
+                let col_stats = ColumnStatistics {
+                    null_count: Precision::Exact(0),
+                    max_value: Precision::Exact(partition_value.clone()),
+                    min_value: Precision::Exact(partition_value.clone()),
+                    distinct_count: Precision::Exact(1),
+                    sum_value: Precision::Absent,
+                    byte_size: partition_value
+                        .data_type()
+                        .primitive_width()
+                        .map(|w| stats.num_rows.multiply(&Precision::Exact(w)))
+                        .unwrap_or_else(|| Precision::Absent),
+                };
+                stats.column_statistics.push(col_stats);
+            }
+            self.statistics = Some(Arc::new(stats));
+        }
         self
     }
 
@@ -561,6 +607,70 @@ mod tests {
         sut.get_store(url.as_ref()).unwrap();
     }
 
+    #[test]
+    fn test_with_statistics_appends_partition_column_stats() {
+        use crate::PartitionedFile;
+        use datafusion_common::stats::Precision;
+        use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
+
+        // Create a PartitionedFile with partition values
+        let mut pf = PartitionedFile::new(
+            "test.parquet",
+            100, // file size
+        );
+        pf.partition_values = vec![
+            ScalarValue::Date32(Some(20148)), // 2025-03-01
+        ];
+
+        // Create file-only statistics (1 column for 'id')
+        let file_stats = Arc::new(Statistics {
+            num_rows: Precision::Exact(2),
+            total_byte_size: Precision::Exact(16),
+            column_statistics: vec![ColumnStatistics {
+                null_count: Precision::Exact(0),
+                max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
+                min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
+                sum_value: Precision::Absent,
+                distinct_count: Precision::Absent,
+                byte_size: Precision::Absent,
+            }],
+        });
+
+        // Call with_statistics - should append partition column stats
+        let pf = pf.with_statistics(file_stats);
+
+        // Verify the statistics now have 2 columns
+        let stats = pf.statistics.unwrap();
+        assert_eq!(
+            stats.column_statistics.len(),
+            2,
+            "Expected 2 columns (id + date partition)"
+        );
+
+        // Verify partition column statistics
+        let partition_col_stats = &stats.column_statistics[1];
+        assert_eq!(
+            partition_col_stats.null_count,
+            Precision::Exact(0),
+            "Partition column null_count should be Exact(0)"
+        );
+        assert_eq!(
+            partition_col_stats.min_value,
+            Precision::Exact(ScalarValue::Date32(Some(20148))),
+            "Partition column min should match partition value"
+        );
+        assert_eq!(
+            partition_col_stats.max_value,
+            Precision::Exact(ScalarValue::Date32(Some(20148))),
+            "Partition column max should match partition value"
+        );
+        assert_eq!(
+            partition_col_stats.distinct_count,
+            Precision::Exact(1),
+            "Partition column distinct_count should be Exact(1)"
+        );
+    }
+
     #[test]
     fn test_url_contains() {
         let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to