This is an automated email from the ASF dual-hosted git repository.
xudong963 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6368daf0b6 Implement `partition_statistics` API for `RepartitionExec`
(#17061)
6368daf0b6 is described below
commit 6368daf0b60611d5c6388b21def4d623b4c71fdc
Author: Liam Bao <[email protected]>
AuthorDate: Sun Aug 24 21:51:51 2025 -0400
Implement `partition_statistics` API for `RepartitionExec` (#17061)
* Implement `partition_statistics` API for `RepartitionExec`
* Test execution
* Change the partition number for the test
* Make all column stats absent
* Use `ColumnStatistics::new_unknown()`
* Add test case for 0 partitions
* Return unknown statistics for 0 partitions
---
.../physical_optimizer/partition_statistics.rs | 102 +++++++++++++++++++++
datafusion/physical-plan/src/repartition/mod.rs | 43 ++++++++-
2 files changed, 141 insertions(+), 4 deletions(-)
diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
index bfc09340cc..df1032e065 100644
--- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs
+++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs
@@ -33,6 +33,7 @@ mod test {
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_physical_expr::aggregate::AggregateExprBuilder;
use datafusion_physical_expr::expressions::{binary, col, lit, Column};
+ use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use datafusion_physical_plan::aggregates::{
@@ -47,6 +48,7 @@ mod test {
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::projection::ProjectionExec;
+ use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::union::UnionExec;
use datafusion_physical_plan::{
@@ -761,4 +763,104 @@ mod test {
Ok(())
}
+
+ #[tokio::test]
+ async fn test_statistic_by_partition_of_repartition() -> Result<()> {
+ let scan = create_scan_exec_with_statistics(None, Some(2)).await;
+
+ let repartition = Arc::new(RepartitionExec::try_new(
+ scan.clone(),
+ Partitioning::RoundRobinBatch(3),
+ )?);
+
+ let statistics = (0..repartition.partitioning().partition_count())
+ .map(|idx| repartition.partition_statistics(Some(idx)))
+ .collect::<Result<Vec<_>>>()?;
+ assert_eq!(statistics.len(), 3);
+
+ let expected_stats = Statistics {
+ num_rows: Precision::Inexact(1),
+ total_byte_size: Precision::Inexact(73),
+ column_statistics: vec![
+ ColumnStatistics::new_unknown(),
+ ColumnStatistics::new_unknown(),
+ ],
+ };
+
+ // All partitions should have the same statistics
+ for stat in statistics.iter() {
+ assert_eq!(stat, &expected_stats);
+ }
+
+ // Verify that the result has exactly 3 partitions
+ let partitions = execute_stream_partitioned(
+ repartition.clone(),
+ Arc::new(TaskContext::default()),
+ )?;
+ assert_eq!(partitions.len(), 3);
+
+ // Collect row counts from each partition
+ let mut partition_row_counts = Vec::new();
+ for partition_stream in partitions.into_iter() {
+ let results: Vec<RecordBatch> =
partition_stream.try_collect().await?;
+ let total_rows: usize = results.iter().map(|batch|
batch.num_rows()).sum();
+ partition_row_counts.push(total_rows);
+ }
+ assert_eq!(partition_row_counts.len(), 3);
+ assert_eq!(partition_row_counts[0], 2);
+ assert_eq!(partition_row_counts[1], 2);
+ assert_eq!(partition_row_counts[2], 0);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_statistic_by_partition_of_repartition_invalid_partition() ->
Result<()>
+ {
+ let scan = create_scan_exec_with_statistics(None, Some(2)).await;
+
+ let repartition = Arc::new(RepartitionExec::try_new(
+ scan.clone(),
+ Partitioning::RoundRobinBatch(2),
+ )?);
+
+ let result = repartition.partition_statistics(Some(2));
+ assert!(result.is_err());
+ let error = result.unwrap_err();
+ assert!(error
+ .to_string()
+ .contains("RepartitionExec invalid partition 2 (expected less than
2)"));
+
+ let partitions = execute_stream_partitioned(
+ repartition.clone(),
+ Arc::new(TaskContext::default()),
+ )?;
+ assert_eq!(partitions.len(), 2);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_statistic_by_partition_of_repartition_zero_partitions() ->
Result<()> {
+ let scan = create_scan_exec_with_statistics(None, Some(2)).await;
+ let scan_schema = scan.schema();
+
+ // Create a repartition with 0 partitions
+ let repartition = Arc::new(RepartitionExec::try_new(
+ Arc::new(EmptyExec::new(scan_schema.clone())),
+ Partitioning::RoundRobinBatch(0),
+ )?);
+
+ let result = repartition.partition_statistics(Some(0))?;
+ assert_eq!(result, Statistics::new_unknown(&scan_schema));
+
+ // Verify that the result has exactly 0 partitions
+ let partitions = execute_stream_partitioned(
+ repartition.clone(),
+ Arc::new(TaskContext::default()),
+ )?;
+ assert_eq!(partitions.len(), 0);
+
+ Ok(())
+ }
}
diff --git a/datafusion/physical-plan/src/repartition/mod.rs
b/datafusion/physical-plan/src/repartition/mod.rs
index 754a208126..3cd6ee6c1a 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -45,8 +45,9 @@ use arrow::array::{PrimitiveArray, RecordBatch,
RecordBatchOptions};
use arrow::compute::take_arrays;
use arrow::datatypes::{SchemaRef, UInt32Type};
use datafusion_common::config::ConfigOptions;
+use datafusion_common::stats::Precision;
use datafusion_common::utils::transpose;
-use datafusion_common::{internal_err, HashMap};
+use datafusion_common::{internal_err, ColumnStatistics, HashMap};
use datafusion_common::{not_impl_err, DataFusionError, Result};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::memory_pool::MemoryConsumer;
@@ -755,10 +756,44 @@ impl ExecutionPlan for RepartitionExec {
}
fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
- if partition.is_none() {
- self.input.partition_statistics(None)
+ if let Some(partition) = partition {
+ let partition_count = self.partitioning().partition_count();
+ if partition_count == 0 {
+ return Ok(Statistics::new_unknown(&self.schema()));
+ }
+
+ if partition >= partition_count {
+ return internal_err!(
+ "RepartitionExec invalid partition {} (expected less than
{})",
+ partition,
+ self.partitioning().partition_count()
+ );
+ }
+
+ let mut stats = self.input.partition_statistics(None)?;
+
+ // Distribute statistics across partitions
+ stats.num_rows = stats
+ .num_rows
+ .get_value()
+ .map(|rows| Precision::Inexact(rows / partition_count))
+ .unwrap_or(Precision::Absent);
+ stats.total_byte_size = stats
+ .total_byte_size
+ .get_value()
+ .map(|bytes| Precision::Inexact(bytes / partition_count))
+ .unwrap_or(Precision::Absent);
+
+ // Make all column stats unknown
+ stats.column_statistics = stats
+ .column_statistics
+ .iter()
+ .map(|_| ColumnStatistics::new_unknown())
+ .collect();
+
+ Ok(stats)
} else {
- Ok(Statistics::new_unknown(&self.schema()))
+ self.input.partition_statistics(None)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]