This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 1de33159 fix: ensure CoalescePartitionsExec is enabled for
IcebergCommitExec (#1723)
1de33159 is described below
commit 1de331593ea877e3a8b570e328d26f8362ad2035
Author: Sergei Grebnov <[email protected]>
AuthorDate: Sat Oct 11 08:44:54 2025 +0000
fix: ensure CoalescePartitionsExec is enabled for IcebergCommitExec (#1723)
## Which issue does this PR close?
PR fixes partial writes similar to reported
[here](https://github.com/spiceai/spiceai/issues/7407). Despite the
following code to enforce `CoalescePartitionsExec` (single input
behavior) it can be removed by DataFusion optimizer. Unit test was added
to demonstrate such behavior.
https://github.com/apache/iceberg-rust/blob/dc349284a4204c1a56af47fb3177ace6f9e899a0/crates/integrations/datafusion/src/table/mod.rs#L196-L210
```rust
let write_plan = Arc::new(IcebergWriteExec::new(
self.table.clone(),
input,
self.schema.clone(),
));
// Merge the outputs of write_plan into one so we can commit all
files together
let coalesce_partitions =
Arc::new(CoalescePartitionsExec::new(write_plan));
Ok(Arc::new(IcebergCommitExec::new(
self.table.clone(),
catalog,
coalesce_partitions,
self.schema.clone(),
)))
```
Example plan (observe no `CoalescePartitionsExec`)
```shell
explain format tree insert into task_history_sink select * from
runtime.task_history;
+---------------+-------------------------------+
| plan_type | plan |
+---------------+-------------------------------+
| physical_plan | ┌───────────────────────────┐ |
| | │ IcebergCommitExec │ |
| | │ -------------------- │ |
| | │ IcebergCommitExec: table: │ |
| | │ team_app.task_history │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ IcebergWriteExec │ |
| | │ -------------------- │ |
| | │ IcebergWriteExec: table: │ |
| | │ team_app.task_history │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ ProjectionExec │ |
| | │ -------------------- │ |
| | │ captured_output: │ |
| | │ captured_output │ |
| | │ │ |
| | │ end_time: │ |
| | │ CAST(end_time AS Timestamp│ |
| | │ (Microsecond, None)) │ |
| | │ │ |
| | │ error_message: │ |
| | │ error_message │ |
| | │ │ |
| | │ execution_duration_ms: │ |
| | │ execution_duration_ms │ |
| | │ │ |
| | │ input: input │ |
| | │ │ |
| | │ labels: │ |
| | │ CAST(labels AS Map(Field {│ |
| | │ name: "key_value", │ |
| | │ data_type: Struct( │ |
| | │ [Field { name: "key", │ |
| | │ data_type: Utf8, │ |
| | │ nullable: false, │ |
| | │ dict_id: 0, │ |
| | │ dict_is_ordered │ |
| | │ : false, metadata: { │ |
| | │ "PARQUET:field_id": │ |
| | │ "12"} }, Field { name: │ |
| | │ "value", data_type: │ |
| | │ Utf8, nullable: true │ |
| | │ ... │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ RepartitionExec │ |
| | │ -------------------- │ |
| | │ partition_count(in->out): │ |
| | │ 1 -> 14 │ |
| | │ │ |
| | │ partitioning_scheme: │ |
| | │ RoundRobinBatch(14) │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ BytesProcessedExec │ |
| | │ -------------------- │ |
| | │ BytesProcessedExec │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ SchemaCastScanExec │ |
| | │ -------------------- │ |
| | │ SchemaCastScanExec │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ DataSourceExec │ |
| | │ -------------------- │ |
| | │ bytes: 88176 │ |
| | │ format: memory │ |
| | │ rows: 6 │ |
| | └───────────────────────────┘ |
| | |
+---------------+-------------------------------+
```
## What changes are included in this PR?
PR adds `required_input_distribution` setting for `IcebergWriteExec` to
ensure DataFusion coalesces input partitions automatically before
commit. Similar to [DataFusion
DataSinkExec](https://github.com/apache/datafusion/blob/a7b113c45509aae34595b6a62469b3173cac91bd/datafusion/datasource/src/sink.rs#L187)
`test_datafusion_execution_partitioned_source` can be used to ovserve
behavior before and after
Before
```rust
Physical plan:
IcebergCommitExec: table=test_namespace.test_table_partitioning
RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=3
IcebergWriteExec: table=test_namespace.test_table_partitioning
DataSourceExec: partitions=3, partition_sizes=[1, 1, 1]
```
After
```rust
IcebergCommitExec: table=test_namespace.test_table
CoalescePartitionsExec
IcebergWriteExec: table=test_namespace.test_table
DataSourceExec: partitions=3, partition_sizes=[1, 1, 1]
```
## Are these changes tested?
Added `test_datafusion_execution_partitioned_source` unit test, tested
manually
---
.../datafusion/src/physical_plan/commit.rs | 111 ++++++++++++++++++++-
.../datafusion/src/physical_plan/write.rs | 10 ++
2 files changed, 120 insertions(+), 1 deletion(-)
diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs
b/crates/integrations/datafusion/src/physical_plan/commit.rs
index 0ee0504f..067049c1 100644
--- a/crates/integrations/datafusion/src/physical_plan/commit.rs
+++ b/crates/integrations/datafusion/src/physical_plan/commit.rs
@@ -136,6 +136,14 @@ impl ExecutionPlan for IcebergCommitExec {
vec![&self.input]
}
+ fn required_input_distribution(&self) ->
Vec<datafusion::physical_plan::Distribution> {
+ vec![datafusion::physical_plan::Distribution::SinglePartition;
self.children().len()]
+ }
+
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+ vec![false]
+ }
+
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
@@ -262,14 +270,16 @@ mod tests {
use std::fmt;
use std::sync::Arc;
- use datafusion::arrow::array::{ArrayRef, RecordBatch, StringArray};
+ use datafusion::arrow::array::{ArrayRef, Int32Array, RecordBatch,
StringArray, UInt64Array};
use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
+ use datafusion::datasource::MemTable;
use datafusion::execution::context::TaskContext;
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
use datafusion::physical_plan::common::collect;
use datafusion::physical_plan::execution_plan::Boundedness;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType,
ExecutionPlan, PlanProperties};
+ use datafusion::prelude::*;
use futures::StreamExt;
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
use iceberg::spec::{
@@ -280,6 +290,7 @@ mod tests {
use super::*;
use crate::physical_plan::DATA_FILES_COL_NAME;
+ use crate::table::IcebergTableProvider;
// A mock execution plan that returns record batches with serialized data
files
#[derive(Debug)]
@@ -510,4 +521,102 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn test_datafusion_execution_partitioned_source() -> Result<(),
Box<dyn std::error::Error>>
+ {
+ let catalog = Arc::new(
+ MemoryCatalogBuilder::default()
+ .load(
+ "memory",
+ HashMap::from([(
+ MEMORY_CATALOG_WAREHOUSE.to_string(),
+ "memory://root".to_string(),
+ )]),
+ )
+ .await?,
+ );
+
+ let namespace = NamespaceIdent::new("test_namespace".to_string());
+ catalog.create_namespace(&namespace, HashMap::new()).await?;
+
+ let schema = Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::required(2, "name",
Type::Primitive(PrimitiveType::String)).into(),
+ ])
+ .build()?;
+
+ let table_name = "test_table";
+ let table_creation = TableCreation::builder()
+ .name(table_name.to_string())
+ .schema(schema)
+ .location("memory://root/test_table".to_string())
+ .properties(HashMap::new())
+ .build();
+ let _ = catalog.create_table(&namespace, table_creation).await?;
+
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("name", DataType::Utf8, false),
+ ]));
+
+ let batches: Vec<RecordBatch> = (1..4)
+ .map(|idx| {
+ RecordBatch::try_new(arrow_schema.clone(), vec![
+ Arc::new(Int32Array::from(vec![idx])) as ArrayRef,
+ Arc::new(StringArray::from(vec![format!("Name{idx}")])) as
ArrayRef,
+ ])
+ })
+ .collect::<Result<_, _>>()?;
+
+ // Create DataFusion context with specific partition configuration
+ let mut config = SessionConfig::new();
+ config = config.set_usize("datafusion.execution.target_partitions", 8);
+ let ctx = SessionContext::new_with_config(config);
+
+ // Create multiple partitions - each batch becomes a separate partition
+ let partitions: Vec<Vec<RecordBatch>> =
+ batches.into_iter().map(|batch| vec![batch]).collect();
+ let source_table =
Arc::new(MemTable::try_new(Arc::clone(&arrow_schema), partitions)?);
+ ctx.register_table("source_table", source_table)?;
+
+ let iceberg_table_provider = IcebergTableProvider::try_new(
+ catalog.clone(),
+ namespace.clone(),
+ table_name.to_string(),
+ )
+ .await?;
+ ctx.register_table("iceberg_table", Arc::new(iceberg_table_provider))?;
+
+ let insert_plan = ctx
+ .sql("INSERT INTO iceberg_table SELECT * FROM source_table")
+ .await?;
+
+ let physical_plan = insert_plan.create_physical_plan().await?;
+
+ let actual_plan = format!(
+ "{}",
+
datafusion::physical_plan::displayable(physical_plan.as_ref()).indent(false)
+ );
+
+ println!("Physical plan:\n{actual_plan}");
+
+ let expected_plan = "\
+IcebergCommitExec: table=test_namespace.test_table
+ CoalescePartitionsExec
+ IcebergWriteExec: table=test_namespace.test_table
+ DataSourceExec: partitions=3, partition_sizes=[1, 1, 1]";
+
+ assert_eq!(
+ actual_plan.trim(),
+ expected_plan.trim(),
+ "Physical plan does not match
expected\n\nExpected:\n{}\n\nActual:\n{}",
+ expected_plan.trim(),
+ actual_plan.trim()
+ );
+
+ Ok(())
+ }
}
diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs
b/crates/integrations/datafusion/src/physical_plan/write.rs
index 712da92b..759f1a8d 100644
--- a/crates/integrations/datafusion/src/physical_plan/write.rs
+++ b/crates/integrations/datafusion/src/physical_plan/write.rs
@@ -144,6 +144,16 @@ impl ExecutionPlan for IcebergWriteExec {
self
}
+ /// Prevents the introduction of additional `RepartitionExec` and
processing input in parallel.
+ fn benefits_from_input_partitioning(&self) -> Vec<bool> {
+ vec![false]
+ }
+
+ fn maintains_input_order(&self) -> Vec<bool> {
+ // Maintains ordering in the sense that the written file will reflect
the ordering of the input.
+ vec![true; self.children().len()]
+ }
+
fn properties(&self) -> &PlanProperties {
&self.plan_properties
}