This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 6fa878590 feat(datafusion): Apply SortExec when writing in clustered 
mode (#2005)
6fa878590 is described below

commit 6fa878590a70f4078e6fbefa2751513662d421b4
Author: Shawn Chang <[email protected]>
AuthorDate: Wed Jan 7 17:42:45 2026 -0800

    feat(datafusion): Apply SortExec when writing in clustered mode (#2005)
    
    ## Which issue does this PR close?
    
    - Closes #1540
    
    ## What changes are included in this PR?
    - When writing in clustered mode, use `sort_by_partition` to sort the
    data so the clustered writer can comsume it
    
    
    ## Are these changes tested?
    - Added simple uts to verify that the `SortExec` is applied correctly
---
 .../datafusion/src/physical_plan/sort.rs           |   3 -
 crates/integrations/datafusion/src/table/mod.rs    | 178 ++++++++++++++++++++-
 2 files changed, 177 insertions(+), 4 deletions(-)

diff --git a/crates/integrations/datafusion/src/physical_plan/sort.rs 
b/crates/integrations/datafusion/src/physical_plan/sort.rs
index ede254753..587ab120c 100644
--- a/crates/integrations/datafusion/src/physical_plan/sort.rs
+++ b/crates/integrations/datafusion/src/physical_plan/sort.rs
@@ -42,9 +42,6 @@ use iceberg::arrow::PROJECTED_PARTITION_VALUE_COLUMN;
 /// # Returns
 /// * `Ok(Arc<dyn ExecutionPlan>)` - A SortExec that sorts by partition values
 /// * `Err` - If the partition column is not found
-///
-/// TODO remove dead_code mark when integrating with insert_into
-#[allow(dead_code)]
 pub(crate) fn sort_by_partition(input: Arc<dyn ExecutionPlan>) -> 
DFResult<Arc<dyn ExecutionPlan>> {
     let schema = input.schema();
 
diff --git a/crates/integrations/datafusion/src/table/mod.rs 
b/crates/integrations/datafusion/src/table/mod.rs
index 86a79611b..ad616542a 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -44,6 +44,7 @@ use datafusion::physical_plan::ExecutionPlan;
 use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use iceberg::arrow::schema_to_arrow_schema;
 use iceberg::inspect::MetadataTableType;
+use iceberg::spec::TableProperties;
 use iceberg::table::Table;
 use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent};
 use metadata_table::IcebergMetadataTableProvider;
@@ -53,6 +54,7 @@ use crate::physical_plan::commit::IcebergCommitExec;
 use crate::physical_plan::project::project_with_partition;
 use crate::physical_plan::repartition::repartition;
 use crate::physical_plan::scan::IcebergTableScan;
+use crate::physical_plan::sort::sort_by_partition;
 use crate::physical_plan::write::IcebergWriteExec;
 
 /// Catalog-backed table provider with automatic metadata refresh.
@@ -185,9 +187,38 @@ impl TableProvider for IcebergTableProvider {
         let repartitioned_plan =
             repartition(plan_with_partition, table.metadata_ref(), 
target_partitions)?;
 
+        // Apply sort node when it's not fanout mode
+        let fanout_enabled = table
+            .metadata()
+            .properties()
+            .get(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED)
+            .map(|value| {
+                value
+                    .parse::<bool>()
+                    .map_err(|e| {
+                        Error::new(
+                            ErrorKind::DataInvalid,
+                            format!(
+                                "Invalid value for {}, expected 'true' or 
'false'",
+                                
TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
+                            ),
+                        )
+                        .with_source(e)
+                    })
+                    .map_err(to_datafusion_error)
+            })
+            .transpose()?
+            
.unwrap_or(TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT);
+
+        let write_input = if fanout_enabled {
+            repartitioned_plan
+        } else {
+            sort_by_partition(repartitioned_plan)?
+        };
+
         let write_plan = Arc::new(IcebergWriteExec::new(
             table.clone(),
-            repartitioned_plan,
+            write_input,
             self.schema.clone(),
         ));
 
@@ -321,6 +352,7 @@ mod tests {
     use std::sync::Arc;
 
     use datafusion::common::Column;
+    use datafusion::physical_plan::ExecutionPlan;
     use datafusion::prelude::SessionContext;
     use iceberg::io::FileIO;
     use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
@@ -598,4 +630,148 @@ mod tests {
             assert_eq!(logical_field.data_type(), physical_field.data_type());
         }
     }
+
+    async fn get_partitioned_test_catalog_and_table(
+        fanout_enabled: Option<bool>,
+    ) -> (Arc<dyn Catalog>, NamespaceIdent, String, TempDir) {
+        use iceberg::spec::{Transform, UnboundPartitionSpec};
+
+        let temp_dir = TempDir::new().unwrap();
+        let warehouse_path = temp_dir.path().to_str().unwrap().to_string();
+
+        let catalog = MemoryCatalogBuilder::default()
+            .load(
+                "memory",
+                HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), 
warehouse_path.clone())]),
+            )
+            .await
+            .unwrap();
+
+        let namespace = NamespaceIdent::new("test_ns".to_string());
+        catalog
+            .create_namespace(&namespace, HashMap::new())
+            .await
+            .unwrap();
+
+        let schema = Schema::builder()
+            .with_schema_id(0)
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                NestedField::required(2, "category", 
Type::Primitive(PrimitiveType::String)).into(),
+            ])
+            .build()
+            .unwrap();
+
+        let partition_spec = UnboundPartitionSpec::builder()
+            .with_spec_id(0)
+            .add_partition_field(2, "category", Transform::Identity)
+            .unwrap()
+            .build();
+
+        let mut properties = HashMap::new();
+        if let Some(enabled) = fanout_enabled {
+            properties.insert(
+                
iceberg::spec::TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED
+                    .to_string(),
+                enabled.to_string(),
+            );
+        }
+
+        let table_creation = TableCreation::builder()
+            .name("partitioned_table".to_string())
+            .location(format!("{warehouse_path}/partitioned_table"))
+            .schema(schema)
+            .partition_spec(partition_spec)
+            .properties(properties)
+            .build();
+
+        catalog
+            .create_table(&namespace, table_creation)
+            .await
+            .unwrap();
+
+        (
+            Arc::new(catalog),
+            namespace,
+            "partitioned_table".to_string(),
+            temp_dir,
+        )
+    }
+
+    /// Helper to check if a plan contains a SortExec node
+    fn plan_contains_sort(plan: &Arc<dyn ExecutionPlan>) -> bool {
+        if plan.name() == "SortExec" {
+            return true;
+        }
+        for child in plan.children() {
+            if plan_contains_sort(child) {
+                return true;
+            }
+        }
+        false
+    }
+
+    #[tokio::test]
+    async fn test_insert_plan_fanout_enabled_no_sort() {
+        use datafusion::datasource::TableProvider;
+        use datafusion::logical_expr::dml::InsertOp;
+        use datafusion::physical_plan::empty::EmptyExec;
+
+        // When fanout is enabled (default), no sort node should be added
+        let (catalog, namespace, table_name, _temp_dir) =
+            get_partitioned_test_catalog_and_table(Some(true)).await;
+
+        let provider =
+            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), 
table_name.clone())
+                .await
+                .unwrap();
+
+        let ctx = SessionContext::new();
+        let input_schema = provider.schema();
+        let input = Arc::new(EmptyExec::new(input_schema)) as Arc<dyn 
ExecutionPlan>;
+
+        let state = ctx.state();
+        let insert_plan = provider
+            .insert_into(&state, input, InsertOp::Append)
+            .await
+            .unwrap();
+
+        // With fanout enabled, there should be no SortExec in the plan
+        assert!(
+            !plan_contains_sort(&insert_plan),
+            "Plan should NOT contain SortExec when fanout is enabled"
+        );
+    }
+
+    #[tokio::test]
+    async fn test_insert_plan_fanout_disabled_has_sort() {
+        use datafusion::datasource::TableProvider;
+        use datafusion::logical_expr::dml::InsertOp;
+        use datafusion::physical_plan::empty::EmptyExec;
+
+        // When fanout is disabled, a sort node should be added
+        let (catalog, namespace, table_name, _temp_dir) =
+            get_partitioned_test_catalog_and_table(Some(false)).await;
+
+        let provider =
+            IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), 
table_name.clone())
+                .await
+                .unwrap();
+
+        let ctx = SessionContext::new();
+        let input_schema = provider.schema();
+        let input = Arc::new(EmptyExec::new(input_schema)) as Arc<dyn 
ExecutionPlan>;
+
+        let state = ctx.state();
+        let insert_plan = provider
+            .insert_into(&state, input, InsertOp::Append)
+            .await
+            .unwrap();
+
+        // With fanout disabled, there should be a SortExec in the plan
+        assert!(
+            plan_contains_sort(&insert_plan),
+            "Plan should contain SortExec when fanout is disabled"
+        );
+    }
 }

Reply via email to