This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 1138364c TableMetadataBuilder (#587)
1138364c is described below
commit 1138364c630c70d998eaadd61bbf767ff5cdf5c8
Author: Christian <[email protected]>
AuthorDate: Tue Nov 26 09:17:10 2024 +0000
TableMetadataBuilder (#587)
* Squash builder
* Address comments
* Address comments
* Match on FormatVersion to fail for V3
* Fix examples
* Fix tests
* Address comments
* Address comments
* Update crates/iceberg/src/spec/table_metadata_builder.rs
Co-authored-by: Renjie Liu <[email protected]>
* Remove ReferenceType
* Fix import
* Remove current_schema and last_updated_ms accessors
* Ensure main branch is not removed
* Address comments
* Fix tests
* Do not ensure ensure_main_branch_not_removed
* set_branch_snapshot create branch if not exists
---------
Co-authored-by: Renjie Liu <[email protected]>
---
crates/catalog/glue/src/catalog.rs | 4 +-
crates/catalog/glue/src/schema.rs | 4 +-
crates/catalog/glue/src/utils.rs | 4 +-
crates/catalog/hms/src/catalog.rs | 4 +-
crates/catalog/memory/src/catalog.rs | 4 +-
crates/catalog/sql/src/catalog.rs | 4 +-
crates/iceberg/src/catalog/mod.rs | 88 +-
crates/iceberg/src/spec/mod.rs | 1 +
crates/iceberg/src/spec/partition.rs | 6 +-
crates/iceberg/src/spec/schema.rs | 19 +-
crates/iceberg/src/spec/snapshot.rs | 7 +
crates/iceberg/src/spec/table_metadata.rs | 117 +-
crates/iceberg/src/spec/table_metadata_builder.rs | 2160 +++++++++++++++++++++
13 files changed, 2305 insertions(+), 117 deletions(-)
diff --git a/crates/catalog/glue/src/catalog.rs
b/crates/catalog/glue/src/catalog.rs
index 18e30f3d..ad216a3a 100644
--- a/crates/catalog/glue/src/catalog.rs
+++ b/crates/catalog/glue/src/catalog.rs
@@ -355,7 +355,9 @@ impl Catalog for GlueCatalog {
}
};
- let metadata =
TableMetadataBuilder::from_table_creation(creation)?.build()?;
+ let metadata = TableMetadataBuilder::from_table_creation(creation)?
+ .build()?
+ .metadata;
let metadata_location = create_metadata_location(&location, 0)?;
self.file_io
diff --git a/crates/catalog/glue/src/schema.rs
b/crates/catalog/glue/src/schema.rs
index bb676e36..1b490d13 100644
--- a/crates/catalog/glue/src/schema.rs
+++ b/crates/catalog/glue/src/schema.rs
@@ -198,7 +198,9 @@ mod tests {
.location("my_location".to_string())
.schema(schema)
.build();
- let metadata =
TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
+ let metadata =
TableMetadataBuilder::from_table_creation(table_creation)?
+ .build()?
+ .metadata;
Ok(metadata)
}
diff --git a/crates/catalog/glue/src/utils.rs b/crates/catalog/glue/src/utils.rs
index a99fb19c..c43af500 100644
--- a/crates/catalog/glue/src/utils.rs
+++ b/crates/catalog/glue/src/utils.rs
@@ -299,7 +299,9 @@ mod tests {
.location("my_location".to_string())
.schema(schema)
.build();
- let metadata =
TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
+ let metadata =
TableMetadataBuilder::from_table_creation(table_creation)?
+ .build()?
+ .metadata;
Ok(metadata)
}
diff --git a/crates/catalog/hms/src/catalog.rs
b/crates/catalog/hms/src/catalog.rs
index 6e5db196..e19cf4ce 100644
--- a/crates/catalog/hms/src/catalog.rs
+++ b/crates/catalog/hms/src/catalog.rs
@@ -346,7 +346,9 @@ impl Catalog for HmsCatalog {
}
};
- let metadata =
TableMetadataBuilder::from_table_creation(creation)?.build()?;
+ let metadata = TableMetadataBuilder::from_table_creation(creation)?
+ .build()?
+ .metadata;
let metadata_location = create_metadata_location(&location, 0)?;
self.file_io
diff --git a/crates/catalog/memory/src/catalog.rs
b/crates/catalog/memory/src/catalog.rs
index e4192aae..3a4a4e3b 100644
--- a/crates/catalog/memory/src/catalog.rs
+++ b/crates/catalog/memory/src/catalog.rs
@@ -194,7 +194,9 @@ impl Catalog for MemoryCatalog {
}
};
- let metadata =
TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
+ let metadata =
TableMetadataBuilder::from_table_creation(table_creation)?
+ .build()?
+ .metadata;
let metadata_location = format!(
"{}/metadata/{}-{}.metadata.json",
&location,
diff --git a/crates/catalog/sql/src/catalog.rs
b/crates/catalog/sql/src/catalog.rs
index abf22ffd..1556614d 100644
--- a/crates/catalog/sql/src/catalog.rs
+++ b/crates/catalog/sql/src/catalog.rs
@@ -699,7 +699,9 @@ impl Catalog for SqlCatalog {
}
};
- let tbl_metadata =
TableMetadataBuilder::from_table_creation(tbl_creation)?.build()?;
+ let tbl_metadata =
TableMetadataBuilder::from_table_creation(tbl_creation)?
+ .build()?
+ .metadata;
let tbl_metadata_location = format!(
"{}/metadata/0-{}.metadata.json",
location.clone(),
diff --git a/crates/iceberg/src/catalog/mod.rs
b/crates/iceberg/src/catalog/mod.rs
index 77935d2c..ceb29822 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -452,8 +452,46 @@ impl TableUpdate {
/// Applies the update to the table metadata builder.
pub fn apply(self, builder: TableMetadataBuilder) ->
Result<TableMetadataBuilder> {
match self {
- TableUpdate::AssignUuid { uuid } => builder.assign_uuid(uuid),
- _ => unimplemented!(),
+ TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)),
+ TableUpdate::AddSchema {
+ schema,
+ last_column_id,
+ } => {
+ if let Some(last_column_id) = last_column_id {
+ if builder.last_column_id() > last_column_id {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Invalid last column ID: {last_column_id} < {}
(previous last column ID)",
+ builder.last_column_id()
+ ),
+ ));
+ }
+ };
+ Ok(builder.add_schema(schema))
+ }
+ TableUpdate::SetCurrentSchema { schema_id } =>
builder.set_current_schema(schema_id),
+ TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec),
+ TableUpdate::SetDefaultSpec { spec_id } =>
builder.set_default_partition_spec(spec_id),
+ TableUpdate::AddSortOrder { sort_order } =>
builder.add_sort_order(sort_order),
+ TableUpdate::SetDefaultSortOrder { sort_order_id } => {
+ builder.set_default_sort_order(sort_order_id)
+ }
+ TableUpdate::AddSnapshot { snapshot } =>
builder.add_snapshot(snapshot),
+ TableUpdate::SetSnapshotRef {
+ ref_name,
+ reference,
+ } => builder.set_ref(&ref_name, reference),
+ TableUpdate::RemoveSnapshots { snapshot_ids } => {
+ Ok(builder.remove_snapshots(&snapshot_ids))
+ }
+ TableUpdate::RemoveSnapshotRef { ref_name } =>
Ok(builder.remove_ref(&ref_name)),
+ TableUpdate::SetLocation { location } =>
Ok(builder.set_location(location)),
+ TableUpdate::SetProperties { updates } =>
builder.set_properties(updates),
+ TableUpdate::RemoveProperties { removals } =>
builder.remove_properties(&removals),
+ TableUpdate::UpgradeFormatVersion { format_version } => {
+ builder.upgrade_format_version(format_version)
+ }
}
}
}
@@ -749,7 +787,7 @@ mod tests {
SnapshotReference, SnapshotRetention, SortDirection, SortField,
SortOrder,
SqlViewRepresentation, Summary, TableMetadata, TableMetadataBuilder,
Transform, Type,
UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation,
ViewRepresentations,
- ViewVersion,
+ ViewVersion, MAIN_BRANCH,
};
use crate::{NamespaceIdent, TableCreation, TableIdent, TableRequirement,
TableUpdate};
@@ -803,9 +841,9 @@ mod tests {
TableMetadataBuilder::from_table_creation(tbl_creation)
.unwrap()
.assign_uuid(uuid::Uuid::nil())
- .unwrap()
.build()
.unwrap()
+ .metadata
}
#[test]
@@ -855,7 +893,7 @@ mod tests {
{
"snapshot-id": 3051729675574597004,
"sequence-number": 10,
- "timestamp-ms": 1515100955770,
+ "timestamp-ms": 9992191116217,
"summary": {
"operation": "append"
},
@@ -865,8 +903,28 @@ mod tests {
"#;
let snapshot = serde_json::from_str::<Snapshot>(record).unwrap();
- let mut metadata = metadata;
- metadata.append_snapshot(snapshot);
+ let builder = metadata.into_builder(None);
+ let builder = TableUpdate::AddSnapshot {
+ snapshot: snapshot.clone(),
+ }
+ .apply(builder)
+ .unwrap();
+ let metadata = TableUpdate::SetSnapshotRef {
+ ref_name: MAIN_BRANCH.to_string(),
+ reference: SnapshotReference {
+ snapshot_id: snapshot.snapshot_id(),
+ retention: SnapshotRetention::Branch {
+ min_snapshots_to_keep: Some(10),
+ max_snapshot_age_ms: None,
+ max_ref_age_ms: None,
+ },
+ },
+ }
+ .apply(builder)
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata;
// Ref exists and should matches
let requirement = TableRequirement::RefSnapshotIdMatch {
@@ -916,14 +974,13 @@ mod tests {
#[test]
fn test_check_last_assigned_partition_id() {
let metadata = metadata();
-
let requirement = TableRequirement::LastAssignedPartitionIdMatch {
- last_assigned_partition_id: 1,
+ last_assigned_partition_id: 0,
};
assert!(requirement.check(Some(&metadata)).is_err());
let requirement = TableRequirement::LastAssignedPartitionIdMatch {
- last_assigned_partition_id: 0,
+ last_assigned_partition_id: 999,
};
assert!(requirement.check(Some(&metadata)).is_ok());
}
@@ -1582,8 +1639,12 @@ mod tests {
let table_metadata =
TableMetadataBuilder::from_table_creation(table_creation)
.unwrap()
.build()
- .unwrap();
- let table_metadata_builder = TableMetadataBuilder::new(table_metadata);
+ .unwrap()
+ .metadata;
+ let table_metadata_builder = TableMetadataBuilder::new_from_metadata(
+ table_metadata,
+ Some("s3://db/table/metadata/metadata1.gz.json".to_string()),
+ );
let uuid = uuid::Uuid::new_v4();
let update = TableUpdate::AssignUuid { uuid };
@@ -1591,7 +1652,8 @@ mod tests {
.apply(table_metadata_builder)
.unwrap()
.build()
- .unwrap();
+ .unwrap()
+ .metadata;
assert_eq!(updated_metadata.uuid(), uuid);
}
diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs
index 793f00d3..9b91d544 100644
--- a/crates/iceberg/src/spec/mod.rs
+++ b/crates/iceberg/src/spec/mod.rs
@@ -25,6 +25,7 @@ mod schema;
mod snapshot;
mod sort;
mod table_metadata;
+mod table_metadata_builder;
mod transform;
mod values;
mod view_metadata;
diff --git a/crates/iceberg/src/spec/partition.rs
b/crates/iceberg/src/spec/partition.rs
index 75e5d924..7110ca52 100644
--- a/crates/iceberg/src/spec/partition.rs
+++ b/crates/iceberg/src/spec/partition.rs
@@ -147,7 +147,6 @@ impl BoundPartitionSpec {
}
/// Get the highest field id in the partition spec.
- /// If the partition spec is unpartitioned, it returns the last
unpartitioned last assigned id (999).
pub fn highest_field_id(&self) -> Option<i32> {
self.fields.iter().map(|f| f.field_id).max()
}
@@ -182,6 +181,11 @@ impl BoundPartitionSpec {
true
}
+
+ /// Change the spec id of the partition spec
+ pub fn with_spec_id(self, spec_id: i32) -> Self {
+ Self { spec_id, ..self }
+ }
}
impl SchemalessPartitionSpec {
diff --git a/crates/iceberg/src/spec/schema.rs
b/crates/iceberg/src/spec/schema.rs
index eaf1fcc8..649b6b2c 100644
--- a/crates/iceberg/src/spec/schema.rs
+++ b/crates/iceberg/src/spec/schema.rs
@@ -91,7 +91,6 @@ impl SchemaBuilder {
/// Reassignment starts from the field-id specified in `start_from`
(inclusive).
///
/// All specified aliases and identifier fields will be updated to the new
field-ids.
- #[allow(dead_code)] // Will be needed in TableMetadataBuilder
pub(crate) fn with_reassigned_field_ids(mut self, start_from: u32) -> Self
{
self.reassign_field_ids_from =
Some(start_from.try_into().unwrap_or(i32::MAX));
self
@@ -376,6 +375,24 @@ impl Schema {
pub fn accessor_by_field_id(&self, field_id: i32) ->
Option<Arc<StructAccessor>> {
self.field_id_to_accessor.get(&field_id).cloned()
}
+
+ /// Check if this schema is identical to another schema semantically -
excluding schema id.
+ pub(crate) fn is_same_schema(&self, other: &SchemaRef) -> bool {
+ self.as_struct().eq(other.as_struct())
+ && self.identifier_field_ids().eq(other.identifier_field_ids())
+ }
+
+ /// Change the schema id of this schema.
+ // This is redundant with the `with_schema_id` method on the builder, but
useful
+ // as it is infallible in contrast to the builder `build()` method.
+ pub(crate) fn with_schema_id(self, schema_id: SchemaId) -> Self {
+ Self { schema_id, ..self }
+ }
+
+ /// Return A HashMap matching field ids to field names.
+ pub(crate) fn field_id_to_name_map(&self) -> &HashMap<i32, String> {
+ &self.id_to_name
+ }
}
impl Display for Schema {
diff --git a/crates/iceberg/src/spec/snapshot.rs
b/crates/iceberg/src/spec/snapshot.rs
index f42e736e..b1ae1dfd 100644
--- a/crates/iceberg/src/spec/snapshot.rs
+++ b/crates/iceberg/src/spec/snapshot.rs
@@ -324,6 +324,13 @@ pub struct SnapshotReference {
pub retention: SnapshotRetention,
}
+impl SnapshotReference {
+ /// Returns true if the snapshot reference is a branch.
+ pub fn is_branch(&self) -> bool {
+ matches!(self.retention, SnapshotRetention::Branch { .. })
+ }
+}
+
impl SnapshotReference {
/// Create new snapshot reference
pub fn new(snapshot_id: i64, retention: SnapshotRetention) -> Self {
diff --git a/crates/iceberg/src/spec/table_metadata.rs
b/crates/iceberg/src/spec/table_metadata.rs
index daed758c..9609d982 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -30,12 +30,13 @@ use serde_repr::{Deserialize_repr, Serialize_repr};
use uuid::Uuid;
use super::snapshot::SnapshotReference;
+pub use super::table_metadata_builder::{TableMetadataBuildResult,
TableMetadataBuilder};
use super::{
- BoundPartitionSpec, BoundPartitionSpecRef, SchemaId, SchemaRef,
SchemalessPartitionSpecRef,
- Snapshot, SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef,
DEFAULT_PARTITION_SPEC_ID,
+ BoundPartitionSpecRef, SchemaId, SchemaRef, SchemalessPartitionSpecRef,
Snapshot, SnapshotRef,
+ SnapshotRetention, SortOrder, SortOrderRef, DEFAULT_PARTITION_SPEC_ID,
};
use crate::error::{timestamp_ms_to_utc, Result};
-use crate::{Error, ErrorKind, TableCreation};
+use crate::{Error, ErrorKind};
static MAIN_BRANCH: &str = "main";
pub(crate) static ONE_MINUTE_MS: i64 = 60_000;
@@ -165,6 +166,17 @@ pub struct TableMetadata {
}
impl TableMetadata {
+ /// Convert this Table Metadata into a builder for modification.
+ ///
+ /// `current_file_location` is the location where the current version
+ /// of the metadata file is stored. This is used to update the metadata
log.
+ /// If `current_file_location` is `None`, the metadata log will not be
updated.
+ /// This should only be used to stage-create tables.
+ #[must_use]
+ pub fn into_builder(self, current_file_location: Option<String>) ->
TableMetadataBuilder {
+ TableMetadataBuilder::new_from_metadata(self, current_file_location)
+ }
+
/// Returns format version of this metadata.
#[inline]
pub fn format_version(&self) -> FormatVersion {
@@ -334,6 +346,10 @@ impl TableMetadata {
}
/// Append snapshot to table
+ #[deprecated(
+ since = "0.4.0",
+ note = "please use `TableMetadataBuilder.set_branch_snapshot` instead"
+ )]
pub fn append_snapshot(&mut self, snapshot: Snapshot) {
self.last_updated_ms = snapshot.timestamp_ms();
self.last_sequence_number = snapshot.sequence_number();
@@ -559,98 +575,6 @@ impl TableMetadata {
}
}
-/// Manipulating table metadata.
-pub struct TableMetadataBuilder(TableMetadata);
-
-impl TableMetadataBuilder {
- /// Creates a new table metadata builder from the given table metadata.
- pub fn new(origin: TableMetadata) -> Self {
- Self(origin)
- }
-
- /// Creates a new table metadata builder from the given table creation.
- pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> {
- let TableCreation {
- name: _,
- location,
- schema,
- partition_spec,
- sort_order,
- properties,
- } = table_creation;
-
- let schema: Arc<super::Schema> = Arc::new(schema);
- let unpartition_spec =
BoundPartitionSpec::unpartition_spec(schema.clone());
- let partition_specs = match partition_spec {
- Some(_) => {
- return Err(Error::new(
- ErrorKind::FeatureUnsupported,
- "Can't create table with partition spec now",
- ))
- }
- None => HashMap::from([(
- unpartition_spec.spec_id(),
- Arc::new(unpartition_spec.clone().into_schemaless()),
- )]),
- };
-
- let sort_orders = match sort_order {
- Some(_) => {
- return Err(Error::new(
- ErrorKind::FeatureUnsupported,
- "Can't create table with sort order now",
- ))
- }
- None => HashMap::from([(
- SortOrder::UNSORTED_ORDER_ID,
- Arc::new(SortOrder::unsorted_order()),
- )]),
- };
-
- let mut table_metadata = TableMetadata {
- format_version: FormatVersion::V2,
- table_uuid: Uuid::now_v7(),
- location: location.ok_or_else(|| {
- Error::new(
- ErrorKind::DataInvalid,
- "Can't create table without location",
- )
- })?,
- last_sequence_number: 0,
- last_updated_ms: Utc::now().timestamp_millis(),
- last_column_id: schema.highest_field_id(),
- current_schema_id: schema.schema_id(),
- schemas: HashMap::from([(schema.schema_id(), schema)]),
- partition_specs,
- default_spec: BoundPartitionSpecRef::new(unpartition_spec),
- last_partition_id: 0,
- properties,
- current_snapshot_id: None,
- snapshots: Default::default(),
- snapshot_log: vec![],
- sort_orders,
- metadata_log: vec![],
- default_sort_order_id: SortOrder::UNSORTED_ORDER_ID,
- refs: Default::default(),
- };
-
- table_metadata.try_normalize()?;
-
- Ok(Self(table_metadata))
- }
-
- /// Changes uuid of table metadata.
- pub fn assign_uuid(mut self, uuid: Uuid) -> Result<Self> {
- self.0.table_uuid = uuid;
- Ok(self)
- }
-
- /// Returns the new table metadata after changes.
- pub fn build(self) -> Result<TableMetadata> {
- Ok(self.0)
- }
-}
-
pub(super) mod _serde {
use std::borrow::BorrowMut;
/// This is a helper module that defines types to help with
serialization/deserialization.
@@ -2328,7 +2252,8 @@ mod tests {
let table_metadata =
TableMetadataBuilder::from_table_creation(table_creation)
.unwrap()
.build()
- .unwrap();
+ .unwrap()
+ .metadata;
assert_eq!(table_metadata.location, "s3://db/table");
assert_eq!(table_metadata.schemas.len(), 1);
assert_eq!(
diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs
b/crates/iceberg/src/spec/table_metadata_builder.rs
new file mode 100644
index 00000000..5d4aa77c
--- /dev/null
+++ b/crates/iceberg/src/spec/table_metadata_builder.rs
@@ -0,0 +1,2160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+use uuid::Uuid;
+
+use super::{
+ BoundPartitionSpec, FormatVersion, MetadataLog, PartitionSpecBuilder,
Schema, SchemaRef,
+ Snapshot, SnapshotLog, SnapshotReference, SnapshotRetention, SortOrder,
SortOrderRef,
+ TableMetadata, UnboundPartitionSpec, DEFAULT_PARTITION_SPEC_ID,
DEFAULT_SCHEMA_ID, MAIN_BRANCH,
+ ONE_MINUTE_MS, PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX,
+ PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT, RESERVED_PROPERTIES,
+ UNPARTITIONED_LAST_ASSIGNED_ID,
+};
+use crate::error::{Error, ErrorKind, Result};
+use crate::{TableCreation, TableUpdate};
+
+const FIRST_FIELD_ID: u32 = 1;
+
+/// Manipulating table metadata.
+///
+/// For this builder the order of called functions matters. Functions are
applied in-order.
+/// All operations applied to the `TableMetadata` are tracked in `changes` as
a chronologically
+/// ordered vec of `TableUpdate`.
+/// If an operation does not lead to a change of the `TableMetadata`, the
corresponding update
+/// is omitted from `changes`.
+///
+/// Unlike a typical builder pattern, the order of function calls matters.
+/// Some basic rules:
+/// - `add_schema` must be called before `set_current_schema`.
+/// - If a new partition spec and schema are added, the schema should be added
first.
+#[derive(Debug, Clone)]
+pub struct TableMetadataBuilder {
+ metadata: TableMetadata,
+ changes: Vec<TableUpdate>,
+ last_added_schema_id: Option<i32>,
+ last_added_spec_id: Option<i32>,
+ last_added_order_id: Option<i64>,
+ // None if this is a new table (from_metadata) method not used
+ previous_history_entry: Option<MetadataLog>,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+/// Result of modifying or creating a `TableMetadata`.
+pub struct TableMetadataBuildResult {
+ /// The new `TableMetadata`.
+ pub metadata: TableMetadata,
+ /// The changes that were applied to the metadata.
+ pub changes: Vec<TableUpdate>,
+ /// Expired metadata logs
+ pub expired_metadata_logs: Vec<MetadataLog>,
+}
+
+impl TableMetadataBuilder {
+ /// Proxy id for "last added" items, including schema, partition spec,
sort order.
+ pub const LAST_ADDED: i32 = -1;
+
+ /// Create a `TableMetadata` object from scratch.
+ ///
+ /// This method re-assign ids of fields in the schema, schema.id,
sort_order.id and
+ /// spec.id. It should only be used to create new table metadata from
scratch.
+ pub fn new(
+ schema: Schema,
+ spec: impl Into<UnboundPartitionSpec>,
+ sort_order: SortOrder,
+ location: String,
+ format_version: FormatVersion,
+ properties: HashMap<String, String>,
+ ) -> Result<Self> {
+ // Re-assign field_ids, schema.id, sort_order.id and spec.id for a new
table.
+ let (fresh_schema, fresh_spec, fresh_sort_order) =
+ Self::reassign_ids(schema, spec.into(), sort_order)?;
+ let schema_id = fresh_schema.schema_id();
+
+ let builder = Self {
+ metadata: TableMetadata {
+ format_version,
+ table_uuid: Uuid::now_v7(),
+ location: "".to_string(), // Overwritten immediately by
set_location
+ last_sequence_number: 0,
+ last_updated_ms: 0, // Overwritten by build() if not set
before
+ last_column_id: -1, // Overwritten immediately by
add_current_schema
+ current_schema_id: -1, // Overwritten immediately by
add_current_schema
+ schemas: HashMap::new(),
+ partition_specs: HashMap::new(),
+ default_spec: Arc::new(
+ // The spec id (-1) is just a proxy value and can be any
negative number.
+ // 0 would lead to wrong changes in the builder if the
provided spec by the user is
+ // also unpartitioned.
+ // The `default_spec` value is always replaced at the end
of this method by he `add_default_partition_spec`
+ // method.
+
BoundPartitionSpec::unpartition_spec(fresh_schema.clone()).with_spec_id(-1),
+ ), // Overwritten immediately by add_default_partition_spec
+ last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID,
+ properties: HashMap::new(),
+ current_snapshot_id: None,
+ snapshots: HashMap::new(),
+ snapshot_log: vec![],
+ sort_orders: HashMap::new(),
+ metadata_log: vec![],
+ default_sort_order_id: -1, // Overwritten immediately by
add_default_sort_order
+ refs: HashMap::default(),
+ },
+ changes: vec![],
+ last_added_schema_id: Some(schema_id),
+ last_added_spec_id: None,
+ last_added_order_id: None,
+ previous_history_entry: None,
+ };
+
+ builder
+ .set_location(location)
+ .add_current_schema(fresh_schema)?
+ .add_default_partition_spec(fresh_spec.into_unbound())?
+ .add_default_sort_order(fresh_sort_order)?
+ .set_properties(properties)
+ }
+
+ /// Creates a new table metadata builder from the given metadata to modify
it.
+
+ /// `current_file_location` is the location where the current version
+ /// of the metadata file is stored. This is used to update the metadata
log.
+ /// If `current_file_location` is `None`, the metadata log will not be
updated.
+ /// This should only be used to stage-create tables.
+ #[must_use]
+ pub fn new_from_metadata(
+ previous: TableMetadata,
+ previous_file_location: Option<String>,
+ ) -> Self {
+ Self {
+ previous_history_entry: previous_file_location.map(|l| MetadataLog
{
+ metadata_file: l,
+ timestamp_ms: previous.last_updated_ms,
+ }),
+ metadata: previous,
+ changes: Vec::default(),
+ last_added_schema_id: None,
+ last_added_spec_id: None,
+ last_added_order_id: None,
+ }
+ }
+
+ /// Creates a new table metadata builder from the given table creation.
+ pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> {
+ let TableCreation {
+ name: _,
+ location,
+ schema,
+ partition_spec,
+ sort_order,
+ properties,
+ } = table_creation;
+
+ let location = location.ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Can't create table without location",
+ )
+ })?;
+ let partition_spec = partition_spec.unwrap_or(UnboundPartitionSpec {
+ spec_id: None,
+ fields: vec![],
+ });
+
+ Self::new(
+ schema,
+ partition_spec,
+ sort_order.unwrap_or(SortOrder::unsorted_order()),
+ location,
+ FormatVersion::V2,
+ properties,
+ )
+ }
+
+ /// Get the current last column id
+ #[inline]
+ pub(crate) fn last_column_id(&self) -> i32 {
+ self.metadata.last_column_id
+ }
+
+ /// Changes uuid of table metadata.
+ pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
+ if self.metadata.table_uuid != uuid {
+ self.metadata.table_uuid = uuid;
+ self.changes.push(TableUpdate::AssignUuid { uuid });
+ }
+
+ self
+ }
+
+ /// Upgrade `FormatVersion`. Downgrades are not allowed.
+ ///
+ /// # Errors
+ /// - Cannot downgrade to older format versions.
+ pub fn upgrade_format_version(mut self, format_version: FormatVersion) ->
Result<Self> {
+ if format_version < self.metadata.format_version {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot downgrade FormatVersion from {} to {}",
+ self.metadata.format_version, format_version
+ ),
+ ));
+ }
+
+ if format_version != self.metadata.format_version {
+ match format_version {
+ FormatVersion::V1 => {
+ // No changes needed for V1
+ }
+ FormatVersion::V2 => {
+ self.metadata.format_version = format_version;
+ self.changes
+ .push(TableUpdate::UpgradeFormatVersion {
format_version });
+ }
+ }
+ }
+
+ Ok(self)
+ }
+
+ /// Set properties. If a property already exists, it will be overwritten.
+ ///
+ /// If a reserved property is set, the corresponding action is performed
and the property is not persisted.
+ /// Currently the following reserved properties are supported:
+ /// * format-version: Set the format version of the table.
+ ///
+ /// # Errors
+ /// - If properties contains a reserved property
+ pub fn set_properties(mut self, properties: HashMap<String, String>) ->
Result<Self> {
+ // List of specified properties that are RESERVED and should not be
persisted.
+ let reserved_properties = properties
+ .keys()
+ .filter(|key| RESERVED_PROPERTIES.contains(&key.as_str()))
+ .map(ToString::to_string)
+ .collect::<Vec<_>>();
+
+ if !reserved_properties.is_empty() {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Table properties should not contain reserved properties,
but got: [{}]",
+ reserved_properties.join(", ")
+ ),
+ ));
+ }
+
+ if properties.is_empty() {
+ return Ok(self);
+ }
+
+ self.metadata.properties.extend(properties.clone());
+ self.changes.push(TableUpdate::SetProperties {
+ updates: properties,
+ });
+
+ Ok(self)
+ }
+
+ /// Remove properties from the table metadata.
+ /// Does nothing if the key is not present.
+ ///
+ /// # Errors
+ /// - If properties to remove contains a reserved property
+ pub fn remove_properties(mut self, properties: &[String]) -> Result<Self> {
+ // remove duplicates
+ let properties = properties.iter().cloned().collect::<HashSet<_>>();
+
+ // disallow removal of reserved properties
+ let reserved_properties = properties
+ .iter()
+ .filter(|key| RESERVED_PROPERTIES.contains(&key.as_str()))
+ .map(ToString::to_string)
+ .collect::<Vec<_>>();
+
+ if !reserved_properties.is_empty() {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Table properties to remove contain reserved properties:
[{}]",
+ reserved_properties.join(", ")
+ ),
+ ));
+ }
+
+ for property in &properties {
+ self.metadata.properties.remove(property);
+ }
+
+ if !properties.is_empty() {
+ self.changes.push(TableUpdate::RemoveProperties {
+ removals: properties.into_iter().collect(),
+ });
+ }
+
+ Ok(self)
+ }
+
+ /// Set the location of the table metadata, stripping any trailing slashes.
+ pub fn set_location(mut self, location: String) -> Self {
+ let location = location.trim_end_matches('/').to_string();
+ if self.metadata.location != location {
+ self.changes.push(TableUpdate::SetLocation {
+ location: location.clone(),
+ });
+ self.metadata.location = location;
+ }
+
+ self
+ }
+
+ /// Add a snapshot to the table metadata.
+ ///
+ /// # Errors
+ /// - Snapshot id already exists.
+ /// - For format version > 1: the sequence number of the snapshot is lower
than the highest sequence number specified so far.
+ pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result<Self> {
+ if self
+ .metadata
+ .snapshots
+ .contains_key(&snapshot.snapshot_id())
+ {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("Snapshot already exists for: '{}'",
snapshot.snapshot_id()),
+ ));
+ }
+
+ if self.metadata.format_version != FormatVersion::V1
+ && snapshot.sequence_number() <= self.metadata.last_sequence_number
+ && snapshot.parent_snapshot_id().is_some()
+ {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot add snapshot with sequence number {} older than
last sequence number {}",
+ snapshot.sequence_number(),
+ self.metadata.last_sequence_number
+ )
+ ));
+ }
+
+ if let Some(last) = self.metadata.snapshot_log.last() {
+ // commits can happen concurrently from different machines.
+ // A tolerance helps us avoid failure for small clock skew
+ if snapshot.timestamp_ms() - last.timestamp_ms < -ONE_MINUTE_MS {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Invalid snapshot timestamp {}: before last snapshot
timestamp {}",
+ snapshot.timestamp_ms(),
+ last.timestamp_ms
+ ),
+ ));
+ }
+ }
+
+ if snapshot.timestamp_ms() - self.metadata.last_updated_ms <
-ONE_MINUTE_MS {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Invalid snapshot timestamp {}: before last updated
timestamp {}",
+ snapshot.timestamp_ms(),
+ self.metadata.last_updated_ms
+ ),
+ ));
+ }
+
+ // Mutation happens in next line - must be infallible from here
+ self.changes.push(TableUpdate::AddSnapshot {
+ snapshot: snapshot.clone(),
+ });
+
+ self.metadata.last_updated_ms = snapshot.timestamp_ms();
+ self.metadata.last_sequence_number = snapshot.sequence_number();
+ self.metadata
+ .snapshots
+ .insert(snapshot.snapshot_id(), snapshot.into());
+
+ Ok(self)
+ }
+
+ /// Append a snapshot to the specified branch.
+ /// Retention settings from the `branch` are re-used.
+ ///
+ /// # Errors
+ /// - Any of the preconditions of `self.add_snapshot` are not met.
+ pub fn set_branch_snapshot(self, snapshot: Snapshot, branch: &str) ->
Result<Self> {
+ let reference = self.metadata.refs.get(branch).cloned();
+
+ let reference = if let Some(mut reference) = reference {
+ if !reference.is_branch() {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("Cannot append snapshot to non-branch reference
'{branch}'",),
+ ));
+ }
+
+ reference.snapshot_id = snapshot.snapshot_id();
+ reference
+ } else {
+ SnapshotReference {
+ snapshot_id: snapshot.snapshot_id(),
+ retention: SnapshotRetention::Branch {
+ min_snapshots_to_keep: None,
+ max_snapshot_age_ms: None,
+ max_ref_age_ms: None,
+ },
+ }
+ };
+
+ self.add_snapshot(snapshot)?.set_ref(branch, reference)
+ }
+
+ /// Remove snapshots by its ids from the table metadata.
+ /// Does nothing if a snapshot id is not present.
+ /// Keeps as changes only the snapshots that were actually removed.
+ pub fn remove_snapshots(mut self, snapshot_ids: &[i64]) -> Self {
+ let mut removed_snapshots = Vec::with_capacity(snapshot_ids.len());
+
+ self.metadata.snapshots.retain(|k, _| {
+ if snapshot_ids.contains(k) {
+ removed_snapshots.push(*k);
+ false
+ } else {
+ true
+ }
+ });
+
+ if !removed_snapshots.is_empty() {
+ self.changes.push(TableUpdate::RemoveSnapshots {
+ snapshot_ids: removed_snapshots,
+ });
+ }
+
+ // Remove refs that are no longer valid
+ self.metadata
+ .refs
+ .retain(|_, v|
self.metadata.snapshots.contains_key(&v.snapshot_id));
+
+ self
+ }
+
+ /// Set a reference to a snapshot.
+ ///
+ /// # Errors
+ /// - The snapshot id is unknown.
+ pub fn set_ref(mut self, ref_name: &str, reference: SnapshotReference) ->
Result<Self> {
+ if self
+ .metadata
+ .refs
+ .get(ref_name)
+ .is_some_and(|snap_ref| snap_ref.eq(&reference))
+ {
+ return Ok(self);
+ }
+
+ let Some(snapshot) =
self.metadata.snapshots.get(&reference.snapshot_id) else {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot set '{ref_name}' to unknown snapshot: '{}'",
+ reference.snapshot_id
+ ),
+ ));
+ };
+
+ // Update last_updated_ms to the exact timestamp of the snapshot if it
was added in this commit
+ let is_added_snapshot = self.changes.iter().any(|update| {
+ matches!(update, TableUpdate::AddSnapshot { snapshot: snap } if
snap.snapshot_id() == snapshot.snapshot_id())
+ });
+ if is_added_snapshot {
+ self.metadata.last_updated_ms = snapshot.timestamp_ms();
+ }
+
+ // Current snapshot id is set only for the main branch
+ if ref_name == MAIN_BRANCH {
+ self.metadata.current_snapshot_id = Some(snapshot.snapshot_id());
+ if self.metadata.last_updated_ms == i64::default() {
+ self.metadata.last_updated_ms =
chrono::Utc::now().timestamp_millis();
+ };
+
+ self.metadata.snapshot_log.push(SnapshotLog {
+ snapshot_id: snapshot.snapshot_id(),
+ timestamp_ms: self.metadata.last_updated_ms,
+ });
+ }
+
+ self.changes.push(TableUpdate::SetSnapshotRef {
+ ref_name: ref_name.to_string(),
+ reference: reference.clone(),
+ });
+ self.metadata.refs.insert(ref_name.to_string(), reference);
+
+ Ok(self)
+ }
+
+ /// Remove a reference
+ ///
+ /// If `ref_name='main'` the current snapshot id is set to -1.
+ pub fn remove_ref(mut self, ref_name: &str) -> Self {
+ if ref_name == MAIN_BRANCH {
+ self.metadata.current_snapshot_id = None;
+ self.metadata.snapshot_log.clear();
+ }
+
+ if self.metadata.refs.remove(ref_name).is_some() || ref_name ==
MAIN_BRANCH {
+ self.changes.push(TableUpdate::RemoveSnapshotRef {
+ ref_name: ref_name.to_string(),
+ });
+ }
+
+ self
+ }
+
+ /// Add a schema to the table metadata.
+ ///
+ /// The provided `schema.schema_id` may not be used.
+ ///
+ /// Important: Use this method with caution. The builder does not check
+ /// if the added schema is compatible with the current schema.
+ pub fn add_schema(mut self, schema: Schema) -> Self {
+ let new_schema_id = self.reuse_or_create_new_schema_id(&schema);
+ let schema_found = self.metadata.schemas.contains_key(&new_schema_id);
+
+ if schema_found {
+ if self.last_added_schema_id != Some(new_schema_id) {
+ self.changes.push(TableUpdate::AddSchema {
+ last_column_id: Some(self.metadata.last_column_id),
+ schema: schema.clone(),
+ });
+ self.last_added_schema_id = Some(new_schema_id);
+ }
+
+ return self;
+ }
+
+ // New schemas might contain only old columns. In this case
last_column_id should not be
+ // reduced.
+ self.metadata.last_column_id =
+ std::cmp::max(self.metadata.last_column_id,
schema.highest_field_id());
+
+ // Set schema-id
+ let schema = match new_schema_id == schema.schema_id() {
+ true => schema,
+ false => schema.with_schema_id(new_schema_id),
+ };
+
+ self.metadata
+ .schemas
+ .insert(new_schema_id, schema.clone().into());
+
+ self.changes.push(TableUpdate::AddSchema {
+ schema,
+ last_column_id: Some(self.metadata.last_column_id),
+ });
+
+ self.last_added_schema_id = Some(new_schema_id);
+
+ self
+ }
+
+ /// Set the current schema id.
+ ///
+ /// If `schema_id` is -1, the last added schema is set as the current
schema.
+ ///
+ /// Errors:
+ /// - provided `schema_id` is -1 but no schema has been added via
`add_schema`.
+ /// - No schema with the provided `schema_id` exists.
+ pub fn set_current_schema(mut self, mut schema_id: i32) -> Result<Self> {
+ if schema_id == Self::LAST_ADDED {
+ schema_id = self.last_added_schema_id.ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Cannot set current schema to last added schema: no schema
has been added.",
+ )
+ })?;
+ };
+ let schema_id = schema_id; // Make immutable
+
+ if schema_id == self.metadata.current_schema_id {
+ return Ok(self);
+ }
+
+ let _schema = self.metadata.schemas.get(&schema_id).ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot set current schema to unknown schema with id:
'{}'",
+ schema_id
+ ),
+ )
+ })?;
+
+ // Old partition specs and sort-orders should be preserved even if
they are not compatible with the new schema,
+ // so that older metadata can still be interpreted.
+ // Default partition spec and sort order are checked in the build()
method
+ // which allows other default partition specs and sort orders to be
set before the build.
+
+ self.metadata.current_schema_id = schema_id;
+
+ if self.last_added_schema_id == Some(schema_id) {
+ self.changes.push(TableUpdate::SetCurrentSchema {
+ schema_id: Self::LAST_ADDED,
+ });
+ } else {
+ self.changes
+ .push(TableUpdate::SetCurrentSchema { schema_id });
+ }
+
+ Ok(self)
+ }
+
+ /// Add a schema and set it as the current schema.
+ pub fn add_current_schema(self, schema: Schema) -> Result<Self> {
+ self.add_schema(schema).set_current_schema(Self::LAST_ADDED)
+ }
+
+ /// Add a partition spec to the table metadata.
+ ///
+ /// The spec is bound eagerly to the current schema.
+ /// If a schema is added in the same set of changes, the schema should be
added first.
+ ///
+ /// Even if `unbound_spec.spec_id` is provided as `Some`, it may not be
used.
+ ///
+ /// # Errors
+ /// - The partition spec cannot be bound to the current schema.
+ /// - The partition spec has non-sequential field ids and the table format
version is 1.
+ pub fn add_partition_spec(mut self, unbound_spec: UnboundPartitionSpec) ->
Result<Self> {
+ let schema = self.get_current_schema()?.clone();
+ let spec =
PartitionSpecBuilder::new_from_unbound(unbound_spec.clone(), schema)?
+ .with_last_assigned_field_id(self.metadata.last_partition_id)
+ .build()?;
+
+ let new_spec_id = self.reuse_or_create_new_spec_id(&spec);
+ let spec_found =
self.metadata.partition_specs.contains_key(&new_spec_id);
+ let spec = spec.with_spec_id(new_spec_id);
+ let unbound_spec = unbound_spec.with_spec_id(new_spec_id);
+
+ if spec_found {
+ if self.last_added_spec_id != Some(new_spec_id) {
+ self.changes
+ .push(TableUpdate::AddSpec { spec: unbound_spec });
+ self.last_added_spec_id = Some(new_spec_id);
+ }
+
+ return Ok(self);
+ }
+
+ if self.metadata.format_version <= FormatVersion::V1 &&
!spec.has_sequential_ids() {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Cannot add partition spec with non-sequential field ids to
format version 1 table",
+ ));
+ }
+
+ let highest_field_id = spec
+ .highest_field_id()
+ .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID);
+ self.metadata
+ .partition_specs
+ .insert(new_spec_id, Arc::new(spec.into()));
+ self.changes
+ .push(TableUpdate::AddSpec { spec: unbound_spec });
+
+ self.last_added_spec_id = Some(new_spec_id);
+ self.metadata.last_partition_id =
+ std::cmp::max(self.metadata.last_partition_id, highest_field_id);
+
+ Ok(self)
+ }
+
+ /// Set the default partition spec.
+ ///
+ /// # Errors
+ /// - spec_id is -1 but no spec has been added via `add_partition_spec`.
+ /// - No partition spec with the provided `spec_id` exists.
+ pub fn set_default_partition_spec(mut self, mut spec_id: i32) ->
Result<Self> {
+ if spec_id == Self::LAST_ADDED {
+ spec_id = self.last_added_spec_id.ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Cannot set default partition spec to last added spec: no
spec has been added.",
+ )
+ })?;
+ }
+
+ if self.metadata.default_spec.spec_id() == spec_id {
+ return Ok(self);
+ }
+
+ if !self.metadata.partition_specs.contains_key(&spec_id) {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!("Cannot set default partition spec to unknown spec
with id: '{spec_id}'",),
+ ));
+ }
+
+ let schemaless_spec =
+ self.metadata
+ .partition_specs
+ .get(&spec_id)
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Cannot set default partition spec to unknown spec
with id: '{spec_id}'",),
+ )
+ })?
+ .clone();
+ let spec =
+
Arc::unwrap_or_clone(schemaless_spec).bind(self.get_current_schema()?.clone())?;
+ self.metadata.default_spec = Arc::new(spec);
+
+ if self.last_added_spec_id == Some(spec_id) {
+ self.changes.push(TableUpdate::SetDefaultSpec {
+ spec_id: Self::LAST_ADDED,
+ });
+ } else {
+ self.changes.push(TableUpdate::SetDefaultSpec { spec_id });
+ }
+
+ Ok(self)
+ }
+
+ /// Add a partition spec and set it as the default
+ pub fn add_default_partition_spec(self, unbound_spec:
UnboundPartitionSpec) -> Result<Self> {
+ self.add_partition_spec(unbound_spec)?
+ .set_default_partition_spec(Self::LAST_ADDED)
+ }
+
+ /// Add a sort order to the table metadata.
+ ///
+ /// The spec is bound eagerly to the current schema and must be valid for
it.
+ /// If a schema is added in the same set of changes, the schema should be
added first.
+ ///
+ /// Even if `sort_order.order_id` is provided, it may not be used.
+ ///
+ /// # Errors
+ /// - Sort order id to add already exists.
+ /// - Sort order is incompatible with the current schema.
+ pub fn add_sort_order(mut self, sort_order: SortOrder) -> Result<Self> {
+ let new_order_id = self.reuse_or_create_new_sort_id(&sort_order);
+ let sort_order_found =
self.metadata.sort_orders.contains_key(&new_order_id);
+
+ if sort_order_found {
+ if self.last_added_order_id != Some(new_order_id) {
+ self.changes.push(TableUpdate::AddSortOrder {
+ sort_order: sort_order.clone(),
+ });
+ self.last_added_order_id = Some(new_order_id);
+ }
+
+ return Ok(self);
+ }
+
+ let schema = self.get_current_schema()?.clone().as_ref().clone();
+ let sort_order = SortOrder::builder()
+ .with_order_id(new_order_id)
+ .with_fields(sort_order.fields)
+ .build(&schema)
+ .map_err(|e| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Sort order to add is incompatible with current
schema: {e}"),
+ )
+ .with_source(e)
+ })?;
+
+ self.last_added_order_id = Some(new_order_id);
+ self.metadata
+ .sort_orders
+ .insert(new_order_id, sort_order.clone().into());
+ self.changes.push(TableUpdate::AddSortOrder { sort_order });
+
+ Ok(self)
+ }
+
+ /// Set the default sort order. If `sort_order_id` is -1, the last added
sort order is set as default.
+ ///
+ /// # Errors
+ /// - sort_order_id is -1 but no sort order has been added via
`add_sort_order`.
+ /// - No sort order with the provided `sort_order_id` exists.
+ pub fn set_default_sort_order(mut self, mut sort_order_id: i64) ->
Result<Self> {
+ if sort_order_id == Self::LAST_ADDED as i64 {
+ sort_order_id = self.last_added_order_id.ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Cannot set default sort order to last added order: no
order has been added.",
+ )
+ })?;
+ }
+
+ if self.metadata.default_sort_order_id == sort_order_id {
+ return Ok(self);
+ }
+
+ if !self.metadata.sort_orders.contains_key(&sort_order_id) {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot set default sort order to unknown order with id:
'{sort_order_id}'"
+ ),
+ ));
+ }
+
+ self.metadata.default_sort_order_id = sort_order_id;
+
+ if self.last_added_order_id == Some(sort_order_id) {
+ self.changes.push(TableUpdate::SetDefaultSortOrder {
+ sort_order_id: Self::LAST_ADDED as i64,
+ });
+ } else {
+ self.changes
+ .push(TableUpdate::SetDefaultSortOrder { sort_order_id });
+ }
+
+ Ok(self)
+ }
+
+ /// Add a sort order and set it as the default
+ fn add_default_sort_order(self, sort_order: SortOrder) -> Result<Self> {
+ self.add_sort_order(sort_order)?
+ .set_default_sort_order(Self::LAST_ADDED as i64)
+ }
+
+ /// Build the table metadata.
+ pub fn build(mut self) -> Result<TableMetadataBuildResult> {
+ if self.metadata.last_updated_ms == i64::default() {
+ self.metadata.last_updated_ms =
chrono::Utc::now().timestamp_millis();
+ }
+
+ // Check compatibility of the current schema to the default partition
spec and sort order.
+ // We use the `get_xxx` methods from the builder to avoid using the
panicking
+ // `TableMetadata.default_partition_spec` etc. methods.
+ let schema = self.get_current_schema()?.clone();
+ let sort_order = Arc::unwrap_or_clone(self.get_default_sort_order()?);
+
+ self.metadata.default_spec = Arc::new(
+ Arc::unwrap_or_clone(self.metadata.default_spec)
+ .into_unbound()
+ .bind(schema.clone())?,
+ );
+ SortOrder::builder()
+ .with_fields(sort_order.fields)
+ .build(&schema)?;
+
+ self.update_snapshot_log()?;
+ self.metadata.try_normalize()?;
+
+ if let Some(hist_entry) = self.previous_history_entry.take() {
+ self.metadata.metadata_log.push(hist_entry);
+ }
+ let expired_metadata_logs = self.expire_metadata_log();
+
+ Ok(TableMetadataBuildResult {
+ metadata: self.metadata,
+ changes: self.changes,
+ expired_metadata_logs,
+ })
+ }
+
+ fn expire_metadata_log(&mut self) -> Vec<MetadataLog> {
+ let max_size = self
+ .metadata
+ .properties
+ .get(PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX)
+ .and_then(|v| v.parse::<usize>().ok())
+ .unwrap_or(PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)
+ .max(1);
+
+ if self.metadata.metadata_log.len() > max_size {
+ self.metadata
+ .metadata_log
+ .drain(0..self.metadata.metadata_log.len() - max_size)
+ .collect()
+ } else {
+ Vec::new()
+ }
+ }
+
+ fn update_snapshot_log(&mut self) -> Result<()> {
+ let intermediate_snapshots = self.get_intermediate_snapshots();
+ let has_removed_snapshots = self
+ .changes
+ .iter()
+ .any(|update| matches!(update, TableUpdate::RemoveSnapshots { ..
}));
+
+ if intermediate_snapshots.is_empty() && !has_removed_snapshots {
+ return Ok(());
+ }
+
+ let mut new_snapshot_log = Vec::new();
+ for log_entry in &self.metadata.snapshot_log {
+ let snapshot_id = log_entry.snapshot_id;
+ if self.metadata.snapshots.contains_key(&snapshot_id) {
+ if !intermediate_snapshots.contains(&snapshot_id) {
+ new_snapshot_log.push(log_entry.clone());
+ }
+ } else if has_removed_snapshots {
+ // any invalid entry causes the history before it to be
removed. otherwise, there could be
+ // history gaps that cause time-travel queries to produce
incorrect results. for example,
+ // if history is [(t1, s1), (t2, s2), (t3, s3)] and s2 is
removed, the history cannot be
+ // [(t1, s1), (t3, s3)] because it appears that s3 was current
during the time between t2
+ // and t3 when in fact s2 was the current snapshot.
+ new_snapshot_log.clear();
+ }
+ }
+
+ if let Some(current_snapshot_id) = self.metadata.current_snapshot_id {
+ let last_id = new_snapshot_log.last().map(|entry|
entry.snapshot_id);
+ if last_id != Some(current_snapshot_id) {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Cannot set invalid snapshot log: latest entry is not the
current snapshot",
+ ));
+ }
+ };
+
+ self.metadata.snapshot_log = new_snapshot_log;
+ Ok(())
+ }
+
+ /// Finds intermediate snapshots that have not been committed as the
current snapshot.
+ ///
+ /// Transactions can create snapshots that are never the current snapshot
because several
+ /// changes are combined by the transaction into one table metadata
update. when each
+ /// intermediate snapshot is added to table metadata, it is added to the
snapshot log, assuming
+ /// that it will be the current snapshot. when there are multiple snapshot
updates, the log must
+ /// be corrected by suppressing the intermediate snapshot entries.
+ ///
+ /// A snapshot is an intermediate snapshot if it was added but is not the
current snapshot.
+ fn get_intermediate_snapshots(&self) -> HashSet<i64> {
+ let added_snapshot_ids = self
+ .changes
+ .iter()
+ .filter_map(|update| match update {
+ TableUpdate::AddSnapshot { snapshot } =>
Some(snapshot.snapshot_id()),
+ _ => None,
+ })
+ .collect::<HashSet<_>>();
+
+ self.changes
+ .iter()
+ .filter_map(|update| match update {
+ TableUpdate::SetSnapshotRef {
+ ref_name,
+ reference,
+ } => {
+ if added_snapshot_ids.contains(&reference.snapshot_id)
+ && ref_name == MAIN_BRANCH
+ && reference.snapshot_id
+ != self
+ .metadata
+ .current_snapshot_id
+ .unwrap_or(i64::from(Self::LAST_ADDED))
+ {
+ Some(reference.snapshot_id)
+ } else {
+ None
+ }
+ }
+ _ => None,
+ })
+ .collect()
+ }
+
+ fn reassign_ids(
+ schema: Schema,
+ spec: UnboundPartitionSpec,
+ sort_order: SortOrder,
+ ) -> Result<(Schema, BoundPartitionSpec, SortOrder)> {
+ // Re-assign field ids and schema ids for a new table.
+ let previous_id_to_name = schema.field_id_to_name_map().clone();
+ let fresh_schema = schema
+ .into_builder()
+ .with_schema_id(DEFAULT_SCHEMA_ID)
+ .with_reassigned_field_ids(FIRST_FIELD_ID)
+ .build()?;
+
+ // Re-build partition spec with new ids
+ let mut fresh_spec = PartitionSpecBuilder::new(fresh_schema.clone());
+ for field in spec.fields() {
+ let source_field_name =
previous_id_to_name.get(&field.source_id).ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot find source column with id {} for partition
column {} in schema.",
+ field.source_id, field.name
+ ),
+ )
+ })?;
+ fresh_spec =
+ fresh_spec.add_partition_field(source_field_name, &field.name,
field.transform)?;
+ }
+ let fresh_spec = fresh_spec.build()?;
+
+ // Re-build sort order with new ids
+ let mut fresh_order = SortOrder::builder();
+ for mut field in sort_order.fields {
+ let source_field_name =
previous_id_to_name.get(&field.source_id).ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot find source column with id {} for sort column
in schema.",
+ field.source_id
+ ),
+ )
+ })?;
+ let new_field_id = fresh_schema
+ .field_by_name(source_field_name)
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::Unexpected,
+ format!(
+ "Cannot find source column with name {} for
sort column in re-assigned schema.",
+ source_field_name
+ ),
+ )
+ })?.id;
+ field.source_id = new_field_id;
+ fresh_order.with_sort_field(field);
+ }
+ let fresh_sort_order = fresh_order.build(&fresh_schema)?;
+
+ Ok((fresh_schema, fresh_spec, fresh_sort_order))
+ }
+
+ fn reuse_or_create_new_schema_id(&self, new_schema: &Schema) -> i32 {
+ self.metadata
+ .schemas
+ .iter()
+ .find_map(|(id, schema)|
new_schema.is_same_schema(schema).then_some(*id))
+ .unwrap_or_else(|| self.get_highest_schema_id() + 1)
+ }
+
+ fn get_highest_schema_id(&self) -> i32 {
+ *self
+ .metadata
+ .schemas
+ .keys()
+ .max()
+ .unwrap_or(&self.metadata.current_schema_id)
+ }
+
+ fn get_current_schema(&self) -> Result<&SchemaRef> {
+ self.metadata
+ .schemas
+ .get(&self.metadata.current_schema_id)
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Current schema with id '{}' not found in table
metadata.",
+ self.metadata.current_schema_id
+ ),
+ )
+ })
+ }
+
+ fn get_default_sort_order(&self) -> Result<SortOrderRef> {
+ self.metadata
+ .sort_orders
+ .get(&self.metadata.default_sort_order_id)
+ .cloned()
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Default sort order with id '{}' not found in table
metadata.",
+ self.metadata.default_sort_order_id
+ ),
+ )
+ })
+ }
+
+ /// If a compatible spec already exists, use the same ID. Otherwise, use 1
more than the highest ID.
+ fn reuse_or_create_new_spec_id(&self, new_spec: &BoundPartitionSpec) ->
i32 {
+ self.metadata
+ .partition_specs
+ .iter()
+ .find_map(|(id, old_spec)| {
+ new_spec
+ .is_compatible_with_schemaless(old_spec)
+ .then_some(*id)
+ })
+ .unwrap_or_else(|| {
+ self.get_highest_spec_id()
+ .map(|id| id + 1)
+ .unwrap_or(DEFAULT_PARTITION_SPEC_ID)
+ })
+ }
+
+ fn get_highest_spec_id(&self) -> Option<i32> {
+ self.metadata.partition_specs.keys().max().copied()
+ }
+
+ /// If a compatible sort-order already exists, use the same ID. Otherwise,
use 1 more than the highest ID.
+ fn reuse_or_create_new_sort_id(&self, new_sort_order: &SortOrder) -> i64 {
+ if new_sort_order.is_unsorted() {
+ return SortOrder::unsorted_order().order_id;
+ }
+
+ self.metadata
+ .sort_orders
+ .iter()
+ .find_map(|(id, sort_order)| {
+ sort_order.fields.eq(&new_sort_order.fields).then_some(*id)
+ })
+ .unwrap_or_else(|| {
+ self.highest_sort_order_id()
+ .unwrap_or(SortOrder::unsorted_order().order_id)
+ + 1
+ })
+ }
+
+ fn highest_sort_order_id(&self) -> Option<i64> {
+ self.metadata.sort_orders.keys().max().copied()
+ }
+}
+
+impl From<TableMetadataBuildResult> for TableMetadata {
+ fn from(result: TableMetadataBuildResult) -> Self {
+ result.metadata
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::spec::{
+ NestedField, NullOrder, Operation, PrimitiveType, Schema,
SchemalessPartitionSpec,
+ SnapshotRetention, SortDirection, SortField, StructType, Summary,
Transform, Type,
+ UnboundPartitionField,
+ };
+
+ const TEST_LOCATION: &str = "s3://bucket/test/location";
+ const LAST_ASSIGNED_COLUMN_ID: i32 = 3;
+
+ fn schema() -> Schema {
+ Schema::builder()
+ .with_fields(vec![
+ NestedField::required(1, "x",
Type::Primitive(PrimitiveType::Long)).into(),
+ NestedField::required(2, "y",
Type::Primitive(PrimitiveType::Long)).into(),
+ NestedField::required(3, "z",
Type::Primitive(PrimitiveType::Long)).into(),
+ ])
+ .build()
+ .unwrap()
+ }
+
+ fn sort_order() -> SortOrder {
+ let schema = schema();
+ SortOrder::builder()
+ .with_order_id(1)
+ .with_sort_field(SortField {
+ source_id: 3,
+ transform: Transform::Bucket(4),
+ direction: SortDirection::Descending,
+ null_order: NullOrder::First,
+ })
+ .build(&schema)
+ .unwrap()
+ }
+
+ fn partition_spec() -> UnboundPartitionSpec {
+ UnboundPartitionSpec::builder()
+ .with_spec_id(0)
+ .add_partition_field(2, "y", Transform::Identity)
+ .unwrap()
+ .build()
+ }
+
+ fn builder_without_changes(format_version: FormatVersion) ->
TableMetadataBuilder {
+ TableMetadataBuilder::new(
+ schema(),
+ partition_spec(),
+ sort_order(),
+ TEST_LOCATION.to_string(),
+ format_version,
+ HashMap::new(),
+ )
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata
+ .into_builder(Some(
+ "s3://bucket/test/location/metadata/metadata1.json".to_string(),
+ ))
+ }
+
+ #[test]
+ fn test_minimal_build() {
+ let metadata = TableMetadataBuilder::new(
+ schema(),
+ partition_spec(),
+ sort_order(),
+ TEST_LOCATION.to_string(),
+ FormatVersion::V1,
+ HashMap::new(),
+ )
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata;
+
+ assert_eq!(metadata.format_version, FormatVersion::V1);
+ assert_eq!(metadata.location, TEST_LOCATION);
+ assert_eq!(metadata.current_schema_id, 0);
+ assert_eq!(metadata.default_spec.spec_id(), 0);
+ assert_eq!(metadata.default_sort_order_id, 1);
+ assert_eq!(metadata.last_partition_id, 1000);
+ assert_eq!(metadata.last_column_id, 3);
+ assert_eq!(metadata.snapshots.len(), 0);
+ assert_eq!(metadata.refs.len(), 0);
+ assert_eq!(metadata.properties.len(), 0);
+ assert_eq!(metadata.metadata_log.len(), 0);
+ assert_eq!(metadata.last_sequence_number, 0);
+ assert_eq!(metadata.last_column_id, LAST_ASSIGNED_COLUMN_ID);
+
+ // Test can serialize v1
+ let _ = serde_json::to_string(&metadata).unwrap();
+
+ // Test can serialize v2
+ let metadata = metadata
+ .into_builder(Some(
+
"s3://bucket/test/location/metadata/metadata1.json".to_string(),
+ ))
+ .upgrade_format_version(FormatVersion::V2)
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata;
+
+ assert_eq!(metadata.format_version, FormatVersion::V2);
+ let _ = serde_json::to_string(&metadata).unwrap();
+ }
+
+ #[test]
+ fn test_build_unpartitioned_unsorted() {
+ let schema = Schema::builder().build().unwrap();
+ let metadata = TableMetadataBuilder::new(
+ schema.clone(),
+ SchemalessPartitionSpec::unpartition_spec(),
+ SortOrder::unsorted_order(),
+ TEST_LOCATION.to_string(),
+ FormatVersion::V2,
+ HashMap::new(),
+ )
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata;
+
+ assert_eq!(metadata.format_version, FormatVersion::V2);
+ assert_eq!(metadata.location, TEST_LOCATION);
+ assert_eq!(metadata.current_schema_id, 0);
+ assert_eq!(metadata.default_spec.spec_id(), 0);
+ assert_eq!(metadata.default_sort_order_id, 0);
+ assert_eq!(metadata.last_partition_id, UNPARTITIONED_LAST_ASSIGNED_ID);
+ assert_eq!(metadata.last_column_id, 0);
+ assert_eq!(metadata.snapshots.len(), 0);
+ assert_eq!(metadata.refs.len(), 0);
+ assert_eq!(metadata.properties.len(), 0);
+ assert_eq!(metadata.metadata_log.len(), 0);
+ assert_eq!(metadata.last_sequence_number, 0);
+ }
+
+ #[test]
+ fn test_reassigns_ids() {
+ let schema = Schema::builder()
+ .with_schema_id(10)
+ .with_fields(vec![
+ NestedField::required(11, "a",
Type::Primitive(PrimitiveType::Long)).into(),
+ NestedField::required(12, "b",
Type::Primitive(PrimitiveType::Long)).into(),
+ NestedField::required(
+ 13,
+ "struct",
+ Type::Struct(StructType::new(vec![NestedField::required(
+ 14,
+ "nested",
+ Type::Primitive(PrimitiveType::Long),
+ )
+ .into()])),
+ )
+ .into(),
+ NestedField::required(15, "c",
Type::Primitive(PrimitiveType::Long)).into(),
+ ])
+ .build()
+ .unwrap();
+ let spec = BoundPartitionSpec::builder(schema.clone())
+ .with_spec_id(20)
+ .add_partition_field("a", "a", Transform::Identity)
+ .unwrap()
+ .add_partition_field("struct.nested", "nested_partition",
Transform::Identity)
+ .unwrap()
+ .build()
+ .unwrap();
+ let sort_order = SortOrder::builder()
+ .with_fields(vec![SortField {
+ source_id: 11,
+ transform: Transform::Identity,
+ direction: SortDirection::Ascending,
+ null_order: NullOrder::First,
+ }])
+ .with_order_id(10)
+ .build(&schema)
+ .unwrap();
+
+ let (fresh_schema, fresh_spec, fresh_sort_order) =
+ TableMetadataBuilder::reassign_ids(schema, spec.into_unbound(),
sort_order).unwrap();
+
+ let expected_schema = Schema::builder()
+ .with_fields(vec![
+ NestedField::required(1, "a",
Type::Primitive(PrimitiveType::Long)).into(),
+ NestedField::required(2, "b",
Type::Primitive(PrimitiveType::Long)).into(),
+ NestedField::required(
+ 3,
+ "struct",
+ Type::Struct(StructType::new(vec![NestedField::required(
+ 5,
+ "nested",
+ Type::Primitive(PrimitiveType::Long),
+ )
+ .into()])),
+ )
+ .into(),
+ NestedField::required(4, "c",
Type::Primitive(PrimitiveType::Long)).into(),
+ ])
+ .build()
+ .unwrap();
+
+ let expected_spec =
BoundPartitionSpec::builder(expected_schema.clone())
+ .with_spec_id(0)
+ .add_partition_field("a", "a", Transform::Identity)
+ .unwrap()
+ .add_partition_field("struct.nested", "nested_partition",
Transform::Identity)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let expected_sort_order = SortOrder::builder()
+ .with_fields(vec![SortField {
+ source_id: 1,
+ transform: Transform::Identity,
+ direction: SortDirection::Ascending,
+ null_order: NullOrder::First,
+ }])
+ .with_order_id(1)
+ .build(&expected_schema)
+ .unwrap();
+
+ assert_eq!(fresh_schema, expected_schema);
+ assert_eq!(fresh_spec, expected_spec);
+ assert_eq!(fresh_sort_order, expected_sort_order);
+ }
+
+ #[test]
+ fn test_ids_are_reassigned_for_new_metadata() {
+ let schema =
schema().into_builder().with_schema_id(10).build().unwrap();
+
+ let metadata = TableMetadataBuilder::new(
+ schema,
+ partition_spec(),
+ sort_order(),
+ TEST_LOCATION.to_string(),
+ FormatVersion::V1,
+ HashMap::new(),
+ )
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata;
+
+ assert_eq!(metadata.current_schema_id, 0);
+ assert_eq!(metadata.current_schema().schema_id(), 0);
+ }
+
+ #[test]
+ fn test_new_metadata_changes() {
+ let changes = TableMetadataBuilder::new(
+ schema(),
+ partition_spec(),
+ sort_order(),
+ TEST_LOCATION.to_string(),
+ FormatVersion::V1,
+ HashMap::from_iter(vec![("property 1".to_string(), "value
1".to_string())]),
+ )
+ .unwrap()
+ .build()
+ .unwrap()
+ .changes;
+
+ pretty_assertions::assert_eq!(changes, vec![
+ TableUpdate::SetLocation {
+ location: TEST_LOCATION.to_string()
+ },
+ TableUpdate::AddSchema {
+ last_column_id: Some(LAST_ASSIGNED_COLUMN_ID),
+ schema: schema(),
+ },
+ TableUpdate::SetCurrentSchema { schema_id: -1 },
+ TableUpdate::AddSpec {
+ // Because this is a new tables, field-ids are assigned
+ // partition_spec() has None set for field-id
+ spec: BoundPartitionSpec::builder(schema())
+ .with_spec_id(0)
+ .add_unbound_field(UnboundPartitionField {
+ name: "y".to_string(),
+ transform: Transform::Identity,
+ source_id: 2,
+ field_id: Some(1000)
+ })
+ .unwrap()
+ .build()
+ .unwrap()
+ .into_unbound(),
+ },
+ TableUpdate::SetDefaultSpec { spec_id: -1 },
+ TableUpdate::AddSortOrder {
+ sort_order: sort_order(),
+ },
+ TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
+ TableUpdate::SetProperties {
+ updates: HashMap::from_iter(vec![(
+ "property 1".to_string(),
+ "value 1".to_string()
+ )]),
+ }
+ ]);
+ }
+
+ #[test]
+ fn test_new_metadata_changes_unpartitioned_unsorted() {
+ let schema = Schema::builder().build().unwrap();
+ let changes = TableMetadataBuilder::new(
+ schema.clone(),
+ SchemalessPartitionSpec::unpartition_spec().into_unbound(),
+ SortOrder::unsorted_order(),
+ TEST_LOCATION.to_string(),
+ FormatVersion::V1,
+ HashMap::new(),
+ )
+ .unwrap()
+ .build()
+ .unwrap()
+ .changes;
+
+ pretty_assertions::assert_eq!(changes, vec![
+ TableUpdate::SetLocation {
+ location: TEST_LOCATION.to_string()
+ },
+ TableUpdate::AddSchema {
+ last_column_id: Some(0),
+ schema: Schema::builder().build().unwrap(),
+ },
+ TableUpdate::SetCurrentSchema { schema_id: -1 },
+ TableUpdate::AddSpec {
+ // Because this is a new tables, field-ids are assigned
+ // partition_spec() has None set for field-id
+ spec: BoundPartitionSpec::builder(schema)
+ .with_spec_id(0)
+ .build()
+ .unwrap()
+ .into_unbound(),
+ },
+ TableUpdate::SetDefaultSpec { spec_id: -1 },
+ TableUpdate::AddSortOrder {
+ sort_order: SortOrder::unsorted_order(),
+ },
+ TableUpdate::SetDefaultSortOrder { sort_order_id: -1 },
+ ]);
+ }
+
+ #[test]
+ fn test_add_partition_spec() {
+ let builder = builder_without_changes(FormatVersion::V2);
+
+ let added_spec = UnboundPartitionSpec::builder()
+ .with_spec_id(10)
+ .add_partition_fields(vec![
+ UnboundPartitionField {
+ // The previous field - has field_id set
+ name: "y".to_string(),
+ transform: Transform::Identity,
+ source_id: 2,
+ field_id: Some(1000),
+ },
+ UnboundPartitionField {
+ // A new field without field id - should still be without
field id in changes
+ name: "z".to_string(),
+ transform: Transform::Identity,
+ source_id: 3,
+ field_id: None,
+ },
+ ])
+ .unwrap()
+ .build();
+
+ let build_result = builder
+ .add_partition_spec(added_spec.clone())
+ .unwrap()
+ .build()
+ .unwrap();
+
+ // Spec id should be re-assigned
+ let expected_change = added_spec.with_spec_id(1);
+ let expected_spec = BoundPartitionSpec::builder(schema())
+ .with_spec_id(1)
+ .add_unbound_field(UnboundPartitionField {
+ name: "y".to_string(),
+ transform: Transform::Identity,
+ source_id: 2,
+ field_id: Some(1000),
+ })
+ .unwrap()
+ .add_unbound_field(UnboundPartitionField {
+ name: "z".to_string(),
+ transform: Transform::Identity,
+ source_id: 3,
+ field_id: Some(1001),
+ })
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(build_result.changes.len(), 1);
+ assert_eq!(
+ build_result.metadata.partition_spec_by_id(1),
+ Some(&Arc::new(expected_spec.into_schemaless()))
+ );
+ assert_eq!(build_result.metadata.default_spec.spec_id(), 0);
+ assert_eq!(build_result.metadata.last_partition_id, 1001);
+ pretty_assertions::assert_eq!(build_result.changes[0],
TableUpdate::AddSpec {
+ spec: expected_change
+ });
+ }
+
+ #[test]
+ fn test_set_default_partition_spec() {
+ let builder = builder_without_changes(FormatVersion::V2);
+ let schema = builder.get_current_schema().unwrap().clone();
+ let added_spec = UnboundPartitionSpec::builder()
+ .with_spec_id(10)
+ .add_partition_field(1, "y_bucket[2]", Transform::Bucket(2))
+ .unwrap()
+ .build();
+
+ let build_result = builder
+ .add_partition_spec(added_spec.clone())
+ .unwrap()
+ .set_default_partition_spec(-1)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let expected_spec = BoundPartitionSpec::builder(schema)
+ .with_spec_id(1)
+ .add_unbound_field(UnboundPartitionField {
+ name: "y_bucket[2]".to_string(),
+ transform: Transform::Bucket(2),
+ source_id: 1,
+ field_id: Some(1001),
+ })
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(build_result.changes.len(), 2);
+ assert_eq!(build_result.metadata.default_spec,
Arc::new(expected_spec));
+ assert_eq!(build_result.changes, vec![
+ TableUpdate::AddSpec {
+ // Should contain the actual ID that was used
+ spec: added_spec.with_spec_id(1)
+ },
+ TableUpdate::SetDefaultSpec { spec_id: -1 }
+ ]);
+ }
+
+ #[test]
+ fn test_set_existing_default_partition_spec() {
+ let builder = builder_without_changes(FormatVersion::V2);
+ // Add and set an unbound spec as current
+ let unbound_spec =
UnboundPartitionSpec::builder().with_spec_id(1).build();
+ let build_result = builder
+ .add_partition_spec(unbound_spec.clone())
+ .unwrap()
+ .set_default_partition_spec(-1)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(build_result.changes.len(), 2);
+ assert_eq!(build_result.changes[0], TableUpdate::AddSpec {
+ spec: unbound_spec.clone()
+ });
+ assert_eq!(build_result.changes[1], TableUpdate::SetDefaultSpec {
+ spec_id: -1
+ });
+ assert_eq!(
+ build_result.metadata.default_spec,
+ Arc::new(
+ unbound_spec
+ .bind(build_result.metadata.current_schema().clone())
+ .unwrap()
+ )
+ );
+
+ // Set old spec again
+ let build_result = build_result
+ .metadata
+ .into_builder(Some(
+
"s3://bucket/test/location/metadata/metadata1.json".to_string(),
+ ))
+ .set_default_partition_spec(0)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(build_result.changes.len(), 1);
+ assert_eq!(build_result.changes[0], TableUpdate::SetDefaultSpec {
+ spec_id: 0
+ });
+ assert_eq!(
+ build_result.metadata.default_spec,
+ Arc::new(
+ partition_spec()
+ .bind(build_result.metadata.current_schema().clone())
+ .unwrap()
+ )
+ );
+ }
+
+ #[test]
+ fn test_add_sort_order() {
+ let builder = builder_without_changes(FormatVersion::V2);
+
+ let added_sort_order = SortOrder::builder()
+ .with_order_id(10)
+ .with_fields(vec![SortField {
+ source_id: 1,
+ transform: Transform::Identity,
+ direction: SortDirection::Ascending,
+ null_order: NullOrder::First,
+ }])
+ .build(&schema())
+ .unwrap();
+
+ let build_result = builder
+ .add_sort_order(added_sort_order.clone())
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let expected_sort_order = added_sort_order.with_order_id(2);
+
+ assert_eq!(build_result.changes.len(), 1);
+ assert_eq!(build_result.metadata.sort_orders.keys().max(), Some(&2));
+ pretty_assertions::assert_eq!(
+ build_result.metadata.sort_order_by_id(2),
+ Some(&Arc::new(expected_sort_order.clone()))
+ );
+ pretty_assertions::assert_eq!(build_result.changes[0],
TableUpdate::AddSortOrder {
+ sort_order: expected_sort_order
+ });
+ }
+
+ #[test]
+ fn test_add_compatible_schema() {
+ let builder = builder_without_changes(FormatVersion::V2);
+
+ let added_schema = Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ NestedField::required(1, "x",
Type::Primitive(PrimitiveType::Long)).into(),
+ NestedField::required(2, "y",
Type::Primitive(PrimitiveType::Long)).into(),
+ NestedField::required(3, "z",
Type::Primitive(PrimitiveType::Long)).into(),
+ NestedField::required(4, "a",
Type::Primitive(PrimitiveType::Long)).into(),
+ ])
+ .build()
+ .unwrap();
+
+ let build_result = builder
+ .add_current_schema(added_schema.clone())
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(build_result.changes.len(), 2);
+ assert_eq!(build_result.metadata.schemas.keys().max(), Some(&1));
+ pretty_assertions::assert_eq!(
+ build_result.metadata.schema_by_id(1),
+ Some(&Arc::new(added_schema.clone()))
+ );
+ pretty_assertions::assert_eq!(build_result.changes[0],
TableUpdate::AddSchema {
+ last_column_id: Some(4),
+ schema: added_schema
+ });
+ assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema {
+ schema_id: -1
+ });
+ }
+
+ #[test]
+ fn
test_set_current_schema_change_is_minus_one_if_schema_was_added_in_this_change()
{
+ let builder = builder_without_changes(FormatVersion::V2);
+
+ let added_schema = Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ NestedField::required(1, "x",
Type::Primitive(PrimitiveType::Long)).into(),
+ NestedField::required(2, "y",
Type::Primitive(PrimitiveType::Long)).into(),
+ NestedField::required(3, "z",
Type::Primitive(PrimitiveType::Long)).into(),
+ NestedField::required(4, "a",
Type::Primitive(PrimitiveType::Long)).into(),
+ ])
+ .build()
+ .unwrap();
+
+ let build_result = builder
+ .add_schema(added_schema.clone())
+ .set_current_schema(1)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(build_result.changes.len(), 2);
+ assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema {
+ schema_id: -1
+ });
+ }
+
+ #[test]
+ fn test_no_metadata_log_for_create_table() {
+ let build_result = TableMetadataBuilder::new(
+ schema(),
+ partition_spec(),
+ sort_order(),
+ TEST_LOCATION.to_string(),
+ FormatVersion::V2,
+ HashMap::new(),
+ )
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(build_result.metadata.metadata_log.len(), 0);
+ }
+
+ #[test]
+ fn test_no_metadata_log_entry_for_no_previous_location() {
+ // Used for first commit after stage-creation of tables
+ let metadata = builder_without_changes(FormatVersion::V2)
+ .build()
+ .unwrap()
+ .metadata;
+ assert_eq!(metadata.metadata_log.len(), 1);
+
+ let build_result = metadata
+ .into_builder(None)
+ .set_properties(HashMap::from_iter(vec![(
+ "foo".to_string(),
+ "bar".to_string(),
+ )]))
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(build_result.metadata.metadata_log.len(), 1);
+ }
+
+ #[test]
+ fn test_from_metadata_generates_metadata_log() {
+ let metadata_path =
"s3://bucket/test/location/metadata/metadata1.json";
+ let builder = TableMetadataBuilder::new(
+ schema(),
+ partition_spec(),
+ sort_order(),
+ TEST_LOCATION.to_string(),
+ FormatVersion::V2,
+ HashMap::new(),
+ )
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata
+ .into_builder(Some(metadata_path.to_string()));
+
+ let builder = builder
+ .add_default_sort_order(SortOrder::unsorted_order())
+ .unwrap();
+
+ let build_result = builder.build().unwrap();
+
+ assert_eq!(build_result.metadata.metadata_log.len(), 1);
+ assert_eq!(
+ build_result.metadata.metadata_log[0].metadata_file,
+ metadata_path
+ );
+ }
+
+ #[test]
+ fn test_set_ref() {
+ let builder = builder_without_changes(FormatVersion::V2);
+
+ let snapshot = Snapshot::builder()
+ .with_snapshot_id(1)
+ .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
+ .with_sequence_number(0)
+ .with_schema_id(0)
+ .with_manifest_list("/snap-1.avro")
+ .with_summary(Summary {
+ operation: Operation::Append,
+ other: HashMap::from_iter(vec![
+ (
+ "spark.app.id".to_string(),
+ "local-1662532784305".to_string(),
+ ),
+ ("added-data-files".to_string(), "4".to_string()),
+ ("added-records".to_string(), "4".to_string()),
+ ("added-files-size".to_string(), "6001".to_string()),
+ ]),
+ })
+ .build();
+
+ let builder = builder.add_snapshot(snapshot.clone()).unwrap();
+
+ assert!(builder
+ .clone()
+ .set_ref(MAIN_BRANCH, SnapshotReference {
+ snapshot_id: 10,
+ retention: SnapshotRetention::Branch {
+ min_snapshots_to_keep: Some(10),
+ max_snapshot_age_ms: None,
+ max_ref_age_ms: None,
+ },
+ })
+ .unwrap_err()
+ .to_string()
+ .contains("Cannot set 'main' to unknown snapshot: '10'"));
+
+ let build_result = builder
+ .set_ref(MAIN_BRANCH, SnapshotReference {
+ snapshot_id: 1,
+ retention: SnapshotRetention::Branch {
+ min_snapshots_to_keep: Some(10),
+ max_snapshot_age_ms: None,
+ max_ref_age_ms: None,
+ },
+ })
+ .unwrap()
+ .build()
+ .unwrap();
+ assert_eq!(build_result.metadata.snapshots.len(), 1);
+ assert_eq!(
+ build_result.metadata.snapshot_by_id(1),
+ Some(&Arc::new(snapshot.clone()))
+ );
+ assert_eq!(build_result.metadata.snapshot_log, vec![SnapshotLog {
+ snapshot_id: 1,
+ timestamp_ms: snapshot.timestamp_ms()
+ }])
+ }
+
+ #[test]
+ fn test_snapshot_log_skips_intermediates() {
+ let builder = builder_without_changes(FormatVersion::V2);
+
+ let snapshot_1 = Snapshot::builder()
+ .with_snapshot_id(1)
+ .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
+ .with_sequence_number(0)
+ .with_schema_id(0)
+ .with_manifest_list("/snap-1.avro")
+ .with_summary(Summary {
+ operation: Operation::Append,
+ other: HashMap::from_iter(vec![
+ (
+ "spark.app.id".to_string(),
+ "local-1662532784305".to_string(),
+ ),
+ ("added-data-files".to_string(), "4".to_string()),
+ ("added-records".to_string(), "4".to_string()),
+ ("added-files-size".to_string(), "6001".to_string()),
+ ]),
+ })
+ .build();
+
+ let snapshot_2 = Snapshot::builder()
+ .with_snapshot_id(2)
+ .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
+ .with_sequence_number(0)
+ .with_schema_id(0)
+ .with_manifest_list("/snap-1.avro")
+ .with_summary(Summary {
+ operation: Operation::Append,
+ other: HashMap::from_iter(vec![
+ (
+ "spark.app.id".to_string(),
+ "local-1662532784305".to_string(),
+ ),
+ ("added-data-files".to_string(), "4".to_string()),
+ ("added-records".to_string(), "4".to_string()),
+ ("added-files-size".to_string(), "6001".to_string()),
+ ]),
+ })
+ .build();
+
+ let result = builder
+ .add_snapshot(snapshot_1)
+ .unwrap()
+ .set_ref(MAIN_BRANCH, SnapshotReference {
+ snapshot_id: 1,
+ retention: SnapshotRetention::Branch {
+ min_snapshots_to_keep: Some(10),
+ max_snapshot_age_ms: None,
+ max_ref_age_ms: None,
+ },
+ })
+ .unwrap()
+ .set_branch_snapshot(snapshot_2.clone(), MAIN_BRANCH)
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(result.metadata.snapshot_log, vec![SnapshotLog {
+ snapshot_id: 2,
+ timestamp_ms: snapshot_2.timestamp_ms()
+ }]);
+ assert_eq!(result.metadata.current_snapshot().unwrap().snapshot_id(),
2);
+ }
+
+ #[test]
+ fn test_set_branch_snapshot_creates_branch_if_not_exists() {
+ let builder = builder_without_changes(FormatVersion::V2);
+
+ let snapshot = Snapshot::builder()
+ .with_snapshot_id(2)
+ .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
+ .with_sequence_number(0)
+ .with_schema_id(0)
+ .with_manifest_list("/snap-1.avro")
+ .with_summary(Summary {
+ operation: Operation::Append,
+ other: HashMap::new(),
+ })
+ .build();
+
+ let build_result = builder
+ .set_branch_snapshot(snapshot.clone(), "new_branch")
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let reference = SnapshotReference {
+ snapshot_id: 2,
+ retention: SnapshotRetention::Branch {
+ min_snapshots_to_keep: None,
+ max_snapshot_age_ms: None,
+ max_ref_age_ms: None,
+ },
+ };
+
+ assert_eq!(build_result.metadata.refs.len(), 1);
+ assert_eq!(
+ build_result.metadata.refs.get("new_branch"),
+ Some(&reference)
+ );
+ assert_eq!(build_result.changes, vec![
+ TableUpdate::AddSnapshot { snapshot },
+ TableUpdate::SetSnapshotRef {
+ ref_name: "new_branch".to_string(),
+ reference
+ }
+ ]);
+ }
+
+ #[test]
+ fn test_cannot_add_duplicate_snapshot_id() {
+ let builder = builder_without_changes(FormatVersion::V2);
+
+ let snapshot = Snapshot::builder()
+ .with_snapshot_id(2)
+ .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
+ .with_sequence_number(0)
+ .with_schema_id(0)
+ .with_manifest_list("/snap-1.avro")
+ .with_summary(Summary {
+ operation: Operation::Append,
+ other: HashMap::from_iter(vec![
+ (
+ "spark.app.id".to_string(),
+ "local-1662532784305".to_string(),
+ ),
+ ("added-data-files".to_string(), "4".to_string()),
+ ("added-records".to_string(), "4".to_string()),
+ ("added-files-size".to_string(), "6001".to_string()),
+ ]),
+ })
+ .build();
+
+ let builder = builder.add_snapshot(snapshot.clone()).unwrap();
+ builder.add_snapshot(snapshot).unwrap_err();
+ }
+
+ #[test]
+ fn test_add_incompatible_current_schema_fails() {
+ let builder = builder_without_changes(FormatVersion::V2);
+
+ let added_schema = Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![])
+ .build()
+ .unwrap();
+
+ let err = builder
+ .add_current_schema(added_schema)
+ .unwrap()
+ .build()
+ .unwrap_err();
+
+ assert!(err
+ .to_string()
+ .contains("Cannot find partition source field"));
+ }
+
+ #[test]
+ fn test_add_partition_spec_for_v1_requires_sequential_ids() {
+ let builder = builder_without_changes(FormatVersion::V1);
+
+ let added_spec = UnboundPartitionSpec::builder()
+ .with_spec_id(10)
+ .add_partition_fields(vec![
+ UnboundPartitionField {
+ name: "y".to_string(),
+ transform: Transform::Identity,
+ source_id: 2,
+ field_id: Some(1000),
+ },
+ UnboundPartitionField {
+ name: "z".to_string(),
+ transform: Transform::Identity,
+ source_id: 3,
+ field_id: Some(1002),
+ },
+ ])
+ .unwrap()
+ .build();
+
+ let err = builder.add_partition_spec(added_spec).unwrap_err();
+ assert!(err.to_string().contains(
+ "Cannot add partition spec with non-sequential field ids to format
version 1 table"
+ ));
+ }
+
+ #[test]
+ fn test_expire_metadata_log() {
+ let builder = builder_without_changes(FormatVersion::V2);
+ let metadata = builder
+ .set_properties(HashMap::from_iter(vec![(
+ PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX.to_string(),
+ "2".to_string(),
+ )]))
+ .unwrap()
+ .build()
+ .unwrap();
+ assert_eq!(metadata.metadata.metadata_log.len(), 1);
+ assert_eq!(metadata.expired_metadata_logs.len(), 0);
+
+ let metadata = metadata
+ .metadata
+ .into_builder(Some("path2".to_string()))
+ .set_properties(HashMap::from_iter(vec![(
+ "change_nr".to_string(),
+ "1".to_string(),
+ )]))
+ .unwrap()
+ .build()
+ .unwrap();
+
+ assert_eq!(metadata.metadata.metadata_log.len(), 2);
+ assert_eq!(metadata.expired_metadata_logs.len(), 0);
+
+ let metadata = metadata
+ .metadata
+ .into_builder(Some("path2".to_string()))
+ .set_properties(HashMap::from_iter(vec![(
+ "change_nr".to_string(),
+ "2".to_string(),
+ )]))
+ .unwrap()
+ .build()
+ .unwrap();
+ assert_eq!(metadata.metadata.metadata_log.len(), 2);
+ assert_eq!(metadata.expired_metadata_logs.len(), 1);
+ }
+
+ #[test]
+ fn test_v2_sequence_number_cannot_decrease() {
+ let builder = builder_without_changes(FormatVersion::V2);
+
+ let snapshot = Snapshot::builder()
+ .with_snapshot_id(1)
+ .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
+ .with_sequence_number(1)
+ .with_schema_id(0)
+ .with_manifest_list("/snap-1")
+ .with_summary(Summary {
+ operation: Operation::Append,
+ other: HashMap::new(),
+ })
+ .build();
+
+ let builder = builder
+ .add_snapshot(snapshot.clone())
+ .unwrap()
+ .set_ref(MAIN_BRANCH, SnapshotReference {
+ snapshot_id: 1,
+ retention: SnapshotRetention::Branch {
+ min_snapshots_to_keep: Some(10),
+ max_snapshot_age_ms: None,
+ max_ref_age_ms: None,
+ },
+ })
+ .unwrap();
+
+ let snapshot = Snapshot::builder()
+ .with_snapshot_id(2)
+ .with_timestamp_ms(builder.metadata.last_updated_ms + 1)
+ .with_sequence_number(0)
+ .with_schema_id(0)
+ .with_manifest_list("/snap-0")
+ .with_parent_snapshot_id(Some(1))
+ .with_summary(Summary {
+ operation: Operation::Append,
+ other: HashMap::new(),
+ })
+ .build();
+
+ let err = builder
+ .set_branch_snapshot(snapshot, MAIN_BRANCH)
+ .unwrap_err();
+ assert!(err
+ .to_string()
+ .contains("Cannot add snapshot with sequence number"));
+ }
+}