This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new b6a6f6d2 refine: refine the interface of SnapshotProducer (#1490)
b6a6f6d2 is described below
commit b6a6f6d2642c22b392956b996be4aa04fe447b21
Author: ZENOTME <[email protected]>
AuthorDate: Mon Jul 14 18:23:26 2025 +0800
refine: refine the interface of SnapshotProducer (#1490)
## Which issue does this PR close?
## What changes are included in this PR?
This PR refine the interface of SnapshotProducer:
1. include the table in SnapshotProducer rather than pass using function
param
2. add SnapshotProducer as param for ManifestProcess,
SnapshotProduceOperation
I find that it would convenient when working on merge append:
ManifestProcess and SnapshotProduceOperation can be consider as custom
extension for SnapshotProducer and they would reuse SnapshotProducer as
for common usage. So we should include it in their function param. At
the same time, include Table in SnapshotProducer make thing easier they
can use SnapshotProducer diretcly.
## Are these changes tested?
---------
Co-authored-by: ZENOTME <[email protected]>
Co-authored-by: Renjie Liu <[email protected]>
---
crates/iceberg/src/transaction/append.rs | 34 +++++----
crates/iceberg/src/transaction/snapshot.rs | 119 +++++++++++++++--------------
2 files changed, 84 insertions(+), 69 deletions(-)
diff --git a/crates/iceberg/src/transaction/append.rs
b/crates/iceberg/src/transaction/append.rs
index 9ecbb54d..6e719163 100644
--- a/crates/iceberg/src/transaction/append.rs
+++ b/crates/iceberg/src/transaction/append.rs
@@ -84,14 +84,6 @@ impl FastAppendAction {
#[async_trait]
impl TransactionAction for FastAppendAction {
async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
- // validate added files
- SnapshotProducer::validate_added_data_files(table,
&self.added_data_files)?;
-
- // Checks duplicate files
- if self.check_duplicate {
- SnapshotProducer::validate_duplicate_files(table,
&self.added_data_files).await?;
- }
-
let snapshot_producer = SnapshotProducer::new(
table,
self.commit_uuid.unwrap_or_else(Uuid::now_v7),
@@ -100,8 +92,18 @@ impl TransactionAction for FastAppendAction {
self.added_data_files.clone(),
);
+ // validate added files
+ snapshot_producer.validate_added_data_files(&self.added_data_files)?;
+
+ // Checks duplicate files
+ if self.check_duplicate {
+ snapshot_producer
+ .validate_duplicate_files(&self.added_data_files)
+ .await?;
+ }
+
snapshot_producer
- .commit(table, FastAppendOperation, DefaultManifestProcess)
+ .commit(FastAppendOperation, DefaultManifestProcess)
.await
}
}
@@ -115,18 +117,24 @@ impl SnapshotProduceOperation for FastAppendOperation {
async fn delete_entries(
&self,
- _snapshot_produce: &SnapshotProducer,
+ _snapshot_produce: &SnapshotProducer<'_>,
) -> Result<Vec<ManifestEntry>> {
Ok(vec![])
}
- async fn existing_manifest(&self, table: &Table) ->
Result<Vec<ManifestFile>> {
- let Some(snapshot) = table.metadata().current_snapshot() else {
+ async fn existing_manifest(
+ &self,
+ snapshot_produce: &SnapshotProducer<'_>,
+ ) -> Result<Vec<ManifestFile>> {
+ let Some(snapshot) =
snapshot_produce.table.metadata().current_snapshot() else {
return Ok(vec![]);
};
let manifest_list = snapshot
- .load_manifest_list(table.file_io(), &table.metadata_ref())
+ .load_manifest_list(
+ snapshot_produce.table.file_io(),
+ &snapshot_produce.table.metadata_ref(),
+ )
.await?;
Ok(manifest_list
diff --git a/crates/iceberg/src/transaction/snapshot.rs
b/crates/iceberg/src/transaction/snapshot.rs
index 113e6a76..092f9fb2 100644
--- a/crates/iceberg/src/transaction/snapshot.rs
+++ b/crates/iceberg/src/transaction/snapshot.rs
@@ -44,23 +44,32 @@ pub(crate) trait SnapshotProduceOperation: Send + Sync {
) -> impl Future<Output = Result<Vec<ManifestEntry>>> + Send;
fn existing_manifest(
&self,
- table: &Table,
+ snapshot_produce: &SnapshotProducer<'_>,
) -> impl Future<Output = Result<Vec<ManifestFile>>> + Send;
}
pub(crate) struct DefaultManifestProcess;
impl ManifestProcess for DefaultManifestProcess {
- fn process_manifests(&self, manifests: Vec<ManifestFile>) ->
Vec<ManifestFile> {
+ fn process_manifests(
+ &self,
+ _snapshot_produce: &SnapshotProducer<'_>,
+ manifests: Vec<ManifestFile>,
+ ) -> Vec<ManifestFile> {
manifests
}
}
pub(crate) trait ManifestProcess: Send + Sync {
- fn process_manifests(&self, manifests: Vec<ManifestFile>) ->
Vec<ManifestFile>;
+ fn process_manifests(
+ &self,
+ snapshot_produce: &SnapshotProducer<'_>,
+ manifests: Vec<ManifestFile>,
+ ) -> Vec<ManifestFile>;
}
-pub(crate) struct SnapshotProducer {
+pub(crate) struct SnapshotProducer<'a> {
+ pub(crate) table: &'a Table,
snapshot_id: i64,
commit_uuid: Uuid,
key_metadata: Option<Vec<u8>>,
@@ -72,15 +81,16 @@ pub(crate) struct SnapshotProducer {
manifest_counter: RangeFrom<u64>,
}
-impl SnapshotProducer {
+impl<'a> SnapshotProducer<'a> {
pub(crate) fn new(
- table: &Table,
+ table: &'a Table,
commit_uuid: Uuid,
key_metadata: Option<Vec<u8>>,
snapshot_properties: HashMap<String, String>,
added_data_files: Vec<DataFile>,
) -> Self {
Self {
+ table,
snapshot_id: Self::generate_unique_snapshot_id(table),
commit_uuid,
key_metadata,
@@ -90,10 +100,7 @@ impl SnapshotProducer {
}
}
- pub(crate) fn validate_added_data_files(
- table: &Table,
- added_data_files: &[DataFile],
- ) -> Result<()> {
+ pub(crate) fn validate_added_data_files(&self, added_data_files:
&[DataFile]) -> Result<()> {
for data_file in added_data_files {
if data_file.content_type() != crate::spec::DataContentType::Data {
return Err(Error::new(
@@ -102,7 +109,7 @@ impl SnapshotProducer {
));
}
// Check if the data file partition spec id matches the table
default partition spec id.
- if table.metadata().default_partition_spec_id() !=
data_file.partition_spec_id {
+ if self.table.metadata().default_partition_spec_id() !=
data_file.partition_spec_id {
return Err(Error::new(
ErrorKind::DataInvalid,
"Data file partition spec id does not match table default
partition spec id",
@@ -110,7 +117,7 @@ impl SnapshotProducer {
}
Self::validate_partition_value(
data_file.partition(),
- table.metadata().default_partition_type(),
+ self.table.metadata().default_partition_type(),
)?;
}
@@ -118,7 +125,7 @@ impl SnapshotProducer {
}
pub(crate) async fn validate_duplicate_files(
- table: &Table,
+ &self,
added_data_files: &[DataFile],
) -> Result<()> {
let new_files: HashSet<&str> = added_data_files
@@ -127,12 +134,14 @@ impl SnapshotProducer {
.collect();
let mut referenced_files = Vec::new();
- if let Some(current_snapshot) = table.metadata().current_snapshot() {
+ if let Some(current_snapshot) =
self.table.metadata().current_snapshot() {
let manifest_list = current_snapshot
- .load_manifest_list(table.file_io(), &table.metadata_ref())
+ .load_manifest_list(self.table.file_io(),
&self.table.metadata_ref())
.await?;
for manifest_list_entry in manifest_list.entries() {
- let manifest =
manifest_list_entry.load_manifest(table.file_io()).await?;
+ let manifest = manifest_list_entry
+ .load_manifest(self.table.file_io())
+ .await?;
for entry in manifest.entries() {
let file_path = entry.file_path();
if new_files.contains(file_path) && entry.is_alive() {
@@ -177,28 +186,28 @@ impl SnapshotProducer {
snapshot_id
}
- fn new_manifest_writer(
- &mut self,
- content: ManifestContentType,
- table: &Table,
- ) -> Result<ManifestWriter> {
+ fn new_manifest_writer(&mut self, content: ManifestContentType) ->
Result<ManifestWriter> {
let new_manifest_path = format!(
"{}/{}/{}-m{}.{}",
- table.metadata().location(),
+ self.table.metadata().location(),
META_ROOT_PATH,
self.commit_uuid,
self.manifest_counter.next().unwrap(),
DataFileFormat::Avro
);
- let output_file = table.file_io().new_output(new_manifest_path)?;
+ let output_file = self.table.file_io().new_output(new_manifest_path)?;
let builder = ManifestWriterBuilder::new(
output_file,
Some(self.snapshot_id),
self.key_metadata.clone(),
- table.metadata().current_schema().clone(),
- table.metadata().default_partition_spec().as_ref().clone(),
+ self.table.metadata().current_schema().clone(),
+ self.table
+ .metadata()
+ .default_partition_spec()
+ .as_ref()
+ .clone(),
);
- if table.metadata().format_version() == FormatVersion::V1 {
+ if self.table.metadata().format_version() == FormatVersion::V1 {
Ok(builder.build_v1())
} else {
match content {
@@ -240,7 +249,7 @@ impl SnapshotProducer {
}
// Write manifest file for added data files and return the ManifestFile
for ManifestList.
- async fn write_added_manifest(&mut self, table: &Table) ->
Result<ManifestFile> {
+ async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
let added_data_files = std::mem::take(&mut self.added_data_files);
if added_data_files.is_empty() {
return Err(Error::new(
@@ -250,7 +259,7 @@ impl SnapshotProducer {
}
let snapshot_id = self.snapshot_id;
- let format_version = table.metadata().format_version();
+ let format_version = self.table.metadata().format_version();
let manifest_entries = added_data_files.into_iter().map(|data_file| {
let builder = ManifestEntry::builder()
.status(crate::spec::ManifestStatus::Added)
@@ -263,7 +272,7 @@ impl SnapshotProducer {
builder.build()
}
});
- let mut writer = self.new_manifest_writer(ManifestContentType::Data,
table)?;
+ let mut writer = self.new_manifest_writer(ManifestContentType::Data)?;
for entry in manifest_entries {
writer.add_entry(entry)?;
}
@@ -272,29 +281,27 @@ impl SnapshotProducer {
async fn manifest_file<OP: SnapshotProduceOperation, MP: ManifestProcess>(
&mut self,
- table: &Table,
snapshot_produce_operation: &OP,
manifest_process: &MP,
) -> Result<Vec<ManifestFile>> {
- let added_manifest = self.write_added_manifest(table).await?;
- let existing_manifests =
snapshot_produce_operation.existing_manifest(table).await?;
+ let added_manifest = self.write_added_manifest().await?;
+ let existing_manifests =
snapshot_produce_operation.existing_manifest(self).await?;
// # TODO
// Support process delete entries.
let mut manifest_files = vec![added_manifest];
manifest_files.extend(existing_manifests);
- let manifest_files =
manifest_process.process_manifests(manifest_files);
+ let manifest_files = manifest_process.process_manifests(self,
manifest_files);
Ok(manifest_files)
}
// Returns a `Summary` of the current snapshot
fn summary<OP: SnapshotProduceOperation>(
&self,
- table: &Table,
snapshot_produce_operation: &OP,
) -> Result<Summary> {
let mut summary_collector = SnapshotSummaryCollector::default();
- let table_metadata = table.metadata_ref();
+ let table_metadata = self.table.metadata_ref();
let partition_summary_limit = if let Some(limit) = table_metadata
.properties()
@@ -339,10 +346,10 @@ impl SnapshotProducer {
)
}
- fn generate_manifest_list_file_path(&self, table: &Table, attempt: i64) ->
String {
+ fn generate_manifest_list_file_path(&self, attempt: i64) -> String {
format!(
"{}/{}/snap-{}-{}-{}.{}",
- table.metadata().location(),
+ self.table.metadata().location(),
META_ROOT_PATH,
self.snapshot_id,
attempt,
@@ -354,34 +361,34 @@ impl SnapshotProducer {
/// Finished building the action and return the [`ActionCommit`] to the
transaction.
pub(crate) async fn commit<OP: SnapshotProduceOperation, MP:
ManifestProcess>(
mut self,
- table: &Table,
snapshot_produce_operation: OP,
process: MP,
) -> Result<ActionCommit> {
let new_manifests = self
- .manifest_file(table, &snapshot_produce_operation, &process)
+ .manifest_file(&snapshot_produce_operation, &process)
.await?;
- let next_seq_num = table.metadata().next_sequence_number();
+ let next_seq_num = self.table.metadata().next_sequence_number();
- let summary = self
- .summary(table, &snapshot_produce_operation)
- .map_err(|err| {
- Error::new(ErrorKind::Unexpected, "Failed to create snapshot
summary.")
- .with_source(err)
- })?;
+ let summary = self.summary(&snapshot_produce_operation).map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "Failed to create snapshot
summary.").with_source(err)
+ })?;
- let manifest_list_path = self.generate_manifest_list_file_path(table,
0);
+ let manifest_list_path = self.generate_manifest_list_file_path(0);
- let mut manifest_list_writer = match table.metadata().format_version()
{
+ let mut manifest_list_writer = match
self.table.metadata().format_version() {
FormatVersion::V1 => ManifestListWriter::v1(
- table.file_io().new_output(manifest_list_path.clone())?,
+ self.table
+ .file_io()
+ .new_output(manifest_list_path.clone())?,
self.snapshot_id,
- table.metadata().current_snapshot_id(),
+ self.table.metadata().current_snapshot_id(),
),
FormatVersion::V2 => ManifestListWriter::v2(
- table.file_io().new_output(manifest_list_path.clone())?,
+ self.table
+ .file_io()
+ .new_output(manifest_list_path.clone())?,
self.snapshot_id,
- table.metadata().current_snapshot_id(),
+ self.table.metadata().current_snapshot_id(),
next_seq_num,
),
};
@@ -392,10 +399,10 @@ impl SnapshotProducer {
let new_snapshot = Snapshot::builder()
.with_manifest_list(manifest_list_path)
.with_snapshot_id(self.snapshot_id)
- .with_parent_snapshot_id(table.metadata().current_snapshot_id())
+
.with_parent_snapshot_id(self.table.metadata().current_snapshot_id())
.with_sequence_number(next_seq_num)
.with_summary(summary)
- .with_schema_id(table.metadata().current_schema_id())
+ .with_schema_id(self.table.metadata().current_schema_id())
.with_timestamp_ms(commit_ts)
.build();
@@ -414,11 +421,11 @@ impl SnapshotProducer {
let requirements = vec![
TableRequirement::UuidMatch {
- uuid: table.metadata().uuid(),
+ uuid: self.table.metadata().uuid(),
},
TableRequirement::RefSnapshotIdMatch {
r#ref: MAIN_BRANCH.to_string(),
- snapshot_id: table.metadata().current_snapshot_id(),
+ snapshot_id: self.table.metadata().current_snapshot_id(),
},
];