This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 42191e97 refactor(writer): Refactor writers for the future 
partitioning writers (#1657)
42191e97 is described below

commit 42191e97c4b661b5098233bf6c4015a51a31a3b6
Author: Shawn Chang <[email protected]>
AuthorDate: Thu Oct 9 03:35:34 2025 -0700

    refactor(writer): Refactor writers for the future partitioning writers 
(#1657)
    
    ## Which issue does this PR close?
    
    
    - Closes #1650
    
    ## What changes are included in this PR?
    Refactored the writer layers; from a bird’s-eye view, the structure now
    looks like this:
    ```mermaid
    flowchart TD
        subgraph PartitioningWriter
            PW[PartitioningWriter]
    
            subgraph DataFileWriter
                RW[DataFileWriter]
    
                subgraph RollingWriter
                    DFW[RollingWriter]
    
                    subgraph FileWriter
                        FW[FileWriter]
                    end
    
                    DFW --> FW
                end
    
                RW --> DFW
            end
    
            PW --> RW
        end
    
    
    ```
    
    
    ### Key Changes
    - Modified `RollingFileWriter` to handle location generator, file name
    generator, and partition keys directly
    - Simplified `ParquetWriterBuilder` interface to accept output files
    during build
    - Restructured `DataFileWriterBuilder` to use `RollingFileWriter` with
    partition keys
    - Updated DataFusion integration to work with the new writer
    architecture
    - NOTE: Technically DataFusion or any engine should use `TaskWriter` ->
    `PartitioningWriter` -> `RollingWriter` -> ..., but `TaskWriter` and
    `PartitioningWriter` are not included in this draft so far
    
    
    
    ## Are these changes tested?
    Not yet, but changing the existing tests accordingly should be enough
---
 crates/iceberg/src/spec/manifest/data_file.rs      |   4 +-
 crates/iceberg/src/spec/partition.rs               |  24 ++
 .../src/writer/base_writer/data_file_writer.rs     | 153 ++++++++-----
 .../writer/base_writer/equality_delete_writer.rs   | 148 ++++++++-----
 crates/iceberg/src/writer/file_writer/mod.rs       |   4 +-
 .../src/writer/file_writer/parquet_writer.rs       | 245 ++++++++-------------
 .../src/writer/file_writer/rolling_writer.rs       | 212 ++++++++++++++----
 crates/iceberg/src/writer/mod.rs                   |  31 ++-
 .../tests/shared_tests/append_data_file_test.rs    |   7 +-
 .../append_partition_data_file_test.rs             |  76 +++----
 .../tests/shared_tests/conflict_commit_test.rs     |   7 +-
 .../tests/shared_tests/scan_all_type.rs            |   7 +-
 .../datafusion/src/physical_plan/write.rs          |  27 ++-
 13 files changed, 570 insertions(+), 375 deletions(-)

diff --git a/crates/iceberg/src/spec/manifest/data_file.rs 
b/crates/iceberg/src/spec/manifest/data_file.rs
index 931f9441..d7455b56 100644
--- a/crates/iceberg/src/spec/manifest/data_file.rs
+++ b/crates/iceberg/src/spec/manifest/data_file.rs
@@ -26,7 +26,7 @@ use serde_with::{DeserializeFromStr, SerializeDisplay};
 use super::_serde::DataFileSerde;
 use super::{Datum, FormatVersion, Schema, data_file_schema_v1, 
data_file_schema_v2};
 use crate::error::Result;
-use crate::spec::{Struct, StructType};
+use crate::spec::{DEFAULT_PARTITION_SPEC_ID, Struct, StructType};
 use crate::{Error, ErrorKind};
 
 /// Data file carries data file path, partition tuple, metrics, …
@@ -49,6 +49,7 @@ pub struct DataFile {
     ///
     /// Partition data tuple, schema based on the partition spec output using
     /// partition field ids for the struct field ids
+    #[builder(default = "Struct::empty()")]
     pub(crate) partition: Struct,
     /// field id: 103
     ///
@@ -156,6 +157,7 @@ pub struct DataFile {
     pub(crate) first_row_id: Option<i64>,
     /// This field is not included in spec. It is just store in memory 
representation used
     /// in process.
+    #[builder(default = "DEFAULT_PARTITION_SPEC_ID")]
     pub(crate) partition_spec_id: i32,
     /// field id: 143
     ///
diff --git a/crates/iceberg/src/spec/partition.rs 
b/crates/iceberg/src/spec/partition.rs
index fff9b62d..128db338 100644
--- a/crates/iceberg/src/spec/partition.rs
+++ b/crates/iceberg/src/spec/partition.rs
@@ -194,6 +194,15 @@ impl PartitionKey {
         Self { spec, schema, data }
     }
 
+    /// Creates a new partition key from another partition key, with a new 
data field.
+    pub fn copy_with_data(&self, data: Struct) -> Self {
+        Self {
+            spec: self.spec.clone(),
+            schema: self.schema.clone(),
+            data,
+        }
+    }
+
     /// Generates a partition path based on the partition values.
     pub fn to_path(&self) -> String {
         self.spec.partition_to_path(&self.data, self.schema.clone())
@@ -207,6 +216,21 @@ impl PartitionKey {
             Some(pk) => pk.spec.is_unpartitioned(),
         }
     }
+
+    /// Returns the associated [`PartitionSpec`].
+    pub fn spec(&self) -> &PartitionSpec {
+        &self.spec
+    }
+
+    /// Returns the associated [`SchemaRef`].
+    pub fn schema(&self) -> &SchemaRef {
+        &self.schema
+    }
+
+    /// Returns the associated [`Struct`].
+    pub fn data(&self) -> &Struct {
+        &self.data
+    }
 }
 
 /// Reference to [`UnboundPartitionSpec`].
diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs 
b/crates/iceberg/src/writer/base_writer/data_file_writer.rs
index f215673d..a950547d 100644
--- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs
+++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs
@@ -18,86 +18,126 @@
 //! This module provide `DataFileWriter`.
 
 use arrow_array::RecordBatch;
-use itertools::Itertools;
 
-use crate::Result;
-use crate::spec::{DataContentType, DataFile, Struct};
-use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
+use crate::spec::{DataContentType, DataFile, PartitionKey};
+use crate::writer::file_writer::FileWriterBuilder;
+use crate::writer::file_writer::location_generator::{FileNameGenerator, 
LocationGenerator};
+use crate::writer::file_writer::rolling_writer::{RollingFileWriter, 
RollingFileWriterBuilder};
 use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
+use crate::{Error, ErrorKind, Result};
 
 /// Builder for `DataFileWriter`.
 #[derive(Clone, Debug)]
-pub struct DataFileWriterBuilder<B: FileWriterBuilder> {
-    inner: B,
-    partition_value: Option<Struct>,
-    partition_spec_id: i32,
+pub struct DataFileWriterBuilder<B: FileWriterBuilder, L: LocationGenerator, 
F: FileNameGenerator> {
+    inner: RollingFileWriterBuilder<B, L, F>,
+    partition_key: Option<PartitionKey>,
 }
 
-impl<B: FileWriterBuilder> DataFileWriterBuilder<B> {
-    /// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`.
-    pub fn new(inner: B, partition_value: Option<Struct>, partition_spec_id: 
i32) -> Self {
+impl<B, L, F> DataFileWriterBuilder<B, L, F>
+where
+    B: FileWriterBuilder,
+    L: LocationGenerator,
+    F: FileNameGenerator,
+{
+    /// Create a new `DataFileWriterBuilder` using a 
`RollingFileWriterBuilder`.
+    pub fn new(
+        inner_builder: RollingFileWriterBuilder<B, L, F>,
+        partition_key: Option<PartitionKey>,
+    ) -> Self {
         Self {
-            inner,
-            partition_value,
-            partition_spec_id,
+            inner: inner_builder,
+            partition_key,
         }
     }
 }
 
 #[async_trait::async_trait]
-impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
-    type R = DataFileWriter<B>;
+impl<B, L, F> IcebergWriterBuilder for DataFileWriterBuilder<B, L, F>
+where
+    B: FileWriterBuilder,
+    L: LocationGenerator,
+    F: FileNameGenerator,
+{
+    type R = DataFileWriter<B, L, F>;
 
     async fn build(self) -> Result<Self::R> {
         Ok(DataFileWriter {
-            inner_writer: Some(self.inner.clone().build().await?),
-            partition_value: self.partition_value.unwrap_or(Struct::empty()),
-            partition_spec_id: self.partition_spec_id,
+            inner: Some(self.inner.clone().build()),
+            partition_key: self.partition_key,
         })
     }
 }
 
 /// A writer write data is within one spec/partition.
 #[derive(Debug)]
-pub struct DataFileWriter<B: FileWriterBuilder> {
-    inner_writer: Option<B::R>,
-    partition_value: Struct,
-    partition_spec_id: i32,
+pub struct DataFileWriter<B: FileWriterBuilder, L: LocationGenerator, F: 
FileNameGenerator> {
+    inner: Option<RollingFileWriter<B, L, F>>,
+    partition_key: Option<PartitionKey>,
 }
 
 #[async_trait::async_trait]
-impl<B: FileWriterBuilder> IcebergWriter for DataFileWriter<B> {
+impl<B, L, F> IcebergWriter for DataFileWriter<B, L, F>
+where
+    B: FileWriterBuilder,
+    L: LocationGenerator,
+    F: FileNameGenerator,
+{
     async fn write(&mut self, batch: RecordBatch) -> Result<()> {
-        self.inner_writer.as_mut().unwrap().write(&batch).await
+        if let Some(writer) = self.inner.as_mut() {
+            writer.write(&self.partition_key, &batch).await
+        } else {
+            Err(Error::new(
+                ErrorKind::Unexpected,
+                "Writer is not initialized!",
+            ))
+        }
     }
 
     async fn close(&mut self) -> Result<Vec<DataFile>> {
-        let writer = self.inner_writer.take().unwrap();
-        Ok(writer
-            .close()
-            .await?
-            .into_iter()
-            .map(|mut res| {
-                res.content(DataContentType::Data);
-                res.partition(self.partition_value.clone());
-                res.partition_spec_id(self.partition_spec_id);
-                res.build().expect("Guaranteed to be valid")
-            })
-            .collect_vec())
+        if let Some(writer) = self.inner.take() {
+            writer
+                .close()
+                .await?
+                .into_iter()
+                .map(|mut res| {
+                    res.content(DataContentType::Data);
+                    if let Some(pk) = self.partition_key.as_ref() {
+                        res.partition(pk.data().clone());
+                        res.partition_spec_id(pk.spec().spec_id());
+                    }
+                    res.build().map_err(|e| {
+                        Error::new(
+                            ErrorKind::DataInvalid,
+                            format!("Failed to build data file: {}", e),
+                        )
+                    })
+                })
+                .collect()
+        } else {
+            Err(Error::new(
+                ErrorKind::Unexpected,
+                "Data file writer has been closed.",
+            ))
+        }
     }
 }
 
-impl<B: FileWriterBuilder> CurrentFileStatus for DataFileWriter<B> {
+impl<B, L, F> CurrentFileStatus for DataFileWriter<B, L, F>
+where
+    B: FileWriterBuilder,
+    L: LocationGenerator,
+    F: FileNameGenerator,
+{
     fn current_file_path(&self) -> String {
-        self.inner_writer.as_ref().unwrap().current_file_path()
+        self.inner.as_ref().unwrap().current_file_path()
     }
 
     fn current_row_num(&self) -> usize {
-        self.inner_writer.as_ref().unwrap().current_row_num()
+        self.inner.as_ref().unwrap().current_row_num()
     }
 
     fn current_written_size(&self) -> usize {
-        self.inner_writer.as_ref().unwrap().current_written_size()
+        self.inner.as_ref().unwrap().current_written_size()
     }
 }
 
@@ -116,13 +156,15 @@ mod test {
     use crate::Result;
     use crate::io::FileIOBuilder;
     use crate::spec::{
-        DataContentType, DataFileFormat, Literal, NestedField, PrimitiveType, 
Schema, Struct, Type,
+        DataContentType, DataFileFormat, Literal, NestedField, PartitionKey, 
PartitionSpec,
+        PrimitiveType, Schema, Struct, Type,
     };
     use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
     use crate::writer::file_writer::ParquetWriterBuilder;
     use crate::writer::file_writer::location_generator::{
         DefaultFileNameGenerator, DefaultLocationGenerator,
     };
+    use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
     use crate::writer::{IcebergWriter, IcebergWriterBuilder, RecordBatch};
 
     #[tokio::test]
@@ -143,16 +185,16 @@ mod test {
             ])
             .build()?;
 
-        let pw = ParquetWriterBuilder::new(
-            WriterProperties::builder().build(),
-            Arc::new(schema),
-            None,
+        let pw = 
ParquetWriterBuilder::new(WriterProperties::builder().build(), 
Arc::new(schema));
+
+        let rolling_file_writer_builder = 
RollingFileWriterBuilder::new_with_default_file_size(
+            pw,
             file_io.clone(),
             location_gen,
             file_name_gen,
         );
 
-        let mut data_file_writer = DataFileWriterBuilder::new(pw, None, 0)
+        let mut data_file_writer = 
DataFileWriterBuilder::new(rolling_file_writer_builder, None)
             .build()
             .await
             .unwrap();
@@ -219,20 +261,27 @@ mod test {
                 NestedField::required(6, "name", 
Type::Primitive(PrimitiveType::String)).into(),
             ])
             .build()?;
+        let schema_ref = Arc::new(schema);
 
         let partition_value = Struct::from_iter([Some(Literal::int(1))]);
+        let partition_key = PartitionKey::new(
+            PartitionSpec::builder(schema_ref.clone()).build()?,
+            schema_ref.clone(),
+            partition_value.clone(),
+        );
 
-        let parquet_writer_builder = ParquetWriterBuilder::new(
-            WriterProperties::builder().build(),
-            Arc::new(schema.clone()),
-            None,
+        let parquet_writer_builder =
+            ParquetWriterBuilder::new(WriterProperties::builder().build(), 
schema_ref.clone());
+
+        let rolling_file_writer_builder = 
RollingFileWriterBuilder::new_with_default_file_size(
+            parquet_writer_builder,
             file_io.clone(),
             location_gen,
             file_name_gen,
         );
 
         let mut data_file_writer =
-            DataFileWriterBuilder::new(parquet_writer_builder, 
Some(partition_value.clone()), 0)
+            DataFileWriterBuilder::new(rolling_file_writer_builder, 
Some(partition_key))
                 .build()
                 .await?;
 
diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs 
b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
index 765ff1ca..6740ed43 100644
--- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
+++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
@@ -26,21 +26,35 @@ use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
 
 use crate::arrow::record_batch_projector::RecordBatchProjector;
 use crate::arrow::schema_to_arrow_schema;
-use crate::spec::{DataFile, SchemaRef, Struct};
-use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
+use crate::spec::{DataFile, PartitionKey, SchemaRef};
+use crate::writer::file_writer::FileWriterBuilder;
+use crate::writer::file_writer::location_generator::{FileNameGenerator, 
LocationGenerator};
+use crate::writer::file_writer::rolling_writer::{RollingFileWriter, 
RollingFileWriterBuilder};
 use crate::writer::{IcebergWriter, IcebergWriterBuilder};
 use crate::{Error, ErrorKind, Result};
 
 /// Builder for `EqualityDeleteWriter`.
 #[derive(Clone, Debug)]
-pub struct EqualityDeleteFileWriterBuilder<B: FileWriterBuilder> {
-    inner: B,
+pub struct EqualityDeleteFileWriterBuilder<
+    B: FileWriterBuilder,
+    L: LocationGenerator,
+    F: FileNameGenerator,
+> {
+    inner: RollingFileWriterBuilder<B, L, F>,
     config: EqualityDeleteWriterConfig,
 }
 
-impl<B: FileWriterBuilder> EqualityDeleteFileWriterBuilder<B> {
-    /// Create a new `EqualityDeleteFileWriterBuilder` using a 
`FileWriterBuilder`.
-    pub fn new(inner: B, config: EqualityDeleteWriterConfig) -> Self {
+impl<B, L, F> EqualityDeleteFileWriterBuilder<B, L, F>
+where
+    B: FileWriterBuilder,
+    L: LocationGenerator,
+    F: FileNameGenerator,
+{
+    /// Create a new `EqualityDeleteFileWriterBuilder` using a 
`RollingFileWriterBuilder`.
+    pub fn new(
+        inner: RollingFileWriterBuilder<B, L, F>,
+        config: EqualityDeleteWriterConfig,
+    ) -> Self {
         Self { inner, config }
     }
 }
@@ -52,8 +66,7 @@ pub struct EqualityDeleteWriterConfig {
     equality_ids: Vec<i32>,
     // Projector used to project the data chunk into specific fields.
     projector: RecordBatchProjector,
-    partition_value: Struct,
-    partition_spec_id: i32,
+    partition_key: Option<PartitionKey>,
 }
 
 impl EqualityDeleteWriterConfig {
@@ -61,8 +74,7 @@ impl EqualityDeleteWriterConfig {
     pub fn new(
         equality_ids: Vec<i32>,
         original_schema: SchemaRef,
-        partition_value: Option<Struct>,
-        partition_spec_id: i32,
+        partition_key: Option<PartitionKey>,
     ) -> Result<Self> {
         let original_arrow_schema = 
Arc::new(schema_to_arrow_schema(&original_schema)?);
         let projector = RecordBatchProjector::new(
@@ -98,8 +110,7 @@ impl EqualityDeleteWriterConfig {
         Ok(Self {
             equality_ids,
             projector,
-            partition_value: partition_value.unwrap_or(Struct::empty()),
-            partition_spec_id,
+            partition_key,
         })
     }
 
@@ -110,36 +121,48 @@ impl EqualityDeleteWriterConfig {
 }
 
 #[async_trait::async_trait]
-impl<B: FileWriterBuilder> IcebergWriterBuilder for 
EqualityDeleteFileWriterBuilder<B> {
-    type R = EqualityDeleteFileWriter<B>;
+impl<B, L, F> IcebergWriterBuilder for EqualityDeleteFileWriterBuilder<B, L, F>
+where
+    B: FileWriterBuilder,
+    L: LocationGenerator,
+    F: FileNameGenerator,
+{
+    type R = EqualityDeleteFileWriter<B, L, F>;
 
     async fn build(self) -> Result<Self::R> {
         Ok(EqualityDeleteFileWriter {
-            inner_writer: Some(self.inner.clone().build().await?),
+            inner: Some(self.inner.clone().build()), // todo revisit this, 
probably still need a builder for rolling writer
             projector: self.config.projector,
             equality_ids: self.config.equality_ids,
-            partition_value: self.config.partition_value,
-            partition_spec_id: self.config.partition_spec_id,
+            partition_key: self.config.partition_key,
         })
     }
 }
 
 /// Writer used to write equality delete files.
 #[derive(Debug)]
-pub struct EqualityDeleteFileWriter<B: FileWriterBuilder> {
-    inner_writer: Option<B::R>,
+pub struct EqualityDeleteFileWriter<
+    B: FileWriterBuilder,
+    L: LocationGenerator,
+    F: FileNameGenerator,
+> {
+    inner: Option<RollingFileWriter<B, L, F>>,
     projector: RecordBatchProjector,
     equality_ids: Vec<i32>,
-    partition_value: Struct,
-    partition_spec_id: i32,
+    partition_key: Option<PartitionKey>,
 }
 
 #[async_trait::async_trait]
-impl<B: FileWriterBuilder> IcebergWriter for EqualityDeleteFileWriter<B> {
+impl<B, L, F> IcebergWriter for EqualityDeleteFileWriter<B, L, F>
+where
+    B: FileWriterBuilder,
+    L: LocationGenerator,
+    F: FileNameGenerator,
+{
     async fn write(&mut self, batch: RecordBatch) -> Result<()> {
         let batch = self.projector.project_batch(batch)?;
-        if let Some(writer) = self.inner_writer.as_mut() {
-            writer.write(&batch).await
+        if let Some(writer) = self.inner.as_mut() {
+            writer.write(&self.partition_key, &batch).await
         } else {
             Err(Error::new(
                 ErrorKind::Unexpected,
@@ -149,19 +172,26 @@ impl<B: FileWriterBuilder> IcebergWriter for 
EqualityDeleteFileWriter<B> {
     }
 
     async fn close(&mut self) -> Result<Vec<DataFile>> {
-        if let Some(writer) = self.inner_writer.take() {
-            Ok(writer
+        if let Some(writer) = self.inner.take() {
+            writer
                 .close()
                 .await?
                 .into_iter()
                 .map(|mut res| {
                     res.content(crate::spec::DataContentType::EqualityDeletes);
                     
res.equality_ids(Some(self.equality_ids.iter().copied().collect_vec()));
-                    res.partition(self.partition_value.clone());
-                    res.partition_spec_id(self.partition_spec_id);
-                    res.build().expect("msg")
+                    if let Some(pk) = self.partition_key.as_ref() {
+                        res.partition(pk.data().clone());
+                        res.partition_spec_id(pk.spec().spec_id());
+                    }
+                    res.build().map_err(|e| {
+                        Error::new(
+                            ErrorKind::DataInvalid,
+                            format!("Failed to build data file: {}", e),
+                        )
+                    })
                 })
-                .collect_vec())
+                .collect()
         } else {
             Err(Error::new(
                 ErrorKind::Unexpected,
@@ -201,6 +231,7 @@ mod test {
     use crate::writer::file_writer::location_generator::{
         DefaultFileNameGenerator, DefaultLocationGenerator,
     };
+    use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
     use crate::writer::{IcebergWriter, IcebergWriterBuilder};
 
     async fn check_parquet_data_file_with_equality_delete_write(
@@ -397,23 +428,24 @@ mod test {
 
         let equality_ids = vec![0_i32, 8];
         let equality_config =
-            EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), 
None, 0).unwrap();
+            EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), 
None).unwrap();
         let delete_schema =
             
arrow_schema_to_schema(equality_config.projected_arrow_schema_ref()).unwrap();
         let projector = equality_config.projector.clone();
 
         // prepare writer
-        let pb = ParquetWriterBuilder::new(
-            WriterProperties::builder().build(),
-            Arc::new(delete_schema),
-            None,
+        let pb =
+            ParquetWriterBuilder::new(WriterProperties::builder().build(), 
Arc::new(delete_schema));
+        let rolling_writer_builder = 
RollingFileWriterBuilder::new_with_default_file_size(
+            pb,
             file_io.clone(),
             location_gen,
             file_name_gen,
         );
-        let mut equality_delete_writer = 
EqualityDeleteFileWriterBuilder::new(pb, equality_config)
-            .build()
-            .await?;
+        let mut equality_delete_writer =
+            EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, 
equality_config)
+                .build()
+                .await?;
 
         // write
         equality_delete_writer.write(to_write.clone()).await?;
@@ -499,19 +531,19 @@ mod test {
                 .unwrap(),
         );
         // Float and Double are not allowed to be used for equality delete
-        assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone(), None, 
0).is_err());
-        assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None, 
0).is_err());
+        assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone(), 
None).is_err());
+        assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone(), 
None).is_err());
         // Struct is not allowed to be used for equality delete
-        assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone(), None, 
0).is_err());
+        assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone(), 
None).is_err());
         // Nested field of struct is allowed to be used for equality delete
-        assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone(), None, 
0).is_ok());
+        assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone(), 
None).is_ok());
         // Nested field of map is not allowed to be used for equality delete
-        assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone(), None, 
0).is_err());
-        assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone(), None, 
0).is_err());
-        assert!(EqualityDeleteWriterConfig::new(vec![9], schema.clone(), None, 
0).is_err());
+        assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone(), 
None).is_err());
+        assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone(), 
None).is_err());
+        assert!(EqualityDeleteWriterConfig::new(vec![9], schema.clone(), 
None).is_err());
         // Nested field of list is not allowed to be used for equality delete
-        assert!(EqualityDeleteWriterConfig::new(vec![10], schema.clone(), 
None, 0).is_err());
-        assert!(EqualityDeleteWriterConfig::new(vec![11], schema.clone(), 
None, 0).is_err());
+        assert!(EqualityDeleteWriterConfig::new(vec![10], schema.clone(), 
None).is_err());
+        assert!(EqualityDeleteWriterConfig::new(vec![11], schema.clone(), 
None).is_err());
 
         Ok(())
     }
@@ -565,22 +597,22 @@ mod test {
                 .unwrap(),
         );
         let equality_ids = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13];
-        let config =
-            EqualityDeleteWriterConfig::new(equality_ids, schema.clone(), 
None, 0).unwrap();
+        let config = EqualityDeleteWriterConfig::new(equality_ids, 
schema.clone(), None).unwrap();
         let delete_arrow_schema = config.projected_arrow_schema_ref().clone();
         let delete_schema = 
arrow_schema_to_schema(&delete_arrow_schema).unwrap();
 
-        let pb = ParquetWriterBuilder::new(
-            WriterProperties::builder().build(),
-            Arc::new(delete_schema),
-            None,
+        let pb =
+            ParquetWriterBuilder::new(WriterProperties::builder().build(), 
Arc::new(delete_schema));
+        let rolling_writer_builder = 
RollingFileWriterBuilder::new_with_default_file_size(
+            pb,
             file_io.clone(),
             location_gen,
             file_name_gen,
         );
-        let mut equality_delete_writer = 
EqualityDeleteFileWriterBuilder::new(pb, config)
-            .build()
-            .await?;
+        let mut equality_delete_writer =
+            EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, 
config)
+                .build()
+                .await?;
 
         // prepare data
         let col0 = Arc::new(BooleanArray::from(vec![
@@ -763,7 +795,7 @@ mod test {
         let to_write = RecordBatch::try_new(arrow_schema.clone(), 
columns).unwrap();
         let equality_ids = vec![0_i32, 2, 5];
         let equality_config =
-            EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), 
None, 0).unwrap();
+            EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), 
None).unwrap();
         let projector = equality_config.projector.clone();
 
         // check
diff --git a/crates/iceberg/src/writer/file_writer/mod.rs 
b/crates/iceberg/src/writer/file_writer/mod.rs
index 2a5a7355..2ed6414c 100644
--- a/crates/iceberg/src/writer/file_writer/mod.rs
+++ b/crates/iceberg/src/writer/file_writer/mod.rs
@@ -27,6 +27,8 @@ use crate::spec::DataFileBuilder;
 mod parquet_writer;
 pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder};
 
+use crate::io::OutputFile;
+
 pub mod location_generator;
 /// Module providing writers that can automatically roll over to new files 
based on size thresholds.
 pub mod rolling_writer;
@@ -38,7 +40,7 @@ pub trait FileWriterBuilder<O = DefaultOutput>: Send + Clone 
+ 'static {
     /// The associated file writer type.
     type R: FileWriter<O>;
     /// Build file writer.
-    fn build(self) -> impl Future<Output = Result<Self::R>> + Send;
+    fn build(self, output_file: OutputFile) -> impl Future<Output = 
Result<Self::R>> + Send;
 }
 
 /// File writer focus on writing record batch to different physical file 
format.(Such as parquet. orc)
diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs 
b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
index 0a8a095e..620f27df 100644
--- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
@@ -34,7 +34,6 @@ use parquet::format::FileMetaData;
 use parquet::thrift::{TCompactOutputProtocol, TSerializable};
 use thrift::protocol::TOutputProtocol;
 
-use super::location_generator::{FileNameGenerator, LocationGenerator};
 use super::{FileWriter, FileWriterBuilder};
 use crate::arrow::{
     ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode, 
NanValueCountVisitor,
@@ -43,8 +42,8 @@ use crate::arrow::{
 use crate::io::{FileIO, FileWrite, OutputFile};
 use crate::spec::{
     DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, 
Literal, MapType,
-    NestedFieldRef, PartitionKey, PartitionSpec, PrimitiveType, Schema, 
SchemaRef, SchemaVisitor,
-    Struct, StructType, TableMetadata, Type, visit_schema,
+    NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, 
SchemaVisitor, Struct,
+    StructType, TableMetadata, Type, visit_schema,
 };
 use crate::transform::create_transform_function;
 use crate::writer::{CurrentFileStatus, DataFile};
@@ -52,78 +51,43 @@ use crate::{Error, ErrorKind, Result};
 
 /// ParquetWriterBuilder is used to builder a [`ParquetWriter`]
 #[derive(Clone, Debug)]
-pub struct ParquetWriterBuilder<T: LocationGenerator, F: FileNameGenerator> {
+pub struct ParquetWriterBuilder {
     props: WriterProperties,
     schema: SchemaRef,
-    partition_key: Option<PartitionKey>,
     match_mode: FieldMatchMode,
-
-    file_io: FileIO,
-    location_generator: T,
-    file_name_generator: F,
 }
 
-impl<T: LocationGenerator, F: FileNameGenerator> ParquetWriterBuilder<T, F> {
+impl ParquetWriterBuilder {
     /// Create a new `ParquetWriterBuilder`
     /// To construct the write result, the schema should contain the 
`PARQUET_FIELD_ID_META_KEY` metadata for each field.
-    pub fn new(
-        props: WriterProperties,
-        schema: SchemaRef,
-        partition_key: Option<PartitionKey>,
-        file_io: FileIO,
-        location_generator: T,
-        file_name_generator: F,
-    ) -> Self {
-        Self::new_with_match_mode(
-            props,
-            schema,
-            partition_key,
-            FieldMatchMode::Id,
-            file_io,
-            location_generator,
-            file_name_generator,
-        )
+    pub fn new(props: WriterProperties, schema: SchemaRef) -> Self {
+        Self::new_with_match_mode(props, schema, FieldMatchMode::Id)
     }
 
     /// Create a new `ParquetWriterBuilder` with custom match mode
     pub fn new_with_match_mode(
         props: WriterProperties,
         schema: SchemaRef,
-        partition_key: Option<PartitionKey>,
         match_mode: FieldMatchMode,
-        file_io: FileIO,
-        location_generator: T,
-        file_name_generator: F,
     ) -> Self {
         Self {
             props,
             schema,
-            partition_key,
             match_mode,
-            file_io,
-            location_generator,
-            file_name_generator,
         }
     }
 }
 
-impl<T: LocationGenerator, F: FileNameGenerator> FileWriterBuilder for 
ParquetWriterBuilder<T, F> {
+impl FileWriterBuilder for ParquetWriterBuilder {
     type R = ParquetWriter;
 
-    async fn build(self) -> Result<Self::R> {
-        let out_file = self
-            .file_io
-            .new_output(self.location_generator.generate_location(
-                self.partition_key.as_ref(),
-                &self.file_name_generator.generate_file_name(),
-            ))?;
-
+    async fn build(self, output_file: OutputFile) -> Result<Self::R> {
         Ok(ParquetWriter {
             schema: self.schema.clone(),
             inner_writer: None,
             writer_properties: self.props,
             current_row_num: 0,
-            out_file,
+            output_file,
             nan_value_count_visitor: 
NanValueCountVisitor::new_with_match_mode(self.match_mode),
         })
     }
@@ -250,7 +214,7 @@ impl SchemaVisitor for IndexByParquetPathName {
 /// `ParquetWriter`` is used to write arrow data into parquet file on storage.
 pub struct ParquetWriter {
     schema: SchemaRef,
-    out_file: OutputFile,
+    output_file: OutputFile,
     inner_writer: Option<AsyncArrowWriter<AsyncFileWriter<Box<dyn 
FileWrite>>>>,
     writer_properties: WriterProperties,
     current_row_num: usize,
@@ -555,7 +519,7 @@ impl FileWriter for ParquetWriter {
             writer
         } else {
             let arrow_schema: ArrowSchemaRef = 
Arc::new(self.schema.as_ref().try_into()?);
-            let inner_writer = self.out_file.writer().await?;
+            let inner_writer = self.output_file.writer().await?;
             let async_writer = AsyncFileWriter::new(inner_writer);
             let writer = AsyncArrowWriter::try_new(
                 async_writer,
@@ -594,7 +558,7 @@ impl FileWriter for ParquetWriter {
         let written_size = writer.bytes_written();
 
         if self.current_row_num == 0 {
-            self.out_file.delete().await.map_err(|err| {
+            self.output_file.delete().await.map_err(|err| {
                 Error::new(
                     ErrorKind::Unexpected,
                     "Failed to delete empty parquet file.",
@@ -616,7 +580,7 @@ impl FileWriter for ParquetWriter {
                 self.schema,
                 parquet_metadata,
                 written_size,
-                self.out_file.location().to_string(),
+                self.output_file.location().to_string(),
                 self.nan_value_count_visitor.nan_value_counts,
             )?])
         }
@@ -625,7 +589,7 @@ impl FileWriter for ParquetWriter {
 
 impl CurrentFileStatus for ParquetWriter {
     fn current_file_path(&self) -> String {
-        self.out_file.location().to_string()
+        self.output_file.location().to_string()
     }
 
     fn current_row_num(&self) -> usize {
@@ -703,7 +667,7 @@ mod tests {
     use crate::io::FileIOBuilder;
     use crate::spec::{PrimitiveLiteral, Struct, *};
     use crate::writer::file_writer::location_generator::{
-        DefaultFileNameGenerator, DefaultLocationGenerator,
+        DefaultFileNameGenerator, DefaultLocationGenerator, FileNameGenerator, 
LocationGenerator,
     };
     use crate::writer::tests::check_parquet_data_file;
 
@@ -871,11 +835,13 @@ mod tests {
 
         // prepare data
         let schema = {
-            let fields = vec![
-                arrow_schema::Field::new("col", arrow_schema::DataType::Int64, 
true).with_metadata(
-                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), 
"0".to_string())]),
-                ),
-            ];
+            let fields =
+                vec![
+                    Field::new("col", DataType::Int64, 
true).with_metadata(HashMap::from([(
+                        PARQUET_FIELD_ID_META_KEY.to_string(),
+                        "0".to_string(),
+                    )])),
+                ];
             Arc::new(arrow_schema::Schema::new(fields))
         };
         let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
@@ -883,18 +849,18 @@ mod tests {
         let to_write = RecordBatch::try_new(schema.clone(), 
vec![col]).unwrap();
         let to_write_null = RecordBatch::try_new(schema.clone(), 
vec![null_col]).unwrap();
 
+        let output_file = file_io.new_output(
+            location_gen.generate_location(None, 
&file_name_gen.generate_file_name()),
+        )?;
+
         // write data
         let mut pw = ParquetWriterBuilder::new(
             WriterProperties::builder()
                 .set_max_row_group_size(128)
                 .build(),
             Arc::new(to_write.schema().as_ref().try_into().unwrap()),
-            None,
-            file_io.clone(),
-            location_gen,
-            file_name_gen,
         )
-        .build()
+        .build(output_file)
         .await?;
         pw.write(&to_write).await?;
         pw.write(&to_write_null).await?;
@@ -905,7 +871,7 @@ mod tests {
             .next()
             .unwrap()
             // Put dummy field for build successfully.
-            .content(crate::spec::DataContentType::Data)
+            .content(DataContentType::Data)
             .partition(Struct::empty())
             .partition_spec_id(0)
             .build()
@@ -1006,7 +972,7 @@ mod tests {
             None,
         ));
         let col5 = Arc::new({
-            let mut map_array_builder = arrow_array::builder::MapBuilder::new(
+            let mut map_array_builder = MapBuilder::new(
                 None,
                 arrow_array::builder::StringBuilder::new(),
                 
arrow_array::builder::ListBuilder::new(arrow_array::builder::PrimitiveBuilder::<
@@ -1083,18 +1049,15 @@ mod tests {
             col0, col1, col2, col3, col4, col5,
         ])
         .unwrap();
+        let output_file = file_io.new_output(
+            location_gen.generate_location(None, 
&file_name_gen.generate_file_name()),
+        )?;
 
         // write data
-        let mut pw = ParquetWriterBuilder::new(
-            WriterProperties::builder().build(),
-            Arc::new(schema),
-            None,
-            file_io.clone(),
-            location_gen,
-            file_name_gen,
-        )
-        .build()
-        .await?;
+        let mut pw =
+            ParquetWriterBuilder::new(WriterProperties::builder().build(), 
Arc::new(schema))
+                .build(output_file)
+                .await?;
         pw.write(&to_write).await?;
         let res = pw.close().await?;
         assert_eq!(res.len(), 1);
@@ -1161,7 +1124,7 @@ mod tests {
     async fn test_all_type_for_write() -> Result<()> {
         let temp_dir = TempDir::new().unwrap();
         let file_io = FileIOBuilder::new_fs_io().build().unwrap();
-        let loccation_gen = DefaultLocationGenerator::with_data_location(
+        let location_gen = DefaultLocationGenerator::with_data_location(
             temp_dir.path().to_str().unwrap().to_string(),
         );
         let file_name_gen =
@@ -1276,18 +1239,15 @@ mod tests {
             col14, col15, col16,
         ])
         .unwrap();
+        let output_file = file_io.new_output(
+            location_gen.generate_location(None, 
&file_name_gen.generate_file_name()),
+        )?;
 
         // write data
-        let mut pw = ParquetWriterBuilder::new(
-            WriterProperties::builder().build(),
-            Arc::new(schema),
-            None,
-            file_io.clone(),
-            loccation_gen,
-            file_name_gen,
-        )
-        .build()
-        .await?;
+        let mut pw =
+            ParquetWriterBuilder::new(WriterProperties::builder().build(), 
Arc::new(schema))
+                .build(output_file)
+                .await?;
         pw.write(&to_write).await?;
         let res = pw.close().await?;
         assert_eq!(res.len(), 1);
@@ -1405,7 +1365,7 @@ mod tests {
     async fn test_decimal_bound() -> Result<()> {
         let temp_dir = TempDir::new().unwrap();
         let file_io = FileIOBuilder::new_fs_io().build().unwrap();
-        let loccation_gen = DefaultLocationGenerator::with_data_location(
+        let location_gen = DefaultLocationGenerator::with_data_location(
             temp_dir.path().to_str().unwrap().to_string(),
         );
         let file_name_gen =
@@ -1429,16 +1389,12 @@ mod tests {
                 .unwrap(),
         );
         let arrow_schema: ArrowSchemaRef = 
Arc::new(schema_to_arrow_schema(&schema).unwrap());
-        let mut pw = ParquetWriterBuilder::new(
-            WriterProperties::builder().build(),
-            schema.clone(),
-            None,
-            file_io.clone(),
-            loccation_gen.clone(),
-            file_name_gen.clone(),
-        )
-        .build()
-        .await?;
+        let output_file = file_io.new_output(
+            location_gen.generate_location(None, 
&file_name_gen.generate_file_name()),
+        )?;
+        let mut pw = 
ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
+            .build(output_file)
+            .await?;
         let col0 = Arc::new(
             Decimal128Array::from(vec![Some(22000000000), Some(11000000000)])
                 .with_data_type(DataType::Decimal128(28, 10)),
@@ -1485,16 +1441,12 @@ mod tests {
                 .unwrap(),
         );
         let arrow_schema: ArrowSchemaRef = 
Arc::new(schema_to_arrow_schema(&schema).unwrap());
-        let mut pw = ParquetWriterBuilder::new(
-            WriterProperties::builder().build(),
-            schema.clone(),
-            None,
-            file_io.clone(),
-            loccation_gen.clone(),
-            file_name_gen.clone(),
-        )
-        .build()
-        .await?;
+        let output_file = file_io.new_output(
+            location_gen.generate_location(None, 
&file_name_gen.generate_file_name()),
+        )?;
+        let mut pw = 
ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
+            .build(output_file)
+            .await?;
         let col0 = Arc::new(
             Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)])
                 .with_data_type(DataType::Decimal128(28, 10)),
@@ -1544,16 +1496,12 @@ mod tests {
                 .unwrap(),
         );
         let arrow_schema: ArrowSchemaRef = 
Arc::new(schema_to_arrow_schema(&schema).unwrap());
-        let mut pw = ParquetWriterBuilder::new(
-            WriterProperties::builder().build(),
-            schema,
-            None,
-            file_io.clone(),
-            loccation_gen,
-            file_name_gen,
-        )
-        .build()
-        .await?;
+        let output_file = file_io.new_output(
+            location_gen.generate_location(None, 
&file_name_gen.generate_file_name()),
+        )?;
+        let mut pw = 
ParquetWriterBuilder::new(WriterProperties::builder().build(), schema)
+            .build(output_file)
+            .await?;
         let col0 = Arc::new(
             Decimal128Array::from(vec![
                 Some(decimal_max.mantissa()),
@@ -1676,37 +1624,31 @@ mod tests {
         };
         let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
         let to_write = RecordBatch::try_new(schema.clone(), 
vec![col]).unwrap();
+        let file_path = location_gen.generate_location(None, 
&file_name_gen.generate_file_name());
+        let output_file = file_io.new_output(&file_path)?;
         let mut pw = ParquetWriterBuilder::new(
             WriterProperties::builder().build(),
             Arc::new(to_write.schema().as_ref().try_into().unwrap()),
-            None,
-            file_io.clone(),
-            location_gen.clone(),
-            file_name_gen,
         )
-        .build()
+        .build(output_file)
         .await?;
         pw.write(&to_write).await?;
-        let file_path = pw.out_file.location().to_string();
         pw.close().await.unwrap();
-        assert!(file_io.exists(file_path).await.unwrap());
+        assert!(file_io.exists(&file_path).await.unwrap());
 
         // Test that file will not create if no data to write
         let file_name_gen =
             DefaultFileNameGenerator::new("test_empty".to_string(), None, 
DataFileFormat::Parquet);
+        let file_path = location_gen.generate_location(None, 
&file_name_gen.generate_file_name());
+        let output_file = file_io.new_output(&file_path)?;
         let pw = ParquetWriterBuilder::new(
             WriterProperties::builder().build(),
             Arc::new(to_write.schema().as_ref().try_into().unwrap()),
-            None,
-            file_io.clone(),
-            location_gen,
-            file_name_gen,
         )
-        .build()
+        .build(output_file)
         .await?;
-        let file_path = pw.out_file.location().to_string();
         pw.close().await.unwrap();
-        assert!(!file_io.exists(file_path).await.unwrap());
+        assert!(!file_io.exists(&file_path).await.unwrap());
 
         Ok(())
     }
@@ -1746,17 +1688,16 @@ mod tests {
 
         let to_write =
             RecordBatch::try_new(arrow_schema.clone(), vec![float_32_col, 
float_64_col]).unwrap();
+        let output_file = file_io.new_output(
+            location_gen.generate_location(None, 
&file_name_gen.generate_file_name()),
+        )?;
 
         // write data
         let mut pw = ParquetWriterBuilder::new(
             WriterProperties::builder().build(),
             Arc::new(to_write.schema().as_ref().try_into().unwrap()),
-            None,
-            file_io.clone(),
-            location_gen,
-            file_name_gen,
         )
-        .build()
+        .build(output_file)
         .await?;
 
         pw.write(&to_write).await?;
@@ -1887,17 +1828,16 @@ mod tests {
             struct_nested_float_field_col,
         ])
         .unwrap();
+        let output_file = file_io.new_output(
+            location_gen.generate_location(None, 
&file_name_gen.generate_file_name()),
+        )?;
 
         // write data
         let mut pw = ParquetWriterBuilder::new(
             WriterProperties::builder().build(),
             Arc::new(to_write.schema().as_ref().try_into().unwrap()),
-            None,
-            file_io.clone(),
-            location_gen,
-            file_name_gen,
         )
-        .build()
+        .build(output_file)
         .await?;
 
         pw.write(&to_write).await?;
@@ -2047,6 +1987,9 @@ mod tests {
             // large_list_float_field_col,
         ])
         .expect("Could not form record batch");
+        let output_file = file_io.new_output(
+            location_gen.generate_location(None, 
&file_name_gen.generate_file_name()),
+        )?;
 
         // write data
         let mut pw = ParquetWriterBuilder::new(
@@ -2058,12 +2001,8 @@ mod tests {
                     .try_into()
                     .expect("Could not convert iceberg schema"),
             ),
-            None,
-            file_io.clone(),
-            location_gen,
-            file_name_gen,
         )
-        .build()
+        .build(output_file)
         .await?;
 
         pw.write(&to_write).await?;
@@ -2229,6 +2168,9 @@ mod tests {
             struct_list_float_field_col,
         ])
         .expect("Could not form record batch");
+        let output_file = file_io.new_output(
+            location_gen.generate_location(None, 
&file_name_gen.generate_file_name()),
+        )?;
 
         // write data
         let mut pw = ParquetWriterBuilder::new(
@@ -2240,12 +2182,8 @@ mod tests {
                     .try_into()
                     .expect("Could not convert iceberg schema"),
             ),
-            None,
-            file_io.clone(),
-            location_gen,
-            file_name_gen,
         )
-        .build()
+        .build(output_file)
         .await?;
 
         pw.write(&to_write).await?;
@@ -2310,6 +2248,9 @@ mod tests {
         );
         let file_name_gen =
             DefaultFileNameGenerator::new("test".to_string(), None, 
DataFileFormat::Parquet);
+        let output_file = file_io
+            .new_output(location_gen.generate_location(None, 
&file_name_gen.generate_file_name()))
+            .unwrap();
 
         // write data
         let pw = ParquetWriterBuilder::new(
@@ -2325,12 +2266,8 @@ mod tests {
                     .build()
                     .expect("Failed to create schema"),
             ),
-            None,
-            file_io.clone(),
-            location_gen,
-            file_name_gen,
         )
-        .build()
+        .build(output_file)
         .await
         .unwrap();
 
diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs 
b/crates/iceberg/src/writer/file_writer/rolling_writer.rs
index 181205c3..0b9b105c 100644
--- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs
@@ -15,50 +15,104 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::fmt::{Debug, Formatter};
+
 use arrow_array::RecordBatch;
 
-use crate::spec::DataFileBuilder;
+use crate::io::{FileIO, OutputFile};
+use crate::spec::{DataFileBuilder, 
PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, PartitionKey};
 use crate::writer::CurrentFileStatus;
+use crate::writer::file_writer::location_generator::{FileNameGenerator, 
LocationGenerator};
 use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
 use crate::{Error, ErrorKind, Result};
 
-/// Builder for creating a `RollingFileWriter` that rolls over to a new file
-/// when the data size exceeds a target threshold.
-#[derive(Clone)]
-pub struct RollingFileWriterBuilder<B: FileWriterBuilder> {
+/// Builder for [`RollingFileWriter`].
+#[derive(Clone, Debug)]
+pub struct RollingFileWriterBuilder<
+    B: FileWriterBuilder,
+    L: LocationGenerator,
+    F: FileNameGenerator,
+> {
     inner_builder: B,
     target_file_size: usize,
+    file_io: FileIO,
+    location_generator: L,
+    file_name_generator: F,
 }
 
-impl<B: FileWriterBuilder> RollingFileWriterBuilder<B> {
-    /// Creates a new `RollingFileWriterBuilder` with the specified inner 
builder and target size.
+impl<B, L, F> RollingFileWriterBuilder<B, L, F>
+where
+    B: FileWriterBuilder,
+    L: LocationGenerator,
+    F: FileNameGenerator,
+{
+    /// Creates a new `RollingFileWriterBuilder` with the specified target 
file size.
     ///
-    /// # Arguments
+    /// # Parameters
     ///
     /// * `inner_builder` - The builder for the underlying file writer
-    /// * `target_file_size` - The target size in bytes before rolling over to 
a new file
+    /// * `target_file_size` - The target file size in bytes that triggers 
rollover
+    /// * `file_io` - The file IO interface for creating output files
+    /// * `location_generator` - Generator for file locations
+    /// * `file_name_generator` - Generator for file names
+    ///
+    /// # Returns
     ///
-    /// NOTE: The `target_file_size` does not exactly reflect the final size 
on physical storage.
-    /// This is because the input size is based on the Arrow in-memory format 
and cannot precisely control rollover behavior.
-    /// The actual file size on disk is expected to be slightly larger than 
`target_file_size`.
-    pub fn new(inner_builder: B, target_file_size: usize) -> Self {
+    /// A new `RollingFileWriterBuilder` instance
+    pub fn new(
+        inner_builder: B,
+        target_file_size: usize,
+        file_io: FileIO,
+        location_generator: L,
+        file_name_generator: F,
+    ) -> Self {
         Self {
             inner_builder,
             target_file_size,
+            file_io,
+            location_generator,
+            file_name_generator,
         }
     }
-}
 
-impl<B: FileWriterBuilder> FileWriterBuilder for RollingFileWriterBuilder<B> {
-    type R = RollingFileWriter<B>;
+    /// Creates a new `RollingFileWriterBuilder` with the default target file 
size.
+    ///
+    /// # Parameters
+    ///
+    /// * `inner_builder` - The builder for the underlying file writer
+    /// * `file_io` - The file IO interface for creating output files
+    /// * `location_generator` - Generator for file locations
+    /// * `file_name_generator` - Generator for file names
+    ///
+    /// # Returns
+    ///
+    /// A new `RollingFileWriterBuilder` instance with default target file size
+    pub fn new_with_default_file_size(
+        inner_builder: B,
+        file_io: FileIO,
+        location_generator: L,
+        file_name_generator: F,
+    ) -> Self {
+        Self {
+            inner_builder,
+            target_file_size: PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
+            file_io,
+            location_generator,
+            file_name_generator,
+        }
+    }
 
-    async fn build(self) -> Result<Self::R> {
-        Ok(RollingFileWriter {
+    /// Build a new [`RollingFileWriter`].
+    pub fn build(self) -> RollingFileWriter<B, L, F> {
+        RollingFileWriter {
             inner: None,
             inner_builder: self.inner_builder,
             target_file_size: self.target_file_size,
             data_file_builders: vec![],
-        })
+            file_io: self.file_io,
+            location_generator: self.location_generator,
+            file_name_generator: self.file_name_generator,
+        }
     }
 }
 
@@ -68,14 +122,36 @@ impl<B: FileWriterBuilder> FileWriterBuilder for 
RollingFileWriterBuilder<B> {
 /// This writer wraps another file writer that tracks the amount of data 
written.
 /// When the data size exceeds the target size, it closes the current file and
 /// starts writing to a new one.
-pub struct RollingFileWriter<B: FileWriterBuilder> {
+pub struct RollingFileWriter<B: FileWriterBuilder, L: LocationGenerator, F: 
FileNameGenerator> {
     inner: Option<B::R>,
     inner_builder: B,
     target_file_size: usize,
     data_file_builders: Vec<DataFileBuilder>,
+    file_io: FileIO,
+    location_generator: L,
+    file_name_generator: F,
+}
+
+impl<B, L, F> Debug for RollingFileWriter<B, L, F>
+where
+    B: FileWriterBuilder,
+    L: LocationGenerator,
+    F: FileNameGenerator,
+{
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("RollingFileWriter")
+            .field("target_file_size", &self.target_file_size)
+            .field("file_io", &self.file_io)
+            .finish()
+    }
 }
 
-impl<B: FileWriterBuilder> RollingFileWriter<B> {
+impl<B, L, F> RollingFileWriter<B, L, F>
+where
+    B: FileWriterBuilder,
+    L: LocationGenerator,
+    F: FileNameGenerator,
+{
     /// Determines if the writer should roll over to a new file.
     ///
     /// # Returns
@@ -84,13 +160,42 @@ impl<B: FileWriterBuilder> RollingFileWriter<B> {
     fn should_roll(&self) -> bool {
         self.current_written_size() > self.target_file_size
     }
-}
 
-impl<B: FileWriterBuilder> FileWriter for RollingFileWriter<B> {
-    async fn write(&mut self, input: &RecordBatch) -> Result<()> {
+    fn new_output_file(&self, partition_key: &Option<PartitionKey>) -> 
Result<OutputFile> {
+        self.file_io
+            .new_output(self.location_generator.generate_location(
+                partition_key.as_ref(),
+                &self.file_name_generator.generate_file_name(),
+            ))
+    }
+
+    /// Writes a record batch to the current file, rolling over to a new file 
if necessary.
+    ///
+    /// # Parameters
+    ///
+    /// * `partition_key` - Optional partition key for the data
+    /// * `input` - The record batch to write
+    ///
+    /// # Returns
+    ///
+    /// A `Result` indicating success or failure
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if the writer is not initialized or if writing fails
+    pub async fn write(
+        &mut self,
+        partition_key: &Option<PartitionKey>,
+        input: &RecordBatch,
+    ) -> Result<()> {
         if self.inner.is_none() {
             // initialize inner writer
-            self.inner = Some(self.inner_builder.clone().build().await?);
+            self.inner = Some(
+                self.inner_builder
+                    .clone()
+                    .build(self.new_output_file(partition_key)?)
+                    .await?,
+            );
         }
 
         if self.should_roll() {
@@ -99,7 +204,12 @@ impl<B: FileWriterBuilder> FileWriter for 
RollingFileWriter<B> {
                 self.data_file_builders.extend(inner.close().await?);
 
                 // start a new writer
-                self.inner = Some(self.inner_builder.clone().build().await?);
+                self.inner = Some(
+                    self.inner_builder
+                        .clone()
+                        .build(self.new_output_file(partition_key)?)
+                        .await?,
+                );
             }
         }
 
@@ -114,7 +224,13 @@ impl<B: FileWriterBuilder> FileWriter for 
RollingFileWriter<B> {
         }
     }
 
-    async fn close(mut self) -> Result<Vec<DataFileBuilder>> {
+    /// Closes the writer and returns all data file builders.
+    ///
+    /// # Returns
+    ///
+    /// A `Result` containing a vector of `DataFileBuilder` instances 
representing
+    /// all files that were written, including any that were created due to 
rollover
+    pub async fn close(mut self) -> Result<Vec<DataFileBuilder>> {
         // close the current writer and merge the output
         if let Some(current_writer) = self.inner {
             self.data_file_builders
@@ -125,7 +241,9 @@ impl<B: FileWriterBuilder> FileWriter for 
RollingFileWriter<B> {
     }
 }
 
-impl<B: FileWriterBuilder> CurrentFileStatus for RollingFileWriter<B> {
+impl<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator> 
CurrentFileStatus
+    for RollingFileWriter<B, L, F>
+{
     fn current_file_path(&self) -> String {
         self.inner.as_ref().unwrap().current_file_path()
     }
@@ -199,22 +317,20 @@ mod tests {
         let schema = make_test_schema()?;
 
         // Create writer builders
-        let parquet_writer_builder = ParquetWriterBuilder::new(
-            WriterProperties::builder().build(),
-            Arc::new(schema),
-            None,
-            file_io.clone(),
-            location_gen,
-            file_name_gen,
-        );
+        let parquet_writer_builder =
+            ParquetWriterBuilder::new(WriterProperties::builder().build(), 
Arc::new(schema));
 
         // Set a large target size so no rolling occurs
-        let rolling_writer_builder = RollingFileWriterBuilder::new(
+        let rolling_file_writer_builder = RollingFileWriterBuilder::new(
             parquet_writer_builder,
-            1024 * 1024, // 1MB, large enough to not trigger rolling
+            1024 * 1024,
+            file_io.clone(),
+            location_gen,
+            file_name_gen,
         );
 
-        let data_file_writer_builder = 
DataFileWriterBuilder::new(rolling_writer_builder, None, 0);
+        let data_file_writer_builder =
+            DataFileWriterBuilder::new(rolling_file_writer_builder, None);
 
         // Create writer
         let mut writer = data_file_writer_builder.build().await?;
@@ -260,19 +376,19 @@ mod tests {
         let schema = make_test_schema()?;
 
         // Create writer builders
-        let parquet_writer_builder = ParquetWriterBuilder::new(
-            WriterProperties::builder().build(),
-            Arc::new(schema),
-            None,
-            file_io.clone(),
+        let parquet_writer_builder =
+            ParquetWriterBuilder::new(WriterProperties::builder().build(), 
Arc::new(schema));
+
+        // Set a very small target size to trigger rolling
+        let rolling_writer_builder = RollingFileWriterBuilder::new(
+            parquet_writer_builder,
+            1024,
+            file_io,
             location_gen,
             file_name_gen,
         );
 
-        // Set a very small target size to trigger rolling
-        let rolling_writer_builder = 
RollingFileWriterBuilder::new(parquet_writer_builder, 1024);
-
-        let data_file_writer_builder = 
DataFileWriterBuilder::new(rolling_writer_builder, None, 0);
+        let data_file_writer_builder = 
DataFileWriterBuilder::new(rolling_writer_builder, None);
 
         // Create writer
         let mut writer = data_file_writer_builder.build().await?;
diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs
index 8f17d50e..d5a8a668 100644
--- a/crates/iceberg/src/writer/mod.rs
+++ b/crates/iceberg/src/writer/mod.rs
@@ -60,6 +60,9 @@
 //! async fn main() -> Result<()> {
 //!     // Connect to a catalog.
 //!     use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
+//!     use iceberg::writer::file_writer::rolling_writer::{
+//!         RollingFileWriter, RollingFileWriterBuilder,
+//!     };
 //!     let catalog = MemoryCatalogBuilder::default()
 //!         .load(
 //!             "memory",
@@ -86,15 +89,21 @@
 //!     let parquet_writer_builder = ParquetWriterBuilder::new(
 //!         WriterProperties::default(),
 //!         table.metadata().current_schema().clone(),
-//!         None,
+//!     );
+//!
+//!     // Create a rolling file writer using parquet file writer builder.
+//!     let rolling_file_writer_builder = 
RollingFileWriterBuilder::new_with_default_file_size(
+//!         parquet_writer_builder,
 //!         table.file_io().clone(),
 //!         location_generator.clone(),
 //!         file_name_generator.clone(),
 //!     );
+//!
 //!     // Create a data file writer using parquet file writer builder.
-//!     let data_file_writer_builder = 
DataFileWriterBuilder::new(parquet_writer_builder, None, 0);
+//!     let data_file_writer_builder =
+//!         DataFileWriterBuilder::new(rolling_file_writer_builder, None);
 //!     // Build the data file writer
-//!     let mut data_file_writer = 
data_file_writer_builder.build().await.unwrap();
+//!     let mut data_file_writer = data_file_writer_builder.build().await?;
 //!
 //!     // Write the data using data_file_writer...
 //!
@@ -174,6 +183,9 @@
 //!     // Connect to a catalog.
 //!     use iceberg::memory::MEMORY_CATALOG_WAREHOUSE;
 //!     use iceberg::spec::{Literal, PartitionKey, Struct};
+//!     use iceberg::writer::file_writer::rolling_writer::{
+//!         RollingFileWriter, RollingFileWriterBuilder,
+//!     };
 //!
 //!     let catalog = MemoryCatalogBuilder::default()
 //!         .load(
@@ -207,13 +219,20 @@
 //!     let parquet_writer_builder = ParquetWriterBuilder::new(
 //!         WriterProperties::default(),
 //!         table.metadata().current_schema().clone(),
-//!         Some(partition_key),
+//!     );
+//!
+//!     // Create a rolling file writer
+//!     let rolling_file_writer_builder = RollingFileWriterBuilder::new(
+//!         parquet_writer_builder,
+//!         512 * 1024 * 1024,
 //!         table.file_io().clone(),
 //!         location_generator.clone(),
 //!         file_name_generator.clone(),
 //!     );
-//!     // Create a data file writer builder using parquet file writer builder.
-//!     let data_file_writer_builder = 
DataFileWriterBuilder::new(parquet_writer_builder, None, 0);
+//!
+//!     // Create a data file writer builder using rolling file writer.
+//!     let data_file_writer_builder =
+//!         DataFileWriterBuilder::new(rolling_file_writer_builder, 
Some(partition_key));
 //!     // Create latency record writer using data file writer builder.
 //!     let latency_record_builder = 
LatencyRecordWriterBuilder::new(data_file_writer_builder);
 //!     // Build the final writer
diff --git 
a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs 
b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs
index 9a3d6213..f4cba959 100644
--- a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs
+++ b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs
@@ -27,6 +27,7 @@ use iceberg::writer::file_writer::ParquetWriterBuilder;
 use iceberg::writer::file_writer::location_generator::{
     DefaultFileNameGenerator, DefaultLocationGenerator,
 };
+use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
 use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
 use iceberg::{Catalog, CatalogBuilder, TableCreation};
 use iceberg_catalog_rest::RestCatalogBuilder;
@@ -74,12 +75,14 @@ async fn test_append_data_file() {
     let parquet_writer_builder = ParquetWriterBuilder::new(
         WriterProperties::default(),
         table.metadata().current_schema().clone(),
-        None,
+    );
+    let rolling_file_writer_builder = 
RollingFileWriterBuilder::new_with_default_file_size(
+        parquet_writer_builder,
         table.file_io().clone(),
         location_generator.clone(),
         file_name_generator.clone(),
     );
-    let data_file_writer_builder = 
DataFileWriterBuilder::new(parquet_writer_builder, None, 0);
+    let data_file_writer_builder = 
DataFileWriterBuilder::new(rolling_file_writer_builder, None);
     let mut data_file_writer = data_file_writer_builder.build().await.unwrap();
     let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, 
Some("baz")]);
     let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
diff --git 
a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs
 
b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs
index c737357e..0da88f1a 100644
--- 
a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs
+++ 
b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs
@@ -31,6 +31,7 @@ use iceberg::writer::file_writer::ParquetWriterBuilder;
 use iceberg::writer::file_writer::location_generator::{
     DefaultFileNameGenerator, DefaultLocationGenerator,
 };
+use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
 use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
 use iceberg::{Catalog, CatalogBuilder, TableCreation};
 use iceberg_catalog_rest::RestCatalogBuilder;
@@ -96,18 +97,18 @@ async fn test_append_partition_data_file() {
     let parquet_writer_builder = ParquetWriterBuilder::new(
         WriterProperties::default(),
         table.metadata().current_schema().clone(),
-        Some(partition_key),
+    );
+
+    let rolling_file_writer_builder = 
RollingFileWriterBuilder::new_with_default_file_size(
+        parquet_writer_builder.clone(),
         table.file_io().clone(),
         location_generator.clone(),
         file_name_generator.clone(),
     );
 
     let mut data_file_writer_valid = DataFileWriterBuilder::new(
-        parquet_writer_builder.clone(),
-        Some(Struct::from_iter([Some(Literal::Primitive(
-            PrimitiveLiteral::Int(first_partition_id_value),
-        ))])),
-        0,
+        rolling_file_writer_builder.clone(),
+        Some(partition_key.clone()),
     )
     .build()
     .await
@@ -148,44 +149,53 @@ async fn test_append_partition_data_file() {
     assert_eq!(batches.len(), 1);
     assert_eq!(batches[0], batch);
 
+    let partition_key = partition_key.copy_with_data(Struct::from_iter([Some(
+        Literal::Primitive(PrimitiveLiteral::Boolean(true)),
+    )]));
     test_schema_incompatible_partition_type(
-        parquet_writer_builder.clone(),
+        rolling_file_writer_builder.clone(),
         batch.clone(),
+        partition_key.clone(),
         table.clone(),
         &rest_catalog,
     )
     .await;
 
+    let partition_key = partition_key.copy_with_data(Struct::from_iter([
+        Some(Literal::Primitive(PrimitiveLiteral::Int(
+            first_partition_id_value,
+        ))),
+        Some(Literal::Primitive(PrimitiveLiteral::Int(
+            first_partition_id_value,
+        ))),
+    ]));
     test_schema_incompatible_partition_fields(
-        parquet_writer_builder,
+        rolling_file_writer_builder.clone(),
         batch,
+        partition_key,
         table,
         &rest_catalog,
-        first_partition_id_value,
     )
     .await;
 }
 
 async fn test_schema_incompatible_partition_type(
-    parquet_writer_builder: ParquetWriterBuilder<
+    rolling_file_writer_builder: RollingFileWriterBuilder<
+        ParquetWriterBuilder,
         DefaultLocationGenerator,
         DefaultFileNameGenerator,
     >,
     batch: RecordBatch,
+    partition_key: PartitionKey,
     table: Table,
     catalog: &dyn Catalog,
 ) {
     // test writing different "type" of partition than mentioned in schema
-    let mut data_file_writer_invalid = DataFileWriterBuilder::new(
-        parquet_writer_builder.clone(),
-        Some(Struct::from_iter([Some(Literal::Primitive(
-            PrimitiveLiteral::Boolean(true),
-        ))])),
-        0,
-    )
-    .build()
-    .await
-    .unwrap();
+    let mut data_file_writer_invalid =
+        DataFileWriterBuilder::new(rolling_file_writer_builder, 
Some(partition_key))
+            .build()
+            .await
+            .unwrap();
 
     data_file_writer_invalid.write(batch.clone()).await.unwrap();
     let data_file_invalid = data_file_writer_invalid.close().await.unwrap();
@@ -200,32 +210,22 @@ async fn test_schema_incompatible_partition_type(
 }
 
 async fn test_schema_incompatible_partition_fields(
-    parquet_writer_builder: ParquetWriterBuilder<
+    rolling_file_writer_builder: RollingFileWriterBuilder<
+        ParquetWriterBuilder,
         DefaultLocationGenerator,
         DefaultFileNameGenerator,
     >,
     batch: RecordBatch,
+    partition_key: PartitionKey,
     table: Table,
     catalog: &dyn Catalog,
-    first_partition_id_value: i32,
 ) {
     // test writing different number of partition fields than mentioned in 
schema
-
-    let mut data_file_writer_invalid = DataFileWriterBuilder::new(
-        parquet_writer_builder,
-        Some(Struct::from_iter([
-            Some(Literal::Primitive(PrimitiveLiteral::Int(
-                first_partition_id_value,
-            ))),
-            Some(Literal::Primitive(PrimitiveLiteral::Int(
-                first_partition_id_value,
-            ))),
-        ])),
-        0,
-    )
-    .build()
-    .await
-    .unwrap();
+    let mut data_file_writer_invalid =
+        DataFileWriterBuilder::new(rolling_file_writer_builder, 
Some(partition_key))
+            .build()
+            .await
+            .unwrap();
 
     data_file_writer_invalid.write(batch.clone()).await.unwrap();
     let data_file_invalid = data_file_writer_invalid.close().await.unwrap();
diff --git 
a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs 
b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs
index 4b85612f..a248fa70 100644
--- a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs
+++ b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs
@@ -27,6 +27,7 @@ use iceberg::writer::file_writer::ParquetWriterBuilder;
 use iceberg::writer::file_writer::location_generator::{
     DefaultFileNameGenerator, DefaultLocationGenerator,
 };
+use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
 use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
 use iceberg::{Catalog, CatalogBuilder, TableCreation};
 use iceberg_catalog_rest::RestCatalogBuilder;
@@ -73,12 +74,14 @@ async fn test_append_data_file_conflict() {
     let parquet_writer_builder = ParquetWriterBuilder::new(
         WriterProperties::default(),
         table.metadata().current_schema().clone(),
-        None,
+    );
+    let rolling_file_writer_builder = 
RollingFileWriterBuilder::new_with_default_file_size(
+        parquet_writer_builder,
         table.file_io().clone(),
         location_generator.clone(),
         file_name_generator.clone(),
     );
-    let data_file_writer_builder = 
DataFileWriterBuilder::new(parquet_writer_builder, None, 0);
+    let data_file_writer_builder = 
DataFileWriterBuilder::new(rolling_file_writer_builder, None);
     let mut data_file_writer = data_file_writer_builder.build().await.unwrap();
     let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, 
Some("baz")]);
     let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
diff --git a/crates/integration_tests/tests/shared_tests/scan_all_type.rs 
b/crates/integration_tests/tests/shared_tests/scan_all_type.rs
index b8e7c594..1125de11 100644
--- a/crates/integration_tests/tests/shared_tests/scan_all_type.rs
+++ b/crates/integration_tests/tests/shared_tests/scan_all_type.rs
@@ -39,6 +39,7 @@ use iceberg::writer::file_writer::ParquetWriterBuilder;
 use iceberg::writer::file_writer::location_generator::{
     DefaultFileNameGenerator, DefaultLocationGenerator,
 };
+use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
 use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
 use iceberg::{Catalog, CatalogBuilder, TableCreation};
 use iceberg_catalog_rest::RestCatalogBuilder;
@@ -155,12 +156,14 @@ async fn test_scan_all_type() {
     let parquet_writer_builder = ParquetWriterBuilder::new(
         WriterProperties::default(),
         table.metadata().current_schema().clone(),
-        None,
+    );
+    let rolling_file_writer_builder = 
RollingFileWriterBuilder::new_with_default_file_size(
+        parquet_writer_builder,
         table.file_io().clone(),
         location_generator.clone(),
         file_name_generator.clone(),
     );
-    let data_file_writer_builder = 
DataFileWriterBuilder::new(parquet_writer_builder, None, 0);
+    let data_file_writer_builder = 
DataFileWriterBuilder::new(rolling_file_writer_builder, None);
     let mut data_file_writer = data_file_writer_builder.build().await.unwrap();
 
     // Prepare data
diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs 
b/crates/integrations/datafusion/src/physical_plan/write.rs
index 625405c9..712da92b 100644
--- a/crates/integrations/datafusion/src/physical_plan/write.rs
+++ b/crates/integrations/datafusion/src/physical_plan/write.rs
@@ -208,7 +208,6 @@ impl ExecutionPlan for IcebergWriteExec {
             ));
         }
 
-        let spec_id = self.table.metadata().default_partition_spec_id();
         let partition_type = 
self.table.metadata().default_partition_type().clone();
         let format_version = self.table.metadata().format_version();
 
@@ -235,13 +234,7 @@ impl ExecutionPlan for IcebergWriteExec {
         let parquet_file_writer_builder = 
ParquetWriterBuilder::new_with_match_mode(
             WriterProperties::default(),
             self.table.metadata().current_schema().clone(),
-            None,
             FieldMatchMode::Name,
-            self.table.file_io().clone(),
-            DefaultLocationGenerator::new(self.table.metadata().clone())
-                .map_err(to_datafusion_error)?,
-            // todo filename prefix/suffix should be configurable
-            DefaultFileNameGenerator::new(Uuid::now_v7().to_string(), None, 
file_format),
         );
         let target_file_size = match self
             .table
@@ -261,10 +254,22 @@ impl ExecutionPlan for IcebergWriteExec {
                 .map_err(to_datafusion_error)?,
             None => PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
         };
-        let rolling_writer_builder =
-            RollingFileWriterBuilder::new(parquet_file_writer_builder, 
target_file_size);
-        let data_file_writer_builder =
-            DataFileWriterBuilder::new(rolling_writer_builder, None, spec_id);
+
+        let file_io = self.table.file_io().clone();
+        let location_generator = 
DefaultLocationGenerator::new(self.table.metadata().clone())
+            .map_err(to_datafusion_error)?;
+        // todo filename prefix/suffix should be configurable
+        let file_name_generator =
+            DefaultFileNameGenerator::new(Uuid::now_v7().to_string(), None, 
file_format);
+        let rolling_writer_builder = RollingFileWriterBuilder::new(
+            parquet_file_writer_builder,
+            target_file_size,
+            file_io,
+            location_generator,
+            file_name_generator,
+        );
+        // todo specify partition key when partitioning writer is supported
+        let data_file_writer_builder = 
DataFileWriterBuilder::new(rolling_writer_builder, None);
 
         // Get input data
         let data = execute_input_stream(

Reply via email to