This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 5724fc556 feat(datafusion): Add `sort_by_partition` to sort the input
partitioned data (#1618)
5724fc556 is described below
commit 5724fc556ed8699dfdba5fb657ea5dd9a733cbf1
Author: Shawn Chang <[email protected]>
AuthorDate: Mon Dec 1 03:42:10 2025 -0800
feat(datafusion): Add `sort_by_partition` to sort the input partitioned
data (#1618)
---
.../datafusion/src/physical_plan/mod.rs | 1 +
.../datafusion/src/physical_plan/sort.rs | 244 +++++++++++++++++++++
2 files changed, 245 insertions(+)
diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs
b/crates/integrations/datafusion/src/physical_plan/mod.rs
index eb58082fe..5a9845cde 100644
--- a/crates/integrations/datafusion/src/physical_plan/mod.rs
+++ b/crates/integrations/datafusion/src/physical_plan/mod.rs
@@ -21,6 +21,7 @@ pub(crate) mod metadata_scan;
pub(crate) mod project;
pub(crate) mod repartition;
pub(crate) mod scan;
+pub(crate) mod sort;
pub(crate) mod write;
pub(crate) const DATA_FILES_COL_NAME: &str = "data_files";
diff --git a/crates/integrations/datafusion/src/physical_plan/sort.rs
b/crates/integrations/datafusion/src/physical_plan/sort.rs
new file mode 100644
index 000000000..2a57e16e4
--- /dev/null
+++ b/crates/integrations/datafusion/src/physical_plan/sort.rs
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Partition-based sorting for Iceberg tables.
+
+use std::sync::Arc;
+
+use datafusion::arrow::compute::SortOptions;
+use datafusion::common::Result as DFResult;
+use datafusion::error::DataFusionError;
+use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::physical_plan::expressions::Column;
+use datafusion::physical_plan::sorts::sort::SortExec;
+use iceberg::arrow::PROJECTED_PARTITION_VALUE_COLUMN;
+
+/// Sorts an ExecutionPlan by partition values for Iceberg tables.
+///
+/// This function takes an input ExecutionPlan that has been extended with
partition values
+/// (via `project_with_partition`) and returns a SortExec that sorts by the
partition column.
+/// The partition values are expected to be in a struct column named
`PROJECTED_PARTITION_VALUE_COLUMN`.
+///
+/// For unpartitioned tables or plans without the partition column, returns an
error.
+///
+/// # Arguments
+/// * `input` - The input ExecutionPlan with projected partition values
+///
+/// # Returns
+/// * `Ok(Arc<dyn ExecutionPlan>)` - A SortExec that sorts by partition values
+/// * `Err` - If the partition column is not found
+///
+/// TODO remove dead_code mark when integrating with insert_into
+#[allow(dead_code)]
+pub(crate) fn sort_by_partition(input: Arc<dyn ExecutionPlan>) ->
DFResult<Arc<dyn ExecutionPlan>> {
+ let schema = input.schema();
+
+ // Find the partition column in the schema
+ let (partition_column_index, _partition_field) = schema
+ .column_with_name(PROJECTED_PARTITION_VALUE_COLUMN)
+ .ok_or_else(|| {
+ DataFusionError::Plan(format!(
+ "Partition column '{}' not found in schema. Ensure the plan
has been extended with partition values using project_with_partition.",
+ PROJECTED_PARTITION_VALUE_COLUMN
+ ))
+ })?;
+
+ // Create a single sort expression for the partition column
+ let column_expr = Arc::new(Column::new(
+ PROJECTED_PARTITION_VALUE_COLUMN,
+ partition_column_index,
+ ));
+
+ let sort_expr = PhysicalSortExpr {
+ expr: column_expr,
+ options: SortOptions::default(), // Ascending, nulls last
+ };
+
+ // Create a SortExec with preserve_partitioning=true to ensure the output
partitioning
+ // is the same as the input partitioning, and the data is sorted within
each partition
+ let lex_ordering = LexOrdering::new(vec![sort_expr]).ok_or_else(|| {
+ DataFusionError::Plan("Failed to create LexOrdering from sort
expression".to_string())
+ })?;
+
+ let sort_exec = SortExec::new(lex_ordering,
input).with_preserve_partitioning(true);
+
+ Ok(Arc::new(sort_exec))
+}
+
+#[cfg(test)]
+mod tests {
+ use datafusion::arrow::array::{Int32Array, RecordBatch, StringArray,
StructArray};
+ use datafusion::arrow::datatypes::{DataType, Field, Fields, Schema as
ArrowSchema};
+ use datafusion::datasource::{MemTable, TableProvider};
+ use datafusion::prelude::SessionContext;
+
+ use super::*;
+
+ #[tokio::test]
+ async fn test_sort_by_partition_basic() {
+ // Create a schema with a partition column
+ let partition_fields =
+ Fields::from(vec![Field::new("id_partition", DataType::Int32,
false)]);
+
+ let schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, false),
+ Field::new(
+ PROJECTED_PARTITION_VALUE_COLUMN,
+ DataType::Struct(partition_fields.clone()),
+ false,
+ ),
+ ]));
+
+ // Create test data with partition values
+ let id_array = Arc::new(Int32Array::from(vec![3, 1, 2]));
+ let name_array = Arc::new(StringArray::from(vec!["c", "a", "b"]));
+ let partition_array = Arc::new(StructArray::from(vec![(
+ Arc::new(Field::new("id_partition", DataType::Int32, false)),
+ Arc::new(Int32Array::from(vec![3, 1, 2])) as _,
+ )]));
+
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![id_array, name_array,
partition_array])
+ .unwrap();
+
+ let ctx = SessionContext::new();
+ let mem_table = MemTable::try_new(schema.clone(),
vec![vec![batch]]).unwrap();
+ let input = mem_table.scan(&ctx.state(), None, &[],
None).await.unwrap();
+
+ // Apply sort
+ let sorted_plan = sort_by_partition(input).unwrap();
+
+ // Execute and verify
+ let result = datafusion::physical_plan::collect(sorted_plan,
ctx.task_ctx())
+ .await
+ .unwrap();
+
+ assert_eq!(result.len(), 1);
+ let result_batch = &result[0];
+
+ let id_col = result_batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+
+ // Verify data is sorted by partition value
+ assert_eq!(id_col.value(0), 1);
+ assert_eq!(id_col.value(1), 2);
+ assert_eq!(id_col.value(2), 3);
+ }
+
+ #[tokio::test]
+ async fn test_sort_by_partition_missing_column() {
+ let schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, false),
+ ]));
+
+ let batch = RecordBatch::try_new(schema.clone(), vec![
+ Arc::new(Int32Array::from(vec![1, 2, 3])),
+ Arc::new(StringArray::from(vec!["a", "b", "c"])),
+ ])
+ .unwrap();
+
+ let ctx = SessionContext::new();
+ let mem_table = MemTable::try_new(schema.clone(),
vec![vec![batch]]).unwrap();
+ let input = mem_table.scan(&ctx.state(), None, &[],
None).await.unwrap();
+
+ let result = sort_by_partition(input);
+ assert!(result.is_err());
+ assert!(
+ result
+ .unwrap_err()
+ .to_string()
+ .contains("Partition column '_partition' not found")
+ );
+ }
+
+ #[tokio::test]
+ async fn test_sort_by_partition_multi_field() {
+ // Test with multiple partition fields in the struct
+ let partition_fields = Fields::from(vec![
+ Field::new("year", DataType::Int32, false),
+ Field::new("month", DataType::Int32, false),
+ ]);
+
+ let schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("data", DataType::Utf8, false),
+ Field::new(
+ PROJECTED_PARTITION_VALUE_COLUMN,
+ DataType::Struct(partition_fields.clone()),
+ false,
+ ),
+ ]));
+
+ // Create test data with partition values (year, month)
+ let id_array = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
+ let data_array = Arc::new(StringArray::from(vec!["a", "b", "c", "d"]));
+
+ // Partition values: (2024, 2), (2024, 1), (2023, 12), (2024, 1)
+ let year_array = Arc::new(Int32Array::from(vec![2024, 2024, 2023,
2024]));
+ let month_array = Arc::new(Int32Array::from(vec![2, 1, 12, 1]));
+
+ let partition_array = Arc::new(StructArray::from(vec![
+ (
+ Arc::new(Field::new("year", DataType::Int32, false)),
+ year_array as _,
+ ),
+ (
+ Arc::new(Field::new("month", DataType::Int32, false)),
+ month_array as _,
+ ),
+ ]));
+
+ let batch =
+ RecordBatch::try_new(schema.clone(), vec![id_array, data_array,
partition_array])
+ .unwrap();
+
+ let ctx = SessionContext::new();
+ let mem_table = MemTable::try_new(schema.clone(),
vec![vec![batch]]).unwrap();
+ let input = mem_table.scan(&ctx.state(), None, &[],
None).await.unwrap();
+
+ // Apply sort
+ let sorted_plan = sort_by_partition(input).unwrap();
+
+ // Execute and verify
+ let result = datafusion::physical_plan::collect(sorted_plan,
ctx.task_ctx())
+ .await
+ .unwrap();
+
+ assert_eq!(result.len(), 1);
+ let result_batch = &result[0];
+
+ let id_col = result_batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+
+ // Verify data is sorted by partition value (struct comparison)
+ // Expected order: (2023, 12), (2024, 1), (2024, 1), (2024, 2)
+ // Which corresponds to ids: 3, 2, 4, 1
+ assert_eq!(id_col.value(0), 3);
+ assert_eq!(id_col.value(1), 2);
+ assert_eq!(id_col.value(2), 4);
+ assert_eq!(id_col.value(3), 1);
+ }
+}