This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new bf149d3f refactor: Split transaction module (#1080)
bf149d3f is described below
commit bf149d3f7713b50cc7c36e22bafdc4f84de8abd9
Author: Jonathan Chen <[email protected]>
AuthorDate: Thu Mar 13 21:39:16 2025 -0400
refactor: Split transaction module (#1080)
## Which issue does this PR close?
- Closes #980 .
## What changes are included in this PR?
Split transactions module
---
crates/iceberg/src/transaction.rs | 1046 --------------------------
crates/iceberg/src/transaction/append.rs | 373 +++++++++
crates/iceberg/src/transaction/mod.rs | 326 ++++++++
crates/iceberg/src/transaction/snapshot.rs | 309 ++++++++
crates/iceberg/src/transaction/sort_order.rs | 132 ++++
5 files changed, 1140 insertions(+), 1046 deletions(-)
diff --git a/crates/iceberg/src/transaction.rs
b/crates/iceberg/src/transaction.rs
deleted file mode 100644
index 15d5c99a..00000000
--- a/crates/iceberg/src/transaction.rs
+++ /dev/null
@@ -1,1046 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! This module contains transaction api.
-
-use std::cmp::Ordering;
-use std::collections::{HashMap, HashSet};
-use std::future::Future;
-use std::mem::discriminant;
-use std::ops::RangeFrom;
-
-use arrow_array::StringArray;
-use futures::TryStreamExt;
-use uuid::Uuid;
-
-use crate::error::Result;
-use crate::io::OutputFile;
-use crate::spec::{
- DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile,
ManifestListWriter,
- ManifestWriterBuilder, NullOrder, Operation, Snapshot, SnapshotReference,
SnapshotRetention,
- SortDirection, SortField, SortOrder, Struct, StructType, Summary,
Transform, MAIN_BRANCH,
-};
-use crate::table::Table;
-use crate::writer::file_writer::ParquetWriter;
-use crate::TableUpdate::UpgradeFormatVersion;
-use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement,
TableUpdate};
-
-const META_ROOT_PATH: &str = "metadata";
-
-/// Table transaction.
-pub struct Transaction<'a> {
- table: &'a Table,
- updates: Vec<TableUpdate>,
- requirements: Vec<TableRequirement>,
-}
-
-impl<'a> Transaction<'a> {
- /// Creates a new transaction.
- pub fn new(table: &'a Table) -> Self {
- Self {
- table,
- updates: vec![],
- requirements: vec![],
- }
- }
-
- fn append_updates(&mut self, updates: Vec<TableUpdate>) -> Result<()> {
- for update in &updates {
- for up in &self.updates {
- if discriminant(up) == discriminant(update) {
- return Err(Error::new(
- ErrorKind::DataInvalid,
- format!(
- "Cannot apply update with same type at same time:
{:?}",
- update
- ),
- ));
- }
- }
- }
- self.updates.extend(updates);
- Ok(())
- }
-
- fn append_requirements(&mut self, requirements: Vec<TableRequirement>) ->
Result<()> {
- self.requirements.extend(requirements);
- Ok(())
- }
-
- /// Sets table to a new version.
- pub fn upgrade_table_version(mut self, format_version: FormatVersion) ->
Result<Self> {
- let current_version = self.table.metadata().format_version();
- match current_version.cmp(&format_version) {
- Ordering::Greater => {
- return Err(Error::new(
- ErrorKind::DataInvalid,
- format!(
- "Cannot downgrade table version from {} to {}",
- current_version, format_version
- ),
- ));
- }
- Ordering::Less => {
- self.append_updates(vec![UpgradeFormatVersion { format_version
}])?;
- }
- Ordering::Equal => {
- // Do nothing.
- }
- }
- Ok(self)
- }
-
- /// Update table's property.
- pub fn set_properties(mut self, props: HashMap<String, String>) ->
Result<Self> {
- self.append_updates(vec![TableUpdate::SetProperties { updates: props
}])?;
- Ok(self)
- }
-
- fn generate_unique_snapshot_id(&self) -> i64 {
- let generate_random_id = || -> i64 {
- let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
- let snapshot_id = (lhs ^ rhs) as i64;
- if snapshot_id < 0 {
- -snapshot_id
- } else {
- snapshot_id
- }
- };
- let mut snapshot_id = generate_random_id();
- while self
- .table
- .metadata()
- .snapshots()
- .any(|s| s.snapshot_id() == snapshot_id)
- {
- snapshot_id = generate_random_id();
- }
- snapshot_id
- }
-
- /// Creates a fast append action.
- pub fn fast_append(
- self,
- commit_uuid: Option<Uuid>,
- key_metadata: Vec<u8>,
- ) -> Result<FastAppendAction<'a>> {
- let snapshot_id = self.generate_unique_snapshot_id();
- FastAppendAction::new(
- self,
- snapshot_id,
- commit_uuid.unwrap_or_else(Uuid::now_v7),
- key_metadata,
- HashMap::new(),
- )
- }
-
- /// Creates replace sort order action.
- pub fn replace_sort_order(self) -> ReplaceSortOrderAction<'a> {
- ReplaceSortOrderAction {
- tx: self,
- sort_fields: vec![],
- }
- }
-
- /// Remove properties in table.
- pub fn remove_properties(mut self, keys: Vec<String>) -> Result<Self> {
- self.append_updates(vec![TableUpdate::RemoveProperties { removals:
keys }])?;
- Ok(self)
- }
-
- /// Commit transaction.
- pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
- let table_commit = TableCommit::builder()
- .ident(self.table.identifier().clone())
- .updates(self.updates)
- .requirements(self.requirements)
- .build();
-
- catalog.update_table(table_commit).await
- }
-}
-
-/// FastAppendAction is a transaction action for fast append data files to the
table.
-pub struct FastAppendAction<'a> {
- snapshot_produce_action: SnapshotProduceAction<'a>,
- check_duplicate: bool,
-}
-
-impl<'a> FastAppendAction<'a> {
- #[allow(clippy::too_many_arguments)]
- pub(crate) fn new(
- tx: Transaction<'a>,
- snapshot_id: i64,
- commit_uuid: Uuid,
- key_metadata: Vec<u8>,
- snapshot_properties: HashMap<String, String>,
- ) -> Result<Self> {
- Ok(Self {
- snapshot_produce_action: SnapshotProduceAction::new(
- tx,
- snapshot_id,
- key_metadata,
- commit_uuid,
- snapshot_properties,
- )?,
- check_duplicate: true,
- })
- }
-
- /// Set whether to check duplicate files
- pub fn with_check_duplicate(mut self, v: bool) -> Self {
- self.check_duplicate = v;
- self
- }
-
- /// Add data files to the snapshot.
- pub fn add_data_files(
- &mut self,
- data_files: impl IntoIterator<Item = DataFile>,
- ) -> Result<&mut Self> {
- self.snapshot_produce_action.add_data_files(data_files)?;
- Ok(self)
- }
-
- /// Adds existing parquet files
- #[allow(dead_code)]
- async fn add_parquet_files(mut self, file_path: Vec<String>) ->
Result<Transaction<'a>> {
- if !self
- .snapshot_produce_action
- .tx
- .table
- .metadata()
- .default_spec
- .is_unpartitioned()
- {
- return Err(Error::new(
- ErrorKind::FeatureUnsupported,
- "Appending to partitioned tables is not supported",
- ));
- }
-
- let table_metadata = self.snapshot_produce_action.tx.table.metadata();
-
- let data_files = ParquetWriter::parquet_files_to_data_files(
- self.snapshot_produce_action.tx.table.file_io(),
- file_path,
- table_metadata,
- )
- .await?;
-
- self.add_data_files(data_files)?;
-
- self.apply().await
- }
-
- /// Finished building the action and apply it to the transaction.
- pub async fn apply(self) -> Result<Transaction<'a>> {
- // Checks duplicate files
- if self.check_duplicate {
- let new_files: HashSet<&str> = self
- .snapshot_produce_action
- .added_data_files
- .iter()
- .map(|df| df.file_path.as_str())
- .collect();
-
- let mut manifest_stream = self
- .snapshot_produce_action
- .tx
- .table
- .inspect()
- .manifests()
- .scan()
- .await?;
- let mut referenced_files = Vec::new();
-
- while let Some(batch) = manifest_stream.try_next().await? {
- let file_path_array = batch
- .column(1)
- .as_any()
- .downcast_ref::<StringArray>()
- .ok_or_else(|| {
- Error::new(
- ErrorKind::DataInvalid,
- "Failed to downcast file_path column to
StringArray",
- )
- })?;
-
- for i in 0..batch.num_rows() {
- let file_path = file_path_array.value(i);
- if new_files.contains(file_path) {
- referenced_files.push(file_path.to_string());
- }
- }
- }
-
- if !referenced_files.is_empty() {
- return Err(Error::new(
- ErrorKind::DataInvalid,
- format!(
- "Cannot add files that are already referenced by
table, files: {}",
- referenced_files.join(", ")
- ),
- ));
- }
- }
-
- self.snapshot_produce_action
- .apply(FastAppendOperation, DefaultManifestProcess)
- .await
- }
-}
-
-struct FastAppendOperation;
-
-impl SnapshotProduceOperation for FastAppendOperation {
- fn operation(&self) -> Operation {
- Operation::Append
- }
-
- async fn delete_entries(
- &self,
- _snapshot_produce: &SnapshotProduceAction<'_>,
- ) -> Result<Vec<ManifestEntry>> {
- Ok(vec![])
- }
-
- async fn existing_manifest(
- &self,
- snapshot_produce: &SnapshotProduceAction<'_>,
- ) -> Result<Vec<ManifestFile>> {
- let Some(snapshot) =
snapshot_produce.tx.table.metadata().current_snapshot() else {
- return Ok(vec![]);
- };
-
- let manifest_list = snapshot
- .load_manifest_list(
- snapshot_produce.tx.table.file_io(),
- &snapshot_produce.tx.table.metadata_ref(),
- )
- .await?;
-
- Ok(manifest_list
- .entries()
- .iter()
- .filter(|entry| entry.has_added_files() ||
entry.has_existing_files())
- .cloned()
- .collect())
- }
-}
-
-trait SnapshotProduceOperation: Send + Sync {
- fn operation(&self) -> Operation;
- #[allow(unused)]
- fn delete_entries(
- &self,
- snapshot_produce: &SnapshotProduceAction,
- ) -> impl Future<Output = Result<Vec<ManifestEntry>>> + Send;
- fn existing_manifest(
- &self,
- snapshot_produce: &SnapshotProduceAction,
- ) -> impl Future<Output = Result<Vec<ManifestFile>>> + Send;
-}
-
-struct DefaultManifestProcess;
-
-impl ManifestProcess for DefaultManifestProcess {
- fn process_manifeset(&self, manifests: Vec<ManifestFile>) ->
Vec<ManifestFile> {
- manifests
- }
-}
-
-trait ManifestProcess: Send + Sync {
- fn process_manifeset(&self, manifests: Vec<ManifestFile>) ->
Vec<ManifestFile>;
-}
-
-struct SnapshotProduceAction<'a> {
- tx: Transaction<'a>,
- snapshot_id: i64,
- key_metadata: Vec<u8>,
- commit_uuid: Uuid,
- snapshot_properties: HashMap<String, String>,
- added_data_files: Vec<DataFile>,
- // A counter used to generate unique manifest file names.
- // It starts from 0 and increments for each new manifest file.
- // Note: This counter is limited to the range of (0..u64::MAX).
- manifest_counter: RangeFrom<u64>,
-}
-
-impl<'a> SnapshotProduceAction<'a> {
- pub(crate) fn new(
- tx: Transaction<'a>,
- snapshot_id: i64,
- key_metadata: Vec<u8>,
- commit_uuid: Uuid,
- snapshot_properties: HashMap<String, String>,
- ) -> Result<Self> {
- Ok(Self {
- tx,
- snapshot_id,
- commit_uuid,
- snapshot_properties,
- added_data_files: vec![],
- manifest_counter: (0..),
- key_metadata,
- })
- }
-
- // Check if the partition value is compatible with the partition type.
- fn validate_partition_value(
- partition_value: &Struct,
- partition_type: &StructType,
- ) -> Result<()> {
- if partition_value.fields().len() != partition_type.fields().len() {
- return Err(Error::new(
- ErrorKind::DataInvalid,
- "Partition value is not compatible with partition type",
- ));
- }
-
- for (value, field) in
partition_value.fields().iter().zip(partition_type.fields()) {
- if !field
- .field_type
- .as_primitive_type()
- .ok_or_else(|| {
- Error::new(
- ErrorKind::Unexpected,
- "Partition field should only be primitive type.",
- )
- })?
- .compatible(&value.as_primitive_literal().unwrap())
- {
- return Err(Error::new(
- ErrorKind::DataInvalid,
- "Partition value is not compatible partition type",
- ));
- }
- }
- Ok(())
- }
-
- /// Add data files to the snapshot.
- pub fn add_data_files(
- &mut self,
- data_files: impl IntoIterator<Item = DataFile>,
- ) -> Result<&mut Self> {
- let data_files: Vec<DataFile> = data_files.into_iter().collect();
- for data_file in &data_files {
- if data_file.content_type() != crate::spec::DataContentType::Data {
- return Err(Error::new(
- ErrorKind::DataInvalid,
- "Only data content type is allowed for fast append",
- ));
- }
- Self::validate_partition_value(
- data_file.partition(),
- self.tx.table.metadata().default_partition_type(),
- )?;
- }
- self.added_data_files.extend(data_files);
- Ok(self)
- }
-
- fn new_manifest_output(&mut self) -> Result<OutputFile> {
- let new_manifest_path = format!(
- "{}/{}/{}-m{}.{}",
- self.tx.table.metadata().location(),
- META_ROOT_PATH,
- self.commit_uuid,
- self.manifest_counter.next().unwrap(),
- DataFileFormat::Avro
- );
- self.tx.table.file_io().new_output(new_manifest_path)
- }
-
- // Write manifest file for added data files and return the ManifestFile
for ManifestList.
- async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
- let added_data_files = std::mem::take(&mut self.added_data_files);
- let snapshot_id = self.snapshot_id;
- let manifest_entries = added_data_files.into_iter().map(|data_file| {
- let builder = ManifestEntry::builder()
- .status(crate::spec::ManifestStatus::Added)
- .data_file(data_file);
- if self.tx.table.metadata().format_version() == FormatVersion::V1 {
- builder.snapshot_id(snapshot_id).build()
- } else {
- // For format version > 1, we set the snapshot id at the
inherited time to avoid rewrite the manifest file when
- // commit failed.
- builder.build()
- }
- });
- let mut writer = {
- let builder = ManifestWriterBuilder::new(
- self.new_manifest_output()?,
- Some(self.snapshot_id),
- self.key_metadata.clone(),
- self.tx.table.metadata().current_schema().clone(),
- self.tx
- .table
- .metadata()
- .default_partition_spec()
- .as_ref()
- .clone(),
- );
- if self.tx.table.metadata().format_version() == FormatVersion::V1 {
- builder.build_v1()
- } else {
- builder.build_v2_data()
- }
- };
- for entry in manifest_entries {
- writer.add_entry(entry)?;
- }
- writer.write_manifest_file().await
- }
-
- async fn manifest_file<OP: SnapshotProduceOperation, MP: ManifestProcess>(
- &mut self,
- snapshot_produce_operation: &OP,
- manifest_process: &MP,
- ) -> Result<Vec<ManifestFile>> {
- let added_manifest = self.write_added_manifest().await?;
- let existing_manifests =
snapshot_produce_operation.existing_manifest(self).await?;
- // # TODO
- // Support process delete entries.
-
- let mut manifest_files = vec![added_manifest];
- manifest_files.extend(existing_manifests);
- let manifest_files =
manifest_process.process_manifeset(manifest_files);
- Ok(manifest_files)
- }
-
- // # TODO
- // Fulfill this function
- fn summary<OP: SnapshotProduceOperation>(&self,
snapshot_produce_operation: &OP) -> Summary {
- Summary {
- operation: snapshot_produce_operation.operation(),
- additional_properties: self.snapshot_properties.clone(),
- }
- }
-
- fn generate_manifest_list_file_path(&self, attempt: i64) -> String {
- format!(
- "{}/{}/snap-{}-{}-{}.{}",
- self.tx.table.metadata().location(),
- META_ROOT_PATH,
- self.snapshot_id,
- attempt,
- self.commit_uuid,
- DataFileFormat::Avro
- )
- }
-
- /// Finished building the action and apply it to the transaction.
- pub async fn apply<OP: SnapshotProduceOperation, MP: ManifestProcess>(
- mut self,
- snapshot_produce_operation: OP,
- process: MP,
- ) -> Result<Transaction<'a>> {
- let new_manifests = self
- .manifest_file(&snapshot_produce_operation, &process)
- .await?;
- let next_seq_num = self.tx.table.metadata().next_sequence_number();
-
- let summary = self.summary(&snapshot_produce_operation);
-
- let manifest_list_path = self.generate_manifest_list_file_path(0);
-
- let mut manifest_list_writer = match
self.tx.table.metadata().format_version() {
- FormatVersion::V1 => ManifestListWriter::v1(
- self.tx
- .table
- .file_io()
- .new_output(manifest_list_path.clone())?,
- self.snapshot_id,
- self.tx.table.metadata().current_snapshot_id(),
- ),
- FormatVersion::V2 => ManifestListWriter::v2(
- self.tx
- .table
- .file_io()
- .new_output(manifest_list_path.clone())?,
- self.snapshot_id,
- self.tx.table.metadata().current_snapshot_id(),
- next_seq_num,
- ),
- };
- manifest_list_writer.add_manifests(new_manifests.into_iter())?;
- manifest_list_writer.close().await?;
-
- let commit_ts = chrono::Utc::now().timestamp_millis();
- let new_snapshot = Snapshot::builder()
- .with_manifest_list(manifest_list_path)
- .with_snapshot_id(self.snapshot_id)
-
.with_parent_snapshot_id(self.tx.table.metadata().current_snapshot_id())
- .with_sequence_number(next_seq_num)
- .with_summary(summary)
- .with_schema_id(self.tx.table.metadata().current_schema_id())
- .with_timestamp_ms(commit_ts)
- .build();
-
- self.tx.append_updates(vec![
- TableUpdate::AddSnapshot {
- snapshot: new_snapshot,
- },
- TableUpdate::SetSnapshotRef {
- ref_name: MAIN_BRANCH.to_string(),
- reference: SnapshotReference::new(
- self.snapshot_id,
- SnapshotRetention::branch(None, None, None),
- ),
- },
- ])?;
- self.tx.append_requirements(vec![
- TableRequirement::UuidMatch {
- uuid: self.tx.table.metadata().uuid(),
- },
- TableRequirement::RefSnapshotIdMatch {
- r#ref: MAIN_BRANCH.to_string(),
- snapshot_id: self.tx.table.metadata().current_snapshot_id(),
- },
- ])?;
- Ok(self.tx)
- }
-}
-
-/// Transaction action for replacing sort order.
-pub struct ReplaceSortOrderAction<'a> {
- tx: Transaction<'a>,
- sort_fields: Vec<SortField>,
-}
-
-impl<'a> ReplaceSortOrderAction<'a> {
- /// Adds a field for sorting in ascending order.
- pub fn asc(self, name: &str, null_order: NullOrder) -> Result<Self> {
- self.add_sort_field(name, SortDirection::Ascending, null_order)
- }
-
- /// Adds a field for sorting in descending order.
- pub fn desc(self, name: &str, null_order: NullOrder) -> Result<Self> {
- self.add_sort_field(name, SortDirection::Descending, null_order)
- }
-
- /// Finished building the action and apply it to the transaction.
- pub fn apply(mut self) -> Result<Transaction<'a>> {
- let unbound_sort_order = SortOrder::builder()
- .with_fields(self.sort_fields)
- .build_unbound()?;
-
- let updates = vec![
- TableUpdate::AddSortOrder {
- sort_order: unbound_sort_order,
- },
- TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
- ];
-
- let requirements = vec![
- TableRequirement::CurrentSchemaIdMatch {
- current_schema_id:
self.tx.table.metadata().current_schema().schema_id(),
- },
- TableRequirement::DefaultSortOrderIdMatch {
- default_sort_order_id:
self.tx.table.metadata().default_sort_order().order_id,
- },
- ];
-
- self.tx.append_requirements(requirements)?;
- self.tx.append_updates(updates)?;
- Ok(self.tx)
- }
-
- fn add_sort_field(
- mut self,
- name: &str,
- sort_direction: SortDirection,
- null_order: NullOrder,
- ) -> Result<Self> {
- let field_id = self
- .tx
- .table
- .metadata()
- .current_schema()
- .field_id_by_name(name)
- .ok_or_else(|| {
- Error::new(
- ErrorKind::DataInvalid,
- format!("Cannot find field {} in table schema", name),
- )
- })?;
-
- let sort_field = SortField::builder()
- .source_id(field_id)
- .transform(Transform::Identity)
- .direction(sort_direction)
- .null_order(null_order)
- .build();
-
- self.sort_fields.push(sort_field);
- Ok(self)
- }
-}
-
-#[cfg(test)]
-mod tests {
- use std::collections::HashMap;
- use std::fs::File;
- use std::io::BufReader;
-
- use crate::io::FileIOBuilder;
- use crate::scan::tests::TableTestFixture;
- use crate::spec::{
- DataContentType, DataFileBuilder, DataFileFormat, FormatVersion,
Literal, Struct,
- TableMetadata,
- };
- use crate::table::Table;
- use crate::transaction::{Transaction, MAIN_BRANCH};
- use crate::{TableIdent, TableRequirement, TableUpdate};
-
- fn make_v1_table() -> Table {
- let file = File::open(format!(
- "{}/testdata/table_metadata/{}",
- env!("CARGO_MANIFEST_DIR"),
- "TableMetadataV1Valid.json"
- ))
- .unwrap();
- let reader = BufReader::new(file);
- let resp = serde_json::from_reader::<_,
TableMetadata>(reader).unwrap();
-
- Table::builder()
- .metadata(resp)
-
.metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
- .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
- .file_io(FileIOBuilder::new("memory").build().unwrap())
- .build()
- .unwrap()
- }
-
- fn make_v2_table() -> Table {
- let file = File::open(format!(
- "{}/testdata/table_metadata/{}",
- env!("CARGO_MANIFEST_DIR"),
- "TableMetadataV2Valid.json"
- ))
- .unwrap();
- let reader = BufReader::new(file);
- let resp = serde_json::from_reader::<_,
TableMetadata>(reader).unwrap();
-
- Table::builder()
- .metadata(resp)
-
.metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
- .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
- .file_io(FileIOBuilder::new("memory").build().unwrap())
- .build()
- .unwrap()
- }
-
- fn make_v2_minimal_table() -> Table {
- let file = File::open(format!(
- "{}/testdata/table_metadata/{}",
- env!("CARGO_MANIFEST_DIR"),
- "TableMetadataV2ValidMinimal.json"
- ))
- .unwrap();
- let reader = BufReader::new(file);
- let resp = serde_json::from_reader::<_,
TableMetadata>(reader).unwrap();
-
- Table::builder()
- .metadata(resp)
-
.metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
- .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
- .file_io(FileIOBuilder::new("memory").build().unwrap())
- .build()
- .unwrap()
- }
-
- #[test]
- fn test_upgrade_table_version_v1_to_v2() {
- let table = make_v1_table();
- let tx = Transaction::new(&table);
- let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap();
-
- assert_eq!(
- vec![TableUpdate::UpgradeFormatVersion {
- format_version: FormatVersion::V2
- }],
- tx.updates
- );
- }
-
- #[test]
- fn test_upgrade_table_version_v2_to_v2() {
- let table = make_v2_table();
- let tx = Transaction::new(&table);
- let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap();
-
- assert!(
- tx.updates.is_empty(),
- "Upgrade table to same version should not generate any updates"
- );
- assert!(
- tx.requirements.is_empty(),
- "Upgrade table to same version should not generate any
requirements"
- );
- }
-
- #[test]
- fn test_downgrade_table_version() {
- let table = make_v2_table();
- let tx = Transaction::new(&table);
- let tx = tx.upgrade_table_version(FormatVersion::V1);
-
- assert!(tx.is_err(), "Downgrade table version should fail!");
- }
-
- #[test]
- fn test_set_table_property() {
- let table = make_v2_table();
- let tx = Transaction::new(&table);
- let tx = tx
- .set_properties(HashMap::from([("a".to_string(),
"b".to_string())]))
- .unwrap();
-
- assert_eq!(
- vec![TableUpdate::SetProperties {
- updates: HashMap::from([("a".to_string(), "b".to_string())])
- }],
- tx.updates
- );
- }
-
- #[test]
- fn test_remove_property() {
- let table = make_v2_table();
- let tx = Transaction::new(&table);
- let tx = tx
- .remove_properties(vec!["a".to_string(), "b".to_string()])
- .unwrap();
-
- assert_eq!(
- vec![TableUpdate::RemoveProperties {
- removals: vec!["a".to_string(), "b".to_string()]
- }],
- tx.updates
- );
- }
-
- #[test]
- fn test_replace_sort_order() {
- let table = make_v2_table();
- let tx = Transaction::new(&table);
- let tx = tx.replace_sort_order().apply().unwrap();
-
- assert_eq!(
- vec![
- TableUpdate::AddSortOrder {
- sort_order: Default::default()
- },
- TableUpdate::SetDefaultSortOrder { sort_order_id: -1 }
- ],
- tx.updates
- );
-
- assert_eq!(
- vec![
- TableRequirement::CurrentSchemaIdMatch {
- current_schema_id: 1
- },
- TableRequirement::DefaultSortOrderIdMatch {
- default_sort_order_id: 3
- }
- ],
- tx.requirements
- );
- }
-
- #[tokio::test]
- async fn test_fast_append_action() {
- let table = make_v2_minimal_table();
- let tx = Transaction::new(&table);
- let mut action = tx.fast_append(None, vec![]).unwrap();
-
- // check add data file with incompatible partition value
- let data_file = DataFileBuilder::default()
- .content(DataContentType::Data)
- .file_path("test/3.parquet".to_string())
- .file_format(DataFileFormat::Parquet)
- .file_size_in_bytes(100)
- .record_count(1)
- .partition(Struct::from_iter([Some(Literal::string("test"))]))
- .build()
- .unwrap();
- assert!(action.add_data_files(vec![data_file.clone()]).is_err());
-
- let data_file = DataFileBuilder::default()
- .content(DataContentType::Data)
- .file_path("test/3.parquet".to_string())
- .file_format(DataFileFormat::Parquet)
- .file_size_in_bytes(100)
- .record_count(1)
- .partition(Struct::from_iter([Some(Literal::long(300))]))
- .build()
- .unwrap();
- action.add_data_files(vec![data_file.clone()]).unwrap();
- let tx = action.apply().await.unwrap();
-
- // check updates and requirements
- assert!(
- matches!((&tx.updates[0],&tx.updates[1]),
(TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef {
reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id &&
ref_name == MAIN_BRANCH)
- );
- assert_eq!(
- vec![
- TableRequirement::UuidMatch {
- uuid: tx.table.metadata().uuid()
- },
- TableRequirement::RefSnapshotIdMatch {
- r#ref: MAIN_BRANCH.to_string(),
- snapshot_id: tx.table.metadata().current_snapshot_id
- }
- ],
- tx.requirements
- );
-
- // check manifest list
- let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } =
&tx.updates[0] {
- snapshot
- } else {
- unreachable!()
- };
- let manifest_list = new_snapshot
- .load_manifest_list(table.file_io(), table.metadata())
- .await
- .unwrap();
- assert_eq!(1, manifest_list.entries().len());
- assert_eq!(
- manifest_list.entries()[0].sequence_number,
- new_snapshot.sequence_number()
- );
-
- // check manifset
- let manifest = manifest_list.entries()[0]
- .load_manifest(table.file_io())
- .await
- .unwrap();
- assert_eq!(1, manifest.entries().len());
- assert_eq!(
- new_snapshot.sequence_number(),
- manifest.entries()[0]
- .sequence_number()
- .expect("Inherit sequence number by load manifest")
- );
-
- assert_eq!(
- new_snapshot.snapshot_id(),
- manifest.entries()[0].snapshot_id().unwrap()
- );
- assert_eq!(data_file, *manifest.entries()[0].data_file());
- }
-
- #[test]
- fn test_do_same_update_in_same_transaction() {
- let table = make_v2_table();
- let tx = Transaction::new(&table);
- let tx = tx
- .remove_properties(vec!["a".to_string(), "b".to_string()])
- .unwrap();
-
- let tx = tx.remove_properties(vec!["c".to_string(), "d".to_string()]);
-
- assert!(
- tx.is_err(),
- "Should not allow to do same kinds update in same transaction"
- );
- }
-
- #[tokio::test]
- async fn test_add_existing_parquet_files_to_unpartitioned_table() {
- let mut fixture = TableTestFixture::new_unpartitioned();
- fixture.setup_unpartitioned_manifest_files().await;
- let tx = crate::transaction::Transaction::new(&fixture.table);
-
- let file_paths = vec![
- format!("{}/1.parquet", &fixture.table_location),
- format!("{}/2.parquet", &fixture.table_location),
- format!("{}/3.parquet", &fixture.table_location),
- ];
-
- let fast_append_action = tx.fast_append(None, vec![]).unwrap();
-
- // Attempt to add the existing Parquet files with fast append.
- let new_tx = fast_append_action
- .add_parquet_files(file_paths.clone())
- .await
- .expect("Adding existing Parquet files should succeed");
-
- let mut found_add_snapshot = false;
- let mut found_set_snapshot_ref = false;
- for update in new_tx.updates.iter() {
- match update {
- TableUpdate::AddSnapshot { .. } => {
- found_add_snapshot = true;
- }
- TableUpdate::SetSnapshotRef {
- ref_name,
- reference,
- } => {
- found_set_snapshot_ref = true;
- assert_eq!(ref_name, crate::transaction::MAIN_BRANCH);
- assert!(reference.snapshot_id > 0);
- }
- _ => {}
- }
- }
- assert!(found_add_snapshot);
- assert!(found_set_snapshot_ref);
-
- let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } =
&new_tx.updates[0] {
- snapshot
- } else {
- panic!("Expected the first update to be an AddSnapshot update");
- };
-
- let manifest_list = new_snapshot
- .load_manifest_list(fixture.table.file_io(),
fixture.table.metadata())
- .await
- .expect("Failed to load manifest list");
-
- assert_eq!(
- manifest_list.entries().len(),
- 2,
- "Expected 2 manifest list entries, got {}",
- manifest_list.entries().len()
- );
-
- // Load the manifest from the manifest list
- let manifest = manifest_list.entries()[0]
- .load_manifest(fixture.table.file_io())
- .await
- .expect("Failed to load manifest");
-
- // Check that the manifest contains three entries.
- assert_eq!(manifest.entries().len(), 3);
-
- // Verify each file path appears in manifest.
- let manifest_paths: Vec<String> = manifest
- .entries()
- .iter()
- .map(|entry| entry.data_file().file_path.clone())
- .collect();
- for path in file_paths {
- assert!(manifest_paths.contains(&path));
- }
- }
-}
diff --git a/crates/iceberg/src/transaction/append.rs
b/crates/iceberg/src/transaction/append.rs
new file mode 100644
index 00000000..361924de
--- /dev/null
+++ b/crates/iceberg/src/transaction/append.rs
@@ -0,0 +1,373 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::{HashMap, HashSet};
+
+use arrow_array::StringArray;
+use futures::TryStreamExt;
+use uuid::Uuid;
+
+use crate::error::Result;
+use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation};
+use crate::transaction::snapshot::{
+ DefaultManifestProcess, SnapshotProduceAction, SnapshotProduceOperation,
+};
+use crate::transaction::Transaction;
+use crate::writer::file_writer::ParquetWriter;
+use crate::{Error, ErrorKind};
+
+/// FastAppendAction is a transaction action for fast append data files to the
table.
+pub struct FastAppendAction<'a> {
+ snapshot_produce_action: SnapshotProduceAction<'a>,
+ check_duplicate: bool,
+}
+
+impl<'a> FastAppendAction<'a> {
+ #[allow(clippy::too_many_arguments)]
+ pub(crate) fn new(
+ tx: Transaction<'a>,
+ snapshot_id: i64,
+ commit_uuid: Uuid,
+ key_metadata: Vec<u8>,
+ snapshot_properties: HashMap<String, String>,
+ ) -> Result<Self> {
+ Ok(Self {
+ snapshot_produce_action: SnapshotProduceAction::new(
+ tx,
+ snapshot_id,
+ key_metadata,
+ commit_uuid,
+ snapshot_properties,
+ )?,
+ check_duplicate: true,
+ })
+ }
+
+ /// Set whether to check duplicate files
+ pub fn with_check_duplicate(mut self, v: bool) -> Self {
+ self.check_duplicate = v;
+ self
+ }
+
+ /// Add data files to the snapshot.
+ pub fn add_data_files(
+ &mut self,
+ data_files: impl IntoIterator<Item = DataFile>,
+ ) -> Result<&mut Self> {
+ self.snapshot_produce_action.add_data_files(data_files)?;
+ Ok(self)
+ }
+
+ /// Adds existing parquet files
+ #[allow(dead_code)]
+ async fn add_parquet_files(mut self, file_path: Vec<String>) ->
Result<Transaction<'a>> {
+ if !self
+ .snapshot_produce_action
+ .tx
+ .table
+ .metadata()
+ .default_spec
+ .is_unpartitioned()
+ {
+ return Err(Error::new(
+ ErrorKind::FeatureUnsupported,
+ "Appending to partitioned tables is not supported",
+ ));
+ }
+
+ let table_metadata = self.snapshot_produce_action.tx.table.metadata();
+
+ let data_files = ParquetWriter::parquet_files_to_data_files(
+ self.snapshot_produce_action.tx.table.file_io(),
+ file_path,
+ table_metadata,
+ )
+ .await?;
+
+ self.add_data_files(data_files)?;
+
+ self.apply().await
+ }
+
+ /// Finished building the action and apply it to the transaction.
+ pub async fn apply(self) -> Result<Transaction<'a>> {
+ // Checks duplicate files
+ if self.check_duplicate {
+ let new_files: HashSet<&str> = self
+ .snapshot_produce_action
+ .added_data_files
+ .iter()
+ .map(|df| df.file_path.as_str())
+ .collect();
+
+ let mut manifest_stream = self
+ .snapshot_produce_action
+ .tx
+ .table
+ .inspect()
+ .manifests()
+ .scan()
+ .await?;
+ let mut referenced_files = Vec::new();
+
+ while let Some(batch) = manifest_stream.try_next().await? {
+ let file_path_array = batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Failed to downcast file_path column to
StringArray",
+ )
+ })?;
+
+ for i in 0..batch.num_rows() {
+ let file_path = file_path_array.value(i);
+ if new_files.contains(file_path) {
+ referenced_files.push(file_path.to_string());
+ }
+ }
+ }
+
+ if !referenced_files.is_empty() {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot add files that are already referenced by
table, files: {}",
+ referenced_files.join(", ")
+ ),
+ ));
+ }
+ }
+
+ self.snapshot_produce_action
+ .apply(FastAppendOperation, DefaultManifestProcess)
+ .await
+ }
+}
+
+struct FastAppendOperation;
+
+impl SnapshotProduceOperation for FastAppendOperation {
+ fn operation(&self) -> Operation {
+ Operation::Append
+ }
+
+ async fn delete_entries(
+ &self,
+ _snapshot_produce: &SnapshotProduceAction<'_>,
+ ) -> Result<Vec<ManifestEntry>> {
+ Ok(vec![])
+ }
+
+ async fn existing_manifest(
+ &self,
+ snapshot_produce: &SnapshotProduceAction<'_>,
+ ) -> Result<Vec<ManifestFile>> {
+ let Some(snapshot) =
snapshot_produce.tx.table.metadata().current_snapshot() else {
+ return Ok(vec![]);
+ };
+
+ let manifest_list = snapshot
+ .load_manifest_list(
+ snapshot_produce.tx.table.file_io(),
+ &snapshot_produce.tx.table.metadata_ref(),
+ )
+ .await?;
+
+ Ok(manifest_list
+ .entries()
+ .iter()
+ .filter(|entry| entry.has_added_files() ||
entry.has_existing_files())
+ .cloned()
+ .collect())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::scan::tests::TableTestFixture;
+ use crate::spec::{
+ DataContentType, DataFileBuilder, DataFileFormat, Literal, Struct,
MAIN_BRANCH,
+ };
+ use crate::transaction::tests::make_v2_minimal_table;
+ use crate::transaction::Transaction;
+ use crate::{TableRequirement, TableUpdate};
+
+ #[tokio::test]
+ async fn test_fast_append_action() {
+ let table = make_v2_minimal_table();
+ let tx = Transaction::new(&table);
+ let mut action = tx.fast_append(None, vec![]).unwrap();
+
+ // check add data file with incompatible partition value
+ let data_file = DataFileBuilder::default()
+ .content(DataContentType::Data)
+ .file_path("test/3.parquet".to_string())
+ .file_format(DataFileFormat::Parquet)
+ .file_size_in_bytes(100)
+ .record_count(1)
+ .partition(Struct::from_iter([Some(Literal::string("test"))]))
+ .build()
+ .unwrap();
+ assert!(action.add_data_files(vec![data_file.clone()]).is_err());
+
+ let data_file = DataFileBuilder::default()
+ .content(DataContentType::Data)
+ .file_path("test/3.parquet".to_string())
+ .file_format(DataFileFormat::Parquet)
+ .file_size_in_bytes(100)
+ .record_count(1)
+ .partition(Struct::from_iter([Some(Literal::long(300))]))
+ .build()
+ .unwrap();
+ action.add_data_files(vec![data_file.clone()]).unwrap();
+ let tx = action.apply().await.unwrap();
+
+ // check updates and requirements
+ assert!(
+ matches!((&tx.updates[0],&tx.updates[1]),
(TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef {
reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id &&
ref_name == MAIN_BRANCH)
+ );
+ assert_eq!(
+ vec![
+ TableRequirement::UuidMatch {
+ uuid: tx.table.metadata().uuid()
+ },
+ TableRequirement::RefSnapshotIdMatch {
+ r#ref: MAIN_BRANCH.to_string(),
+ snapshot_id: tx.table.metadata().current_snapshot_id
+ }
+ ],
+ tx.requirements
+ );
+
+ // check manifest list
+ let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } =
&tx.updates[0] {
+ snapshot
+ } else {
+ unreachable!()
+ };
+ let manifest_list = new_snapshot
+ .load_manifest_list(table.file_io(), table.metadata())
+ .await
+ .unwrap();
+ assert_eq!(1, manifest_list.entries().len());
+ assert_eq!(
+ manifest_list.entries()[0].sequence_number,
+ new_snapshot.sequence_number()
+ );
+
+ // check manifset
+ let manifest = manifest_list.entries()[0]
+ .load_manifest(table.file_io())
+ .await
+ .unwrap();
+ assert_eq!(1, manifest.entries().len());
+ assert_eq!(
+ new_snapshot.sequence_number(),
+ manifest.entries()[0]
+ .sequence_number()
+ .expect("Inherit sequence number by load manifest")
+ );
+
+ assert_eq!(
+ new_snapshot.snapshot_id(),
+ manifest.entries()[0].snapshot_id().unwrap()
+ );
+ assert_eq!(data_file, *manifest.entries()[0].data_file());
+ }
+
+ #[tokio::test]
+ async fn test_add_existing_parquet_files_to_unpartitioned_table() {
+ let mut fixture = TableTestFixture::new_unpartitioned();
+ fixture.setup_unpartitioned_manifest_files().await;
+ let tx = crate::transaction::Transaction::new(&fixture.table);
+
+ let file_paths = vec![
+ format!("{}/1.parquet", &fixture.table_location),
+ format!("{}/2.parquet", &fixture.table_location),
+ format!("{}/3.parquet", &fixture.table_location),
+ ];
+
+ let fast_append_action = tx.fast_append(None, vec![]).unwrap();
+
+ // Attempt to add the existing Parquet files with fast append.
+ let new_tx = fast_append_action
+ .add_parquet_files(file_paths.clone())
+ .await
+ .expect("Adding existing Parquet files should succeed");
+
+ let mut found_add_snapshot = false;
+ let mut found_set_snapshot_ref = false;
+ for update in new_tx.updates.iter() {
+ match update {
+ TableUpdate::AddSnapshot { .. } => {
+ found_add_snapshot = true;
+ }
+ TableUpdate::SetSnapshotRef {
+ ref_name,
+ reference,
+ } => {
+ found_set_snapshot_ref = true;
+ assert_eq!(ref_name, MAIN_BRANCH);
+ assert!(reference.snapshot_id > 0);
+ }
+ _ => {}
+ }
+ }
+ assert!(found_add_snapshot);
+ assert!(found_set_snapshot_ref);
+
+ let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } =
&new_tx.updates[0] {
+ snapshot
+ } else {
+ panic!("Expected the first update to be an AddSnapshot update");
+ };
+
+ let manifest_list = new_snapshot
+ .load_manifest_list(fixture.table.file_io(),
fixture.table.metadata())
+ .await
+ .expect("Failed to load manifest list");
+
+ assert_eq!(
+ manifest_list.entries().len(),
+ 2,
+ "Expected 2 manifest list entries, got {}",
+ manifest_list.entries().len()
+ );
+
+ // Load the manifest from the manifest list
+ let manifest = manifest_list.entries()[0]
+ .load_manifest(fixture.table.file_io())
+ .await
+ .expect("Failed to load manifest");
+
+ // Check that the manifest contains three entries.
+ assert_eq!(manifest.entries().len(), 3);
+
+ // Verify each file path appears in manifest.
+ let manifest_paths: Vec<String> = manifest
+ .entries()
+ .iter()
+ .map(|entry| entry.data_file().file_path.clone())
+ .collect();
+ for path in file_paths {
+ assert!(manifest_paths.contains(&path));
+ }
+ }
+}
diff --git a/crates/iceberg/src/transaction/mod.rs
b/crates/iceberg/src/transaction/mod.rs
new file mode 100644
index 00000000..d3c7bc3f
--- /dev/null
+++ b/crates/iceberg/src/transaction/mod.rs
@@ -0,0 +1,326 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains transaction api.
+
+mod append;
+mod snapshot;
+mod sort_order;
+
+use std::cmp::Ordering;
+use std::collections::HashMap;
+use std::mem::discriminant;
+
+use uuid::Uuid;
+
+use crate::error::Result;
+use crate::spec::FormatVersion;
+use crate::table::Table;
+use crate::transaction::append::FastAppendAction;
+use crate::transaction::sort_order::ReplaceSortOrderAction;
+use crate::TableUpdate::UpgradeFormatVersion;
+use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement,
TableUpdate};
+
+/// Table transaction.
+pub struct Transaction<'a> {
+ table: &'a Table,
+ updates: Vec<TableUpdate>,
+ requirements: Vec<TableRequirement>,
+}
+
+impl<'a> Transaction<'a> {
+ /// Creates a new transaction.
+ pub fn new(table: &'a Table) -> Self {
+ Self {
+ table,
+ updates: vec![],
+ requirements: vec![],
+ }
+ }
+
+ fn append_updates(&mut self, updates: Vec<TableUpdate>) -> Result<()> {
+ for update in &updates {
+ for up in &self.updates {
+ if discriminant(up) == discriminant(update) {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot apply update with same type at same time:
{:?}",
+ update
+ ),
+ ));
+ }
+ }
+ }
+ self.updates.extend(updates);
+ Ok(())
+ }
+
+ fn append_requirements(&mut self, requirements: Vec<TableRequirement>) ->
Result<()> {
+ self.requirements.extend(requirements);
+ Ok(())
+ }
+
+ /// Sets table to a new version.
+ pub fn upgrade_table_version(mut self, format_version: FormatVersion) ->
Result<Self> {
+ let current_version = self.table.metadata().format_version();
+ match current_version.cmp(&format_version) {
+ Ordering::Greater => {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot downgrade table version from {} to {}",
+ current_version, format_version
+ ),
+ ));
+ }
+ Ordering::Less => {
+ self.append_updates(vec![UpgradeFormatVersion { format_version
}])?;
+ }
+ Ordering::Equal => {
+ // Do nothing.
+ }
+ }
+ Ok(self)
+ }
+
+ /// Update table's property.
+ pub fn set_properties(mut self, props: HashMap<String, String>) ->
Result<Self> {
+ self.append_updates(vec![TableUpdate::SetProperties { updates: props
}])?;
+ Ok(self)
+ }
+
+ fn generate_unique_snapshot_id(&self) -> i64 {
+ let generate_random_id = || -> i64 {
+ let (lhs, rhs) = Uuid::new_v4().as_u64_pair();
+ let snapshot_id = (lhs ^ rhs) as i64;
+ if snapshot_id < 0 {
+ -snapshot_id
+ } else {
+ snapshot_id
+ }
+ };
+ let mut snapshot_id = generate_random_id();
+ while self
+ .table
+ .metadata()
+ .snapshots()
+ .any(|s| s.snapshot_id() == snapshot_id)
+ {
+ snapshot_id = generate_random_id();
+ }
+ snapshot_id
+ }
+
+ /// Creates a fast append action.
+ pub fn fast_append(
+ self,
+ commit_uuid: Option<Uuid>,
+ key_metadata: Vec<u8>,
+ ) -> Result<FastAppendAction<'a>> {
+ let snapshot_id = self.generate_unique_snapshot_id();
+ FastAppendAction::new(
+ self,
+ snapshot_id,
+ commit_uuid.unwrap_or_else(Uuid::now_v7),
+ key_metadata,
+ HashMap::new(),
+ )
+ }
+
+ /// Creates replace sort order action.
+ pub fn replace_sort_order(self) -> ReplaceSortOrderAction<'a> {
+ ReplaceSortOrderAction {
+ tx: self,
+ sort_fields: vec![],
+ }
+ }
+
+ /// Remove properties in table.
+ pub fn remove_properties(mut self, keys: Vec<String>) -> Result<Self> {
+ self.append_updates(vec![TableUpdate::RemoveProperties { removals:
keys }])?;
+ Ok(self)
+ }
+
+ /// Commit transaction.
+ pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
+ let table_commit = TableCommit::builder()
+ .ident(self.table.identifier().clone())
+ .updates(self.updates)
+ .requirements(self.requirements)
+ .build();
+
+ catalog.update_table(table_commit).await
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::collections::HashMap;
+ use std::fs::File;
+ use std::io::BufReader;
+
+ use crate::io::FileIOBuilder;
+ use crate::spec::{FormatVersion, TableMetadata};
+ use crate::table::Table;
+ use crate::transaction::Transaction;
+ use crate::{TableIdent, TableUpdate};
+
+ fn make_v1_table() -> Table {
+ let file = File::open(format!(
+ "{}/testdata/table_metadata/{}",
+ env!("CARGO_MANIFEST_DIR"),
+ "TableMetadataV1Valid.json"
+ ))
+ .unwrap();
+ let reader = BufReader::new(file);
+ let resp = serde_json::from_reader::<_,
TableMetadata>(reader).unwrap();
+
+ Table::builder()
+ .metadata(resp)
+
.metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
+ .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
+ .file_io(FileIOBuilder::new("memory").build().unwrap())
+ .build()
+ .unwrap()
+ }
+
+ pub fn make_v2_table() -> Table {
+ let file = File::open(format!(
+ "{}/testdata/table_metadata/{}",
+ env!("CARGO_MANIFEST_DIR"),
+ "TableMetadataV2Valid.json"
+ ))
+ .unwrap();
+ let reader = BufReader::new(file);
+ let resp = serde_json::from_reader::<_,
TableMetadata>(reader).unwrap();
+
+ Table::builder()
+ .metadata(resp)
+
.metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
+ .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
+ .file_io(FileIOBuilder::new("memory").build().unwrap())
+ .build()
+ .unwrap()
+ }
+
+ pub fn make_v2_minimal_table() -> Table {
+ let file = File::open(format!(
+ "{}/testdata/table_metadata/{}",
+ env!("CARGO_MANIFEST_DIR"),
+ "TableMetadataV2ValidMinimal.json"
+ ))
+ .unwrap();
+ let reader = BufReader::new(file);
+ let resp = serde_json::from_reader::<_,
TableMetadata>(reader).unwrap();
+
+ Table::builder()
+ .metadata(resp)
+
.metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
+ .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
+ .file_io(FileIOBuilder::new("memory").build().unwrap())
+ .build()
+ .unwrap()
+ }
+
+ #[test]
+ fn test_upgrade_table_version_v1_to_v2() {
+ let table = make_v1_table();
+ let tx = Transaction::new(&table);
+ let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap();
+
+ assert_eq!(
+ vec![TableUpdate::UpgradeFormatVersion {
+ format_version: FormatVersion::V2
+ }],
+ tx.updates
+ );
+ }
+
+ #[test]
+ fn test_upgrade_table_version_v2_to_v2() {
+ let table = make_v2_table();
+ let tx = Transaction::new(&table);
+ let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap();
+
+ assert!(
+ tx.updates.is_empty(),
+ "Upgrade table to same version should not generate any updates"
+ );
+ assert!(
+ tx.requirements.is_empty(),
+ "Upgrade table to same version should not generate any
requirements"
+ );
+ }
+
+ #[test]
+ fn test_downgrade_table_version() {
+ let table = make_v2_table();
+ let tx = Transaction::new(&table);
+ let tx = tx.upgrade_table_version(FormatVersion::V1);
+
+ assert!(tx.is_err(), "Downgrade table version should fail!");
+ }
+
+ #[test]
+ fn test_set_table_property() {
+ let table = make_v2_table();
+ let tx = Transaction::new(&table);
+ let tx = tx
+ .set_properties(HashMap::from([("a".to_string(),
"b".to_string())]))
+ .unwrap();
+
+ assert_eq!(
+ vec![TableUpdate::SetProperties {
+ updates: HashMap::from([("a".to_string(), "b".to_string())])
+ }],
+ tx.updates
+ );
+ }
+
+ #[test]
+ fn test_remove_property() {
+ let table = make_v2_table();
+ let tx = Transaction::new(&table);
+ let tx = tx
+ .remove_properties(vec!["a".to_string(), "b".to_string()])
+ .unwrap();
+
+ assert_eq!(
+ vec![TableUpdate::RemoveProperties {
+ removals: vec!["a".to_string(), "b".to_string()]
+ }],
+ tx.updates
+ );
+ }
+
+ #[test]
+ fn test_do_same_update_in_same_transaction() {
+ let table = make_v2_table();
+ let tx = Transaction::new(&table);
+ let tx = tx
+ .remove_properties(vec!["a".to_string(), "b".to_string()])
+ .unwrap();
+
+ let tx = tx.remove_properties(vec!["c".to_string(), "d".to_string()]);
+
+ assert!(
+ tx.is_err(),
+ "Should not allow to do same kinds update in same transaction"
+ );
+ }
+}
diff --git a/crates/iceberg/src/transaction/snapshot.rs
b/crates/iceberg/src/transaction/snapshot.rs
new file mode 100644
index 00000000..4a3035ba
--- /dev/null
+++ b/crates/iceberg/src/transaction/snapshot.rs
@@ -0,0 +1,309 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::future::Future;
+use std::ops::RangeFrom;
+
+use uuid::Uuid;
+
+use crate::error::Result;
+use crate::io::OutputFile;
+use crate::spec::{
+ DataFile, DataFileFormat, FormatVersion, ManifestEntry, ManifestFile,
ManifestListWriter,
+ ManifestWriterBuilder, Operation, Snapshot, SnapshotReference,
SnapshotRetention, Struct,
+ StructType, Summary, MAIN_BRANCH,
+};
+use crate::transaction::Transaction;
+use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
+
+const META_ROOT_PATH: &str = "metadata";
+
+pub(crate) trait SnapshotProduceOperation: Send + Sync {
+ fn operation(&self) -> Operation;
+ #[allow(unused)]
+ fn delete_entries(
+ &self,
+ snapshot_produce: &SnapshotProduceAction,
+ ) -> impl Future<Output = Result<Vec<ManifestEntry>>> + Send;
+ fn existing_manifest(
+ &self,
+ snapshot_produce: &SnapshotProduceAction,
+ ) -> impl Future<Output = Result<Vec<ManifestFile>>> + Send;
+}
+
+pub(crate) struct DefaultManifestProcess;
+
+impl ManifestProcess for DefaultManifestProcess {
+ fn process_manifeset(&self, manifests: Vec<ManifestFile>) ->
Vec<ManifestFile> {
+ manifests
+ }
+}
+
+pub(crate) trait ManifestProcess: Send + Sync {
+ fn process_manifeset(&self, manifests: Vec<ManifestFile>) ->
Vec<ManifestFile>;
+}
+
+pub(crate) struct SnapshotProduceAction<'a> {
+ pub tx: Transaction<'a>,
+ snapshot_id: i64,
+ key_metadata: Vec<u8>,
+ commit_uuid: Uuid,
+ snapshot_properties: HashMap<String, String>,
+ pub added_data_files: Vec<DataFile>,
+ // A counter used to generate unique manifest file names.
+ // It starts from 0 and increments for each new manifest file.
+ // Note: This counter is limited to the range of (0..u64::MAX).
+ manifest_counter: RangeFrom<u64>,
+}
+
+impl<'a> SnapshotProduceAction<'a> {
+ pub(crate) fn new(
+ tx: Transaction<'a>,
+ snapshot_id: i64,
+ key_metadata: Vec<u8>,
+ commit_uuid: Uuid,
+ snapshot_properties: HashMap<String, String>,
+ ) -> Result<Self> {
+ Ok(Self {
+ tx,
+ snapshot_id,
+ commit_uuid,
+ snapshot_properties,
+ added_data_files: vec![],
+ manifest_counter: (0..),
+ key_metadata,
+ })
+ }
+
+ // Check if the partition value is compatible with the partition type.
+ fn validate_partition_value(
+ partition_value: &Struct,
+ partition_type: &StructType,
+ ) -> Result<()> {
+ if partition_value.fields().len() != partition_type.fields().len() {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Partition value is not compatible with partition type",
+ ));
+ }
+
+ for (value, field) in
partition_value.fields().iter().zip(partition_type.fields()) {
+ if !field
+ .field_type
+ .as_primitive_type()
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "Partition field should only be primitive type.",
+ )
+ })?
+ .compatible(&value.as_primitive_literal().unwrap())
+ {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Partition value is not compatible partition type",
+ ));
+ }
+ }
+ Ok(())
+ }
+
+ /// Add data files to the snapshot.
+ pub fn add_data_files(
+ &mut self,
+ data_files: impl IntoIterator<Item = DataFile>,
+ ) -> Result<&mut Self> {
+ let data_files: Vec<DataFile> = data_files.into_iter().collect();
+ for data_file in &data_files {
+ if data_file.content_type() != crate::spec::DataContentType::Data {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Only data content type is allowed for fast append",
+ ));
+ }
+ Self::validate_partition_value(
+ data_file.partition(),
+ self.tx.table.metadata().default_partition_type(),
+ )?;
+ }
+ self.added_data_files.extend(data_files);
+ Ok(self)
+ }
+
+ fn new_manifest_output(&mut self) -> Result<OutputFile> {
+ let new_manifest_path = format!(
+ "{}/{}/{}-m{}.{}",
+ self.tx.table.metadata().location(),
+ META_ROOT_PATH,
+ self.commit_uuid,
+ self.manifest_counter.next().unwrap(),
+ DataFileFormat::Avro
+ );
+ self.tx.table.file_io().new_output(new_manifest_path)
+ }
+
+ // Write manifest file for added data files and return the ManifestFile
for ManifestList.
+ async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
+ let added_data_files = std::mem::take(&mut self.added_data_files);
+ let snapshot_id = self.snapshot_id;
+ let manifest_entries = added_data_files.into_iter().map(|data_file| {
+ let builder = ManifestEntry::builder()
+ .status(crate::spec::ManifestStatus::Added)
+ .data_file(data_file);
+ if self.tx.table.metadata().format_version() == FormatVersion::V1 {
+ builder.snapshot_id(snapshot_id).build()
+ } else {
+ // For format version > 1, we set the snapshot id at the
inherited time to avoid rewrite the manifest file when
+ // commit failed.
+ builder.build()
+ }
+ });
+ let mut writer = {
+ let builder = ManifestWriterBuilder::new(
+ self.new_manifest_output()?,
+ Some(self.snapshot_id),
+ self.key_metadata.clone(),
+ self.tx.table.metadata().current_schema().clone(),
+ self.tx
+ .table
+ .metadata()
+ .default_partition_spec()
+ .as_ref()
+ .clone(),
+ );
+ if self.tx.table.metadata().format_version() == FormatVersion::V1 {
+ builder.build_v1()
+ } else {
+ builder.build_v2_data()
+ }
+ };
+ for entry in manifest_entries {
+ writer.add_entry(entry)?;
+ }
+ writer.write_manifest_file().await
+ }
+
+ async fn manifest_file<OP: SnapshotProduceOperation, MP: ManifestProcess>(
+ &mut self,
+ snapshot_produce_operation: &OP,
+ manifest_process: &MP,
+ ) -> Result<Vec<ManifestFile>> {
+ let added_manifest = self.write_added_manifest().await?;
+ let existing_manifests =
snapshot_produce_operation.existing_manifest(self).await?;
+ // # TODO
+ // Support process delete entries.
+
+ let mut manifest_files = vec![added_manifest];
+ manifest_files.extend(existing_manifests);
+ let manifest_files =
manifest_process.process_manifeset(manifest_files);
+ Ok(manifest_files)
+ }
+
+ // # TODO
+ // Fulfill this function
+ fn summary<OP: SnapshotProduceOperation>(&self,
snapshot_produce_operation: &OP) -> Summary {
+ Summary {
+ operation: snapshot_produce_operation.operation(),
+ additional_properties: self.snapshot_properties.clone(),
+ }
+ }
+
+ fn generate_manifest_list_file_path(&self, attempt: i64) -> String {
+ format!(
+ "{}/{}/snap-{}-{}-{}.{}",
+ self.tx.table.metadata().location(),
+ META_ROOT_PATH,
+ self.snapshot_id,
+ attempt,
+ self.commit_uuid,
+ DataFileFormat::Avro
+ )
+ }
+
+ /// Finished building the action and apply it to the transaction.
+ pub async fn apply<OP: SnapshotProduceOperation, MP: ManifestProcess>(
+ mut self,
+ snapshot_produce_operation: OP,
+ process: MP,
+ ) -> Result<Transaction<'a>> {
+ let new_manifests = self
+ .manifest_file(&snapshot_produce_operation, &process)
+ .await?;
+ let next_seq_num = self.tx.table.metadata().next_sequence_number();
+
+ let summary = self.summary(&snapshot_produce_operation);
+
+ let manifest_list_path = self.generate_manifest_list_file_path(0);
+
+ let mut manifest_list_writer = match
self.tx.table.metadata().format_version() {
+ FormatVersion::V1 => ManifestListWriter::v1(
+ self.tx
+ .table
+ .file_io()
+ .new_output(manifest_list_path.clone())?,
+ self.snapshot_id,
+ self.tx.table.metadata().current_snapshot_id(),
+ ),
+ FormatVersion::V2 => ManifestListWriter::v2(
+ self.tx
+ .table
+ .file_io()
+ .new_output(manifest_list_path.clone())?,
+ self.snapshot_id,
+ self.tx.table.metadata().current_snapshot_id(),
+ next_seq_num,
+ ),
+ };
+ manifest_list_writer.add_manifests(new_manifests.into_iter())?;
+ manifest_list_writer.close().await?;
+
+ let commit_ts = chrono::Utc::now().timestamp_millis();
+ let new_snapshot = Snapshot::builder()
+ .with_manifest_list(manifest_list_path)
+ .with_snapshot_id(self.snapshot_id)
+
.with_parent_snapshot_id(self.tx.table.metadata().current_snapshot_id())
+ .with_sequence_number(next_seq_num)
+ .with_summary(summary)
+ .with_schema_id(self.tx.table.metadata().current_schema_id())
+ .with_timestamp_ms(commit_ts)
+ .build();
+
+ self.tx.append_updates(vec![
+ TableUpdate::AddSnapshot {
+ snapshot: new_snapshot,
+ },
+ TableUpdate::SetSnapshotRef {
+ ref_name: MAIN_BRANCH.to_string(),
+ reference: SnapshotReference::new(
+ self.snapshot_id,
+ SnapshotRetention::branch(None, None, None),
+ ),
+ },
+ ])?;
+ self.tx.append_requirements(vec![
+ TableRequirement::UuidMatch {
+ uuid: self.tx.table.metadata().uuid(),
+ },
+ TableRequirement::RefSnapshotIdMatch {
+ r#ref: MAIN_BRANCH.to_string(),
+ snapshot_id: self.tx.table.metadata().current_snapshot_id(),
+ },
+ ])?;
+ Ok(self.tx)
+ }
+}
diff --git a/crates/iceberg/src/transaction/sort_order.rs
b/crates/iceberg/src/transaction/sort_order.rs
new file mode 100644
index 00000000..4f21eef0
--- /dev/null
+++ b/crates/iceberg/src/transaction/sort_order.rs
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Result;
+use crate::spec::{NullOrder, SortDirection, SortField, SortOrder, Transform};
+use crate::transaction::Transaction;
+use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
+
+/// Transaction action for replacing sort order.
+pub struct ReplaceSortOrderAction<'a> {
+ pub tx: Transaction<'a>,
+ pub sort_fields: Vec<SortField>,
+}
+
+impl<'a> ReplaceSortOrderAction<'a> {
+ /// Adds a field for sorting in ascending order.
+ pub fn asc(self, name: &str, null_order: NullOrder) -> Result<Self> {
+ self.add_sort_field(name, SortDirection::Ascending, null_order)
+ }
+
+ /// Adds a field for sorting in descending order.
+ pub fn desc(self, name: &str, null_order: NullOrder) -> Result<Self> {
+ self.add_sort_field(name, SortDirection::Descending, null_order)
+ }
+
+ /// Finished building the action and apply it to the transaction.
+ pub fn apply(mut self) -> Result<Transaction<'a>> {
+ let unbound_sort_order = SortOrder::builder()
+ .with_fields(self.sort_fields)
+ .build_unbound()?;
+
+ let updates = vec![
+ TableUpdate::AddSortOrder {
+ sort_order: unbound_sort_order,
+ },
+ TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
+ ];
+
+ let requirements = vec![
+ TableRequirement::CurrentSchemaIdMatch {
+ current_schema_id:
self.tx.table.metadata().current_schema().schema_id(),
+ },
+ TableRequirement::DefaultSortOrderIdMatch {
+ default_sort_order_id:
self.tx.table.metadata().default_sort_order().order_id,
+ },
+ ];
+
+ self.tx.append_requirements(requirements)?;
+ self.tx.append_updates(updates)?;
+ Ok(self.tx)
+ }
+
+ fn add_sort_field(
+ mut self,
+ name: &str,
+ sort_direction: SortDirection,
+ null_order: NullOrder,
+ ) -> Result<Self> {
+ let field_id = self
+ .tx
+ .table
+ .metadata()
+ .current_schema()
+ .field_id_by_name(name)
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Cannot find field {} in table schema", name),
+ )
+ })?;
+
+ let sort_field = SortField::builder()
+ .source_id(field_id)
+ .transform(Transform::Identity)
+ .direction(sort_direction)
+ .null_order(null_order)
+ .build();
+
+ self.sort_fields.push(sort_field);
+ Ok(self)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::transaction::tests::make_v2_table;
+ use crate::transaction::Transaction;
+ use crate::{TableRequirement, TableUpdate};
+
+ #[test]
+ fn test_replace_sort_order() {
+ let table = make_v2_table();
+ let tx = Transaction::new(&table);
+ let tx = tx.replace_sort_order().apply().unwrap();
+
+ assert_eq!(
+ vec![
+ TableUpdate::AddSortOrder {
+ sort_order: Default::default()
+ },
+ TableUpdate::SetDefaultSortOrder { sort_order_id: -1 }
+ ],
+ tx.updates
+ );
+
+ assert_eq!(
+ vec![
+ TableRequirement::CurrentSchemaIdMatch {
+ current_schema_id: 1
+ },
+ TableRequirement::DefaultSortOrderIdMatch {
+ default_sort_order_id: 3
+ }
+ ],
+ tx.requirements
+ );
+ }
+}