This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new a970a0c0 feat(datafusion): Add TaskWriter for DataFusion (#1769)
a970a0c0 is described below

commit a970a0c02f7d50e88648dd01eb50fec55373cf65
Author: Shawn Chang <[email protected]>
AuthorDate: Wed Nov 5 01:11:48 2025 -0800

    feat(datafusion): Add TaskWriter for DataFusion (#1769)
    
    ## Which issue does this PR close?
    
    - Closes #1770
    
    ## What changes are included in this PR?
    - Added TaskWriter to leverage `RecordBatchPartitionSplitter` and
    projected partition values
    - Add UnpartitionedWriter to help write unpartitioned data
    
    
    ## Are these changes tested?
    Added unit tests
---
 .../src/arrow/record_batch_partition_splitter.rs   |   6 -
 crates/iceberg/src/writer/partitioning/mod.rs      |   1 +
 .../writer/partitioning/unpartitioned_writer.rs    | 198 ++++++++
 crates/integrations/datafusion/src/lib.rs          |   2 +
 crates/integrations/datafusion/src/task_writer.rs  | 534 +++++++++++++++++++++
 5 files changed, 735 insertions(+), 6 deletions(-)

diff --git a/crates/iceberg/src/arrow/record_batch_partition_splitter.rs 
b/crates/iceberg/src/arrow/record_batch_partition_splitter.rs
index 2508a003..dcdd9c68 100644
--- a/crates/iceberg/src/arrow/record_batch_partition_splitter.rs
+++ b/crates/iceberg/src/arrow/record_batch_partition_splitter.rs
@@ -38,9 +38,6 @@ pub const PROJECTED_PARTITION_VALUE_COLUMN: &str = 
"_partition";
 /// The splitter supports two modes for obtaining partition values:
 /// - **Computed mode** (`calculator` is `Some`): Computes partition values 
from source columns using transforms
 /// - **Pre-computed mode** (`calculator` is `None`): Expects a `_partition` 
column in the input batch
-// # TODO
-// Remove this after partition writer supported.
-#[allow(dead_code)]
 pub struct RecordBatchPartitionSplitter {
     schema: SchemaRef,
     partition_spec: PartitionSpecRef,
@@ -48,9 +45,6 @@ pub struct RecordBatchPartitionSplitter {
     partition_type: StructType,
 }
 
-// # TODO
-// Remove this after partition writer supported.
-#[allow(dead_code)]
 impl RecordBatchPartitionSplitter {
     /// Create a new RecordBatchPartitionSplitter.
     ///
diff --git a/crates/iceberg/src/writer/partitioning/mod.rs 
b/crates/iceberg/src/writer/partitioning/mod.rs
index f63a9d0d..c8106041 100644
--- a/crates/iceberg/src/writer/partitioning/mod.rs
+++ b/crates/iceberg/src/writer/partitioning/mod.rs
@@ -23,6 +23,7 @@
 
 pub mod clustered_writer;
 pub mod fanout_writer;
+pub mod unpartitioned_writer;
 
 use crate::Result;
 use crate::spec::PartitionKey;
diff --git a/crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs 
b/crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs
new file mode 100644
index 00000000..0fb9cba3
--- /dev/null
+++ b/crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module provides the `UnpartitionedWriter` implementation.
+
+use std::marker::PhantomData;
+
+use crate::Result;
+use crate::writer::{DefaultInput, DefaultOutput, IcebergWriter, 
IcebergWriterBuilder};
+
+/// A simple wrapper around `IcebergWriterBuilder` for unpartitioned tables.
+///
+/// This writer lazily creates the underlying writer on the first write 
operation
+/// and writes all data to a single file (or set of files if rolling).
+///
+/// # Type Parameters
+///
+/// * `B` - The inner writer builder type
+/// * `I` - Input type (defaults to `RecordBatch`)
+/// * `O` - Output collection type (defaults to `Vec<DataFile>`)
+pub struct UnpartitionedWriter<B, I = DefaultInput, O = DefaultOutput>
+where
+    B: IcebergWriterBuilder<I, O>,
+    O: IntoIterator + FromIterator<<O as IntoIterator>::Item>,
+    <O as IntoIterator>::Item: Clone,
+{
+    inner_builder: B,
+    writer: Option<B::R>,
+    output: Vec<<O as IntoIterator>::Item>,
+    _phantom: PhantomData<I>,
+}
+
+impl<B, I, O> UnpartitionedWriter<B, I, O>
+where
+    B: IcebergWriterBuilder<I, O>,
+    I: Send + 'static,
+    O: IntoIterator + FromIterator<<O as IntoIterator>::Item>,
+    <O as IntoIterator>::Item: Send + Clone,
+{
+    /// Create a new `UnpartitionedWriter`.
+    pub fn new(inner_builder: B) -> Self {
+        Self {
+            inner_builder,
+            writer: None,
+            output: Vec::new(),
+            _phantom: PhantomData,
+        }
+    }
+
+    /// Write data to the writer.
+    ///
+    /// The underlying writer is lazily created on the first write operation.
+    ///
+    /// # Parameters
+    ///
+    /// * `input` - The input data to write
+    ///
+    /// # Returns
+    ///
+    /// `Ok(())` on success, or an error if the write operation fails.
+    pub async fn write(&mut self, input: I) -> Result<()> {
+        // Lazily create writer on first write
+        if self.writer.is_none() {
+            self.writer = Some(self.inner_builder.clone().build(None).await?);
+        }
+
+        // Write directly to inner writer
+        self.writer
+            .as_mut()
+            .expect("Writer should be initialized")
+            .write(input)
+            .await
+    }
+
+    /// Close the writer and return all written data files.
+    ///
+    /// This method consumes the writer to prevent further use.
+    ///
+    /// # Returns
+    ///
+    /// The accumulated output from all write operations, or an empty 
collection
+    /// if no data was written.
+    pub async fn close(mut self) -> Result<O> {
+        if let Some(mut writer) = self.writer.take() {
+            self.output.extend(writer.close().await?);
+        }
+        Ok(O::from_iter(self.output))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+    use std::sync::Arc;
+
+    use arrow_array::{Int32Array, RecordBatch, StringArray};
+    use arrow_schema::{DataType, Field, Schema};
+    use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
+    use parquet::file::properties::WriterProperties;
+    use tempfile::TempDir;
+
+    use super::*;
+    use crate::Result;
+    use crate::io::FileIOBuilder;
+    use crate::spec::{DataFileFormat, NestedField, PrimitiveType, Struct, 
Type};
+    use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
+    use crate::writer::file_writer::ParquetWriterBuilder;
+    use crate::writer::file_writer::location_generator::{
+        DefaultFileNameGenerator, DefaultLocationGenerator,
+    };
+    use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
+
+    #[tokio::test]
+    async fn test_unpartitioned_writer() -> Result<()> {
+        let temp_dir = TempDir::new()?;
+
+        // Build Iceberg schema
+        let schema = Arc::new(
+            crate::spec::Schema::builder()
+                .with_schema_id(1)
+                .with_fields(vec![
+                    NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                    NestedField::required(2, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+                ])
+                .build()?,
+        );
+
+        // Build Arrow schema
+        let arrow_schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "1".to_string(),
+            )])),
+            Field::new("name", DataType::Utf8, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "2".to_string(),
+            )])),
+        ]));
+
+        // Build writer
+        let file_io = FileIOBuilder::new_fs_io().build()?;
+        let location_gen = DefaultLocationGenerator::with_data_location(
+            temp_dir.path().to_str().unwrap().to_string(),
+        );
+        let file_name_gen =
+            DefaultFileNameGenerator::new("test".to_string(), None, 
DataFileFormat::Parquet);
+        let parquet_writer_builder =
+            ParquetWriterBuilder::new(WriterProperties::builder().build(), 
schema.clone());
+        let rolling_writer_builder = 
RollingFileWriterBuilder::new_with_default_file_size(
+            parquet_writer_builder,
+            file_io,
+            location_gen,
+            file_name_gen,
+        );
+        let writer_builder = 
DataFileWriterBuilder::new(rolling_writer_builder);
+
+        let mut writer = UnpartitionedWriter::new(writer_builder);
+
+        // Write two batches
+        let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![
+            Arc::new(Int32Array::from(vec![1, 2])),
+            Arc::new(StringArray::from(vec!["Alice", "Bob"])),
+        ])?;
+        let batch2 = RecordBatch::try_new(arrow_schema, vec![
+            Arc::new(Int32Array::from(vec![3, 4])),
+            Arc::new(StringArray::from(vec!["Charlie", "Dave"])),
+        ])?;
+
+        writer.write(batch1).await?;
+        writer.write(batch2).await?;
+
+        let data_files = writer.close().await?;
+
+        // Verify files have empty partition and correct format
+        assert!(!data_files.is_empty());
+        for file in &data_files {
+            assert_eq!(file.partition, Struct::empty());
+            assert_eq!(file.file_format, DataFileFormat::Parquet);
+            assert_eq!(file.record_count, 4);
+        }
+
+        Ok(())
+    }
+}
diff --git a/crates/integrations/datafusion/src/lib.rs 
b/crates/integrations/datafusion/src/lib.rs
index 09d1cac4..4b0ea860 100644
--- a/crates/integrations/datafusion/src/lib.rs
+++ b/crates/integrations/datafusion/src/lib.rs
@@ -26,3 +26,5 @@ mod schema;
 pub mod table;
 pub use table::table_provider_factory::IcebergTableProviderFactory;
 pub use table::*;
