This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new ec545f2a Improve `IcebergCommitExec` to correctly populate 
properties/schema (#1721)
ec545f2a is described below

commit ec545f2a9f50dcbe694b2196e04b32de24f7a411
Author: Sergei Grebnov <[email protected]>
AuthorDate: Tue Oct 14 10:11:23 2025 +0000

    Improve `IcebergCommitExec` to correctly populate properties/schema (#1721)
    
    ## Which issue does this PR close?
    
    PR fixes schema mismatch errors (similar to the example shown below)
    when using `IcebergCommitExec` with DataFusion. This occurs when
    `IcebergCommitExec` is not the top-level plan but is instead wrapped as
    the input to another plan node, for example when added by a custom
    optimization rule (cache invalidation step for example).
    
    >An internal error occurred. Internal error: PhysicalOptimizer rule
    'OutputRequirements' failed. Schema mismatch. Expected original schema:
    Schema { fields: [Field { name: "count", data_type: UInt64, nullable:
    false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {}
    }, got new schema: Schema { fields: [Field { name: "r_regionkey",
    data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false,
    metadata: {"PARQUET:field_id": "1"} }, Field { name: "r_name",
    data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false,
    metadata: {"PARQUET:field_id": "2"} }, Field { name: "r_comment",
    data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false,
    metadata: {"PARQUET:field_id": "3"} }], metadata: {} }.
    This issue was likely caused by a bug in DataFusion's code. Please help
    us to resolve this by filing a bug report in our issue tracker:
    https://github.com/apache/datafusion/issues
    
    ## What changes are included in this PR?
    
    PR updates `compute_properties` logic to use target (output) schema
    instead of input schema. Below is example DataFusion `DataSinkExec`
    implementation demonstrating that properties must be created based on
    target schema, not input.
    
    
    
https://github.com/apache/datafusion/blob/4eacb6046773b759dae0b3d801fe8cb1c6b65c0f/datafusion/datasource/src/sink.rs#L101C1-L117C6
    
    ```rust
    impl DataSinkExec {
        /// Create a plan to write to `sink`
        pub fn new(
            input: Arc<dyn ExecutionPlan>,
            sink: Arc<dyn DataSink>,
            sort_order: Option<LexRequirement>,
        ) -> Self {
            let count_schema = make_count_schema();
            let cache = Self::create_schema(&input, count_schema);
            Self {
                input,
                sink,
                count_schema: make_count_schema(),
                sort_order,
                cache,
            }
        }
    ....
    
        fn properties(&self) -> &PlanProperties {
            &self.cache
        }
    ```
    
    ## Are these changes tested?
    
    Tested manually, expanded existing test to verify output schema, tested
    as part of [Spice Iceberg write automated
    
tests](https://github.com/spiceai/spiceai/blob/trunk/crates/runtime/tests/iceberg/write/mod.rs)
---
 crates/integrations/datafusion/src/physical_plan/commit.rs | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs 
b/crates/integrations/datafusion/src/physical_plan/commit.rs
index 067049c1..48373f11 100644
--- a/crates/integrations/datafusion/src/physical_plan/commit.rs
+++ b/crates/integrations/datafusion/src/physical_plan/commit.rs
@@ -57,14 +57,16 @@ impl IcebergCommitExec {
         input: Arc<dyn ExecutionPlan>,
         schema: ArrowSchemaRef,
     ) -> Self {
-        let plan_properties = Self::compute_properties(schema.clone());
+        let count_schema = Self::make_count_schema();
+
+        let plan_properties = 
Self::compute_properties(Arc::clone(&count_schema));
 
         Self {
             table,
             catalog,
             input,
             schema,
-            count_schema: Self::make_count_schema(),
+            count_schema,
             plan_properties,
         }
     }
@@ -469,6 +471,9 @@ mod tests {
         let commit_exec =
             IcebergCommitExec::new(table.clone(), catalog.clone(), input_exec, 
arrow_schema);
 
+        // Verify Execution Plan schema matches the count schema
+        assert_eq!(commit_exec.schema(), 
IcebergCommitExec::make_count_schema());
+
         // Execute the commit exec
         let task_ctx = Arc::new(TaskContext::default());
         let stream = commit_exec.execute(0, task_ctx)?;

Reply via email to