This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 1de33159 fix: ensure CoalescePartitionsExec is enabled for 
IcebergCommitExec (#1723)
1de33159 is described below

commit 1de331593ea877e3a8b570e328d26f8362ad2035
Author: Sergei Grebnov <[email protected]>
AuthorDate: Sat Oct 11 08:44:54 2025 +0000

    fix: ensure CoalescePartitionsExec is enabled for IcebergCommitExec (#1723)
    
    ## Which issue does this PR close?
    
    PR fixes partial writes similar to reported
    [here](https://github.com/spiceai/spiceai/issues/7407). Despite the
    following code to enforce `CoalescePartitionsExec` (single input
    behavior) it can be removed by DataFusion optimizer. Unit test was added
    to demonstrate such behavior.
    
    
    
https://github.com/apache/iceberg-rust/blob/dc349284a4204c1a56af47fb3177ace6f9e899a0/crates/integrations/datafusion/src/table/mod.rs#L196-L210
    
    ```rust
            let write_plan = Arc::new(IcebergWriteExec::new(
                self.table.clone(),
                input,
                self.schema.clone(),
            ));
    
            // Merge the outputs of write_plan into one so we can commit all 
files together
            let coalesce_partitions = 
Arc::new(CoalescePartitionsExec::new(write_plan));
    
            Ok(Arc::new(IcebergCommitExec::new(
                self.table.clone(),
                catalog,
                coalesce_partitions,
                self.schema.clone(),
            )))
    ```
    
    Example plan (observe no `CoalescePartitionsExec`)
    
    ```shell
    explain format tree insert into task_history_sink select * from 
runtime.task_history;
    +---------------+-------------------------------+
    | plan_type     | plan                          |
    +---------------+-------------------------------+
    | physical_plan | ┌───────────────────────────┐ |
    |               | │     IcebergCommitExec     │ |
    |               | │    --------------------   │ |
    |               | │ IcebergCommitExec: table: │ |
    |               | │   team_app.task_history   │ |
    |               | └─────────────┬─────────────┘ |
    |               | ┌─────────────┴─────────────┐ |
    |               | │      IcebergWriteExec     │ |
    |               | │    --------------------   │ |
    |               | │  IcebergWriteExec: table: │ |
    |               | │   team_app.task_history   │ |
    |               | └─────────────┬─────────────┘ |
    |               | ┌─────────────┴─────────────┐ |
    |               | │       ProjectionExec      │ |
    |               | │    --------------------   │ |
    |               | │      captured_output:     │ |
    |               | │      captured_output      │ |
    |               | │                           │ |
    |               | │         end_time:         │ |
    |               | │ CAST(end_time AS Timestamp│ |
    |               | │    (Microsecond, None))   │ |
    |               | │                           │ |
    |               | │       error_message:      │ |
    |               | │       error_message       │ |
    |               | │                           │ |
    |               | │   execution_duration_ms:  │ |
    |               | │   execution_duration_ms   │ |
    |               | │                           │ |
    |               | │        input: input       │ |
    |               | │                           │ |
    |               | │          labels:          │ |
    |               | │ CAST(labels AS Map(Field {│ |
    |               | │     name: "key_value",    │ |
    |               | │     data_type: Struct(    │ |
    |               | │   [Field { name: "key",   │ |
    |               | │      data_type: Utf8,     │ |
    |               | │      nullable: false,     │ |
    |               | │         dict_id: 0,       │ |
    |               | │       dict_is_ordered     │ |
    |               | │    : false, metadata: {   │ |
    |               | │    "PARQUET:field_id":    │ |
    |               | │  "12"} }, Field { name:   │ |
    |               | │    "value", data_type:    │ |
    |               | │    Utf8, nullable: true   │ |
    |               | │            ...            │ |
    |               | └─────────────┬─────────────┘ |
    |               | ┌─────────────┴─────────────┐ |
    |               | │      RepartitionExec      │ |
    |               | │    --------------------   │ |
    |               | │ partition_count(in->out): │ |
    |               | │          1 -> 14          │ |
    |               | │                           │ |
    |               | │    partitioning_scheme:   │ |
    |               | │    RoundRobinBatch(14)    │ |
    |               | └─────────────┬─────────────┘ |
    |               | ┌─────────────┴─────────────┐ |
    |               | │     BytesProcessedExec    │ |
    |               | │    --------------------   │ |
    |               | │     BytesProcessedExec    │ |
    |               | └─────────────┬─────────────┘ |
    |               | ┌─────────────┴─────────────┐ |
    |               | │     SchemaCastScanExec    │ |
    |               | │    --------------------   │ |
    |               | │     SchemaCastScanExec    │ |
    |               | └─────────────┬─────────────┘ |
    |               | ┌─────────────┴─────────────┐ |
    |               | │       DataSourceExec      │ |
    |               | │    --------------------   │ |
    |               | │        bytes: 88176       │ |
    |               | │       format: memory      │ |
    |               | │          rows: 6          │ |
    |               | └───────────────────────────┘ |
    |               |                               |
    +---------------+-------------------------------+
    ```
    
    
    
    ## What changes are included in this PR?
    
    PR adds `required_input_distribution` setting for `IcebergWriteExec` to
    ensure DataFusion coalesces input partitions automatically before
    commit. Similar to [DataFusion
    
DataSinkExec](https://github.com/apache/datafusion/blob/a7b113c45509aae34595b6a62469b3173cac91bd/datafusion/datasource/src/sink.rs#L187)
    
    `test_datafusion_execution_partitioned_source` can be used to ovserve
    behavior before and after
    
    Before
    
    ```rust
    Physical plan:
    IcebergCommitExec: table=test_namespace.test_table_partitioning
      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=3
        IcebergWriteExec: table=test_namespace.test_table_partitioning
          DataSourceExec: partitions=3, partition_sizes=[1, 1, 1]
    ```
    
    After
    
    ```rust
    IcebergCommitExec: table=test_namespace.test_table
      CoalescePartitionsExec
        IcebergWriteExec: table=test_namespace.test_table
          DataSourceExec: partitions=3, partition_sizes=[1, 1, 1]
    ```
    
    ## Are these changes tested?
    
    Added `test_datafusion_execution_partitioned_source` unit test, tested
    manually
---
 .../datafusion/src/physical_plan/commit.rs         | 111 ++++++++++++++++++++-
 .../datafusion/src/physical_plan/write.rs          |  10 ++
 2 files changed, 120 insertions(+), 1 deletion(-)

diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs 
b/crates/integrations/datafusion/src/physical_plan/commit.rs
index 0ee0504f..067049c1 100644
--- a/crates/integrations/datafusion/src/physical_plan/commit.rs
+++ b/crates/integrations/datafusion/src/physical_plan/commit.rs
@@ -136,6 +136,14 @@ impl ExecutionPlan for IcebergCommitExec {
         vec![&self.input]
     }
 
+    fn required_input_distribution(&self) -> 
Vec<datafusion::physical_plan::Distribution> {
+        vec![datafusion::physical_plan::Distribution::SinglePartition; 
self.children().len()]
+    }
+
+    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+        vec![false]
+    }
+
     fn with_new_children(
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
@@ -262,14 +270,16 @@ mod tests {
     use std::fmt;
     use std::sync::Arc;
 
-    use datafusion::arrow::array::{ArrayRef, RecordBatch, StringArray};
+    use datafusion::arrow::array::{ArrayRef, Int32Array, RecordBatch, 
StringArray, UInt64Array};
     use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
+    use datafusion::datasource::MemTable;
     use datafusion::execution::context::TaskContext;
     use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
     use datafusion::physical_plan::common::collect;
     use datafusion::physical_plan::execution_plan::Boundedness;
     use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
     use datafusion::physical_plan::{DisplayAs, DisplayFormatType, 
ExecutionPlan, PlanProperties};
+    use datafusion::prelude::*;
     use futures::StreamExt;
     use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
     use iceberg::spec::{
@@ -280,6 +290,7 @@ mod tests {
 
     use super::*;
     use crate::physical_plan::DATA_FILES_COL_NAME;
+    use crate::table::IcebergTableProvider;
 
     // A mock execution plan that returns record batches with serialized data 
files
     #[derive(Debug)]
@@ -510,4 +521,102 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_datafusion_execution_partitioned_source() -> Result<(), 
Box<dyn std::error::Error>>
+    {
+        let catalog = Arc::new(
+            MemoryCatalogBuilder::default()
+                .load(
+                    "memory",
+                    HashMap::from([(
+                        MEMORY_CATALOG_WAREHOUSE.to_string(),
+                        "memory://root".to_string(),
+                    )]),
+                )
+                .await?,
+        );
+
+        let namespace = NamespaceIdent::new("test_namespace".to_string());
+        catalog.create_namespace(&namespace, HashMap::new()).await?;
+
+        let schema = Schema::builder()
+            .with_schema_id(1)
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                NestedField::required(2, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+            ])
+            .build()?;
+
+        let table_name = "test_table";
+        let table_creation = TableCreation::builder()
+            .name(table_name.to_string())
+            .schema(schema)
+            .location("memory://root/test_table".to_string())
+            .properties(HashMap::new())
+            .build();
+        let _ = catalog.create_table(&namespace, table_creation).await?;
+
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, false),
+        ]));
+
+        let batches: Vec<RecordBatch> = (1..4)
+            .map(|idx| {
+                RecordBatch::try_new(arrow_schema.clone(), vec![
+                    Arc::new(Int32Array::from(vec![idx])) as ArrayRef,
+                    Arc::new(StringArray::from(vec![format!("Name{idx}")])) as 
ArrayRef,
+                ])
+            })
+            .collect::<Result<_, _>>()?;
+
+        // Create DataFusion context with specific partition configuration
+        let mut config = SessionConfig::new();
+        config = config.set_usize("datafusion.execution.target_partitions", 8);
+        let ctx = SessionContext::new_with_config(config);
+
+        // Create multiple partitions - each batch becomes a separate partition
+        let partitions: Vec<Vec<RecordBatch>> =
+            batches.into_iter().map(|batch| vec![batch]).collect();
+        let source_table = 
Arc::new(MemTable::try_new(Arc::clone(&arrow_schema), partitions)?);
+        ctx.register_table("source_table", source_table)?;
+
+        let iceberg_table_provider = IcebergTableProvider::try_new(
+            catalog.clone(),
+            namespace.clone(),
+            table_name.to_string(),
+        )
+        .await?;
+        ctx.register_table("iceberg_table", Arc::new(iceberg_table_provider))?;
+
+        let insert_plan = ctx
+            .sql("INSERT INTO iceberg_table SELECT * FROM source_table")
+            .await?;
+
+        let physical_plan = insert_plan.create_physical_plan().await?;
+
+        let actual_plan = format!(
+            "{}",
+            
datafusion::physical_plan::displayable(physical_plan.as_ref()).indent(false)
+        );
+
+        println!("Physical plan:\n{actual_plan}");
+
+        let expected_plan = "\
+IcebergCommitExec: table=test_namespace.test_table
+  CoalescePartitionsExec
+    IcebergWriteExec: table=test_namespace.test_table
+      DataSourceExec: partitions=3, partition_sizes=[1, 1, 1]";
+
+        assert_eq!(
+            actual_plan.trim(),
+            expected_plan.trim(),
+            "Physical plan does not match 
expected\n\nExpected:\n{}\n\nActual:\n{}",
+            expected_plan.trim(),
+            actual_plan.trim()
+        );
+
+        Ok(())
+    }
 }
diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs 
b/crates/integrations/datafusion/src/physical_plan/write.rs
index 712da92b..759f1a8d 100644
--- a/crates/integrations/datafusion/src/physical_plan/write.rs
+++ b/crates/integrations/datafusion/src/physical_plan/write.rs
@@ -144,6 +144,16 @@ impl ExecutionPlan for IcebergWriteExec {
         self
     }
 
+    /// Prevents the introduction of additional `RepartitionExec` and 
processing input in parallel.
+    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+        vec![false]
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        // Maintains ordering in the sense that the written file will reflect 
the ordering of the input.
+        vec![true; self.children().len()]
+    }
+
     fn properties(&self) -> &PlanProperties {
         &self.plan_properties
     }

Reply via email to