+
+pub(crate) mod task_writer;
diff --git a/crates/integrations/datafusion/src/task_writer.rs 
b/crates/integrations/datafusion/src/task_writer.rs
new file mode 100644
index 00000000..d27b2e6f
--- /dev/null
+++ b/crates/integrations/datafusion/src/task_writer.rs
@@ -0,0 +1,534 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TaskWriter for DataFusion integration.
+//!
+//! This module provides a high-level writer that handles partitioning and 
routing
+//! of RecordBatch data to Iceberg tables.
+
+use datafusion::arrow::array::RecordBatch;
+use iceberg::Result;
+use iceberg::arrow::RecordBatchPartitionSplitter;
+use iceberg::spec::{DataFile, PartitionSpecRef, SchemaRef};
+use iceberg::writer::IcebergWriterBuilder;
+use iceberg::writer::partitioning::PartitioningWriter;
+use iceberg::writer::partitioning::clustered_writer::ClusteredWriter;
+use iceberg::writer::partitioning::fanout_writer::FanoutWriter;
+use iceberg::writer::partitioning::unpartitioned_writer::UnpartitionedWriter;
+
+/// High-level writer for DataFusion that handles partitioning and routing of 
RecordBatch data.
+///
+/// TaskWriter coordinates writing data to Iceberg tables by:
+/// - Selecting the appropriate partitioning strategy (unpartitioned, fanout, 
or clustered)
+/// - Lazily initializing the partition splitter on first write
+/// - Routing data to the underlying writer
+/// - Collecting all written data files
+///
+/// # Type Parameters
+///
+/// * `B` - The IcebergWriterBuilder type used to create underlying writers
+///
+/// # Example
+///
+/// ```rust,ignore
+/// use iceberg::spec::{PartitionSpec, Schema};
+/// use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
+/// use iceberg_datafusion::writer::task_writer::TaskWriter;
+///
+/// // Create a TaskWriter for an unpartitioned table
+/// let task_writer = TaskWriter::new(
+///     data_file_writer_builder,
+///     false, // fanout_enabled
+///     schema,
+///     partition_spec,
+/// );
+///
+/// // Write data
+/// task_writer.write(record_batch).await?;
+///
+/// // Close and get data files
+/// let data_files = task_writer.close().await?;
+/// ```
+#[allow(dead_code)]
+pub(crate) struct TaskWriter<B: IcebergWriterBuilder> {
+    /// The underlying writer (UnpartitionedWriter, FanoutWriter, or 
ClusteredWriter)
+    writer: SupportedWriter<B>,
+    /// Lazily initialized partition splitter for partitioned tables
+    partition_splitter: Option<RecordBatchPartitionSplitter>,
+    /// Iceberg schema reference
+    schema: SchemaRef,
+    /// Partition specification reference
+    partition_spec: PartitionSpecRef,
+}
+
+/// Internal enum to hold the different writer types.
+///
+/// This enum allows TaskWriter to work with different partitioning strategies
+/// while maintaining a unified interface.
+#[allow(dead_code)]
+enum SupportedWriter<B: IcebergWriterBuilder> {
+    /// Writer for unpartitioned tables
+    Unpartitioned(UnpartitionedWriter<B>),
+    /// Writer for partitioned tables with unsorted data (maintains multiple 
active writers)
+    Fanout(FanoutWriter<B>),
+    /// Writer for partitioned tables with sorted data (maintains single 
active writer)
+    Clustered(ClusteredWriter<B>),
+}
+
+#[allow(dead_code)]
+impl<B: IcebergWriterBuilder> TaskWriter<B> {
+    /// Create a new TaskWriter.
+    ///
+    /// # Parameters
+    ///
+    /// * `writer_builder` - The IcebergWriterBuilder to use for creating 
underlying writers
+    /// * `fanout_enabled` - If true, use FanoutWriter for partitioned tables; 
otherwise use ClusteredWriter
+    /// * `schema` - The Iceberg schema reference
+    /// * `partition_spec` - The partition specification reference
+    ///
+    /// # Returns
+    ///
+    /// Returns a new TaskWriter instance.
+    ///
+    /// # Writer Selection Logic
+    ///
+    /// - If partition_spec is unpartitioned: creates UnpartitionedWriter
+    /// - If partition_spec is partitioned AND fanout_enabled is true: creates 
FanoutWriter
+    /// - If partition_spec is partitioned AND fanout_enabled is false: 
creates ClusteredWriter
+    ///
+    /// # Example
+    ///
+    /// ```rust,ignore
+    /// use iceberg::spec::{PartitionSpec, Schema};
+    /// use 
iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
+    /// use iceberg_datafusion::writer::task_writer::TaskWriter;
+    ///
+    /// // Create a TaskWriter for an unpartitioned table
+    /// let task_writer = TaskWriter::new(
+    ///     data_file_writer_builder,
+    ///     false, // fanout_enabled
+    ///     schema,
+    ///     partition_spec,
+    /// );
+    /// ```
+    pub fn new(
+        writer_builder: B,
+        fanout_enabled: bool,
+        schema: SchemaRef,
+        partition_spec: PartitionSpecRef,
+    ) -> Self {
+        let writer = if partition_spec.is_unpartitioned() {
+            
SupportedWriter::Unpartitioned(UnpartitionedWriter::new(writer_builder))
+        } else if fanout_enabled {
+            SupportedWriter::Fanout(FanoutWriter::new(writer_builder))
+        } else {
+            SupportedWriter::Clustered(ClusteredWriter::new(writer_builder))
+        };
+
+        Self {
+            writer,
+            partition_splitter: None,
+            schema,
+            partition_spec,
+        }
+    }
+
+    /// Write a RecordBatch to the TaskWriter.
+    ///
+    /// For the first write to a partitioned table, this method initializes 
the partition splitter.
+    /// For unpartitioned tables, data is written directly without splitting.
+    ///
+    /// # Parameters
+    ///
+    /// * `batch` - The RecordBatch to write
+    ///
+    /// # Returns
+    ///
+    /// Returns `Ok(())` on success, or an error if the write fails.
+    ///
+    /// # Errors
+    ///
+    /// This method will return an error if:
+    /// - Partition splitter initialization fails
+    /// - Splitting the batch by partition fails
+    /// - Writing to the underlying writer fails
+    ///
+    /// # Example
+    ///
+    /// ```rust,ignore
+    /// use arrow_array::RecordBatch;
+    /// use iceberg_datafusion::writer::task_writer::TaskWriter;
+    ///
+    /// // Write a RecordBatch
+    /// task_writer.write(record_batch).await?;
+    /// ```
+    pub async fn write(&mut self, batch: RecordBatch) -> Result<()> {
+        match &mut self.writer {
+            SupportedWriter::Unpartitioned(writer) => {
+                // Unpartitioned: write directly without splitting
+                writer.write(batch).await
+            }
+            SupportedWriter::Fanout(writer) => {
+                // Initialize splitter on first write if needed
+                if self.partition_splitter.is_none() {
+                    self.partition_splitter =
+                        
Some(RecordBatchPartitionSplitter::new_with_precomputed_values(
+                            self.schema.clone(),
+                            self.partition_spec.clone(),
+                        )?);
+                }
+
+                // Split and write partitioned data
+                Self::write_partitioned_batches(writer, 
&self.partition_splitter, &batch).await
+            }
+            SupportedWriter::Clustered(writer) => {
+                // Initialize splitter on first write if needed
+                if self.partition_splitter.is_none() {
+                    self.partition_splitter =
+                        
Some(RecordBatchPartitionSplitter::new_with_precomputed_values(
+                            self.schema.clone(),
+                            self.partition_spec.clone(),
+                        )?);
+                }
+
+                // Split and write partitioned data
+                Self::write_partitioned_batches(writer, 
&self.partition_splitter, &batch).await
+            }
+        }
+    }
+
+    /// Helper method to split and write partitioned data.
+    ///
+    /// This method handles the common logic for both FanoutWriter and 
ClusteredWriter:
+    /// - Splits the batch by partition key using the provided splitter
+    /// - Writes each partition to the underlying writer
+    ///
+    /// # Parameters
+    ///
+    /// * `writer` - The underlying PartitioningWriter (FanoutWriter or 
ClusteredWriter)
+    /// * `partition_splitter` - The partition splitter (must be initialized)
+    /// * `batch` - The RecordBatch to write
+    ///
+    /// # Returns
+    ///
+    /// Returns `Ok(())` on success, or an error if the operation fails.
+    async fn write_partitioned_batches<W: PartitioningWriter>(
+        writer: &mut W,
+        partition_splitter: &Option<RecordBatchPartitionSplitter>,
+        batch: &RecordBatch,
+    ) -> Result<()> {
+        // Split batch by partition
+        let splitter = partition_splitter
+            .as_ref()
+            .expect("Partition splitter should be initialized");
+        let partitioned_batches = splitter.split(batch)?;
+
+        // Write each partition
+        for (partition_key, partition_batch) in partitioned_batches {
+            writer.write(partition_key, partition_batch).await?;
+        }
+
+        Ok(())
+    }
+
+    /// Close the TaskWriter and return all written data files.
+    ///
+    /// This method consumes the TaskWriter to prevent further use.
+    ///
+    /// # Returns
+    ///
+    /// Returns a `Vec<DataFile>` containing all written files, or an error if 
closing fails.
+    ///
+    /// # Errors
+    ///
+    /// This method will return an error if:
+    /// - Closing the underlying writer fails
+    /// - Any I/O operation fails during the close process
+    ///
+    /// # Example
+    ///
+    /// ```rust,ignore
+    /// use iceberg_datafusion::writer::task_writer::TaskWriter;
+    ///
+    /// // Close the writer and get all data files
+    /// let data_files = task_writer.close().await?;
+    /// ```
+    pub async fn close(self) -> Result<Vec<DataFile>> {
+        match self.writer {
+            SupportedWriter::Unpartitioned(writer) => writer.close().await,
+            SupportedWriter::Fanout(writer) => writer.close().await,
+            SupportedWriter::Clustered(writer) => writer.close().await,
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+    use std::sync::Arc;
+
+    use datafusion::arrow::array::{ArrayRef, Int32Array, RecordBatch, 
StringArray, StructArray};
+    use datafusion::arrow::datatypes::{DataType, Field, Schema};
+    use iceberg::arrow::PROJECTED_PARTITION_VALUE_COLUMN;
+    use iceberg::io::FileIOBuilder;
+    use iceberg::spec::{DataFileFormat, NestedField, PartitionSpec, 
PrimitiveType, Type};
+    use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
+    use iceberg::writer::file_writer::ParquetWriterBuilder;
+    use iceberg::writer::file_writer::location_generator::{
+        DefaultFileNameGenerator, DefaultLocationGenerator,
+    };
+    use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
+    use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
+    use parquet::file::properties::WriterProperties;
+    use tempfile::TempDir;
+
+    use super::*;
+
+    fn create_test_schema() -> Result<Arc<iceberg::spec::Schema>> {
+        Ok(Arc::new(
+            iceberg::spec::Schema::builder()
+                .with_schema_id(1)
+                .with_fields(vec![
+                    NestedField::required(1, "id", 
Type::Primitive(PrimitiveType::Int)).into(),
+                    NestedField::required(2, "name", 
Type::Primitive(PrimitiveType::String)).into(),
+                    NestedField::required(3, "region", 
Type::Primitive(PrimitiveType::String))
+                        .into(),
+                ])
+                .build()?,
+        ))
+    }
+
+    fn create_arrow_schema() -> Arc<Schema> {
+        Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "1".to_string(),
+            )])),
+            Field::new("name", DataType::Utf8, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "2".to_string(),
+            )])),
+            Field::new("region", DataType::Utf8, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "3".to_string(),
+            )])),
+        ]))
+    }
+
+    fn create_arrow_schema_with_partition() -> Arc<Schema> {
+        let partition_field = Field::new("region", DataType::Utf8, 
false).with_metadata(
+            HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), 
"1000".to_string())]),
+        );
+        let partition_struct_field = Field::new(
+            PROJECTED_PARTITION_VALUE_COLUMN,
+            DataType::Struct(vec![partition_field].into()),
+            false,
+        );
+
+        Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "1".to_string(),
+            )])),
+            Field::new("name", DataType::Utf8, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "2".to_string(),
+            )])),
+            Field::new("region", DataType::Utf8, 
false).with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "3".to_string(),
+            )])),
+            partition_struct_field,
+        ]))
+    }
+
+    fn create_writer_builder(
+        temp_dir: &TempDir,
+        schema: Arc<iceberg::spec::Schema>,
+    ) -> Result<
+        DataFileWriterBuilder<
+            ParquetWriterBuilder,
+            DefaultLocationGenerator,
+            DefaultFileNameGenerator,
+        >,
+    > {
+        let file_io = FileIOBuilder::new_fs_io().build()?;
+        let location_gen = DefaultLocationGenerator::with_data_location(
+            temp_dir.path().to_str().unwrap().to_string(),
+        );
+        let file_name_gen =
+            DefaultFileNameGenerator::new("test".to_string(), None, 
DataFileFormat::Parquet);
+        let parquet_writer_builder =
+            ParquetWriterBuilder::new(WriterProperties::builder().build(), 
schema);
+        let rolling_writer_builder = 
RollingFileWriterBuilder::new_with_default_file_size(
+            parquet_writer_builder,
+            file_io,
+            location_gen,
+            file_name_gen,
+        );
+        Ok(DataFileWriterBuilder::new(rolling_writer_builder))
+    }
+
+    #[tokio::test]
+    async fn test_task_writer_unpartitioned() -> Result<()> {
+        let temp_dir = TempDir::new()?;
+        let schema = create_test_schema()?;
+        let arrow_schema = create_arrow_schema();
+
+        // Create unpartitioned spec
+        let partition_spec = 
Arc::new(PartitionSpec::builder(schema.clone()).build()?);
+
+        let writer_builder = create_writer_builder(&temp_dir, schema.clone())?;
+        let mut task_writer = TaskWriter::new(writer_builder, false, schema, 
partition_spec);
+
+        // Write data
+        let batch = RecordBatch::try_new(arrow_schema, vec![
+            Arc::new(Int32Array::from(vec![1, 2, 3])),
+            Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
+            Arc::new(StringArray::from(vec!["US", "EU", "US"])),
+        ])?;
+
+        task_writer.write(batch).await?;
+        let data_files = task_writer.close().await?;
+
+        // Verify results
+        assert!(!data_files.is_empty());
+        assert_eq!(data_files[0].record_count(), 3);
+
+        Ok(())
+    }
+
+    /// Helper to verify partition data files
+    fn verify_partition_files(
+        data_files: &[iceberg::spec::DataFile],
+        expected_total: u64,
+    ) -> HashMap<String, u64> {
+        let total_records: u64 = data_files.iter().map(|f| 
f.record_count()).sum();
+        assert_eq!(total_records, expected_total, "Total record count 
mismatch");
+
+        let mut partition_counts = HashMap::new();
+        for data_file in data_files {
+            let partition_value = data_file.partition();
+            let region_literal = partition_value.fields()[0]
+                .as_ref()
+                .expect("Partition value should not be null");
+            let region = match region_literal
+                .as_primitive_literal()
+                .expect("Should be primitive literal")
+            {
+                iceberg::spec::PrimitiveLiteral::String(s) => s.clone(),
+                _ => panic!("Expected string partition value"),
+            };
+
+            *partition_counts.entry(region.clone()).or_insert(0) += 
data_file.record_count();
+
+            // Verify file path contains partition information
+            assert!(
+                data_file.file_path().contains("region="),
+                "File path should contain partition info"
+            );
+        }
+        partition_counts
+    }
+
+    #[tokio::test]
+    async fn test_task_writer_partitioned_fanout() -> Result<()> {
+        let temp_dir = TempDir::new()?;
+        let schema = create_test_schema()?;
+        let arrow_schema = create_arrow_schema_with_partition();
+
+        let partition_spec = Arc::new(
+            PartitionSpec::builder(schema.clone())
+                .with_spec_id(1)
+                .add_partition_field("region", "region", 
iceberg::spec::Transform::Identity)?
+                .build()?,
+        );
+
+        let writer_builder = create_writer_builder(&temp_dir, schema.clone())?;
+        let mut task_writer = TaskWriter::new(writer_builder, true, schema, 
partition_spec);
+
+        // Create partition column
+        let partition_field = Field::new("region", DataType::Utf8, 
false).with_metadata(
+            HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), 
"1000".to_string())]),
+        );
+        let partition_values = StringArray::from(vec!["US", "EU", "US", "EU"]);
+        let partition_struct = StructArray::from(vec![(
+            Arc::new(partition_field),
+            Arc::new(partition_values) as ArrayRef,
+        )]);
+
+        let batch = RecordBatch::try_new(arrow_schema, vec![
+            Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
+            Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", 
"Dave"])),
+            Arc::new(StringArray::from(vec!["US", "EU", "US", "EU"])),
+            Arc::new(partition_struct),
+        ])?;
+
+        task_writer.write(batch).await?;
+        let data_files = task_writer.close().await?;
+
+        let partition_counts = verify_partition_files(&data_files, 4);
+        assert_eq!(partition_counts.get("US"), Some(&2));
+        assert_eq!(partition_counts.get("EU"), Some(&2));
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_task_writer_partitioned_clustered() -> Result<()> {
+        let temp_dir = TempDir::new()?;
+        let schema = create_test_schema()?;
+        let arrow_schema = create_arrow_schema_with_partition();
+
+        let partition_spec = Arc::new(
+            PartitionSpec::builder(schema.clone())
+                .with_spec_id(1)
+                .add_partition_field("region", "region", 
iceberg::spec::Transform::Identity)?
+                .build()?,
+        );
+
+        let writer_builder = create_writer_builder(&temp_dir, schema.clone())?;
+        let mut task_writer = TaskWriter::new(writer_builder, false, schema, 
partition_spec);
+
+        // Create partition column
+        let partition_field = Field::new("region", DataType::Utf8, 
false).with_metadata(
+            HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), 
"1000".to_string())]),
+        );
+        let partition_values = StringArray::from(vec!["ASIA", "ASIA", "EU", 
"EU"]);
+        let partition_struct = StructArray::from(vec![(
+            Arc::new(partition_field),
+            Arc::new(partition_values) as ArrayRef,
+        )]);
+
+        // ClusteredWriter expects data to be pre-sorted by partition
+        let batch = RecordBatch::try_new(arrow_schema, vec![
+            Arc::new(Int32Array::from(vec![1, 2, 3, 4])),
+            Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", 
"Dave"])),
+            Arc::new(StringArray::from(vec!["ASIA", "ASIA", "EU", "EU"])),
+            Arc::new(partition_struct),
+        ])?;
+
+        task_writer.write(batch).await?;
+        let data_files = task_writer.close().await?;
+
+        let partition_counts = verify_partition_files(&data_files, 4);
+        assert_eq!(partition_counts.get("ASIA"), Some(&2));
+        assert_eq!(partition_counts.get("EU"), Some(&2));
+
+        Ok(())
+    }
+}

Reply via email to