This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 42191e97 refactor(writer): Refactor writers for the future
partitioning writers (#1657)
42191e97 is described below
commit 42191e97c4b661b5098233bf6c4015a51a31a3b6
Author: Shawn Chang <[email protected]>
AuthorDate: Thu Oct 9 03:35:34 2025 -0700
refactor(writer): Refactor writers for the future partitioning writers
(#1657)
## Which issue does this PR close?
- Closes #1650
## What changes are included in this PR?
Refactored the writer layers; from a bird’s-eye view, the structure now
looks like this:
```mermaid
flowchart TD
subgraph PartitioningWriter
PW[PartitioningWriter]
subgraph DataFileWriter
RW[DataFileWriter]
subgraph RollingWriter
DFW[RollingWriter]
subgraph FileWriter
FW[FileWriter]
end
DFW --> FW
end
RW --> DFW
end
PW --> RW
end
```
### Key Changes
- Modified `RollingFileWriter` to handle location generator, file name
generator, and partition keys directly
- Simplified `ParquetWriterBuilder` interface to accept output files
during build
- Restructured `DataFileWriterBuilder` to use `RollingFileWriter` with
partition keys
- Updated DataFusion integration to work with the new writer
architecture
- NOTE: Technically DataFusion or any engine should use `TaskWriter` ->
`PartitioningWriter` -> `RollingWriter` -> ..., but `TaskWriter` and
`PartitioningWriter` are not included in this draft so far
## Are these changes tested?
Not yet, but changing the existing tests accordingly should be enough
---
crates/iceberg/src/spec/manifest/data_file.rs | 4 +-
crates/iceberg/src/spec/partition.rs | 24 ++
.../src/writer/base_writer/data_file_writer.rs | 153 ++++++++-----
.../writer/base_writer/equality_delete_writer.rs | 148 ++++++++-----
crates/iceberg/src/writer/file_writer/mod.rs | 4 +-
.../src/writer/file_writer/parquet_writer.rs | 245 ++++++++-------------
.../src/writer/file_writer/rolling_writer.rs | 212 ++++++++++++++----
crates/iceberg/src/writer/mod.rs | 31 ++-
.../tests/shared_tests/append_data_file_test.rs | 7 +-
.../append_partition_data_file_test.rs | 76 +++----
.../tests/shared_tests/conflict_commit_test.rs | 7 +-
.../tests/shared_tests/scan_all_type.rs | 7 +-
.../datafusion/src/physical_plan/write.rs | 27 ++-
13 files changed, 570 insertions(+), 375 deletions(-)
diff --git a/crates/iceberg/src/spec/manifest/data_file.rs
b/crates/iceberg/src/spec/manifest/data_file.rs
index 931f9441..d7455b56 100644
--- a/crates/iceberg/src/spec/manifest/data_file.rs
+++ b/crates/iceberg/src/spec/manifest/data_file.rs
@@ -26,7 +26,7 @@ use serde_with::{DeserializeFromStr, SerializeDisplay};
use super::_serde::DataFileSerde;
use super::{Datum, FormatVersion, Schema, data_file_schema_v1,
data_file_schema_v2};
use crate::error::Result;
-use crate::spec::{Struct, StructType};
+use crate::spec::{DEFAULT_PARTITION_SPEC_ID, Struct, StructType};
use crate::{Error, ErrorKind};
/// Data file carries data file path, partition tuple, metrics, …
@@ -49,6 +49,7 @@ pub struct DataFile {
///
/// Partition data tuple, schema based on the partition spec output using
/// partition field ids for the struct field ids
+ #[builder(default = "Struct::empty()")]
pub(crate) partition: Struct,
/// field id: 103
///
@@ -156,6 +157,7 @@ pub struct DataFile {
pub(crate) first_row_id: Option<i64>,
/// This field is not included in spec. It is just store in memory
representation used
/// in process.
+ #[builder(default = "DEFAULT_PARTITION_SPEC_ID")]
pub(crate) partition_spec_id: i32,
/// field id: 143
///
diff --git a/crates/iceberg/src/spec/partition.rs
b/crates/iceberg/src/spec/partition.rs
index fff9b62d..128db338 100644
--- a/crates/iceberg/src/spec/partition.rs
+++ b/crates/iceberg/src/spec/partition.rs
@@ -194,6 +194,15 @@ impl PartitionKey {
Self { spec, schema, data }
}
+ /// Creates a new partition key from another partition key, with a new
data field.
+ pub fn copy_with_data(&self, data: Struct) -> Self {
+ Self {
+ spec: self.spec.clone(),
+ schema: self.schema.clone(),
+ data,
+ }
+ }
+
/// Generates a partition path based on the partition values.
pub fn to_path(&self) -> String {
self.spec.partition_to_path(&self.data, self.schema.clone())
@@ -207,6 +216,21 @@ impl PartitionKey {
Some(pk) => pk.spec.is_unpartitioned(),
}
}
+
+ /// Returns the associated [`PartitionSpec`].
+ pub fn spec(&self) -> &PartitionSpec {
+ &self.spec
+ }
+
+ /// Returns the associated [`SchemaRef`].
+ pub fn schema(&self) -> &SchemaRef {
+ &self.schema
+ }
+
+ /// Returns the associated [`Struct`].
+ pub fn data(&self) -> &Struct {
+ &self.data
+ }
}
/// Reference to [`UnboundPartitionSpec`].
diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs
b/crates/iceberg/src/writer/base_writer/data_file_writer.rs
index f215673d..a950547d 100644
--- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs
+++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs
@@ -18,86 +18,126 @@
//! This module provide `DataFileWriter`.
use arrow_array::RecordBatch;
-use itertools::Itertools;
-use crate::Result;
-use crate::spec::{DataContentType, DataFile, Struct};
-use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
+use crate::spec::{DataContentType, DataFile, PartitionKey};
+use crate::writer::file_writer::FileWriterBuilder;
+use crate::writer::file_writer::location_generator::{FileNameGenerator,
LocationGenerator};
+use crate::writer::file_writer::rolling_writer::{RollingFileWriter,
RollingFileWriterBuilder};
use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
+use crate::{Error, ErrorKind, Result};
/// Builder for `DataFileWriter`.
#[derive(Clone, Debug)]
-pub struct DataFileWriterBuilder<B: FileWriterBuilder> {
- inner: B,
- partition_value: Option<Struct>,
- partition_spec_id: i32,
+pub struct DataFileWriterBuilder<B: FileWriterBuilder, L: LocationGenerator,
F: FileNameGenerator> {
+ inner: RollingFileWriterBuilder<B, L, F>,
+ partition_key: Option<PartitionKey>,
}
-impl<B: FileWriterBuilder> DataFileWriterBuilder<B> {
- /// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`.
- pub fn new(inner: B, partition_value: Option<Struct>, partition_spec_id:
i32) -> Self {
+impl<B, L, F> DataFileWriterBuilder<B, L, F>
+where
+ B: FileWriterBuilder,
+ L: LocationGenerator,
+ F: FileNameGenerator,
+{
+ /// Create a new `DataFileWriterBuilder` using a
`RollingFileWriterBuilder`.
+ pub fn new(
+ inner_builder: RollingFileWriterBuilder<B, L, F>,
+ partition_key: Option<PartitionKey>,
+ ) -> Self {
Self {
- inner,
- partition_value,
- partition_spec_id,
+ inner: inner_builder,
+ partition_key,
}
}
}
#[async_trait::async_trait]
-impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
- type R = DataFileWriter<B>;
+impl<B, L, F> IcebergWriterBuilder for DataFileWriterBuilder<B, L, F>
+where
+ B: FileWriterBuilder,
+ L: LocationGenerator,
+ F: FileNameGenerator,
+{
+ type R = DataFileWriter<B, L, F>;
async fn build(self) -> Result<Self::R> {
Ok(DataFileWriter {
- inner_writer: Some(self.inner.clone().build().await?),
- partition_value: self.partition_value.unwrap_or(Struct::empty()),
- partition_spec_id: self.partition_spec_id,
+ inner: Some(self.inner.clone().build()),
+ partition_key: self.partition_key,
})
}
}
/// A writer write data is within one spec/partition.
#[derive(Debug)]
-pub struct DataFileWriter<B: FileWriterBuilder> {
- inner_writer: Option<B::R>,
- partition_value: Struct,
- partition_spec_id: i32,
+pub struct DataFileWriter<B: FileWriterBuilder, L: LocationGenerator, F:
FileNameGenerator> {
+ inner: Option<RollingFileWriter<B, L, F>>,
+ partition_key: Option<PartitionKey>,
}
#[async_trait::async_trait]
-impl<B: FileWriterBuilder> IcebergWriter for DataFileWriter<B> {
+impl<B, L, F> IcebergWriter for DataFileWriter<B, L, F>
+where
+ B: FileWriterBuilder,
+ L: LocationGenerator,
+ F: FileNameGenerator,
+{
async fn write(&mut self, batch: RecordBatch) -> Result<()> {
- self.inner_writer.as_mut().unwrap().write(&batch).await
+ if let Some(writer) = self.inner.as_mut() {
+ writer.write(&self.partition_key, &batch).await
+ } else {
+ Err(Error::new(
+ ErrorKind::Unexpected,
+ "Writer is not initialized!",
+ ))
+ }
}
async fn close(&mut self) -> Result<Vec<DataFile>> {
- let writer = self.inner_writer.take().unwrap();
- Ok(writer
- .close()
- .await?
- .into_iter()
- .map(|mut res| {
- res.content(DataContentType::Data);
- res.partition(self.partition_value.clone());
- res.partition_spec_id(self.partition_spec_id);
- res.build().expect("Guaranteed to be valid")
- })
- .collect_vec())
+ if let Some(writer) = self.inner.take() {
+ writer
+ .close()
+ .await?
+ .into_iter()
+ .map(|mut res| {
+ res.content(DataContentType::Data);
+ if let Some(pk) = self.partition_key.as_ref() {
+ res.partition(pk.data().clone());
+ res.partition_spec_id(pk.spec().spec_id());
+ }
+ res.build().map_err(|e| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Failed to build data file: {}", e),
+ )
+ })
+ })
+ .collect()
+ } else {
+ Err(Error::new(
+ ErrorKind::Unexpected,
+ "Data file writer has been closed.",
+ ))
+ }
}
}
-impl<B: FileWriterBuilder> CurrentFileStatus for DataFileWriter<B> {
+impl<B, L, F> CurrentFileStatus for DataFileWriter<B, L, F>
+where
+ B: FileWriterBuilder,
+ L: LocationGenerator,
+ F: FileNameGenerator,
+{
fn current_file_path(&self) -> String {
- self.inner_writer.as_ref().unwrap().current_file_path()
+ self.inner.as_ref().unwrap().current_file_path()
}
fn current_row_num(&self) -> usize {
- self.inner_writer.as_ref().unwrap().current_row_num()
+ self.inner.as_ref().unwrap().current_row_num()
}
fn current_written_size(&self) -> usize {
- self.inner_writer.as_ref().unwrap().current_written_size()
+ self.inner.as_ref().unwrap().current_written_size()
}
}
@@ -116,13 +156,15 @@ mod test {
use crate::Result;
use crate::io::FileIOBuilder;
use crate::spec::{
- DataContentType, DataFileFormat, Literal, NestedField, PrimitiveType,
Schema, Struct, Type,
+ DataContentType, DataFileFormat, Literal, NestedField, PartitionKey,
PartitionSpec,
+ PrimitiveType, Schema, Struct, Type,
};
use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use crate::writer::file_writer::ParquetWriterBuilder;
use crate::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
+ use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
use crate::writer::{IcebergWriter, IcebergWriterBuilder, RecordBatch};
#[tokio::test]
@@ -143,16 +185,16 @@ mod test {
])
.build()?;
- let pw = ParquetWriterBuilder::new(
- WriterProperties::builder().build(),
- Arc::new(schema),
- None,
+ let pw =
ParquetWriterBuilder::new(WriterProperties::builder().build(),
Arc::new(schema));
+
+ let rolling_file_writer_builder =
RollingFileWriterBuilder::new_with_default_file_size(
+ pw,
file_io.clone(),
location_gen,
file_name_gen,
);
- let mut data_file_writer = DataFileWriterBuilder::new(pw, None, 0)
+ let mut data_file_writer =
DataFileWriterBuilder::new(rolling_file_writer_builder, None)
.build()
.await
.unwrap();
@@ -219,20 +261,27 @@ mod test {
NestedField::required(6, "name",
Type::Primitive(PrimitiveType::String)).into(),
])
.build()?;
+ let schema_ref = Arc::new(schema);
let partition_value = Struct::from_iter([Some(Literal::int(1))]);
+ let partition_key = PartitionKey::new(
+ PartitionSpec::builder(schema_ref.clone()).build()?,
+ schema_ref.clone(),
+ partition_value.clone(),
+ );
- let parquet_writer_builder = ParquetWriterBuilder::new(
- WriterProperties::builder().build(),
- Arc::new(schema.clone()),
- None,
+ let parquet_writer_builder =
+ ParquetWriterBuilder::new(WriterProperties::builder().build(),
schema_ref.clone());
+
+ let rolling_file_writer_builder =
RollingFileWriterBuilder::new_with_default_file_size(
+ parquet_writer_builder,
file_io.clone(),
location_gen,
file_name_gen,
);
let mut data_file_writer =
- DataFileWriterBuilder::new(parquet_writer_builder,
Some(partition_value.clone()), 0)
+ DataFileWriterBuilder::new(rolling_file_writer_builder,
Some(partition_key))
.build()
.await?;
diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
index 765ff1ca..6740ed43 100644
--- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
+++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs
@@ -26,21 +26,35 @@ use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use crate::arrow::record_batch_projector::RecordBatchProjector;
use crate::arrow::schema_to_arrow_schema;
-use crate::spec::{DataFile, SchemaRef, Struct};
-use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
+use crate::spec::{DataFile, PartitionKey, SchemaRef};
+use crate::writer::file_writer::FileWriterBuilder;
+use crate::writer::file_writer::location_generator::{FileNameGenerator,
LocationGenerator};
+use crate::writer::file_writer::rolling_writer::{RollingFileWriter,
RollingFileWriterBuilder};
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
use crate::{Error, ErrorKind, Result};
/// Builder for `EqualityDeleteWriter`.
#[derive(Clone, Debug)]
-pub struct EqualityDeleteFileWriterBuilder<B: FileWriterBuilder> {
- inner: B,
+pub struct EqualityDeleteFileWriterBuilder<
+ B: FileWriterBuilder,
+ L: LocationGenerator,
+ F: FileNameGenerator,
+> {
+ inner: RollingFileWriterBuilder<B, L, F>,
config: EqualityDeleteWriterConfig,
}
-impl<B: FileWriterBuilder> EqualityDeleteFileWriterBuilder<B> {
- /// Create a new `EqualityDeleteFileWriterBuilder` using a
`FileWriterBuilder`.
- pub fn new(inner: B, config: EqualityDeleteWriterConfig) -> Self {
+impl<B, L, F> EqualityDeleteFileWriterBuilder<B, L, F>
+where
+ B: FileWriterBuilder,
+ L: LocationGenerator,
+ F: FileNameGenerator,
+{
+ /// Create a new `EqualityDeleteFileWriterBuilder` using a
`RollingFileWriterBuilder`.
+ pub fn new(
+ inner: RollingFileWriterBuilder<B, L, F>,
+ config: EqualityDeleteWriterConfig,
+ ) -> Self {
Self { inner, config }
}
}
@@ -52,8 +66,7 @@ pub struct EqualityDeleteWriterConfig {
equality_ids: Vec<i32>,
// Projector used to project the data chunk into specific fields.
projector: RecordBatchProjector,
- partition_value: Struct,
- partition_spec_id: i32,
+ partition_key: Option<PartitionKey>,
}
impl EqualityDeleteWriterConfig {
@@ -61,8 +74,7 @@ impl EqualityDeleteWriterConfig {
pub fn new(
equality_ids: Vec<i32>,
original_schema: SchemaRef,
- partition_value: Option<Struct>,
- partition_spec_id: i32,
+ partition_key: Option<PartitionKey>,
) -> Result<Self> {
let original_arrow_schema =
Arc::new(schema_to_arrow_schema(&original_schema)?);
let projector = RecordBatchProjector::new(
@@ -98,8 +110,7 @@ impl EqualityDeleteWriterConfig {
Ok(Self {
equality_ids,
projector,
- partition_value: partition_value.unwrap_or(Struct::empty()),
- partition_spec_id,
+ partition_key,
})
}
@@ -110,36 +121,48 @@ impl EqualityDeleteWriterConfig {
}
#[async_trait::async_trait]
-impl<B: FileWriterBuilder> IcebergWriterBuilder for
EqualityDeleteFileWriterBuilder<B> {
- type R = EqualityDeleteFileWriter<B>;
+impl<B, L, F> IcebergWriterBuilder for EqualityDeleteFileWriterBuilder<B, L, F>
+where
+ B: FileWriterBuilder,
+ L: LocationGenerator,
+ F: FileNameGenerator,
+{
+ type R = EqualityDeleteFileWriter<B, L, F>;
async fn build(self) -> Result<Self::R> {
Ok(EqualityDeleteFileWriter {
- inner_writer: Some(self.inner.clone().build().await?),
+ inner: Some(self.inner.clone().build()), // todo revisit this,
probably still need a builder for rolling writer
projector: self.config.projector,
equality_ids: self.config.equality_ids,
- partition_value: self.config.partition_value,
- partition_spec_id: self.config.partition_spec_id,
+ partition_key: self.config.partition_key,
})
}
}
/// Writer used to write equality delete files.
#[derive(Debug)]
-pub struct EqualityDeleteFileWriter<B: FileWriterBuilder> {
- inner_writer: Option<B::R>,
+pub struct EqualityDeleteFileWriter<
+ B: FileWriterBuilder,
+ L: LocationGenerator,
+ F: FileNameGenerator,
+> {
+ inner: Option<RollingFileWriter<B, L, F>>,
projector: RecordBatchProjector,
equality_ids: Vec<i32>,
- partition_value: Struct,
- partition_spec_id: i32,
+ partition_key: Option<PartitionKey>,
}
#[async_trait::async_trait]
-impl<B: FileWriterBuilder> IcebergWriter for EqualityDeleteFileWriter<B> {
+impl<B, L, F> IcebergWriter for EqualityDeleteFileWriter<B, L, F>
+where
+ B: FileWriterBuilder,
+ L: LocationGenerator,
+ F: FileNameGenerator,
+{
async fn write(&mut self, batch: RecordBatch) -> Result<()> {
let batch = self.projector.project_batch(batch)?;
- if let Some(writer) = self.inner_writer.as_mut() {
- writer.write(&batch).await
+ if let Some(writer) = self.inner.as_mut() {
+ writer.write(&self.partition_key, &batch).await
} else {
Err(Error::new(
ErrorKind::Unexpected,
@@ -149,19 +172,26 @@ impl<B: FileWriterBuilder> IcebergWriter for
EqualityDeleteFileWriter<B> {
}
async fn close(&mut self) -> Result<Vec<DataFile>> {
- if let Some(writer) = self.inner_writer.take() {
- Ok(writer
+ if let Some(writer) = self.inner.take() {
+ writer
.close()
.await?
.into_iter()
.map(|mut res| {
res.content(crate::spec::DataContentType::EqualityDeletes);
res.equality_ids(Some(self.equality_ids.iter().copied().collect_vec()));
- res.partition(self.partition_value.clone());
- res.partition_spec_id(self.partition_spec_id);
- res.build().expect("msg")
+ if let Some(pk) = self.partition_key.as_ref() {
+ res.partition(pk.data().clone());
+ res.partition_spec_id(pk.spec().spec_id());
+ }
+ res.build().map_err(|e| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Failed to build data file: {}", e),
+ )
+ })
})
- .collect_vec())
+ .collect()
} else {
Err(Error::new(
ErrorKind::Unexpected,
@@ -201,6 +231,7 @@ mod test {
use crate::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
+ use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
use crate::writer::{IcebergWriter, IcebergWriterBuilder};
async fn check_parquet_data_file_with_equality_delete_write(
@@ -397,23 +428,24 @@ mod test {
let equality_ids = vec![0_i32, 8];
let equality_config =
- EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema),
None, 0).unwrap();
+ EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema),
None).unwrap();
let delete_schema =
arrow_schema_to_schema(equality_config.projected_arrow_schema_ref()).unwrap();
let projector = equality_config.projector.clone();
// prepare writer
- let pb = ParquetWriterBuilder::new(
- WriterProperties::builder().build(),
- Arc::new(delete_schema),
- None,
+ let pb =
+ ParquetWriterBuilder::new(WriterProperties::builder().build(),
Arc::new(delete_schema));
+ let rolling_writer_builder =
RollingFileWriterBuilder::new_with_default_file_size(
+ pb,
file_io.clone(),
location_gen,
file_name_gen,
);
- let mut equality_delete_writer =
EqualityDeleteFileWriterBuilder::new(pb, equality_config)
- .build()
- .await?;
+ let mut equality_delete_writer =
+ EqualityDeleteFileWriterBuilder::new(rolling_writer_builder,
equality_config)
+ .build()
+ .await?;
// write
equality_delete_writer.write(to_write.clone()).await?;
@@ -499,19 +531,19 @@ mod test {
.unwrap(),
);
// Float and Double are not allowed to be used for equality delete
- assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone(), None,
0).is_err());
- assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None,
0).is_err());
+ assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone(),
None).is_err());
+ assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone(),
None).is_err());
// Struct is not allowed to be used for equality delete
- assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone(), None,
0).is_err());
+ assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone(),
None).is_err());
// Nested field of struct is allowed to be used for equality delete
- assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone(), None,
0).is_ok());
+ assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone(),
None).is_ok());
// Nested field of map is not allowed to be used for equality delete
- assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone(), None,
0).is_err());
- assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone(), None,
0).is_err());
- assert!(EqualityDeleteWriterConfig::new(vec![9], schema.clone(), None,
0).is_err());
+ assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone(),
None).is_err());
+ assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone(),
None).is_err());
+ assert!(EqualityDeleteWriterConfig::new(vec![9], schema.clone(),
None).is_err());
// Nested field of list is not allowed to be used for equality delete
- assert!(EqualityDeleteWriterConfig::new(vec![10], schema.clone(),
None, 0).is_err());
- assert!(EqualityDeleteWriterConfig::new(vec![11], schema.clone(),
None, 0).is_err());
+ assert!(EqualityDeleteWriterConfig::new(vec![10], schema.clone(),
None).is_err());
+ assert!(EqualityDeleteWriterConfig::new(vec![11], schema.clone(),
None).is_err());
Ok(())
}
@@ -565,22 +597,22 @@ mod test {
.unwrap(),
);
let equality_ids = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13];
- let config =
- EqualityDeleteWriterConfig::new(equality_ids, schema.clone(),
None, 0).unwrap();
+ let config = EqualityDeleteWriterConfig::new(equality_ids,
schema.clone(), None).unwrap();
let delete_arrow_schema = config.projected_arrow_schema_ref().clone();
let delete_schema =
arrow_schema_to_schema(&delete_arrow_schema).unwrap();
- let pb = ParquetWriterBuilder::new(
- WriterProperties::builder().build(),
- Arc::new(delete_schema),
- None,
+ let pb =
+ ParquetWriterBuilder::new(WriterProperties::builder().build(),
Arc::new(delete_schema));
+ let rolling_writer_builder =
RollingFileWriterBuilder::new_with_default_file_size(
+ pb,
file_io.clone(),
location_gen,
file_name_gen,
);
- let mut equality_delete_writer =
EqualityDeleteFileWriterBuilder::new(pb, config)
- .build()
- .await?;
+ let mut equality_delete_writer =
+ EqualityDeleteFileWriterBuilder::new(rolling_writer_builder,
config)
+ .build()
+ .await?;
// prepare data
let col0 = Arc::new(BooleanArray::from(vec![
@@ -763,7 +795,7 @@ mod test {
let to_write = RecordBatch::try_new(arrow_schema.clone(),
columns).unwrap();
let equality_ids = vec![0_i32, 2, 5];
let equality_config =
- EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema),
None, 0).unwrap();
+ EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema),
None).unwrap();
let projector = equality_config.projector.clone();
// check
diff --git a/crates/iceberg/src/writer/file_writer/mod.rs
b/crates/iceberg/src/writer/file_writer/mod.rs
index 2a5a7355..2ed6414c 100644
--- a/crates/iceberg/src/writer/file_writer/mod.rs
+++ b/crates/iceberg/src/writer/file_writer/mod.rs
@@ -27,6 +27,8 @@ use crate::spec::DataFileBuilder;
mod parquet_writer;
pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder};
+use crate::io::OutputFile;
+
pub mod location_generator;
/// Module providing writers that can automatically roll over to new files
based on size thresholds.
pub mod rolling_writer;
@@ -38,7 +40,7 @@ pub trait FileWriterBuilder<O = DefaultOutput>: Send + Clone
+ 'static {
/// The associated file writer type.
type R: FileWriter<O>;
/// Build file writer.
- fn build(self) -> impl Future<Output = Result<Self::R>> + Send;
+ fn build(self, output_file: OutputFile) -> impl Future<Output =
Result<Self::R>> + Send;
}
/// File writer focus on writing record batch to different physical file
format.(Such as parquet. orc)
diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
index 0a8a095e..620f27df 100644
--- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
@@ -34,7 +34,6 @@ use parquet::format::FileMetaData;
use parquet::thrift::{TCompactOutputProtocol, TSerializable};
use thrift::protocol::TOutputProtocol;
-use super::location_generator::{FileNameGenerator, LocationGenerator};
use super::{FileWriter, FileWriterBuilder};
use crate::arrow::{
ArrowFileReader, DEFAULT_MAP_FIELD_NAME, FieldMatchMode,
NanValueCountVisitor,
@@ -43,8 +42,8 @@ use crate::arrow::{
use crate::io::{FileIO, FileWrite, OutputFile};
use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType,
Literal, MapType,
- NestedFieldRef, PartitionKey, PartitionSpec, PrimitiveType, Schema,
SchemaRef, SchemaVisitor,
- Struct, StructType, TableMetadata, Type, visit_schema,
+ NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef,
SchemaVisitor, Struct,
+ StructType, TableMetadata, Type, visit_schema,
};
use crate::transform::create_transform_function;
use crate::writer::{CurrentFileStatus, DataFile};
@@ -52,78 +51,43 @@ use crate::{Error, ErrorKind, Result};
/// ParquetWriterBuilder is used to builder a [`ParquetWriter`]
#[derive(Clone, Debug)]
-pub struct ParquetWriterBuilder<T: LocationGenerator, F: FileNameGenerator> {
+pub struct ParquetWriterBuilder {
props: WriterProperties,
schema: SchemaRef,
- partition_key: Option<PartitionKey>,
match_mode: FieldMatchMode,
-
- file_io: FileIO,
- location_generator: T,
- file_name_generator: F,
}
-impl<T: LocationGenerator, F: FileNameGenerator> ParquetWriterBuilder<T, F> {
+impl ParquetWriterBuilder {
/// Create a new `ParquetWriterBuilder`
/// To construct the write result, the schema should contain the
`PARQUET_FIELD_ID_META_KEY` metadata for each field.
- pub fn new(
- props: WriterProperties,
- schema: SchemaRef,
- partition_key: Option<PartitionKey>,
- file_io: FileIO,
- location_generator: T,
- file_name_generator: F,
- ) -> Self {
- Self::new_with_match_mode(
- props,
- schema,
- partition_key,
- FieldMatchMode::Id,
- file_io,
- location_generator,
- file_name_generator,
- )
+ pub fn new(props: WriterProperties, schema: SchemaRef) -> Self {
+ Self::new_with_match_mode(props, schema, FieldMatchMode::Id)
}
/// Create a new `ParquetWriterBuilder` with custom match mode
pub fn new_with_match_mode(
props: WriterProperties,
schema: SchemaRef,
- partition_key: Option<PartitionKey>,
match_mode: FieldMatchMode,
- file_io: FileIO,
- location_generator: T,
- file_name_generator: F,
) -> Self {
Self {
props,
schema,
- partition_key,
match_mode,
- file_io,
- location_generator,
- file_name_generator,
}
}
}
-impl<T: LocationGenerator, F: FileNameGenerator> FileWriterBuilder for
ParquetWriterBuilder<T, F> {
+impl FileWriterBuilder for ParquetWriterBuilder {
type R = ParquetWriter;
- async fn build(self) -> Result<Self::R> {
- let out_file = self
- .file_io
- .new_output(self.location_generator.generate_location(
- self.partition_key.as_ref(),
- &self.file_name_generator.generate_file_name(),
- ))?;
-
+ async fn build(self, output_file: OutputFile) -> Result<Self::R> {
Ok(ParquetWriter {
schema: self.schema.clone(),
inner_writer: None,
writer_properties: self.props,
current_row_num: 0,
- out_file,
+ output_file,
nan_value_count_visitor:
NanValueCountVisitor::new_with_match_mode(self.match_mode),
})
}
@@ -250,7 +214,7 @@ impl SchemaVisitor for IndexByParquetPathName {
/// `ParquetWriter`` is used to write arrow data into parquet file on storage.
pub struct ParquetWriter {
schema: SchemaRef,
- out_file: OutputFile,
+ output_file: OutputFile,
inner_writer: Option<AsyncArrowWriter<AsyncFileWriter<Box<dyn
FileWrite>>>>,
writer_properties: WriterProperties,
current_row_num: usize,
@@ -555,7 +519,7 @@ impl FileWriter for ParquetWriter {
writer
} else {
let arrow_schema: ArrowSchemaRef =
Arc::new(self.schema.as_ref().try_into()?);
- let inner_writer = self.out_file.writer().await?;
+ let inner_writer = self.output_file.writer().await?;
let async_writer = AsyncFileWriter::new(inner_writer);
let writer = AsyncArrowWriter::try_new(
async_writer,
@@ -594,7 +558,7 @@ impl FileWriter for ParquetWriter {
let written_size = writer.bytes_written();
if self.current_row_num == 0 {
- self.out_file.delete().await.map_err(|err| {
+ self.output_file.delete().await.map_err(|err| {
Error::new(
ErrorKind::Unexpected,
"Failed to delete empty parquet file.",
@@ -616,7 +580,7 @@ impl FileWriter for ParquetWriter {
self.schema,
parquet_metadata,
written_size,
- self.out_file.location().to_string(),
+ self.output_file.location().to_string(),
self.nan_value_count_visitor.nan_value_counts,
)?])
}
@@ -625,7 +589,7 @@ impl FileWriter for ParquetWriter {
impl CurrentFileStatus for ParquetWriter {
fn current_file_path(&self) -> String {
- self.out_file.location().to_string()
+ self.output_file.location().to_string()
}
fn current_row_num(&self) -> usize {
@@ -703,7 +667,7 @@ mod tests {
use crate::io::FileIOBuilder;
use crate::spec::{PrimitiveLiteral, Struct, *};
use crate::writer::file_writer::location_generator::{
- DefaultFileNameGenerator, DefaultLocationGenerator,
+ DefaultFileNameGenerator, DefaultLocationGenerator, FileNameGenerator,
LocationGenerator,
};
use crate::writer::tests::check_parquet_data_file;
@@ -871,11 +835,13 @@ mod tests {
// prepare data
let schema = {
- let fields = vec![
- arrow_schema::Field::new("col", arrow_schema::DataType::Int64,
true).with_metadata(
- HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(),
"0".to_string())]),
- ),
- ];
+ let fields =
+ vec![
+ Field::new("col", DataType::Int64,
true).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "0".to_string(),
+ )])),
+ ];
Arc::new(arrow_schema::Schema::new(fields))
};
let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
@@ -883,18 +849,18 @@ mod tests {
let to_write = RecordBatch::try_new(schema.clone(),
vec![col]).unwrap();
let to_write_null = RecordBatch::try_new(schema.clone(),
vec![null_col]).unwrap();
+ let output_file = file_io.new_output(
+ location_gen.generate_location(None,
&file_name_gen.generate_file_name()),
+ )?;
+
// write data
let mut pw = ParquetWriterBuilder::new(
WriterProperties::builder()
.set_max_row_group_size(128)
.build(),
Arc::new(to_write.schema().as_ref().try_into().unwrap()),
- None,
- file_io.clone(),
- location_gen,
- file_name_gen,
)
- .build()
+ .build(output_file)
.await?;
pw.write(&to_write).await?;
pw.write(&to_write_null).await?;
@@ -905,7 +871,7 @@ mod tests {
.next()
.unwrap()
// Put dummy field for build successfully.
- .content(crate::spec::DataContentType::Data)
+ .content(DataContentType::Data)
.partition(Struct::empty())
.partition_spec_id(0)
.build()
@@ -1006,7 +972,7 @@ mod tests {
None,
));
let col5 = Arc::new({
- let mut map_array_builder = arrow_array::builder::MapBuilder::new(
+ let mut map_array_builder = MapBuilder::new(
None,
arrow_array::builder::StringBuilder::new(),
arrow_array::builder::ListBuilder::new(arrow_array::builder::PrimitiveBuilder::<
@@ -1083,18 +1049,15 @@ mod tests {
col0, col1, col2, col3, col4, col5,
])
.unwrap();
+ let output_file = file_io.new_output(
+ location_gen.generate_location(None,
&file_name_gen.generate_file_name()),
+ )?;
// write data
- let mut pw = ParquetWriterBuilder::new(
- WriterProperties::builder().build(),
- Arc::new(schema),
- None,
- file_io.clone(),
- location_gen,
- file_name_gen,
- )
- .build()
- .await?;
+ let mut pw =
+ ParquetWriterBuilder::new(WriterProperties::builder().build(),
Arc::new(schema))
+ .build(output_file)
+ .await?;
pw.write(&to_write).await?;
let res = pw.close().await?;
assert_eq!(res.len(), 1);
@@ -1161,7 +1124,7 @@ mod tests {
async fn test_all_type_for_write() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
- let loccation_gen = DefaultLocationGenerator::with_data_location(
+ let location_gen = DefaultLocationGenerator::with_data_location(
temp_dir.path().to_str().unwrap().to_string(),
);
let file_name_gen =
@@ -1276,18 +1239,15 @@ mod tests {
col14, col15, col16,
])
.unwrap();
+ let output_file = file_io.new_output(
+ location_gen.generate_location(None,
&file_name_gen.generate_file_name()),
+ )?;
// write data
- let mut pw = ParquetWriterBuilder::new(
- WriterProperties::builder().build(),
- Arc::new(schema),
- None,
- file_io.clone(),
- loccation_gen,
- file_name_gen,
- )
- .build()
- .await?;
+ let mut pw =
+ ParquetWriterBuilder::new(WriterProperties::builder().build(),
Arc::new(schema))
+ .build(output_file)
+ .await?;
pw.write(&to_write).await?;
let res = pw.close().await?;
assert_eq!(res.len(), 1);
@@ -1405,7 +1365,7 @@ mod tests {
async fn test_decimal_bound() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
- let loccation_gen = DefaultLocationGenerator::with_data_location(
+ let location_gen = DefaultLocationGenerator::with_data_location(
temp_dir.path().to_str().unwrap().to_string(),
);
let file_name_gen =
@@ -1429,16 +1389,12 @@ mod tests {
.unwrap(),
);
let arrow_schema: ArrowSchemaRef =
Arc::new(schema_to_arrow_schema(&schema).unwrap());
- let mut pw = ParquetWriterBuilder::new(
- WriterProperties::builder().build(),
- schema.clone(),
- None,
- file_io.clone(),
- loccation_gen.clone(),
- file_name_gen.clone(),
- )
- .build()
- .await?;
+ let output_file = file_io.new_output(
+ location_gen.generate_location(None,
&file_name_gen.generate_file_name()),
+ )?;
+ let mut pw =
ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
+ .build(output_file)
+ .await?;
let col0 = Arc::new(
Decimal128Array::from(vec![Some(22000000000), Some(11000000000)])
.with_data_type(DataType::Decimal128(28, 10)),
@@ -1485,16 +1441,12 @@ mod tests {
.unwrap(),
);
let arrow_schema: ArrowSchemaRef =
Arc::new(schema_to_arrow_schema(&schema).unwrap());
- let mut pw = ParquetWriterBuilder::new(
- WriterProperties::builder().build(),
- schema.clone(),
- None,
- file_io.clone(),
- loccation_gen.clone(),
- file_name_gen.clone(),
- )
- .build()
- .await?;
+ let output_file = file_io.new_output(
+ location_gen.generate_location(None,
&file_name_gen.generate_file_name()),
+ )?;
+ let mut pw =
ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone())
+ .build(output_file)
+ .await?;
let col0 = Arc::new(
Decimal128Array::from(vec![Some(-22000000000), Some(-11000000000)])
.with_data_type(DataType::Decimal128(28, 10)),
@@ -1544,16 +1496,12 @@ mod tests {
.unwrap(),
);
let arrow_schema: ArrowSchemaRef =
Arc::new(schema_to_arrow_schema(&schema).unwrap());
- let mut pw = ParquetWriterBuilder::new(
- WriterProperties::builder().build(),
- schema,
- None,
- file_io.clone(),
- loccation_gen,
- file_name_gen,
- )
- .build()
- .await?;
+ let output_file = file_io.new_output(
+ location_gen.generate_location(None,
&file_name_gen.generate_file_name()),
+ )?;
+ let mut pw =
ParquetWriterBuilder::new(WriterProperties::builder().build(), schema)
+ .build(output_file)
+ .await?;
let col0 = Arc::new(
Decimal128Array::from(vec![
Some(decimal_max.mantissa()),
@@ -1676,37 +1624,31 @@ mod tests {
};
let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
let to_write = RecordBatch::try_new(schema.clone(),
vec![col]).unwrap();
+ let file_path = location_gen.generate_location(None,
&file_name_gen.generate_file_name());
+ let output_file = file_io.new_output(&file_path)?;
let mut pw = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
Arc::new(to_write.schema().as_ref().try_into().unwrap()),
- None,
- file_io.clone(),
- location_gen.clone(),
- file_name_gen,
)
- .build()
+ .build(output_file)
.await?;
pw.write(&to_write).await?;
- let file_path = pw.out_file.location().to_string();
pw.close().await.unwrap();
- assert!(file_io.exists(file_path).await.unwrap());
+ assert!(file_io.exists(&file_path).await.unwrap());
// Test that file will not create if no data to write
let file_name_gen =
DefaultFileNameGenerator::new("test_empty".to_string(), None,
DataFileFormat::Parquet);
+ let file_path = location_gen.generate_location(None,
&file_name_gen.generate_file_name());
+ let output_file = file_io.new_output(&file_path)?;
let pw = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
Arc::new(to_write.schema().as_ref().try_into().unwrap()),
- None,
- file_io.clone(),
- location_gen,
- file_name_gen,
)
- .build()
+ .build(output_file)
.await?;
- let file_path = pw.out_file.location().to_string();
pw.close().await.unwrap();
- assert!(!file_io.exists(file_path).await.unwrap());
+ assert!(!file_io.exists(&file_path).await.unwrap());
Ok(())
}
@@ -1746,17 +1688,16 @@ mod tests {
let to_write =
RecordBatch::try_new(arrow_schema.clone(), vec![float_32_col,
float_64_col]).unwrap();
+ let output_file = file_io.new_output(
+ location_gen.generate_location(None,
&file_name_gen.generate_file_name()),
+ )?;
// write data
let mut pw = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
Arc::new(to_write.schema().as_ref().try_into().unwrap()),
- None,
- file_io.clone(),
- location_gen,
- file_name_gen,
)
- .build()
+ .build(output_file)
.await?;
pw.write(&to_write).await?;
@@ -1887,17 +1828,16 @@ mod tests {
struct_nested_float_field_col,
])
.unwrap();
+ let output_file = file_io.new_output(
+ location_gen.generate_location(None,
&file_name_gen.generate_file_name()),
+ )?;
// write data
let mut pw = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
Arc::new(to_write.schema().as_ref().try_into().unwrap()),
- None,
- file_io.clone(),
- location_gen,
- file_name_gen,
)
- .build()
+ .build(output_file)
.await?;
pw.write(&to_write).await?;
@@ -2047,6 +1987,9 @@ mod tests {
// large_list_float_field_col,
])
.expect("Could not form record batch");
+ let output_file = file_io.new_output(
+ location_gen.generate_location(None,
&file_name_gen.generate_file_name()),
+ )?;
// write data
let mut pw = ParquetWriterBuilder::new(
@@ -2058,12 +2001,8 @@ mod tests {
.try_into()
.expect("Could not convert iceberg schema"),
),
- None,
- file_io.clone(),
- location_gen,
- file_name_gen,
)
- .build()
+ .build(output_file)
.await?;
pw.write(&to_write).await?;
@@ -2229,6 +2168,9 @@ mod tests {
struct_list_float_field_col,
])
.expect("Could not form record batch");
+ let output_file = file_io.new_output(
+ location_gen.generate_location(None,
&file_name_gen.generate_file_name()),
+ )?;
// write data
let mut pw = ParquetWriterBuilder::new(
@@ -2240,12 +2182,8 @@ mod tests {
.try_into()
.expect("Could not convert iceberg schema"),
),
- None,
- file_io.clone(),
- location_gen,
- file_name_gen,
)
- .build()
+ .build(output_file)
.await?;
pw.write(&to_write).await?;
@@ -2310,6 +2248,9 @@ mod tests {
);
let file_name_gen =
DefaultFileNameGenerator::new("test".to_string(), None,
DataFileFormat::Parquet);
+ let output_file = file_io
+ .new_output(location_gen.generate_location(None,
&file_name_gen.generate_file_name()))
+ .unwrap();
// write data
let pw = ParquetWriterBuilder::new(
@@ -2325,12 +2266,8 @@ mod tests {
.build()
.expect("Failed to create schema"),
),
- None,
- file_io.clone(),
- location_gen,
- file_name_gen,
)
- .build()
+ .build(output_file)
.await
.unwrap();
diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs
b/crates/iceberg/src/writer/file_writer/rolling_writer.rs
index 181205c3..0b9b105c 100644
--- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs
@@ -15,50 +15,104 @@
// specific language governing permissions and limitations
// under the License.
+use std::fmt::{Debug, Formatter};
+
use arrow_array::RecordBatch;
-use crate::spec::DataFileBuilder;
+use crate::io::{FileIO, OutputFile};
+use crate::spec::{DataFileBuilder,
PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, PartitionKey};
use crate::writer::CurrentFileStatus;
+use crate::writer::file_writer::location_generator::{FileNameGenerator,
LocationGenerator};
use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
use crate::{Error, ErrorKind, Result};
-/// Builder for creating a `RollingFileWriter` that rolls over to a new file
-/// when the data size exceeds a target threshold.
-#[derive(Clone)]
-pub struct RollingFileWriterBuilder<B: FileWriterBuilder> {
+/// Builder for [`RollingFileWriter`].
+#[derive(Clone, Debug)]
+pub struct RollingFileWriterBuilder<
+ B: FileWriterBuilder,
+ L: LocationGenerator,
+ F: FileNameGenerator,
+> {
inner_builder: B,
target_file_size: usize,
+ file_io: FileIO,
+ location_generator: L,
+ file_name_generator: F,
}
-impl<B: FileWriterBuilder> RollingFileWriterBuilder<B> {
- /// Creates a new `RollingFileWriterBuilder` with the specified inner
builder and target size.
+impl<B, L, F> RollingFileWriterBuilder<B, L, F>
+where
+ B: FileWriterBuilder,
+ L: LocationGenerator,
+ F: FileNameGenerator,
+{
+ /// Creates a new `RollingFileWriterBuilder` with the specified target
file size.
///
- /// # Arguments
+ /// # Parameters
///
/// * `inner_builder` - The builder for the underlying file writer
- /// * `target_file_size` - The target size in bytes before rolling over to
a new file
+ /// * `target_file_size` - The target file size in bytes that triggers
rollover
+ /// * `file_io` - The file IO interface for creating output files
+ /// * `location_generator` - Generator for file locations
+ /// * `file_name_generator` - Generator for file names
+ ///
+ /// # Returns
///
- /// NOTE: The `target_file_size` does not exactly reflect the final size
on physical storage.
- /// This is because the input size is based on the Arrow in-memory format
and cannot precisely control rollover behavior.
- /// The actual file size on disk is expected to be slightly larger than
`target_file_size`.
- pub fn new(inner_builder: B, target_file_size: usize) -> Self {
+ /// A new `RollingFileWriterBuilder` instance
+ pub fn new(
+ inner_builder: B,
+ target_file_size: usize,
+ file_io: FileIO,
+ location_generator: L,
+ file_name_generator: F,
+ ) -> Self {
Self {
inner_builder,
target_file_size,
+ file_io,
+ location_generator,
+ file_name_generator,
}
}
-}
-impl<B: FileWriterBuilder> FileWriterBuilder for RollingFileWriterBuilder<B> {
- type R = RollingFileWriter<B>;
+ /// Creates a new `RollingFileWriterBuilder` with the default target file
size.
+ ///
+ /// # Parameters
+ ///
+ /// * `inner_builder` - The builder for the underlying file writer
+ /// * `file_io` - The file IO interface for creating output files
+ /// * `location_generator` - Generator for file locations
+ /// * `file_name_generator` - Generator for file names
+ ///
+ /// # Returns
+ ///
+ /// A new `RollingFileWriterBuilder` instance with default target file size
+ pub fn new_with_default_file_size(
+ inner_builder: B,
+ file_io: FileIO,
+ location_generator: L,
+ file_name_generator: F,
+ ) -> Self {
+ Self {
+ inner_builder,
+ target_file_size: PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
+ file_io,
+ location_generator,
+ file_name_generator,
+ }
+ }
- async fn build(self) -> Result<Self::R> {
- Ok(RollingFileWriter {
+ /// Build a new [`RollingFileWriter`].
+ pub fn build(self) -> RollingFileWriter<B, L, F> {
+ RollingFileWriter {
inner: None,
inner_builder: self.inner_builder,
target_file_size: self.target_file_size,
data_file_builders: vec![],
- })
+ file_io: self.file_io,
+ location_generator: self.location_generator,
+ file_name_generator: self.file_name_generator,
+ }
}
}
@@ -68,14 +122,36 @@ impl<B: FileWriterBuilder> FileWriterBuilder for
RollingFileWriterBuilder<B> {
/// This writer wraps another file writer that tracks the amount of data
written.
/// When the data size exceeds the target size, it closes the current file and
/// starts writing to a new one.
-pub struct RollingFileWriter<B: FileWriterBuilder> {
+pub struct RollingFileWriter<B: FileWriterBuilder, L: LocationGenerator, F:
FileNameGenerator> {
inner: Option<B::R>,
inner_builder: B,
target_file_size: usize,
data_file_builders: Vec<DataFileBuilder>,
+ file_io: FileIO,
+ location_generator: L,
+ file_name_generator: F,
+}
+
+impl<B, L, F> Debug for RollingFileWriter<B, L, F>
+where
+ B: FileWriterBuilder,
+ L: LocationGenerator,
+ F: FileNameGenerator,
+{
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("RollingFileWriter")
+ .field("target_file_size", &self.target_file_size)
+ .field("file_io", &self.file_io)
+ .finish()
+ }
}
-impl<B: FileWriterBuilder> RollingFileWriter<B> {
+impl<B, L, F> RollingFileWriter<B, L, F>
+where
+ B: FileWriterBuilder,
+ L: LocationGenerator,
+ F: FileNameGenerator,
+{
/// Determines if the writer should roll over to a new file.
///
/// # Returns
@@ -84,13 +160,42 @@ impl<B: FileWriterBuilder> RollingFileWriter<B> {
fn should_roll(&self) -> bool {
self.current_written_size() > self.target_file_size
}
-}
-impl<B: FileWriterBuilder> FileWriter for RollingFileWriter<B> {
- async fn write(&mut self, input: &RecordBatch) -> Result<()> {
+ fn new_output_file(&self, partition_key: &Option<PartitionKey>) ->
Result<OutputFile> {
+ self.file_io
+ .new_output(self.location_generator.generate_location(
+ partition_key.as_ref(),
+ &self.file_name_generator.generate_file_name(),
+ ))
+ }
+
+ /// Writes a record batch to the current file, rolling over to a new file
if necessary.
+ ///
+ /// # Parameters
+ ///
+ /// * `partition_key` - Optional partition key for the data
+ /// * `input` - The record batch to write
+ ///
+ /// # Returns
+ ///
+ /// A `Result` indicating success or failure
+ ///
+ /// # Errors
+ ///
+ /// Returns an error if the writer is not initialized or if writing fails
+ pub async fn write(
+ &mut self,
+ partition_key: &Option<PartitionKey>,
+ input: &RecordBatch,
+ ) -> Result<()> {
if self.inner.is_none() {
// initialize inner writer
- self.inner = Some(self.inner_builder.clone().build().await?);
+ self.inner = Some(
+ self.inner_builder
+ .clone()
+ .build(self.new_output_file(partition_key)?)
+ .await?,
+ );
}
if self.should_roll() {
@@ -99,7 +204,12 @@ impl<B: FileWriterBuilder> FileWriter for
RollingFileWriter<B> {
self.data_file_builders.extend(inner.close().await?);
// start a new writer
- self.inner = Some(self.inner_builder.clone().build().await?);
+ self.inner = Some(
+ self.inner_builder
+ .clone()
+ .build(self.new_output_file(partition_key)?)
+ .await?,
+ );
}
}
@@ -114,7 +224,13 @@ impl<B: FileWriterBuilder> FileWriter for
RollingFileWriter<B> {
}
}
- async fn close(mut self) -> Result<Vec<DataFileBuilder>> {
+ /// Closes the writer and returns all data file builders.
+ ///
+ /// # Returns
+ ///
+ /// A `Result` containing a vector of `DataFileBuilder` instances
representing
+ /// all files that were written, including any that were created due to
rollover
+ pub async fn close(mut self) -> Result<Vec<DataFileBuilder>> {
// close the current writer and merge the output
if let Some(current_writer) = self.inner {
self.data_file_builders
@@ -125,7 +241,9 @@ impl<B: FileWriterBuilder> FileWriter for
RollingFileWriter<B> {
}
}
-impl<B: FileWriterBuilder> CurrentFileStatus for RollingFileWriter<B> {
+impl<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator>
CurrentFileStatus
+ for RollingFileWriter<B, L, F>
+{
fn current_file_path(&self) -> String {
self.inner.as_ref().unwrap().current_file_path()
}
@@ -199,22 +317,20 @@ mod tests {
let schema = make_test_schema()?;
// Create writer builders
- let parquet_writer_builder = ParquetWriterBuilder::new(
- WriterProperties::builder().build(),
- Arc::new(schema),
- None,
- file_io.clone(),
- location_gen,
- file_name_gen,
- );
+ let parquet_writer_builder =
+ ParquetWriterBuilder::new(WriterProperties::builder().build(),
Arc::new(schema));
// Set a large target size so no rolling occurs
- let rolling_writer_builder = RollingFileWriterBuilder::new(
+ let rolling_file_writer_builder = RollingFileWriterBuilder::new(
parquet_writer_builder,
- 1024 * 1024, // 1MB, large enough to not trigger rolling
+ 1024 * 1024,
+ file_io.clone(),
+ location_gen,
+ file_name_gen,
);
- let data_file_writer_builder =
DataFileWriterBuilder::new(rolling_writer_builder, None, 0);
+ let data_file_writer_builder =
+ DataFileWriterBuilder::new(rolling_file_writer_builder, None);
// Create writer
let mut writer = data_file_writer_builder.build().await?;
@@ -260,19 +376,19 @@ mod tests {
let schema = make_test_schema()?;
// Create writer builders
- let parquet_writer_builder = ParquetWriterBuilder::new(
- WriterProperties::builder().build(),
- Arc::new(schema),
- None,
- file_io.clone(),
+ let parquet_writer_builder =
+ ParquetWriterBuilder::new(WriterProperties::builder().build(),
Arc::new(schema));
+
+ // Set a very small target size to trigger rolling
+ let rolling_writer_builder = RollingFileWriterBuilder::new(
+ parquet_writer_builder,
+ 1024,
+ file_io,
location_gen,
file_name_gen,
);
- // Set a very small target size to trigger rolling
- let rolling_writer_builder =
RollingFileWriterBuilder::new(parquet_writer_builder, 1024);
-
- let data_file_writer_builder =
DataFileWriterBuilder::new(rolling_writer_builder, None, 0);
+ let data_file_writer_builder =
DataFileWriterBuilder::new(rolling_writer_builder, None);
// Create writer
let mut writer = data_file_writer_builder.build().await?;
diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs
index 8f17d50e..d5a8a668 100644
--- a/crates/iceberg/src/writer/mod.rs
+++ b/crates/iceberg/src/writer/mod.rs
@@ -60,6 +60,9 @@
//! async fn main() -> Result<()> {
//! // Connect to a catalog.
//! use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder};
+//! use iceberg::writer::file_writer::rolling_writer::{
+//! RollingFileWriter, RollingFileWriterBuilder,
+//! };
//! let catalog = MemoryCatalogBuilder::default()
//! .load(
//! "memory",
@@ -86,15 +89,21 @@
//! let parquet_writer_builder = ParquetWriterBuilder::new(
//! WriterProperties::default(),
//! table.metadata().current_schema().clone(),
-//! None,
+//! );
+//!
+//! // Create a rolling file writer using parquet file writer builder.
+//! let rolling_file_writer_builder =
RollingFileWriterBuilder::new_with_default_file_size(
+//! parquet_writer_builder,
//! table.file_io().clone(),
//! location_generator.clone(),
//! file_name_generator.clone(),
//! );
+//!
//! // Create a data file writer using parquet file writer builder.
-//! let data_file_writer_builder =
DataFileWriterBuilder::new(parquet_writer_builder, None, 0);
+//! let data_file_writer_builder =
+//! DataFileWriterBuilder::new(rolling_file_writer_builder, None);
//! // Build the data file writer
-//! let mut data_file_writer =
data_file_writer_builder.build().await.unwrap();
+//! let mut data_file_writer = data_file_writer_builder.build().await?;
//!
//! // Write the data using data_file_writer...
//!
@@ -174,6 +183,9 @@
//! // Connect to a catalog.
//! use iceberg::memory::MEMORY_CATALOG_WAREHOUSE;
//! use iceberg::spec::{Literal, PartitionKey, Struct};
+//! use iceberg::writer::file_writer::rolling_writer::{
+//! RollingFileWriter, RollingFileWriterBuilder,
+//! };
//!
//! let catalog = MemoryCatalogBuilder::default()
//! .load(
@@ -207,13 +219,20 @@
//! let parquet_writer_builder = ParquetWriterBuilder::new(
//! WriterProperties::default(),
//! table.metadata().current_schema().clone(),
-//! Some(partition_key),
+//! );
+//!
+//! // Create a rolling file writer
+//! let rolling_file_writer_builder = RollingFileWriterBuilder::new(
+//! parquet_writer_builder,
+//! 512 * 1024 * 1024,
//! table.file_io().clone(),
//! location_generator.clone(),
//! file_name_generator.clone(),
//! );
-//! // Create a data file writer builder using parquet file writer builder.
-//! let data_file_writer_builder =
DataFileWriterBuilder::new(parquet_writer_builder, None, 0);
+//!
+//! // Create a data file writer builder using rolling file writer.
+//! let data_file_writer_builder =
+//! DataFileWriterBuilder::new(rolling_file_writer_builder,
Some(partition_key));
//! // Create latency record writer using data file writer builder.
//! let latency_record_builder =
LatencyRecordWriterBuilder::new(data_file_writer_builder);
//! // Build the final writer
diff --git
a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs
b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs
index 9a3d6213..f4cba959 100644
--- a/crates/integration_tests/tests/shared_tests/append_data_file_test.rs
+++ b/crates/integration_tests/tests/shared_tests/append_data_file_test.rs
@@ -27,6 +27,7 @@ use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
+use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
use iceberg::{Catalog, CatalogBuilder, TableCreation};
use iceberg_catalog_rest::RestCatalogBuilder;
@@ -74,12 +75,14 @@ async fn test_append_data_file() {
let parquet_writer_builder = ParquetWriterBuilder::new(
WriterProperties::default(),
table.metadata().current_schema().clone(),
- None,
+ );
+ let rolling_file_writer_builder =
RollingFileWriterBuilder::new_with_default_file_size(
+ parquet_writer_builder,
table.file_io().clone(),
location_generator.clone(),
file_name_generator.clone(),
);
- let data_file_writer_builder =
DataFileWriterBuilder::new(parquet_writer_builder, None, 0);
+ let data_file_writer_builder =
DataFileWriterBuilder::new(rolling_file_writer_builder, None);
let mut data_file_writer = data_file_writer_builder.build().await.unwrap();
let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None,
Some("baz")]);
let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
diff --git
a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs
b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs
index c737357e..0da88f1a 100644
---
a/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs
+++
b/crates/integration_tests/tests/shared_tests/append_partition_data_file_test.rs
@@ -31,6 +31,7 @@ use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
+use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
use iceberg::{Catalog, CatalogBuilder, TableCreation};
use iceberg_catalog_rest::RestCatalogBuilder;
@@ -96,18 +97,18 @@ async fn test_append_partition_data_file() {
let parquet_writer_builder = ParquetWriterBuilder::new(
WriterProperties::default(),
table.metadata().current_schema().clone(),
- Some(partition_key),
+ );
+
+ let rolling_file_writer_builder =
RollingFileWriterBuilder::new_with_default_file_size(
+ parquet_writer_builder.clone(),
table.file_io().clone(),
location_generator.clone(),
file_name_generator.clone(),
);
let mut data_file_writer_valid = DataFileWriterBuilder::new(
- parquet_writer_builder.clone(),
- Some(Struct::from_iter([Some(Literal::Primitive(
- PrimitiveLiteral::Int(first_partition_id_value),
- ))])),
- 0,
+ rolling_file_writer_builder.clone(),
+ Some(partition_key.clone()),
)
.build()
.await
@@ -148,44 +149,53 @@ async fn test_append_partition_data_file() {
assert_eq!(batches.len(), 1);
assert_eq!(batches[0], batch);
+ let partition_key = partition_key.copy_with_data(Struct::from_iter([Some(
+ Literal::Primitive(PrimitiveLiteral::Boolean(true)),
+ )]));
test_schema_incompatible_partition_type(
- parquet_writer_builder.clone(),
+ rolling_file_writer_builder.clone(),
batch.clone(),
+ partition_key.clone(),
table.clone(),
&rest_catalog,
)
.await;
+ let partition_key = partition_key.copy_with_data(Struct::from_iter([
+ Some(Literal::Primitive(PrimitiveLiteral::Int(
+ first_partition_id_value,
+ ))),
+ Some(Literal::Primitive(PrimitiveLiteral::Int(
+ first_partition_id_value,
+ ))),
+ ]));
test_schema_incompatible_partition_fields(
- parquet_writer_builder,
+ rolling_file_writer_builder.clone(),
batch,
+ partition_key,
table,
&rest_catalog,
- first_partition_id_value,
)
.await;
}
async fn test_schema_incompatible_partition_type(
- parquet_writer_builder: ParquetWriterBuilder<
+ rolling_file_writer_builder: RollingFileWriterBuilder<
+ ParquetWriterBuilder,
DefaultLocationGenerator,
DefaultFileNameGenerator,
>,
batch: RecordBatch,
+ partition_key: PartitionKey,
table: Table,
catalog: &dyn Catalog,
) {
// test writing different "type" of partition than mentioned in schema
- let mut data_file_writer_invalid = DataFileWriterBuilder::new(
- parquet_writer_builder.clone(),
- Some(Struct::from_iter([Some(Literal::Primitive(
- PrimitiveLiteral::Boolean(true),
- ))])),
- 0,
- )
- .build()
- .await
- .unwrap();
+ let mut data_file_writer_invalid =
+ DataFileWriterBuilder::new(rolling_file_writer_builder,
Some(partition_key))
+ .build()
+ .await
+ .unwrap();
data_file_writer_invalid.write(batch.clone()).await.unwrap();
let data_file_invalid = data_file_writer_invalid.close().await.unwrap();
@@ -200,32 +210,22 @@ async fn test_schema_incompatible_partition_type(
}
async fn test_schema_incompatible_partition_fields(
- parquet_writer_builder: ParquetWriterBuilder<
+ rolling_file_writer_builder: RollingFileWriterBuilder<
+ ParquetWriterBuilder,
DefaultLocationGenerator,
DefaultFileNameGenerator,
>,
batch: RecordBatch,
+ partition_key: PartitionKey,
table: Table,
catalog: &dyn Catalog,
- first_partition_id_value: i32,
) {
// test writing different number of partition fields than mentioned in
schema
-
- let mut data_file_writer_invalid = DataFileWriterBuilder::new(
- parquet_writer_builder,
- Some(Struct::from_iter([
- Some(Literal::Primitive(PrimitiveLiteral::Int(
- first_partition_id_value,
- ))),
- Some(Literal::Primitive(PrimitiveLiteral::Int(
- first_partition_id_value,
- ))),
- ])),
- 0,
- )
- .build()
- .await
- .unwrap();
+ let mut data_file_writer_invalid =
+ DataFileWriterBuilder::new(rolling_file_writer_builder,
Some(partition_key))
+ .build()
+ .await
+ .unwrap();
data_file_writer_invalid.write(batch.clone()).await.unwrap();
let data_file_invalid = data_file_writer_invalid.close().await.unwrap();
diff --git
a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs
b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs
index 4b85612f..a248fa70 100644
--- a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs
+++ b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs
@@ -27,6 +27,7 @@ use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
+use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
use iceberg::{Catalog, CatalogBuilder, TableCreation};
use iceberg_catalog_rest::RestCatalogBuilder;
@@ -73,12 +74,14 @@ async fn test_append_data_file_conflict() {
let parquet_writer_builder = ParquetWriterBuilder::new(
WriterProperties::default(),
table.metadata().current_schema().clone(),
- None,
+ );
+ let rolling_file_writer_builder =
RollingFileWriterBuilder::new_with_default_file_size(
+ parquet_writer_builder,
table.file_io().clone(),
location_generator.clone(),
file_name_generator.clone(),
);
- let data_file_writer_builder =
DataFileWriterBuilder::new(parquet_writer_builder, None, 0);
+ let data_file_writer_builder =
DataFileWriterBuilder::new(rolling_file_writer_builder, None);
let mut data_file_writer = data_file_writer_builder.build().await.unwrap();
let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None,
Some("baz")]);
let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
diff --git a/crates/integration_tests/tests/shared_tests/scan_all_type.rs
b/crates/integration_tests/tests/shared_tests/scan_all_type.rs
index b8e7c594..1125de11 100644
--- a/crates/integration_tests/tests/shared_tests/scan_all_type.rs
+++ b/crates/integration_tests/tests/shared_tests/scan_all_type.rs
@@ -39,6 +39,7 @@ use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
+use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
use iceberg::{Catalog, CatalogBuilder, TableCreation};
use iceberg_catalog_rest::RestCatalogBuilder;
@@ -155,12 +156,14 @@ async fn test_scan_all_type() {
let parquet_writer_builder = ParquetWriterBuilder::new(
WriterProperties::default(),
table.metadata().current_schema().clone(),
- None,
+ );
+ let rolling_file_writer_builder =
RollingFileWriterBuilder::new_with_default_file_size(
+ parquet_writer_builder,
table.file_io().clone(),
location_generator.clone(),
file_name_generator.clone(),
);
- let data_file_writer_builder =
DataFileWriterBuilder::new(parquet_writer_builder, None, 0);
+ let data_file_writer_builder =
DataFileWriterBuilder::new(rolling_file_writer_builder, None);
let mut data_file_writer = data_file_writer_builder.build().await.unwrap();
// Prepare data
diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs
b/crates/integrations/datafusion/src/physical_plan/write.rs
index 625405c9..712da92b 100644
--- a/crates/integrations/datafusion/src/physical_plan/write.rs
+++ b/crates/integrations/datafusion/src/physical_plan/write.rs
@@ -208,7 +208,6 @@ impl ExecutionPlan for IcebergWriteExec {
));
}
- let spec_id = self.table.metadata().default_partition_spec_id();
let partition_type =
self.table.metadata().default_partition_type().clone();
let format_version = self.table.metadata().format_version();
@@ -235,13 +234,7 @@ impl ExecutionPlan for IcebergWriteExec {
let parquet_file_writer_builder =
ParquetWriterBuilder::new_with_match_mode(
WriterProperties::default(),
self.table.metadata().current_schema().clone(),
- None,
FieldMatchMode::Name,
- self.table.file_io().clone(),
- DefaultLocationGenerator::new(self.table.metadata().clone())
- .map_err(to_datafusion_error)?,
- // todo filename prefix/suffix should be configurable
- DefaultFileNameGenerator::new(Uuid::now_v7().to_string(), None,
file_format),
);
let target_file_size = match self
.table
@@ -261,10 +254,22 @@ impl ExecutionPlan for IcebergWriteExec {
.map_err(to_datafusion_error)?,
None => PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
};
- let rolling_writer_builder =
- RollingFileWriterBuilder::new(parquet_file_writer_builder,
target_file_size);
- let data_file_writer_builder =
- DataFileWriterBuilder::new(rolling_writer_builder, None, spec_id);
+
+ let file_io = self.table.file_io().clone();
+ let location_generator =
DefaultLocationGenerator::new(self.table.metadata().clone())
+ .map_err(to_datafusion_error)?;
+ // todo filename prefix/suffix should be configurable
+ let file_name_generator =
+ DefaultFileNameGenerator::new(Uuid::now_v7().to_string(), None,
file_format);
+ let rolling_writer_builder = RollingFileWriterBuilder::new(
+ parquet_file_writer_builder,
+ target_file_size,
+ file_io,
+ location_generator,
+ file_name_generator,
+ );
+ // todo specify partition key when partitioning writer is supported
+ let data_file_writer_builder =
DataFileWriterBuilder::new(rolling_writer_builder, None);
// Get input data
let data = execute_input_stream(