This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 17e4351a feat(datafusion): Add IcebergCommitExec to commit the written 
data files (#1588)
17e4351a is described below

commit 17e4351a4a4d52c8d33f7f791384bae467186cd7
Author: Shawn Chang <[email protected]>
AuthorDate: Fri Aug 8 03:10:07 2025 -0700

    feat(datafusion): Add IcebergCommitExec to commit the written data files 
(#1588)
    
    ## Which issue does this PR close?
    
    - Closes #1546
    - Draft: #1511
    
    ## What changes are included in this PR?
    - Added `IcebergCommitExec` to help commit the data files written and
    return the number of rows written
    
    
    ## Are these changes tested?
    Added ut
---
 .../datafusion/src/physical_plan/commit.rs         | 506 +++++++++++++++++++++
 .../datafusion/src/physical_plan/mod.rs            |   4 +
 2 files changed, 510 insertions(+)

diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs 
b/crates/integrations/datafusion/src/physical_plan/commit.rs
new file mode 100644
index 00000000..1ce8fa8d
--- /dev/null
+++ b/crates/integrations/datafusion/src/physical_plan/commit.rs
@@ -0,0 +1,506 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::fmt::{Debug, Formatter};
+use std::sync::Arc;
+
+use datafusion::arrow::array::{Array, ArrayRef, RecordBatch, StringArray, 
UInt64Array};
+use datafusion::arrow::datatypes::{
+    DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
+};
+use datafusion::common::{DataFusionError, Result as DFResult};
+use datafusion::execution::{SendableRecordBatchStream, TaskContext};
+use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
+use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, 
PlanProperties};
+use futures::StreamExt;
+use iceberg::Catalog;
+use iceberg::spec::{DataFile, deserialize_data_file_from_json};
+use iceberg::table::Table;
+use iceberg::transaction::{ApplyTransactionAction, Transaction};
+
+use crate::physical_plan::DATA_FILES_COL_NAME;
+use crate::to_datafusion_error;
+
+/// IcebergCommitExec is responsible for collecting the files written and use
+/// [`Transaction::fast_append`] to commit the data files written.
+#[derive(Debug)]
+pub(crate) struct IcebergCommitExec {
+    table: Table,
+    catalog: Arc<dyn Catalog>,
+    input: Arc<dyn ExecutionPlan>,
+    schema: ArrowSchemaRef,
+    count_schema: ArrowSchemaRef,
+    plan_properties: PlanProperties,
+}
+
+impl IcebergCommitExec {
+    pub fn new(
+        table: Table,
+        catalog: Arc<dyn Catalog>,
+        input: Arc<dyn ExecutionPlan>,
+        schema: ArrowSchemaRef,
+    ) -> Self {
+        let plan_properties = Self::compute_properties(schema.clone());
+
+        Self {
+            table,
+            catalog,
+            input,
+            schema,
+            count_schema: Self::make_count_schema(),
+            plan_properties,
+        }
+    }
+
+    // Compute the plan properties for this execution plan
+    fn compute_properties(schema: ArrowSchemaRef) -> PlanProperties {
+        PlanProperties::new(
+            EquivalenceProperties::new(schema),
+            Partitioning::UnknownPartitioning(1),
+            EmissionType::Final,
+            Boundedness::Bounded,
+        )
+    }
+
+    // Create a record batch with just the count of rows written
+    fn make_count_batch(count: u64) -> DFResult<RecordBatch> {
+        let count_array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef;
+
+        RecordBatch::try_from_iter_with_nullable(vec![("count", count_array, 
false)]).map_err(|e| {
+            DataFusionError::ArrowError(e, Some("Failed to make count 
batch!".to_string()))
+        })
+    }
+
+    fn make_count_schema() -> ArrowSchemaRef {
+        // Define a schema.
+        Arc::new(ArrowSchema::new(vec![Field::new(
+            "count",
+            DataType::UInt64,
+            false,
+        )]))
+    }
+}
+
+impl DisplayAs for IcebergCommitExec {
+    fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "IcebergCommitExec: table={}", 
self.table.identifier())
+            }
+            DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "IcebergCommitExec: table={}, schema={:?}",
+                    self.table.identifier(),
+                    self.schema
+                )
+            }
+            DisplayFormatType::TreeRender => {
+                write!(f, "IcebergCommitExec: table={}", 
self.table.identifier())
+            }
+        }
+    }
+}
+
+impl ExecutionPlan for IcebergCommitExec {
+    fn name(&self) -> &str {
+        "IcebergCommitExec"
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        &self.plan_properties
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.input]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> DFResult<Arc<dyn ExecutionPlan>> {
+        if children.len() != 1 {
+            return Err(DataFusionError::Internal(format!(
+                "IcebergCommitExec expects exactly one child, but provided {}",
+                children.len()
+            )));
+        }
+
+        Ok(Arc::new(IcebergCommitExec::new(
+            self.table.clone(),
+            self.catalog.clone(),
+            children[0].clone(),
+            self.schema.clone(),
+        )))
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> DFResult<SendableRecordBatchStream> {
+        // IcebergCommitExec only has one partition (partition 0)
+        if partition != 0 {
+            return Err(DataFusionError::Internal(format!(
+                "IcebergCommitExec only has one partition, but got partition 
{}",
+                partition
+            )));
+        }
+
+        let table = self.table.clone();
+        let input_plan = self.input.clone();
+        let count_schema = Arc::clone(&self.count_schema);
+
+        // todo revisit this
+        let spec_id = self.table.metadata().default_partition_spec_id();
+        let partition_type = 
self.table.metadata().default_partition_type().clone();
+        let current_schema = self.table.metadata().current_schema().clone();
+
+        let catalog = Arc::clone(&self.catalog);
+
+        // Process the input streams from all partitions and commit the data 
files
+        let stream = futures::stream::once(async move {
+            let mut data_files: Vec<DataFile> = Vec::new();
+            let mut total_record_count: u64 = 0;
+
+            // Execute and collect results from the input coalesced plan
+            let mut batch_stream = input_plan.execute(0, context)?;
+
+            while let Some(batch_result) = batch_stream.next().await {
+                let batch = batch_result?;
+
+                let files_array = batch
+                    .column_by_name(DATA_FILES_COL_NAME)
+                    .ok_or_else(|| {
+                        DataFusionError::Internal(
+                            "Expected 'data_files' column in input 
batch".to_string(),
+                        )
+                    })?
+                    .as_any()
+                    .downcast_ref::<StringArray>()
+                    .ok_or_else(|| {
+                        DataFusionError::Internal(
+                            "Expected 'data_files' column to be 
StringArray".to_string(),
+                        )
+                    })?;
+
+                // Deserialize all data files from the StringArray
+                let batch_files: Vec<DataFile> = files_array
+                    .into_iter()
+                    .flatten()
+                    .map(|f| -> DFResult<DataFile> {
+                        // Parse JSON to DataFileSerde and convert to DataFile
+                        deserialize_data_file_from_json(
+                            f,
+                            spec_id,
+                            &partition_type,
+                            &current_schema,
+                        )
+                        .map_err(to_datafusion_error)
+                    })
+                    .collect::<datafusion::common::Result<_>>()?;
+
+                // add record_counts from the current batch to total record 
count
+                total_record_count += batch_files.iter().map(|f| 
f.record_count()).sum::<u64>();
+
+                // Add all deserialized files to our collection
+                data_files.extend(batch_files);
+            }
+
+            // If no data files were collected, return an empty result
+            if data_files.is_empty() {
+                return Ok(RecordBatch::new_empty(count_schema));
+            }
+
+            // Create a transaction and commit the data files
+            let tx = Transaction::new(&table);
+            let action = tx.fast_append().add_data_files(data_files);
+
+            // Apply the action and commit the transaction
+            let _updated_table = action
+                .apply(tx)
+                .map_err(to_datafusion_error)?
+                .commit(catalog.as_ref())
+                .await
+                .map_err(to_datafusion_error)?;
+
+            Self::make_count_batch(total_record_count)
+        })
+        .boxed();
+
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
+            Arc::clone(&self.count_schema),
+            stream,
+        )))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+    use std::fmt;
+    use std::sync::Arc;
+
+    use datafusion::arrow::array::{ArrayRef, RecordBatch, StringArray};
+    use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
+    use datafusion::execution::context::TaskContext;
+    use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
+    use datafusion::physical_plan::common::collect;
+    use datafusion::physical_plan::execution_plan::Boundedness;
+    use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+    use datafusion::physical_plan::{DisplayAs, DisplayFormatType, 
ExecutionPlan, PlanProperties};
+    use futures::StreamExt;
+    use iceberg::io::FileIOBuilder;
+    use iceberg::spec::{
+        DataContentType, DataFileBuilder, DataFileFormat, NestedField, 
PrimitiveType, Schema,
+        Struct, Type,
+    };
+    use iceberg::{Catalog, MemoryCatalog, NamespaceIdent, TableCreation, 
TableIdent};
+
+    use super::*;
+    use crate::physical_plan::DATA_FILES_COL_NAME;
+
+    // A mock execution plan that returns record batches with serialized data 
files
+    #[derive(Debug)]
+    struct MockWriteExec {
+        schema: Arc<ArrowSchema>,
+        data_files_json: Vec<String>,
+        plan_properties: PlanProperties,
+    }
+
+    impl MockWriteExec {
+        fn new(data_files_json: Vec<String>) -> Self {
+            let schema = Arc::new(ArrowSchema::new(vec![Field::new(
+                DATA_FILES_COL_NAME,
+                DataType::Utf8,
+                false,
+            )]));
+
+            let plan_properties = PlanProperties::new(
+                EquivalenceProperties::new(schema.clone()),
+                Partitioning::UnknownPartitioning(1),
+                EmissionType::Final,
+                Boundedness::Bounded,
+            );
+
+            Self {
+                schema,
+                data_files_json,
+                plan_properties,
+            }
+        }
+    }
+
+    impl ExecutionPlan for MockWriteExec {
+        fn name(&self) -> &str {
+            "MockWriteExec"
+        }
+
+        fn as_any(&self) -> &dyn Any {
+            self
+        }
+
+        fn schema(&self) -> Arc<ArrowSchema> {
+            self.schema.clone()
+        }
+
+        fn properties(&self) -> &PlanProperties {
+            &self.plan_properties
+        }
+
+        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+            vec![]
+        }
+
+        fn with_new_children(
+            self: Arc<Self>,
+            _children: Vec<Arc<dyn ExecutionPlan>>,
+        ) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
+            Ok(self)
+        }
+
+        fn execute(
+            &self,
+            _partition: usize,
+            _context: Arc<TaskContext>,
+        ) -> datafusion::common::Result<SendableRecordBatchStream> {
+            // Create a record batch with the serialized data files
+            let array = 
Arc::new(StringArray::from(self.data_files_json.clone())) as ArrayRef;
+            let batch = RecordBatch::try_new(self.schema.clone(), 
vec![array])?;
+
+            // Create a stream that returns this batch
+            let stream = futures::stream::once(async move { Ok(batch) 
}).boxed();
+            Ok(Box::pin(RecordBatchStreamAdapter::new(
+                self.schema(),
+                stream,
+            )))
+        }
+    }
+
+    // Implement DisplayAs for MockDataFilesExec
+    impl DisplayAs for MockWriteExec {
+        fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> 
fmt::Result {
+            match t {
+                DisplayFormatType::Default
+                | DisplayFormatType::Verbose
+                | DisplayFormatType::TreeRender => {
+                    write!(f, "MockDataFilesExec: files={}", 
self.data_files_json.len())
+                }
+            }
+        }
+    }
+
+    #[tokio::test]
+    async fn test_iceberg_commit_exec() -> Result<(), Box<dyn 
std::error::Error>> {
+        // Create a memory catalog with in-memory file IO
+        let file_io = FileIOBuilder::new("memory").build()?;
+        let catalog = Arc::new(MemoryCatalog::new(
+            file_io,
+            Some("memory://root".to_string()),
+        ));
+
+        // Create a namespace
+        let namespace = NamespaceIdent::new("test_namespace".to_string());
+        catalog.create_namespace(&namespace, HashMap::new()).await?;
+
+        // Create a schema for the table
+        let schema = Schema::builder()
+            .with_schema_id(1)
+            .with_fields(vec![
+                NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                NestedField::required(2, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+            ])
+            .build()?;
+
+        // Create a table
+        let table_creation = TableCreation::builder()
+            .name("test_table".to_string())
+            .schema(schema)
+            .location("memory://root/test_table".to_string())
+            .properties(HashMap::new())
+            .build();
+
+        let table = catalog.create_table(&namespace, table_creation).await?;
+
+        // Create data files
+        let data_file1 = DataFileBuilder::default()
+            .content(DataContentType::Data)
+            .file_path("path/to/file1.parquet".to_string())
+            .file_format(DataFileFormat::Parquet)
+            .file_size_in_bytes(1024)
+            .record_count(100)
+            .partition_spec_id(table.metadata().default_partition_spec_id())
+            .partition(Struct::empty())
+            .build()?;
+
+        let data_file2 = DataFileBuilder::default()
+            .content(DataContentType::Data)
+            .file_path("path/to/file2.parquet".to_string())
+            .file_format(DataFileFormat::Parquet)
+            .file_size_in_bytes(2048)
+            .record_count(200)
+            .partition_spec_id(table.metadata().default_partition_spec_id())
+            .partition(Struct::empty())
+            .build()?;
+
+        // Serialize data files to JSON
+        let partition_type = table.metadata().default_partition_type().clone();
+        let data_file1_json = iceberg::spec::serialize_data_file_to_json(
+            data_file1.clone(),
+            &partition_type,
+            table.metadata().format_version(),
+        )?;
+
+        let data_file2_json = iceberg::spec::serialize_data_file_to_json(
+            data_file2.clone(),
+            &partition_type,
+            table.metadata().format_version(),
+        )?;
+
+        // Create a mock execution plan that returns the serialized data files
+        let input_exec = Arc::new(MockWriteExec::new(vec![data_file1_json, 
data_file2_json]));
+
+        // Create the IcebergCommitExec
+        let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
+            DATA_FILES_COL_NAME,
+            DataType::Utf8,
+            false,
+        )]));
+
+        let commit_exec =
+            IcebergCommitExec::new(table.clone(), catalog.clone(), input_exec, 
arrow_schema);
+
+        // Execute the commit exec
+        let task_ctx = Arc::new(TaskContext::default());
+        let stream = commit_exec.execute(0, task_ctx)?;
+        let batches = collect(stream).await?;
+
+        // Verify the results
+        assert_eq!(batches.len(), 1);
+        let batch = &batches[0];
+        assert_eq!(batch.num_columns(), 1);
+        assert_eq!(batch.num_rows(), 1);
+
+        // The output should be a record batch with a single column "count" 
and a single row
+        // with the total record count (100 + 200 = 300)
+        let count_array = batch.column(0);
+        assert_eq!(count_array.len(), 1);
+        assert_eq!(count_array.data_type(), &DataType::UInt64);
+
+        // Verify that the count is correct
+        let count = 
count_array.as_any().downcast_ref::<UInt64Array>().unwrap();
+        assert_eq!(count.value(0), 300);
+
+        // Verify that the table has been updated with the new files
+        let updated_table = catalog
+            .load_table(&TableIdent::from_strs(["test_namespace", 
"test_table"]).unwrap())
+            .await?;
+        let current_snapshot = 
updated_table.metadata().current_snapshot().unwrap();
+
+        // Load the manifest list to verify the data files were added
+        let manifest_list = current_snapshot
+            .load_manifest_list(updated_table.file_io(), 
updated_table.metadata())
+            .await?;
+
+        // There should be at least one manifest
+        assert!(!manifest_list.entries().is_empty());
+
+        // Load the first manifest and verify it contains our data files
+        let manifest = manifest_list.entries()[0]
+            .load_manifest(updated_table.file_io())
+            .await?;
+
+        // Verify that the manifest contains our data files
+        let manifest_files: Vec<String> = manifest
+            .entries()
+            .iter()
+            .map(|entry| entry.data_file().file_path().to_string())
+            .collect();
+
+        assert!(manifest_files.contains(&"path/to/file1.parquet".to_string()));
+        assert!(manifest_files.contains(&"path/to/file2.parquet".to_string()));
+
+        Ok(())
+    }
+}
diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs 
b/crates/integrations/datafusion/src/physical_plan/mod.rs
index e424b690..b583c2d0 100644
--- a/crates/integrations/datafusion/src/physical_plan/mod.rs
+++ b/crates/integrations/datafusion/src/physical_plan/mod.rs
@@ -15,7 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+pub(crate) mod commit;
 pub(crate) mod expr_to_predicate;
 pub(crate) mod metadata_scan;
 pub(crate) mod scan;
+
+pub(crate) const DATA_FILES_COL_NAME: &str = "data_files";
+
 pub use scan::IcebergTableScan;

Reply via email to