This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 1fcad936 feat(datafusion): Support `INSERT INTO` partitioned tables
(#1827)
1fcad936 is described below
commit 1fcad93641603917829cdd6d60f4570062394855
Author: Shawn Chang <[email protected]>
AuthorDate: Wed Nov 12 02:49:54 2025 -0800
feat(datafusion): Support `INSERT INTO` partitioned tables (#1827)
## Which issue does this PR close?
- Closes #1828
- Related to #1540
## What changes are included in this PR?
- Use project to calculate partition values for record batches
- Repartition inputs for table_provider::insert_into
- Initialize partition_splitter in TaskWriter's constructor
- Use TaskWriter in `IcebergWriteExec` to support partitioned data
## Are these changes tested?
Added an ut
---
.../src/arrow/record_batch_partition_splitter.rs | 20 +--
.../datafusion/src/physical_plan/repartition.rs | 1 -
.../datafusion/src/physical_plan/write.rs | 48 ++++---
crates/integrations/datafusion/src/table/mod.rs | 43 ++++--
crates/integrations/datafusion/src/task_writer.rs | 71 ++++------
.../tests/integration_datafusion_test.rs | 152 ++++++++++++++++++++-
6 files changed, 239 insertions(+), 96 deletions(-)
diff --git a/crates/iceberg/src/arrow/record_batch_partition_splitter.rs
b/crates/iceberg/src/arrow/record_batch_partition_splitter.rs
index dcdd9c68..7b83621f 100644
--- a/crates/iceberg/src/arrow/record_batch_partition_splitter.rs
+++ b/crates/iceberg/src/arrow/record_batch_partition_splitter.rs
@@ -59,7 +59,7 @@ impl RecordBatchPartitionSplitter {
/// # Returns
///
/// Returns a new `RecordBatchPartitionSplitter` instance or an error if
initialization fails.
- pub fn new(
+ pub fn try_new(
iceberg_schema: SchemaRef,
partition_spec: PartitionSpecRef,
calculator: Option<PartitionValueCalculator>,
@@ -87,12 +87,12 @@ impl RecordBatchPartitionSplitter {
/// # Returns
///
/// Returns a new `RecordBatchPartitionSplitter` instance or an error if
initialization fails.
- pub fn new_with_computed_values(
+ pub fn try_new_with_computed_values(
iceberg_schema: SchemaRef,
partition_spec: PartitionSpecRef,
) -> Result<Self> {
let calculator = PartitionValueCalculator::try_new(&partition_spec,
&iceberg_schema)?;
- Self::new(iceberg_schema, partition_spec, Some(calculator))
+ Self::try_new(iceberg_schema, partition_spec, Some(calculator))
}
/// Create a new RecordBatchPartitionSplitter expecting pre-computed
partition values.
@@ -108,11 +108,11 @@ impl RecordBatchPartitionSplitter {
/// # Returns
///
/// Returns a new `RecordBatchPartitionSplitter` instance or an error if
initialization fails.
- pub fn new_with_precomputed_values(
+ pub fn try_new_with_precomputed_values(
iceberg_schema: SchemaRef,
partition_spec: PartitionSpecRef,
) -> Result<Self> {
- Self::new(iceberg_schema, partition_spec, None)
+ Self::try_new(iceberg_schema, partition_spec, None)
}
/// Split the record batch into multiple record batches based on the
partition spec.
@@ -261,9 +261,11 @@ mod tests {
.build()
.unwrap(),
);
- let partition_splitter =
-
RecordBatchPartitionSplitter::new_with_computed_values(schema.clone(),
partition_spec)
- .expect("Failed to create splitter");
+ let partition_splitter =
RecordBatchPartitionSplitter::try_new_with_computed_values(
+ schema.clone(),
+ partition_spec,
+ )
+ .expect("Failed to create splitter");
let arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap());
let id_array = Int32Array::from(vec![1, 2, 1, 3, 2, 3, 1]);
@@ -392,7 +394,7 @@ mod tests {
]));
// Create splitter expecting pre-computed partition column
- let partition_splitter =
RecordBatchPartitionSplitter::new_with_precomputed_values(
+ let partition_splitter =
RecordBatchPartitionSplitter::try_new_with_precomputed_values(
schema.clone(),
partition_spec,
)
diff --git a/crates/integrations/datafusion/src/physical_plan/repartition.rs
b/crates/integrations/datafusion/src/physical_plan/repartition.rs
index 95cdc847..8ad87fd1 100644
--- a/crates/integrations/datafusion/src/physical_plan/repartition.rs
+++ b/crates/integrations/datafusion/src/physical_plan/repartition.rs
@@ -86,7 +86,6 @@ use iceberg::spec::{TableMetadata, TableMetadataRef,
Transform};
/// NonZeroUsize::new(4).unwrap(),
/// )?;
/// ```
-#[allow(dead_code)]
pub(crate) fn repartition(
input: Arc<dyn ExecutionPlan>,
table_metadata: TableMetadataRef,
diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs
b/crates/integrations/datafusion/src/physical_plan/write.rs
index b9d1f02d..9eb53c23 100644
--- a/crates/integrations/datafusion/src/physical_plan/write.rs
+++ b/crates/integrations/datafusion/src/physical_plan/write.rs
@@ -35,7 +35,7 @@ use datafusion::physical_plan::{
execute_input_stream,
};
use futures::StreamExt;
-use iceberg::arrow::{FieldMatchMode, schema_to_arrow_schema};
+use iceberg::arrow::FieldMatchMode;
use iceberg::spec::{DataFileFormat, TableProperties,
serialize_data_file_to_json};
use iceberg::table::Table;
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
@@ -44,12 +44,12 @@ use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
-use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
use iceberg::{Error, ErrorKind};
use parquet::file::properties::WriterProperties;
use uuid::Uuid;
use crate::physical_plan::DATA_FILES_COL_NAME;
+use crate::task_writer::TaskWriter;
use crate::to_datafusion_error;
/// An execution plan node that writes data to an Iceberg table.
@@ -205,18 +205,6 @@ impl ExecutionPlan for IcebergWriteExec {
partition: usize,
context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
- if !self
- .table
- .metadata()
- .default_partition_spec()
- .is_unpartitioned()
- {
- // TODO add support for partitioned tables
- return Err(DataFusionError::NotImplemented(
- "IcebergWriteExec does not support partitioned tables
yet".to_string(),
- ));
- }
-
let partition_type =
self.table.metadata().default_partition_type().clone();
let format_version = self.table.metadata().format_version();
@@ -277,31 +265,41 @@ impl ExecutionPlan for IcebergWriteExec {
);
let data_file_writer_builder =
DataFileWriterBuilder::new(rolling_writer_builder);
+ // Create TaskWriter
+ // TODO: Make fanout_enabled configurable via table properties
+ let fanout_enabled = true;
+ let schema = self.table.metadata().current_schema().clone();
+ let partition_spec =
self.table.metadata().default_partition_spec().clone();
+ let task_writer = TaskWriter::try_new(
+ data_file_writer_builder,
+ fanout_enabled,
+ schema.clone(),
+ partition_spec,
+ )
+ .map_err(to_datafusion_error)?;
+
// Get input data
let data = execute_input_stream(
Arc::clone(&self.input),
- Arc::new(
- schema_to_arrow_schema(self.table.metadata().current_schema())
- .map_err(to_datafusion_error)?,
- ),
+ self.input.schema(), // input schema may have projected column
`_partition`
partition,
Arc::clone(&context),
)?;
// Create write stream
let stream = futures::stream::once(async move {
- let mut writer = data_file_writer_builder
- // todo specify partition key when partitioning writer is
supported
- .build(None)
- .await
- .map_err(to_datafusion_error)?;
+ let mut task_writer = task_writer;
let mut input_stream = data;
while let Some(batch) = input_stream.next().await {
- writer.write(batch?).await.map_err(to_datafusion_error)?;
+ let batch = batch?;
+ task_writer
+ .write(batch)
+ .await
+ .map_err(to_datafusion_error)?;
}
- let data_files =
writer.close().await.map_err(to_datafusion_error)?;
+ let data_files =
task_writer.close().await.map_err(to_datafusion_error)?;
// Convert builders to data files and then to JSON strings
let data_files_strs: Vec<String> = data_files
diff --git a/crates/integrations/datafusion/src/table/mod.rs
b/crates/integrations/datafusion/src/table/mod.rs
index a8c49837..42a3baad 100644
--- a/crates/integrations/datafusion/src/table/mod.rs
+++ b/crates/integrations/datafusion/src/table/mod.rs
@@ -19,6 +19,7 @@ pub mod metadata_table;
pub mod table_provider_factory;
use std::any::Any;
+use std::num::NonZeroUsize;
use std::sync::Arc;
use async_trait::async_trait;
@@ -38,6 +39,8 @@ use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent,
Result, TableIdent};
use metadata_table::IcebergMetadataTableProvider;
use crate::physical_plan::commit::IcebergCommitExec;
+use crate::physical_plan::project::project_with_partition;
+use crate::physical_plan::repartition::repartition;
use crate::physical_plan::scan::IcebergTableScan;
use crate::physical_plan::write::IcebergWriteExec;
@@ -170,32 +173,42 @@ impl TableProvider for IcebergTableProvider {
async fn insert_into(
&self,
- _state: &dyn Session,
+ state: &dyn Session,
input: Arc<dyn ExecutionPlan>,
_insert_op: InsertOp,
) -> DFResult<Arc<dyn ExecutionPlan>> {
- if !self
- .table
- .metadata()
- .default_partition_spec()
- .is_unpartitioned()
- {
- // TODO add insert into support for partitioned tables
- return Err(DataFusionError::NotImplemented(
- "IcebergTableProvider::insert_into does not support
partitioned tables yet"
- .to_string(),
- ));
- }
-
let Some(catalog) = self.catalog.clone() else {
return Err(DataFusionError::Execution(
"Catalog cannot be none for insert_into".to_string(),
));
};
+ let partition_spec = self.table.metadata().default_partition_spec();
+
+ // Step 1: Project partition values for partitioned tables
+ let plan_with_partition = if !partition_spec.is_unpartitioned() {
+ project_with_partition(input, &self.table)?
+ } else {
+ input
+ };
+
+ // Step 2: Repartition for parallel processing
+ let target_partitions =
+
NonZeroUsize::new(state.config().target_partitions()).ok_or_else(|| {
+ DataFusionError::Configuration(
+ "target_partitions must be greater than 0".to_string(),
+ )
+ })?;
+
+ let repartitioned_plan = repartition(
+ plan_with_partition,
+ self.table.metadata_ref(),
+ target_partitions,
+ )?;
+
let write_plan = Arc::new(IcebergWriteExec::new(
self.table.clone(),
- input,
+ repartitioned_plan,
self.schema.clone(),
));
diff --git a/crates/integrations/datafusion/src/task_writer.rs
b/crates/integrations/datafusion/src/task_writer.rs
index d27b2e6f..5329f264 100644
--- a/crates/integrations/datafusion/src/task_writer.rs
+++ b/crates/integrations/datafusion/src/task_writer.rs
@@ -34,7 +34,7 @@ use
iceberg::writer::partitioning::unpartitioned_writer::UnpartitionedWriter;
///
/// TaskWriter coordinates writing data to Iceberg tables by:
/// - Selecting the appropriate partitioning strategy (unpartitioned, fanout,
or clustered)
-/// - Lazily initializing the partition splitter on first write
+/// - Initializing the partition splitter in the constructor for partitioned
tables
/// - Routing data to the underlying writer
/// - Collecting all written data files
///
@@ -63,23 +63,17 @@ use
iceberg::writer::partitioning::unpartitioned_writer::UnpartitionedWriter;
/// // Close and get data files
/// let data_files = task_writer.close().await?;
/// ```
-#[allow(dead_code)]
pub(crate) struct TaskWriter<B: IcebergWriterBuilder> {
/// The underlying writer (UnpartitionedWriter, FanoutWriter, or
ClusteredWriter)
writer: SupportedWriter<B>,
- /// Lazily initialized partition splitter for partitioned tables
+ /// Partition splitter for partitioned tables (initialized in constructor)
partition_splitter: Option<RecordBatchPartitionSplitter>,
- /// Iceberg schema reference
- schema: SchemaRef,
- /// Partition specification reference
- partition_spec: PartitionSpecRef,
}
/// Internal enum to hold the different writer types.
///
/// This enum allows TaskWriter to work with different partitioning strategies
/// while maintaining a unified interface.
-#[allow(dead_code)]
enum SupportedWriter<B: IcebergWriterBuilder> {
/// Writer for unpartitioned tables
Unpartitioned(UnpartitionedWriter<B>),
@@ -89,7 +83,6 @@ enum SupportedWriter<B: IcebergWriterBuilder> {
Clustered(ClusteredWriter<B>),
}
-#[allow(dead_code)]
impl<B: IcebergWriterBuilder> TaskWriter<B> {
/// Create a new TaskWriter.
///
@@ -125,12 +118,12 @@ impl<B: IcebergWriterBuilder> TaskWriter<B> {
/// partition_spec,
/// );
/// ```
- pub fn new(
+ pub fn try_new(
writer_builder: B,
fanout_enabled: bool,
schema: SchemaRef,
partition_spec: PartitionSpecRef,
- ) -> Self {
+ ) -> Result<Self> {
let writer = if partition_spec.is_unpartitioned() {
SupportedWriter::Unpartitioned(UnpartitionedWriter::new(writer_builder))
} else if fanout_enabled {
@@ -139,17 +132,28 @@ impl<B: IcebergWriterBuilder> TaskWriter<B> {
SupportedWriter::Clustered(ClusteredWriter::new(writer_builder))
};
- Self {
+ // Initialize partition splitter in constructor for partitioned tables
+ let partition_splitter = if !partition_spec.is_unpartitioned() {
+ Some(
+ RecordBatchPartitionSplitter::try_new_with_precomputed_values(
+ schema.clone(),
+ partition_spec.clone(),
+ )?,
+ )
+ } else {
+ None
+ };
+
+ Ok(Self {
writer,
- partition_splitter: None,
- schema,
- partition_spec,
- }
+ partition_splitter,
+ })
}
/// Write a RecordBatch to the TaskWriter.
///
- /// For the first write to a partitioned table, this method initializes
the partition splitter.
+ /// For partitioned tables, uses the partition splitter to split
+ /// the batch by partition key and route each partition to the underlying
writer.
/// For unpartitioned tables, data is written directly without splitting.
///
/// # Parameters
@@ -163,7 +167,6 @@ impl<B: IcebergWriterBuilder> TaskWriter<B> {
/// # Errors
///
/// This method will return an error if:
- /// - Partition splitter initialization fails
/// - Splitting the batch by partition fails
/// - Writing to the underlying writer fails
///
@@ -183,29 +186,9 @@ impl<B: IcebergWriterBuilder> TaskWriter<B> {
writer.write(batch).await
}
SupportedWriter::Fanout(writer) => {
- // Initialize splitter on first write if needed
- if self.partition_splitter.is_none() {
- self.partition_splitter =
-
Some(RecordBatchPartitionSplitter::new_with_precomputed_values(
- self.schema.clone(),
- self.partition_spec.clone(),
- )?);
- }
-
- // Split and write partitioned data
Self::write_partitioned_batches(writer,
&self.partition_splitter, &batch).await
}
SupportedWriter::Clustered(writer) => {
- // Initialize splitter on first write if needed
- if self.partition_splitter.is_none() {
- self.partition_splitter =
-
Some(RecordBatchPartitionSplitter::new_with_precomputed_values(
- self.schema.clone(),
- self.partition_spec.clone(),
- )?);
- }
-
- // Split and write partitioned data
Self::write_partitioned_batches(writer,
&self.partition_splitter, &batch).await
}
}
@@ -214,13 +197,13 @@ impl<B: IcebergWriterBuilder> TaskWriter<B> {
/// Helper method to split and write partitioned data.
///
/// This method handles the common logic for both FanoutWriter and
ClusteredWriter:
- /// - Splits the batch by partition key using the provided splitter
- /// - Writes each partition to the underlying writer
+ /// - Splits the batch by partition key using the partition splitter
+ /// - Writes each partition to the underlying writer with its
corresponding partition key
///
/// # Parameters
///
/// * `writer` - The underlying PartitioningWriter (FanoutWriter or
ClusteredWriter)
- /// * `partition_splitter` - The partition splitter (must be initialized)
+ /// * `partition_splitter` - The partition splitter
/// * `batch` - The RecordBatch to write
///
/// # Returns
@@ -393,7 +376,7 @@ mod tests {
let partition_spec =
Arc::new(PartitionSpec::builder(schema.clone()).build()?);
let writer_builder = create_writer_builder(&temp_dir, schema.clone())?;
- let mut task_writer = TaskWriter::new(writer_builder, false, schema,
partition_spec);
+ let mut task_writer = TaskWriter::try_new(writer_builder, false,
schema, partition_spec)?;
// Write data
let batch = RecordBatch::try_new(arrow_schema, vec![
@@ -459,7 +442,7 @@ mod tests {
);
let writer_builder = create_writer_builder(&temp_dir, schema.clone())?;
- let mut task_writer = TaskWriter::new(writer_builder, true, schema,
partition_spec);
+ let mut task_writer = TaskWriter::try_new(writer_builder, true,
schema, partition_spec)?;
// Create partition column
let partition_field = Field::new("region", DataType::Utf8,
false).with_metadata(
@@ -502,7 +485,7 @@ mod tests {
);
let writer_builder = create_writer_builder(&temp_dir, schema.clone())?;
- let mut task_writer = TaskWriter::new(writer_builder, false, schema,
partition_spec);
+ let mut task_writer = TaskWriter::try_new(writer_builder, false,
schema, partition_spec)?;
// Create partition column
let partition_field = Field::new("region", DataType::Utf8,
false).with_metadata(
diff --git
a/crates/integrations/datafusion/tests/integration_datafusion_test.rs
b/crates/integrations/datafusion/tests/integration_datafusion_test.rs
index cb4987a9..fdf5b17d 100644
--- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs
+++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs
@@ -27,9 +27,13 @@ use datafusion::execution::context::SessionContext;
use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use expect_test::expect;
use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
-use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Type};
+use iceberg::spec::{
+ NestedField, PrimitiveType, Schema, StructType, Transform, Type,
UnboundPartitionSpec,
+};
use iceberg::test_utils::check_record_batches;
-use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, NamespaceIdent, Result,
TableCreation};
+use iceberg::{
+ Catalog, CatalogBuilder, MemoryCatalog, NamespaceIdent, Result,
TableCreation, TableIdent,
+};
use iceberg_datafusion::IcebergCatalogProvider;
use tempfile::TempDir;
@@ -810,3 +814,147 @@ async fn test_insert_into_nested() -> Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn test_insert_into_partitioned() -> Result<()> {
+ let iceberg_catalog = get_iceberg_catalog().await;
+ let namespace = NamespaceIdent::new("test_partitioned_write".to_string());
+ set_test_namespace(&iceberg_catalog, &namespace).await?;
+
+ // Create a schema with a partition column
+ let schema = Schema::builder()
+ .with_schema_id(0)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ NestedField::required(2, "category",
Type::Primitive(PrimitiveType::String)).into(),
+ NestedField::required(3, "value",
Type::Primitive(PrimitiveType::String)).into(),
+ ])
+ .build()?;
+
+ // Create partition spec with identity transform on category
+ let partition_spec = UnboundPartitionSpec::builder()
+ .with_spec_id(0)
+ .add_partition_field(2, "category", Transform::Identity)?
+ .build();
+
+ // Create the partitioned table
+ let creation = TableCreation::builder()
+ .name("partitioned_table".to_string())
+ .location(temp_path())
+ .schema(schema)
+ .partition_spec(partition_spec)
+ .properties(HashMap::new())
+ .build();
+
+ iceberg_catalog.create_table(&namespace, creation).await?;
+
+ let client = Arc::new(iceberg_catalog);
+ let catalog =
Arc::new(IcebergCatalogProvider::try_new(client.clone()).await?);
+
+ let ctx = SessionContext::new();
+ ctx.register_catalog("catalog", catalog);
+
+ // Insert data with multiple partition values in a single batch
+ let df = ctx
+ .sql(
+ r#"
+ INSERT INTO catalog.test_partitioned_write.partitioned_table
+ VALUES
+ (1, 'electronics', 'laptop'),
+ (2, 'electronics', 'phone'),
+ (3, 'books', 'novel'),
+ (4, 'books', 'textbook'),
+ (5, 'clothing', 'shirt')
+ "#,
+ )
+ .await
+ .unwrap();
+
+ let batches = df.collect().await.unwrap();
+ assert_eq!(batches.len(), 1);
+ let batch = &batches[0];
+ let rows_inserted = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<UInt64Array>()
+ .unwrap();
+ assert_eq!(rows_inserted.value(0), 5);
+
+ // Refresh catalog to get updated table
+ let catalog =
Arc::new(IcebergCatalogProvider::try_new(client.clone()).await?);
+ ctx.register_catalog("catalog", catalog);
+
+ // Query the table to verify data
+ let df = ctx
+ .sql("SELECT * FROM catalog.test_partitioned_write.partitioned_table
ORDER BY id")
+ .await
+ .unwrap();
+
+ let batches = df.collect().await.unwrap();
+
+ // Verify the data - note that _partition column should NOT be present
+ check_record_batches(
+ batches,
+ expect![[r#"
+ Field { name: "id", data_type: Int32, nullable: false, dict_id: 0,
dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} },
+ Field { name: "category", data_type: Utf8, nullable: false,
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} },
+ Field { name: "value", data_type: Utf8, nullable: false, dict_id:
0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} }"#]],
+ expect![[r#"
+ id: PrimitiveArray<Int32>
+ [
+ 1,
+ 2,
+ 3,
+ 4,
+ 5,
+ ],
+ category: StringArray
+ [
+ "electronics",
+ "electronics",
+ "books",
+ "books",
+ "clothing",
+ ],
+ value: StringArray
+ [
+ "laptop",
+ "phone",
+ "novel",
+ "textbook",
+ "shirt",
+ ]"#]],
+ &[],
+ Some("id"),
+ );
+
+ // Verify that data files exist under correct partition paths
+ let table_ident = TableIdent::new(namespace.clone(),
"partitioned_table".to_string());
+ let table = client.load_table(&table_ident).await?;
+ let table_location = table.metadata().location();
+ let file_io = table.file_io();
+
+ // List files under each expected partition path
+ let electronics_path = format!("{}/data/category=electronics",
table_location);
+ let books_path = format!("{}/data/category=books", table_location);
+ let clothing_path = format!("{}/data/category=clothing", table_location);
+
+ // Verify partition directories exist and contain data files
+ assert!(
+ file_io.exists(&electronics_path).await?,
+ "Expected partition directory: {}",
+ electronics_path
+ );
+ assert!(
+ file_io.exists(&books_path).await?,
+ "Expected partition directory: {}",
+ books_path
+ );
+ assert!(
+ file_io.exists(&clothing_path).await?,
+ "Expected partition directory: {}",
+ clothing_path
+ );
+
+ Ok(())
+}