This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 899a762230 Populate partition column statistics for PartitionedFile
(#19284)
899a762230 is described below
commit 899a762230d0abc705482d8898d3793763b6afb4
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Mon Dec 15 07:10:55 2025 -0600
Populate partition column statistics for PartitionedFile (#19284)
Superseeds https://github.com/apache/datafusion/pull/15865
Part of https://github.com/apache/datafusion/issues/16800
The idea here was to remove usage of `SchemaAdapter` and at the same
time actually populate the partition column statistics.
---------
Co-authored-by: Martin Grigorov <[email protected]>
---
datafusion/catalog-listing/src/table.rs | 43 +---
datafusion/core/src/datasource/listing/table.rs | 230 +--------------------
.../physical_optimizer/partition_statistics.rs | 169 +++++++++++----
datafusion/core/tests/sql/path_partition.rs | 30 ++-
datafusion/datasource/src/file_groups.rs | 18 +-
datafusion/datasource/src/file_scan_config.rs | 1 +
datafusion/datasource/src/mod.rs | 116 ++++++++++-
7 files changed, 296 insertions(+), 311 deletions(-)
diff --git a/datafusion/catalog-listing/src/table.rs
b/datafusion/catalog-listing/src/table.rs
index 3f28609471..1dbfaf381a 100644
--- a/datafusion/catalog-listing/src/table.rs
+++ b/datafusion/catalog-listing/src/table.rs
@@ -23,16 +23,13 @@ use async_trait::async_trait;
use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
use datafusion_common::stats::Precision;
use datafusion_common::{
- Constraints, DataFusionError, SchemaExt, Statistics,
internal_datafusion_err,
- plan_err, project_schema,
+ Constraints, SchemaExt, Statistics, internal_datafusion_err, plan_err,
project_schema,
};
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_groups::FileGroup;
use datafusion_datasource::file_scan_config::{FileScanConfig,
FileScanConfigBuilder};
use datafusion_datasource::file_sink_config::FileSinkConfig;
-use datafusion_datasource::schema_adapter::{
- DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
-};
+use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_datasource::{
ListingTableUrl, PartitionedFile, TableSchema,
compute_all_files_statistics,
};
@@ -331,20 +328,6 @@ impl ListingTable {
self.schema_adapter_factory.as_ref()
}
- /// Creates a schema adapter for mapping between file and table schemas
- ///
- /// Uses the configured schema adapter factory if available, otherwise
falls back
- /// to the default implementation.
- fn create_schema_adapter(&self) -> Box<dyn SchemaAdapter> {
- let table_schema = self.schema();
- match &self.schema_adapter_factory {
- Some(factory) => {
- factory.create_with_projected_schema(Arc::clone(&table_schema))
- }
- None =>
DefaultSchemaAdapterFactory::from_schema(Arc::clone(&table_schema)),
- }
- }
-
/// Creates a file source and applies schema adapter factory if available
fn create_file_source_with_schema_adapter(
&self,
@@ -359,10 +342,8 @@ impl ListingTable {
);
let mut source = self.options.format.file_source(table_schema);
- // Apply schema adapter to source if available
- //
+ // Apply schema adapter to source if available.
// The source will use this SchemaAdapter to adapt data batches as
they flow up the plan.
- // Note: ListingTable also creates a SchemaAdapter in `scan()` but
that is only used to adapt collected statistics.
if let Some(factory) = &self.schema_adapter_factory {
source = source.with_schema_adapter_factory(Arc::clone(factory))?;
}
@@ -709,25 +690,17 @@ impl ListingTable {
)
};
- let (mut file_groups, mut stats) = compute_all_files_statistics(
+ let (file_groups, stats) = compute_all_files_statistics(
file_groups,
self.schema(),
self.options.collect_stat,
inexact_stats,
)?;
- let schema_adapter = self.create_schema_adapter();
- let (schema_mapper, _) =
schema_adapter.map_schema(self.file_schema.as_ref())?;
-
- stats.column_statistics =
- schema_mapper.map_column_statistics(&stats.column_statistics)?;
- file_groups.iter_mut().try_for_each(|file_group| {
- if let Some(stat) = file_group.statistics_mut() {
- stat.column_statistics =
-
schema_mapper.map_column_statistics(&stat.column_statistics)?;
- }
- Ok::<_, DataFusionError>(())
- })?;
+ // Note: Statistics already include both file columns and partition
columns.
+ // PartitionedFile::with_statistics automatically appends exact
partition column
+ // statistics (min=max=partition_value, null_count=0,
distinct_count=1) computed
+ // from partition_values.
Ok(ListFilesResult {
file_groups,
statistics: stats,
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 020bdfd5d8..7db79485d1 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -129,16 +129,13 @@ mod tests {
ListingOptions, ListingTable, ListingTableConfig, SchemaSource,
};
use datafusion_common::{
- assert_contains, plan_err,
+ assert_contains,
stats::Precision,
test_util::{batches_to_string, datafusion_test_data},
- ColumnStatistics, DataFusionError, Result, ScalarValue,
+ DataFusionError, Result, ScalarValue,
};
use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_format::FileFormat;
- use datafusion_datasource::schema_adapter::{
- SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
- };
use datafusion_datasource::ListingTableUrl;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
@@ -147,15 +144,12 @@ mod tests {
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::empty::EmptyExec;
use datafusion_physical_plan::{collect, ExecutionPlanProperties};
- use rstest::rstest;
use std::collections::HashMap;
use std::io::Write;
use std::sync::Arc;
use tempfile::TempDir;
use url::Url;
- const DUMMY_NULL_COUNT: Precision<usize> = Precision::Exact(42);
-
/// Creates a test schema with standard field types used in tests
fn create_test_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
@@ -1448,31 +1442,6 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn test_statistics_mapping_with_custom_factory() -> Result<()> {
- let ctx = SessionContext::new();
- let table = create_test_listing_table_with_json_and_adapter(
- &ctx,
- false,
- // NullStatsAdapterFactory sets column_statistics null_count to
DUMMY_NULL_COUNT
- Arc::new(NullStatsAdapterFactory {}),
- )?;
-
- let result = table.list_files_for_scan(&ctx.state(), &[], None).await?;
-
- assert_eq!(
- result.statistics.column_statistics[0].null_count,
- DUMMY_NULL_COUNT
- );
- for g in result.file_groups {
- if let Some(s) = g.file_statistics(None) {
- assert_eq!(s.column_statistics[0].null_count,
DUMMY_NULL_COUNT);
- }
- }
-
- Ok(())
- }
-
#[tokio::test]
async fn test_statistics_mapping_with_default_factory() -> Result<()> {
let ctx = SessionContext::new();
@@ -1513,199 +1482,4 @@ mod tests {
Ok(())
}
-
- #[rstest]
- #[case(MapSchemaError::TypeIncompatible, "Cannot map incompatible types")]
- #[case(MapSchemaError::GeneralFailure, "Schema adapter mapping failed")]
- #[case(
- MapSchemaError::InvalidProjection,
- "Invalid projection in schema mapping"
- )]
- #[tokio::test]
- async fn test_schema_adapter_map_schema_errors(
- #[case] error_type: MapSchemaError,
- #[case] expected_error_msg: &str,
- ) -> Result<()> {
- let ctx = SessionContext::new();
- let table = create_test_listing_table_with_json_and_adapter(
- &ctx,
- false,
- Arc::new(FailingMapSchemaAdapterFactory { error_type }),
- )?;
-
- // The error should bubble up from the scan operation when schema
mapping fails
- let scan_result = table.scan(&ctx.state(), None, &[], None).await;
-
- assert!(scan_result.is_err());
- let error_msg = scan_result.unwrap_err().to_string();
- assert!(
- error_msg.contains(expected_error_msg),
- "Expected error containing '{expected_error_msg}', got:
{error_msg}"
- );
-
- Ok(())
- }
-
- // Test that errors during file listing also bubble up correctly
- #[tokio::test]
- async fn test_schema_adapter_error_during_file_listing() -> Result<()> {
- let ctx = SessionContext::new();
- let table = create_test_listing_table_with_json_and_adapter(
- &ctx,
- true,
- Arc::new(FailingMapSchemaAdapterFactory {
- error_type: MapSchemaError::TypeIncompatible,
- }),
- )?;
-
- // The error should bubble up from list_files_for_scan when collecting
statistics
- let list_result = table.list_files_for_scan(&ctx.state(), &[],
None).await;
-
- assert!(list_result.is_err());
- let error_msg = list_result.unwrap_err().to_string();
- assert!(
- error_msg.contains("Cannot map incompatible types"),
- "Expected type incompatibility error during file listing, got:
{error_msg}"
- );
-
- Ok(())
- }
-
- #[derive(Debug, Copy, Clone)]
- enum MapSchemaError {
- TypeIncompatible,
- GeneralFailure,
- InvalidProjection,
- }
-
- #[derive(Debug)]
- struct FailingMapSchemaAdapterFactory {
- error_type: MapSchemaError,
- }
-
- impl SchemaAdapterFactory for FailingMapSchemaAdapterFactory {
- fn create(
- &self,
- projected_table_schema: SchemaRef,
- _table_schema: SchemaRef,
- ) -> Box<dyn SchemaAdapter> {
- Box::new(FailingMapSchemaAdapter {
- schema: projected_table_schema,
- error_type: self.error_type,
- })
- }
- }
-
- #[derive(Debug)]
- struct FailingMapSchemaAdapter {
- schema: SchemaRef,
- error_type: MapSchemaError,
- }
-
- impl SchemaAdapter for FailingMapSchemaAdapter {
- fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option<usize> {
- let field = self.schema.field(index);
- file_schema.fields.find(field.name()).map(|(i, _)| i)
- }
-
- fn map_schema(
- &self,
- _file_schema: &Schema,
- ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
- // Always fail with different error types based on the configured
error_type
- match self.error_type {
- MapSchemaError::TypeIncompatible => {
- plan_err!(
- "Cannot map incompatible types: Boolean cannot be cast
to Utf8"
- )
- }
- MapSchemaError::GeneralFailure => {
- plan_err!("Schema adapter mapping failed due to internal
error")
- }
- MapSchemaError::InvalidProjection => {
- plan_err!("Invalid projection in schema mapping: column
index out of bounds")
- }
- }
- }
- }
-
- #[derive(Debug)]
- struct NullStatsAdapterFactory;
-
- impl SchemaAdapterFactory for NullStatsAdapterFactory {
- fn create(
- &self,
- projected_table_schema: SchemaRef,
- _table_schema: SchemaRef,
- ) -> Box<dyn SchemaAdapter> {
- Box::new(NullStatsAdapter {
- schema: projected_table_schema,
- })
- }
- }
-
- #[derive(Debug)]
- struct NullStatsAdapter {
- schema: SchemaRef,
- }
-
- impl SchemaAdapter for NullStatsAdapter {
- fn map_column_index(&self, index: usize, file_schema: &Schema) ->
Option<usize> {
- let field = self.schema.field(index);
- file_schema.fields.find(field.name()).map(|(i, _)| i)
- }
-
- fn map_schema(
- &self,
- file_schema: &Schema,
- ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
- let projection = (0..file_schema.fields().len()).collect();
- Ok((Arc::new(NullStatsMapper {}), projection))
- }
- }
-
- #[derive(Debug)]
- struct NullStatsMapper;
-
- impl SchemaMapper for NullStatsMapper {
- fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
- Ok(batch)
- }
-
- fn map_column_statistics(
- &self,
- stats: &[ColumnStatistics],
- ) -> Result<Vec<ColumnStatistics>> {
- Ok(stats
- .iter()
- .map(|s| {
- let mut s = s.clone();
- s.null_count = DUMMY_NULL_COUNT;
- s
- })
- .collect())
- }
- }
-
- /// Helper function to create a test ListingTable with JSON format and
custom schema adapter factory
- fn create_test_listing_table_with_json_and_adapter(
- ctx: &SessionContext,
- collect_stat: bool,
- schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
- ) -> Result<ListingTable> {
- let path = "table/file.json";
- register_test_store(ctx, &[(path, 10)]);
-
- let format = JsonFormat::default();
- let opt =
ListingOptions::new(Arc::new(format)).with_collect_stat(collect_stat);
- let schema = Schema::new(vec![Field::new("a", DataType::Boolean,
false)]);
- let table_path = ListingTableUrl::parse("test:///table/")?;
-
- let config = ListingTableConfig::new(table_path)
- .with_listing_options(opt)
- .with_schema(Arc::new(schema))
- .with_schema_adapter_factory(schema_adapter_factory);
-
- ListingTable::try_new(config)
- }
}
diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
index cbfcb71883..1846473e10 100644
--- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
+++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
@@ -112,13 +112,26 @@ mod test {
.unwrap()
}
+ // Date32 values for test data (days since 1970-01-01):
+ // 2025-03-01 = 20148
+ // 2025-03-02 = 20149
+ // 2025-03-03 = 20150
+ // 2025-03-04 = 20151
+ const DATE_2025_03_01: i32 = 20148;
+ const DATE_2025_03_02: i32 = 20149;
+ const DATE_2025_03_03: i32 = 20150;
+ const DATE_2025_03_04: i32 = 20151;
+
/// Helper function to create expected statistics for a partition with
Int32 column
+ ///
+ /// If `date_range` is provided, includes exact statistics for the
partition date column.
+ /// Partition column statistics are exact because all rows in a partition
share the same value.
fn create_partition_statistics(
num_rows: usize,
total_byte_size: usize,
min_value: i32,
max_value: i32,
- include_date_column: bool,
+ date_range: Option<(i32, i32)>,
) -> Statistics {
// Int32 is 4 bytes per row
let int32_byte_size = num_rows * 4;
@@ -131,16 +144,19 @@ mod test {
byte_size: Precision::Exact(int32_byte_size),
}];
- if include_date_column {
- // The date column is a partition column (from the directory path),
- // not stored in the parquet file, so byte_size is Absent
+ if let Some((min_date, max_date)) = date_range {
+ // Partition column stats are computed from partition values:
+ // - null_count = 0 (partition values from paths are never null)
+ // - min/max are the merged partition values across files in the
group
+ // - byte_size = num_rows * 4 (Date32 is 4 bytes per row)
+ let date32_byte_size = num_rows * 4;
column_stats.push(ColumnStatistics {
- null_count: Precision::Absent,
- max_value: Precision::Absent,
- min_value: Precision::Absent,
+ null_count: Precision::Exact(0),
+ max_value:
Precision::Exact(ScalarValue::Date32(Some(max_date))),
+ min_value:
Precision::Exact(ScalarValue::Date32(Some(min_date))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
- byte_size: Precision::Absent,
+ byte_size: Precision::Exact(date32_byte_size),
});
}
@@ -220,10 +236,22 @@ mod test {
let statistics = (0..scan.output_partitioning().partition_count())
.map(|idx| scan.partition_statistics(Some(idx)))
.collect::<Result<Vec<_>>>()?;
- let expected_statistic_partition_1 =
- create_partition_statistics(2, 16, 3, 4, true);
- let expected_statistic_partition_2 =
- create_partition_statistics(2, 16, 1, 2, true);
+ // Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02]
+ let expected_statistic_partition_1 = create_partition_statistics(
+ 2,
+ 16,
+ 3,
+ 4,
+ Some((DATE_2025_03_01, DATE_2025_03_02)),
+ );
+ // Partition 2: ids [1,2], dates [2025-03-03, 2025-03-04]
+ let expected_statistic_partition_2 = create_partition_statistics(
+ 2,
+ 16,
+ 1,
+ 2,
+ Some((DATE_2025_03_03, DATE_2025_03_04)),
+ );
// Check the statistics of each partition
assert_eq!(statistics.len(), 2);
assert_eq!(statistics[0], expected_statistic_partition_1);
@@ -252,10 +280,11 @@ mod test {
let statistics =
(0..projection.output_partitioning().partition_count())
.map(|idx| projection.partition_statistics(Some(idx)))
.collect::<Result<Vec<_>>>()?;
+ // Projection only includes id column, not the date partition column
let expected_statistic_partition_1 =
- create_partition_statistics(2, 8, 3, 4, false);
+ create_partition_statistics(2, 8, 3, 4, None);
let expected_statistic_partition_2 =
- create_partition_statistics(2, 8, 1, 2, false);
+ create_partition_statistics(2, 8, 1, 2, None);
// Check the statistics of each partition
assert_eq!(statistics.len(), 2);
assert_eq!(statistics[0], expected_statistic_partition_1);
@@ -283,7 +312,14 @@ mod test {
let statistics = (0..sort_exec.output_partitioning().partition_count())
.map(|idx| sort_exec.partition_statistics(Some(idx)))
.collect::<Result<Vec<_>>>()?;
- let expected_statistic_partition = create_partition_statistics(4, 32,
1, 4, true);
+ // All 4 files merged: ids [1-4], dates [2025-03-01, 2025-03-04]
+ let expected_statistic_partition = create_partition_statistics(
+ 4,
+ 32,
+ 1,
+ 4,
+ Some((DATE_2025_03_01, DATE_2025_03_04)),
+ );
assert_eq!(statistics.len(), 1);
assert_eq!(statistics[0], expected_statistic_partition);
// Check the statistics_by_partition with real results
@@ -296,10 +332,22 @@ mod test {
let sort_exec: Arc<dyn ExecutionPlan> = Arc::new(
SortExec::new(ordering.into(),
scan_2).with_preserve_partitioning(true),
);
- let expected_statistic_partition_1 =
- create_partition_statistics(2, 16, 3, 4, true);
- let expected_statistic_partition_2 =
- create_partition_statistics(2, 16, 1, 2, true);
+ // Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02]
+ let expected_statistic_partition_1 = create_partition_statistics(
+ 2,
+ 16,
+ 3,
+ 4,
+ Some((DATE_2025_03_01, DATE_2025_03_02)),
+ );
+ // Partition 2: ids [1,2], dates [2025-03-03, 2025-03-04]
+ let expected_statistic_partition_2 = create_partition_statistics(
+ 2,
+ 16,
+ 1,
+ 2,
+ Some((DATE_2025_03_03, DATE_2025_03_04)),
+ );
let statistics = (0..sort_exec.output_partitioning().partition_count())
.map(|idx| sort_exec.partition_statistics(Some(idx)))
.collect::<Result<Vec<_>>>()?;
@@ -349,7 +397,7 @@ mod test {
min_value: Precision::Exact(ScalarValue::Null),
sum_value: Precision::Exact(ScalarValue::Null),
distinct_count: Precision::Exact(0),
- byte_size: Precision::Absent,
+ byte_size: Precision::Exact(16), // 4 rows * 4 bytes
(Date32)
},
],
};
@@ -378,7 +426,7 @@ mod test {
min_value: Precision::Exact(ScalarValue::Null),
sum_value: Precision::Exact(ScalarValue::Null),
distinct_count: Precision::Exact(0),
- byte_size: Precision::Absent,
+ byte_size: Precision::Exact(8), // 2 rows * 4 bytes
(Date32)
},
],
};
@@ -397,10 +445,22 @@ mod test {
.collect::<Result<Vec<_>>>()?;
// Check that we have 4 partitions (2 from each scan)
assert_eq!(statistics.len(), 4);
- let expected_statistic_partition_1 =
- create_partition_statistics(2, 16, 3, 4, true);
- let expected_statistic_partition_2 =
- create_partition_statistics(2, 16, 1, 2, true);
+ // Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02]
+ let expected_statistic_partition_1 = create_partition_statistics(
+ 2,
+ 16,
+ 3,
+ 4,
+ Some((DATE_2025_03_01, DATE_2025_03_02)),
+ );
+ // Partition 2: ids [1,2], dates [2025-03-03, 2025-03-04]
+ let expected_statistic_partition_2 = create_partition_statistics(
+ 2,
+ 16,
+ 1,
+ 2,
+ Some((DATE_2025_03_03, DATE_2025_03_04)),
+ );
// Verify first partition (from first scan)
assert_eq!(statistics[0], expected_statistic_partition_1);
// Verify second partition (from first scan)
@@ -494,11 +554,13 @@ mod test {
.collect::<Result<Vec<_>>>()?;
// Check that we have 2 partitions
assert_eq!(statistics.len(), 2);
+ // Cross join output schema: [left.id, left.date, right.id]
// Cross join doesn't propagate Column's byte_size
let expected_statistic_partition_1 = Statistics {
num_rows: Precision::Exact(8),
total_byte_size: Precision::Exact(512),
column_statistics: vec![
+ // column 0: left.id (Int32, file column from t1)
ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
@@ -507,14 +569,17 @@ mod test {
distinct_count: Precision::Absent,
byte_size: Precision::Absent,
},
+ // column 1: left.date (Date32, partition column from t1)
+ // Partition column statistics are exact because all rows in a
partition share the same value.
ColumnStatistics {
- null_count: Precision::Absent,
- max_value: Precision::Absent,
- min_value: Precision::Absent,
+ null_count: Precision::Exact(0),
+ max_value:
Precision::Exact(ScalarValue::Date32(Some(20151))),
+ min_value:
Precision::Exact(ScalarValue::Date32(Some(20148))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
byte_size: Precision::Absent,
},
+ // column 2: right.id (Int32, file column from t2) - right
partition 0: ids [3,4]
ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
@@ -529,6 +594,7 @@ mod test {
num_rows: Precision::Exact(8),
total_byte_size: Precision::Exact(512),
column_statistics: vec![
+ // column 0: left.id (Int32, file column from t1)
ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
@@ -537,14 +603,17 @@ mod test {
distinct_count: Precision::Absent,
byte_size: Precision::Absent,
},
+ // column 1: left.date (Date32, partition column from t1)
+ // Partition column statistics are exact because all rows in a
partition share the same value.
ColumnStatistics {
- null_count: Precision::Absent,
- max_value: Precision::Absent,
- min_value: Precision::Absent,
+ null_count: Precision::Exact(0),
+ max_value:
Precision::Exact(ScalarValue::Date32(Some(20151))),
+ min_value:
Precision::Exact(ScalarValue::Date32(Some(20148))),
sum_value: Precision::Absent,
distinct_count: Precision::Absent,
byte_size: Precision::Absent,
},
+ // column 2: right.id (Int32, file column from t2) - right
partition 1: ids [1,2]
ColumnStatistics {
null_count: Precision::Exact(0),
max_value: Precision::Exact(ScalarValue::Int32(Some(2))),
@@ -572,10 +641,22 @@ mod test {
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
let coalesce_batches: Arc<dyn ExecutionPlan> =
Arc::new(CoalesceBatchesExec::new(scan, 2));
- let expected_statistic_partition_1 =
- create_partition_statistics(2, 16, 3, 4, true);
- let expected_statistic_partition_2 =
- create_partition_statistics(2, 16, 1, 2, true);
+ // Partition 1: ids [3,4], dates [2025-03-01, 2025-03-02]
+ let expected_statistic_partition_1 = create_partition_statistics(
+ 2,
+ 16,
+ 3,
+ 4,
+ Some((DATE_2025_03_01, DATE_2025_03_02)),
+ );
+ // Partition 2: ids [1,2], dates [2025-03-03, 2025-03-04]
+ let expected_statistic_partition_2 = create_partition_statistics(
+ 2,
+ 16,
+ 1,
+ 2,
+ Some((DATE_2025_03_03, DATE_2025_03_04)),
+ );
let statistics =
(0..coalesce_batches.output_partitioning().partition_count())
.map(|idx| coalesce_batches.partition_statistics(Some(idx)))
.collect::<Result<Vec<_>>>()?;
@@ -597,7 +678,14 @@ mod test {
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
let coalesce_partitions: Arc<dyn ExecutionPlan> =
Arc::new(CoalescePartitionsExec::new(scan));
- let expected_statistic_partition = create_partition_statistics(4, 32,
1, 4, true);
+ // All files merged: ids [1-4], dates [2025-03-01, 2025-03-04]
+ let expected_statistic_partition = create_partition_statistics(
+ 4,
+ 32,
+ 1,
+ 4,
+ Some((DATE_2025_03_01, DATE_2025_03_04)),
+ );
let statistics =
(0..coalesce_partitions.output_partitioning().partition_count())
.map(|idx| coalesce_partitions.partition_statistics(Some(idx)))
.collect::<Result<Vec<_>>>()?;
@@ -646,7 +734,14 @@ mod test {
.map(|idx| global_limit.partition_statistics(Some(idx)))
.collect::<Result<Vec<_>>>()?;
assert_eq!(statistics.len(), 1);
- let expected_statistic_partition = create_partition_statistics(2, 16,
3, 4, true);
+ // GlobalLimit takes from first partition: ids [3,4], dates
[2025-03-01, 2025-03-02]
+ let expected_statistic_partition = create_partition_statistics(
+ 2,
+ 16,
+ 3,
+ 4,
+ Some((DATE_2025_03_01, DATE_2025_03_02)),
+ );
assert_eq!(statistics[0], expected_statistic_partition);
Ok(())
}
diff --git a/datafusion/core/tests/sql/path_partition.rs
b/datafusion/core/tests/sql/path_partition.rs
index 05cc723ef0..3ee4e37589 100644
--- a/datafusion/core/tests/sql/path_partition.rs
+++ b/datafusion/core/tests/sql/path_partition.rs
@@ -31,7 +31,6 @@ use datafusion::{
listing::{ListingOptions, ListingTable, ListingTableConfig},
},
error::Result,
- physical_plan::ColumnStatistics,
prelude::SessionContext,
test_util::{self, arrow_test_data, parquet_test_data},
};
@@ -464,10 +463,19 @@ async fn parquet_statistics() -> Result<()> {
assert_eq!(stat_cols.len(), 4);
// stats for the first col are read from the parquet file
assert_eq!(stat_cols[0].null_count, Precision::Exact(3));
- // TODO assert partition column (1,2,3) stats once implemented (#1186)
- assert_eq!(stat_cols[1], ColumnStatistics::new_unknown(),);
- assert_eq!(stat_cols[2], ColumnStatistics::new_unknown(),);
- assert_eq!(stat_cols[3], ColumnStatistics::new_unknown(),);
+ // Partition column statistics (year=2021 for all 3 rows)
+ assert_eq!(stat_cols[1].null_count, Precision::Exact(0));
+ assert_eq!(
+ stat_cols[1].min_value,
+ Precision::Exact(ScalarValue::Int32(Some(2021)))
+ );
+ assert_eq!(
+ stat_cols[1].max_value,
+ Precision::Exact(ScalarValue::Int32(Some(2021)))
+ );
+ // month and day are Utf8 partition columns with statistics
+ assert_eq!(stat_cols[2].null_count, Precision::Exact(0));
+ assert_eq!(stat_cols[3].null_count, Precision::Exact(0));
//// WITH PROJECTION ////
let dataframe = ctx.sql("SELECT mycol, day FROM t WHERE day='28'").await?;
@@ -479,8 +487,16 @@ async fn parquet_statistics() -> Result<()> {
assert_eq!(stat_cols.len(), 2);
// stats for the first col are read from the parquet file
assert_eq!(stat_cols[0].null_count, Precision::Exact(1));
- // TODO assert partition column stats once implemented (#1186)
- assert_eq!(stat_cols[1], ColumnStatistics::new_unknown());
+ // Partition column statistics for day='28' (1 row)
+ assert_eq!(stat_cols[1].null_count, Precision::Exact(0));
+ assert_eq!(
+ stat_cols[1].min_value,
+ Precision::Exact(ScalarValue::Utf8(Some("28".to_string())))
+ );
+ assert_eq!(
+ stat_cols[1].max_value,
+ Precision::Exact(ScalarValue::Utf8(Some("28".to_string())))
+ );
Ok(())
}
diff --git a/datafusion/datasource/src/file_groups.rs
b/datafusion/datasource/src/file_groups.rs
index d5fde15ff9..28a403ab92 100644
--- a/datafusion/datasource/src/file_groups.rs
+++ b/datafusion/datasource/src/file_groups.rs
@@ -364,11 +364,27 @@ impl FileGroupPartitioner {
/// Represents a group of partitioned files that'll be processed by a single
thread.
/// Maintains optional statistics across all files in the group.
+///
+/// # Statistics
+///
+/// The group-level [`FileGroup::file_statistics`] field contains merged
statistics from all files
+/// in the group for the **full table schema** (file columns + partition
columns).
+///
+/// Partition column statistics are derived from the individual file partition
values:
+/// - `min` = minimum partition value across all files in the group
+/// - `max` = maximum partition value across all files in the group
+/// - `null_count` = 0 (partition values are never null)
+///
+/// This allows query optimizers to prune entire file groups based on
partition bounds.
#[derive(Debug, Clone)]
pub struct FileGroup {
/// The files in this group
files: Vec<PartitionedFile>,
- /// Optional statistics for the data across all files in the group
+ /// Optional statistics for the data across all files in the group.
+ ///
+ /// These statistics cover the full table schema: file columns plus
partition columns.
+ /// Partition column statistics are merged from individual
[`PartitionedFile::statistics`],
+ /// which compute exact values from [`PartitionedFile::partition_values`].
statistics: Option<Arc<Statistics>>,
}
diff --git a/datafusion/datasource/src/file_scan_config.rs
b/datafusion/datasource/src/file_scan_config.rs
index c2415b3d59..ad89406014 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -730,6 +730,7 @@ impl DataSource for FileScanConfig {
fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
if let Some(partition) = partition {
// Get statistics for a specific partition
+ // Note: FileGroup statistics include partition columns (computed
from partition_values)
if let Some(file_group) = self.file_groups.get(partition)
&& let Some(stat) = file_group.file_statistics(None)
{
diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs
index 1e71825b99..347e783c27 100644
--- a/datafusion/datasource/src/mod.rs
+++ b/datafusion/datasource/src/mod.rs
@@ -95,6 +95,19 @@ impl FileRange {
#[derive(Debug, Clone)]
/// A single file or part of a file that should be read, along with its
schema, statistics
/// and partition column values that need to be appended to each row.
+///
+/// # Statistics
+///
+/// The [`Self::statistics`] field contains statistics for the **full table
schema**,
+/// which includes both file columns and partition columns. When statistics
are set via
+/// [`Self::with_statistics`], exact statistics for partition columns are
automatically
+/// computed from [`Self::partition_values`]:
+///
+/// - `min = max = partition_value` (all rows in a file share the same
partition value)
+/// - `null_count = 0` (partition values extracted from paths are never null)
+/// - `distinct_count = 1` (single distinct value per file for each partition
column)
+///
+/// This enables query optimizers to use partition column bounds for pruning
and planning.
pub struct PartitionedFile {
/// Path for the file (e.g. URL, filesystem path, etc)
pub object_meta: ObjectMeta,
@@ -115,6 +128,10 @@ pub struct PartitionedFile {
///
/// DataFusion relies on these statistics for planning (in particular to
sort file groups),
/// so if they are incorrect, incorrect answers may result.
+ ///
+ /// These statistics cover the full table schema: file columns plus
partition columns.
+ /// When set via [`Self::with_statistics`], partition column statistics
are automatically
+ /// computed from [`Self::partition_values`] with exact
min/max/null_count/distinct_count.
pub statistics: Option<Arc<Statistics>>,
/// An optional field for user defined per object metadata
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
@@ -214,9 +231,38 @@ impl PartitionedFile {
self
}
- // Update the statistics for this file.
- pub fn with_statistics(mut self, statistics: Arc<Statistics>) -> Self {
- self.statistics = Some(statistics);
+ /// Update the statistics for this file.
+ ///
+ /// The provided `statistics` should cover only the file schema columns.
+ /// This method will automatically append exact statistics for partition
columns
+ /// based on `partition_values`:
+ /// - `min = max = partition_value` (all rows have the same value)
+ /// - `null_count = 0` (partition values from paths are never null)
+ /// - `distinct_count = 1` (all rows have the same partition value)
+ pub fn with_statistics(mut self, file_statistics: Arc<Statistics>) -> Self
{
+ if self.partition_values.is_empty() {
+ // No partition columns, use stats as-is
+ self.statistics = Some(file_statistics);
+ } else {
+ // Extend stats with exact partition column statistics
+ let mut stats = Arc::unwrap_or_clone(file_statistics);
+ for partition_value in &self.partition_values {
+ let col_stats = ColumnStatistics {
+ null_count: Precision::Exact(0),
+ max_value: Precision::Exact(partition_value.clone()),
+ min_value: Precision::Exact(partition_value.clone()),
+ distinct_count: Precision::Exact(1),
+ sum_value: Precision::Absent,
+ byte_size: partition_value
+ .data_type()
+ .primitive_width()
+ .map(|w| stats.num_rows.multiply(&Precision::Exact(w)))
+ .unwrap_or_else(|| Precision::Absent),
+ };
+ stats.column_statistics.push(col_stats);
+ }
+ self.statistics = Some(Arc::new(stats));
+ }
self
}
@@ -561,6 +607,70 @@ mod tests {
sut.get_store(url.as_ref()).unwrap();
}
+ #[test]
+ fn test_with_statistics_appends_partition_column_stats() {
+ use crate::PartitionedFile;
+ use datafusion_common::stats::Precision;
+ use datafusion_common::{ColumnStatistics, ScalarValue, Statistics};
+
+ // Create a PartitionedFile with partition values
+ let mut pf = PartitionedFile::new(
+ "test.parquet",
+ 100, // file size
+ );
+ pf.partition_values = vec![
+ ScalarValue::Date32(Some(20148)), // 2025-03-01
+ ];
+
+ // Create file-only statistics (1 column for 'id')
+ let file_stats = Arc::new(Statistics {
+ num_rows: Precision::Exact(2),
+ total_byte_size: Precision::Exact(16),
+ column_statistics: vec![ColumnStatistics {
+ null_count: Precision::Exact(0),
+ max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
+ min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
+ sum_value: Precision::Absent,
+ distinct_count: Precision::Absent,
+ byte_size: Precision::Absent,
+ }],
+ });
+
+ // Call with_statistics - should append partition column stats
+ let pf = pf.with_statistics(file_stats);
+
+ // Verify the statistics now have 2 columns
+ let stats = pf.statistics.unwrap();
+ assert_eq!(
+ stats.column_statistics.len(),
+ 2,
+ "Expected 2 columns (id + date partition)"
+ );
+
+ // Verify partition column statistics
+ let partition_col_stats = &stats.column_statistics[1];
+ assert_eq!(
+ partition_col_stats.null_count,
+ Precision::Exact(0),
+ "Partition column null_count should be Exact(0)"
+ );
+ assert_eq!(
+ partition_col_stats.min_value,
+ Precision::Exact(ScalarValue::Date32(Some(20148))),
+ "Partition column min should match partition value"
+ );
+ assert_eq!(
+ partition_col_stats.max_value,
+ Precision::Exact(ScalarValue::Date32(Some(20148))),
+ "Partition column max should match partition value"
+ );
+ assert_eq!(
+ partition_col_stats.distinct_count,
+ Precision::Exact(1),
+ "Partition column distinct_count should be Exact(1)"
+ );
+ }
+
#[test]
fn test_url_contains() {
let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]