This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 61a7de51 feat: add schema update to table metadata builder (#437)
61a7de51 is described below
commit 61a7de51c060a29bc35e00b8153d6a25d30035f0
Author: Guotao Yu <[email protected]>
AuthorDate: Fri Dec 26 14:59:03 2025 +0800
feat: add schema update to table metadata builder (#437)
---
src/iceberg/partition_spec.cc | 39 +++
src/iceberg/partition_spec.h | 6 +-
src/iceberg/schema.cc | 44 +++
src/iceberg/schema.h | 21 ++
src/iceberg/table_metadata.cc | 208 ++++++++++++-
src/iceberg/table_metadata.h | 11 +-
src/iceberg/table_update.cc | 6 +-
src/iceberg/table_update.h | 6 +-
src/iceberg/test/metadata_io_test.cc | 4 +
src/iceberg/test/partition_spec_test.cc | 12 +-
src/iceberg/test/residual_evaluator_test.cc | 4 +-
src/iceberg/test/table_metadata_builder_test.cc | 372 +++++++++++++++++++++++-
src/iceberg/test/table_requirements_test.cc | 15 +-
src/iceberg/test/table_update_test.cc | 6 +-
src/iceberg/test/update_partition_spec_test.cc | 2 +-
15 files changed, 711 insertions(+), 45 deletions(-)
diff --git a/src/iceberg/partition_spec.cc b/src/iceberg/partition_spec.cc
index 7d0dab40..af77d1fe 100644
--- a/src/iceberg/partition_spec.cc
+++ b/src/iceberg/partition_spec.cc
@@ -131,6 +131,8 @@ bool PartitionSpec::Equals(const PartitionSpec& other)
const {
}
Status PartitionSpec::Validate(const Schema& schema, bool
allow_missing_fields) const {
+ ICEBERG_RETURN_UNEXPECTED(ValidatePartitionName(schema, *this));
+
std::unordered_map<int32_t, int32_t> parents = IndexParents(schema);
for (const auto& partition_field : fields_) {
ICEBERG_ASSIGN_OR_RAISE(auto source_field,
@@ -177,6 +179,43 @@ Status PartitionSpec::Validate(const Schema& schema, bool
allow_missing_fields)
return {};
}
+Status PartitionSpec::ValidatePartitionName(const Schema& schema,
+ const PartitionSpec& spec) {
+ std::unordered_set<std::string> partition_names;
+ for (const auto& partition_field : spec.fields()) {
+ auto name = std::string(partition_field.name());
+ ICEBERG_PRECHECK(!name.empty(), "Cannot use empty partition name: {}",
name);
+
+ if (partition_names.contains(name)) {
+ return InvalidArgument("Cannot use partition name more than once: {}",
name);
+ }
+ partition_names.insert(name);
+
+ ICEBERG_ASSIGN_OR_RAISE(auto schema_field, schema.FindFieldByName(name));
+ auto transform_type = partition_field.transform()->transform_type();
+ if (transform_type == TransformType::kIdentity ||
+ transform_type == TransformType::kVoid) {
+ // for identity/nulls transform case we allow conflicts between
partition and schema
+ // field name as long as they are sourced from the same schema field
+ if (schema_field.has_value() &&
+ schema_field.value().get().field_id() !=
partition_field.source_id()) {
+ return InvalidArgument(
+ "Cannot create identity partition sourced from different field in
schema: {}",
+ name);
+ }
+ } else {
+ // for all other transforms we don't allow conflicts between partition
name and
+ // schema field name
+ if (schema_field.has_value()) {
+ return InvalidArgument(
+ "Cannot create partition from name that exists in schema: {}",
name);
+ }
+ }
+ }
+
+ return {};
+}
+
Result<std::vector<std::reference_wrapper<const PartitionField>>>
PartitionSpec::GetFieldsBySourceId(int32_t source_id) const {
ICEBERG_ASSIGN_OR_RAISE(auto source_id_to_fields,
source_id_to_fields_.Get(*this));
diff --git a/src/iceberg/partition_spec.h b/src/iceberg/partition_spec.h
index 5fab5952..9aa1954a 100644
--- a/src/iceberg/partition_spec.h
+++ b/src/iceberg/partition_spec.h
@@ -79,11 +79,15 @@ class ICEBERG_EXPORT PartitionSpec : public
util::Formattable {
/// \brief Validates the partition spec against a schema.
/// \param schema The schema to validate against.
- /// \param allowMissingFields Whether to skip validation for partition
fields whose
+ /// \param allow_missing_fields Whether to skip validation for partition
fields whose
/// source columns have been dropped from the schema.
/// \return Error status if the partition spec is invalid.
Status Validate(const Schema& schema, bool allow_missing_fields) const;
+ // \brief Validates the partition field names are unique within the
partition spec and
+ // schema.
+ static Status ValidatePartitionName(const Schema& schema, const
PartitionSpec& spec);
+
/// \brief Get the partition fields by source ID.
/// \param source_id The id of the source field.
/// \return The partition fields by source ID, or NotFound if the source
field is not
diff --git a/src/iceberg/schema.cc b/src/iceberg/schema.cc
index 4b4b6c26..0b7336b6 100644
--- a/src/iceberg/schema.cc
+++ b/src/iceberg/schema.cc
@@ -25,6 +25,7 @@
#include "iceberg/result.h"
#include "iceberg/row/struct_like.h"
#include "iceberg/schema_internal.h"
+#include "iceberg/table_metadata.h"
#include "iceberg/type.h"
#include "iceberg/util/formatter.h" // IWYU pragma: keep
#include "iceberg/util/macros.h"
@@ -147,6 +148,20 @@ Result<std::unordered_map<int32_t, std::vector<size_t>>>
Schema::InitIdToPositio
return visitor.Finish();
}
+Result<int32_t> Schema::InitHighestFieldId(const Schema& self) {
+ ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, self.id_to_field_.Get(self));
+
+ if (id_to_field.get().empty()) {
+ return kInitialColumnId;
+ }
+
+ auto max_it = std::ranges::max_element(
+ id_to_field.get(),
+ [](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; });
+
+ return max_it->first;
+}
+
Result<std::unique_ptr<StructLikeAccessor>> Schema::GetAccessorById(
int32_t field_id) const {
ICEBERG_ASSIGN_OR_RAISE(auto id_to_position_path,
id_to_position_path_.Get(*this));
@@ -227,4 +242,33 @@ Result<std::vector<std::string>>
Schema::IdentifierFieldNames() const {
return names;
}
+Result<int32_t> Schema::HighestFieldId() const { return
highest_field_id_.Get(*this); }
+
+bool Schema::SameSchema(const Schema& other) const {
+ return fields_ == other.fields_ && identifier_field_ids_ ==
other.identifier_field_ids_;
+}
+
+Status Schema::Validate(int32_t format_version) const {
+ // Get all fields including nested ones
+ ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, id_to_field_.Get(*this));
+
+ // Check each field's type and defaults
+ for (const auto& [field_id, field_ref] : id_to_field.get()) {
+ const auto& field = field_ref.get();
+
+ // Check if the field's type requires a minimum format version
+ if (auto it =
TableMetadata::kMinFormatVersions.find(field.type()->type_id());
+ it != TableMetadata::kMinFormatVersions.end()) {
+ if (int32_t min_format_version = it->second; format_version <
min_format_version) {
+ return InvalidSchema("Invalid type for {}: {} is not supported until
v{}",
+ field.name(), *field.type(), min_format_version);
+ }
+ }
+
+ // TODO(GuoTao.yu): Check default values when they are supported
+ }
+
+ return {};
+}
+
} // namespace iceberg
diff --git a/src/iceberg/schema.h b/src/iceberg/schema.h
index 410fa59e..deda7e84 100644
--- a/src/iceberg/schema.h
+++ b/src/iceberg/schema.h
@@ -46,6 +46,7 @@ namespace iceberg {
class ICEBERG_EXPORT Schema : public StructType {
public:
static constexpr int32_t kInitialSchemaId = 0;
+ static constexpr int32_t kInitialColumnId = 0;
static constexpr int32_t kInvalidColumnId = -1;
/// \brief Special value to select all columns from manifest files.
@@ -130,6 +131,23 @@ class ICEBERG_EXPORT Schema : public StructType {
/// \brief Return the canonical field names of the identifier fields.
Result<std::vector<std::string>> IdentifierFieldNames() const;
+ /// \brief Get the highest field ID in the schema.
+ /// \return The highest field ID.
+ Result<int32_t> HighestFieldId() const;
+
+ /// \brief Checks whether this schema is equivalent to another schema while
ignoring the
+ /// schema id.
+ bool SameSchema(const Schema& other) const;
+
+ /// \brief Validate the schema for a given format version.
+ ///
+ /// This validates that the schema does not contain types that were released
in later
+ /// format versions.
+ ///
+ /// \param format_version The format version to validate against.
+ /// \return Error status if the schema is invalid.
+ Status Validate(int32_t format_version) const;
+
friend bool operator==(const Schema& lhs, const Schema& rhs) { return
lhs.Equals(rhs); }
private:
@@ -158,6 +176,7 @@ class ICEBERG_EXPORT Schema : public StructType {
InitLowerCaseNameToIdMap(const Schema&);
static Result<std::unordered_map<int32_t, std::vector<size_t>>>
InitIdToPositionPath(
const Schema&);
+ static Result<int32_t> InitHighestFieldId(const Schema&);
const std::optional<int32_t> schema_id_;
/// Field IDs that uniquely identify rows in the table.
@@ -170,6 +189,8 @@ class ICEBERG_EXPORT Schema : public StructType {
Lazy<InitLowerCaseNameToIdMap> lowercase_name_to_id_;
/// Mapping from field id to (nested) position path to access the field.
Lazy<InitIdToPositionPath> id_to_position_path_;
+ /// Highest field ID in the schema.
+ Lazy<InitHighestFieldId> highest_field_id_;
};
} // namespace iceberg
diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc
index 46b7f92d..b6cb5f63 100644
--- a/src/iceberg/table_metadata.cc
+++ b/src/iceberg/table_metadata.cc
@@ -432,7 +432,11 @@ class TableMetadataBuilder::Impl {
Status RemoveProperties(const std::unordered_set<std::string>& removed);
Status SetDefaultPartitionSpec(int32_t spec_id);
Result<int32_t> AddPartitionSpec(const PartitionSpec& spec);
- std::unique_ptr<TableMetadata> Build();
+ Status SetCurrentSchema(int32_t schema_id);
+ Status RemoveSchemas(const std::unordered_set<int32_t>& schema_ids);
+ Result<int32_t> AddSchema(const Schema& schema, int32_t new_last_column_id);
+
+ Result<std::unique_ptr<TableMetadata>> Build();
private:
/// \brief Internal method to check for existing sort order and reuse its ID
or create a
@@ -447,6 +451,12 @@ class TableMetadataBuilder::Impl {
/// \return The ID to use for this partition spec (reused if exists, new
otherwise)
int32_t ReuseOrCreateNewPartitionSpecId(const PartitionSpec& new_spec);
+ /// \brief Internal method to check for existing schema and reuse its ID or
create a new
+ /// one
+ /// \param new_schema The schema to check
+ /// \return The ID to use for this schema (reused if exists, new otherwise
+ int32_t ReuseOrCreateNewSchemaId(const Schema& new_schema) const;
+
private:
// Base metadata (nullptr for new tables)
const TableMetadata* base_;
@@ -518,7 +528,7 @@ Status
TableMetadataBuilder::Impl::UpgradeFormatVersion(int8_t new_format_versio
}
Status TableMetadataBuilder::Impl::SetDefaultSortOrder(int32_t order_id) {
- if (order_id == -1) {
+ if (order_id == kLastAdded) {
if (!last_added_order_id_.has_value()) {
return InvalidArgument(
"Cannot set last added sort order: no sort order has been added");
@@ -583,7 +593,7 @@ Result<int32_t>
TableMetadataBuilder::Impl::AddSortOrder(const SortOrder& order)
}
Status TableMetadataBuilder::Impl::SetDefaultPartitionSpec(int32_t spec_id) {
- if (spec_id == -1) {
+ if (spec_id == kLastAdded) {
if (!last_added_spec_id_.has_value()) {
return ValidationFailed(
"Cannot set last added partition spec: no partition spec has been
added");
@@ -632,8 +642,7 @@ Result<int32_t>
TableMetadataBuilder::Impl::AddPartitionSpec(const PartitionSpec
ICEBERG_ASSIGN_OR_RAISE(
std::shared_ptr<PartitionSpec> new_spec,
- PartitionSpec::Make(new_spec_id,
std::vector<PartitionField>(spec.fields().begin(),
-
spec.fields().end())));
+ PartitionSpec::Make(new_spec_id, spec.fields() |
std::ranges::to<std::vector>()));
metadata_.last_partition_id =
std::max(metadata_.last_partition_id,
new_spec->last_assigned_field_id());
metadata_.partition_specs.push_back(new_spec);
@@ -681,7 +690,136 @@ Status TableMetadataBuilder::Impl::RemoveProperties(
return {};
}
-std::unique_ptr<TableMetadata> TableMetadataBuilder::Impl::Build() {
+Status TableMetadataBuilder::Impl::SetCurrentSchema(int32_t schema_id) {
+ if (schema_id == kLastAdded) {
+ if (!last_added_schema_id_.has_value()) {
+ return ValidationFailed("Cannot set last added schema: no schema has
been added");
+ }
+ return SetCurrentSchema(last_added_schema_id_.value());
+ }
+
+ if (metadata_.current_schema_id == schema_id) {
+ return {};
+ }
+
+ auto it = schemas_by_id_.find(schema_id);
+ ICEBERG_PRECHECK(it != schemas_by_id_.end(),
+ "Cannot set current schema to unknown schema: {}",
schema_id);
+ const auto& schema = it->second;
+
+ // Rebuild all partition specs for the new current schema
+ std::vector<std::shared_ptr<PartitionSpec>> updated_specs;
+ for (const auto& partition_spec : metadata_.partition_specs) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto updated_spec,
+ PartitionSpec::Make(partition_spec->spec_id(),
+ partition_spec->fields() |
std::ranges::to<std::vector>()));
+
+ ICEBERG_RETURN_UNEXPECTED(
+ PartitionSpec::ValidatePartitionName(*schema, *updated_spec));
+
+ updated_specs.push_back(std::move(updated_spec));
+ }
+ metadata_.partition_specs = std::move(updated_specs);
+
+ specs_by_id_.clear();
+ for (const auto& spec : metadata_.partition_specs) {
+ specs_by_id_.emplace(spec->spec_id(), spec);
+ }
+
+ // Rebuild all sort orders for the new current schema
+ std::vector<std::shared_ptr<SortOrder>> updated_orders;
+ for (const auto& sort_order : metadata_.sort_orders) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto updated_order,
+ SortOrder::Make(sort_order->order_id(),
+ sort_order->fields() |
std::ranges::to<std::vector>()));
+ updated_orders.push_back(std::move(updated_order));
+ }
+ metadata_.sort_orders = std::move(updated_orders);
+
+ sort_orders_by_id_.clear();
+ for (const auto& order : metadata_.sort_orders) {
+ sort_orders_by_id_.emplace(order->order_id(), order);
+ }
+
+ // Set the current schema ID
+ metadata_.current_schema_id = schema_id;
+
+ // Record the change
+ if (last_added_schema_id_.has_value() && last_added_schema_id_.value() ==
schema_id) {
+ changes_.push_back(std::make_unique<table::SetCurrentSchema>(kLastAdded));
+ } else {
+ changes_.push_back(std::make_unique<table::SetCurrentSchema>(schema_id));
+ }
+
+ return {};
+}
+
+Status TableMetadataBuilder::Impl::RemoveSchemas(
+ const std::unordered_set<int32_t>& schema_ids) {
+ auto current_schema_id =
metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
+ ICEBERG_PRECHECK(!schema_ids.contains(current_schema_id),
+ "Cannot remove current schema: {}", current_schema_id);
+
+ if (!schema_ids.empty()) {
+ metadata_.schemas = metadata_.schemas | std::views::filter([&](const auto&
schema) {
+ return schema->schema_id().has_value() &&
+
!schema_ids.contains(schema->schema_id().value());
+ }) |
+
std::ranges::to<std::vector<std::shared_ptr<Schema>>>();
+ changes_.push_back(std::make_unique<table::RemoveSchemas>(schema_ids));
+ }
+
+ return {};
+}
+
+Result<int32_t> TableMetadataBuilder::Impl::AddSchema(const Schema& schema,
+ int32_t
new_last_column_id) {
+ ICEBERG_PRECHECK(new_last_column_id >= metadata_.last_column_id,
+ "Invalid last column ID: {} < {} (previous last column ID)",
+ new_last_column_id, metadata_.last_column_id);
+
+ ICEBERG_RETURN_UNEXPECTED(schema.Validate(metadata_.format_version));
+
+ auto new_schema_id = ReuseOrCreateNewSchemaId(schema);
+ auto schema_found = schemas_by_id_.contains(new_schema_id);
+ if (schema_found && new_last_column_id == metadata_.last_column_id) {
+ // update last_added_schema_id if the schema was added in this set of
changes (since
+ // it is now the last)
+ bool is_new_schema =
+ last_added_schema_id_.has_value() &&
+ std::ranges::any_of(changes_, [new_schema_id](const auto& change) {
+ if (change->kind() != TableUpdate::Kind::kAddSchema) {
+ return false;
+ }
+ auto* add_schema =
internal::checked_cast<table::AddSchema*>(change.get());
+ return add_schema->schema()->schema_id() ==
std::make_optional(new_schema_id);
+ });
+ last_added_schema_id_ =
+ is_new_schema ? std::make_optional(new_schema_id) : std::nullopt;
+ return new_schema_id;
+ }
+
+ metadata_.last_column_id = new_last_column_id;
+
+ auto new_schema =
+ std::make_shared<Schema>(schema.fields() |
std::ranges::to<std::vector>(),
+ new_schema_id, schema.IdentifierFieldIds());
+
+ if (!schema_found) {
+ metadata_.schemas.push_back(new_schema);
+ schemas_by_id_.emplace(new_schema_id, new_schema);
+ }
+
+ changes_.push_back(std::make_unique<table::AddSchema>(new_schema,
new_last_column_id));
+
+ last_added_schema_id_ = new_schema_id;
+
+ return new_schema_id;
+}
+
+Result<std::unique_ptr<TableMetadata>> TableMetadataBuilder::Impl::Build() {
// 1. Validate metadata consistency through TableMetadata#Validate
// 2. Update last_updated_ms if there are changes
@@ -691,6 +829,27 @@ std::unique_ptr<TableMetadata>
TableMetadataBuilder::Impl::Build() {
std::chrono::system_clock::now().time_since_epoch())};
}
+ auto current_schema_id =
metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
+ auto schema_it = schemas_by_id_.find(current_schema_id);
+ ICEBERG_PRECHECK(schema_it != schemas_by_id_.end(),
+ "Current schema ID {} not found in schemas",
current_schema_id);
+ {
+ const auto& current_schema = schema_it->second;
+
+ auto spec_it = specs_by_id_.find(metadata_.default_spec_id);
+ ICEBERG_PRECHECK(spec_it != specs_by_id_.end(),
+ "Default spec ID {} not found in partition specs",
+ metadata_.default_spec_id);
+ ICEBERG_RETURN_UNEXPECTED(
+ spec_it->second->Validate(*current_schema,
/*allow_missing_fields=*/false));
+
+ auto sort_order_it =
sort_orders_by_id_.find(metadata_.default_sort_order_id);
+ ICEBERG_PRECHECK(sort_order_it != sort_orders_by_id_.end(),
+ "Default sort order ID {} not found in sort orders",
+ metadata_.default_sort_order_id);
+
ICEBERG_RETURN_UNEXPECTED(sort_order_it->second->Validate(*current_schema));
+ }
+
// 3. Buildup metadata_log from base metadata
int32_t max_metadata_log_size =
metadata_.properties.Get(TableProperties::kMetadataPreviousVersionsMax);
@@ -740,6 +899,21 @@ int32_t
TableMetadataBuilder::Impl::ReuseOrCreateNewPartitionSpecId(
return new_spec_id;
}
+int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewSchemaId(
+ const Schema& new_schema) const {
+ // if the schema already exists, use its id; otherwise use the highest id + 1
+ auto new_schema_id =
metadata_.current_schema_id.value_or(Schema::kInitialSchemaId);
+ for (auto& schema : metadata_.schemas) {
+ auto schema_id = schema->schema_id().value_or(Schema::kInitialSchemaId);
+ if (schema->SameSchema(new_schema)) {
+ return schema_id;
+ } else if (new_schema_id <= schema_id) {
+ new_schema_id = schema_id + 1;
+ }
+ }
+ return new_schema_id;
+}
+
TableMetadataBuilder::TableMetadataBuilder(int8_t format_version)
: impl_(std::make_unique<Impl>(format_version)) {}
@@ -796,16 +970,23 @@ TableMetadataBuilder&
TableMetadataBuilder::UpgradeFormatVersion(
}
TableMetadataBuilder& TableMetadataBuilder::SetCurrentSchema(
- std::shared_ptr<Schema> schema, int32_t new_last_column_id) {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ std::shared_ptr<Schema> const& schema, int32_t new_last_column_id) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema_id,
+ impl_->AddSchema(*schema,
new_last_column_id));
+ return SetCurrentSchema(schema_id);
}
TableMetadataBuilder& TableMetadataBuilder::SetCurrentSchema(int32_t
schema_id) {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetCurrentSchema(schema_id));
+ return *this;
}
-TableMetadataBuilder& TableMetadataBuilder::AddSchema(std::shared_ptr<Schema>
schema) {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+TableMetadataBuilder& TableMetadataBuilder::AddSchema(
+ std::shared_ptr<Schema> const& schema) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto highest_field_id,
schema->HighestFieldId());
+ auto new_last_column_id = std::max(impl_->metadata().last_column_id,
highest_field_id);
+ ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->AddSchema(*schema,
new_last_column_id));
+ return *this;
}
TableMetadataBuilder& TableMetadataBuilder::SetDefaultPartitionSpec(
@@ -831,8 +1012,9 @@ TableMetadataBuilder&
TableMetadataBuilder::RemovePartitionSpecs(
}
TableMetadataBuilder& TableMetadataBuilder::RemoveSchemas(
- const std::vector<int32_t>& schema_ids) {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ const std::unordered_set<int32_t>& schema_ids) {
+ ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemoveSchemas(schema_ids));
+ return *this;
}
TableMetadataBuilder& TableMetadataBuilder::SetDefaultSortOrder(
diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h
index daaada6e..3885a458 100644
--- a/src/iceberg/table_metadata.h
+++ b/src/iceberg/table_metadata.h
@@ -73,10 +73,13 @@ struct ICEBERG_EXPORT TableMetadata {
static constexpr int8_t kDefaultTableFormatVersion = 2;
static constexpr int8_t kSupportedTableFormatVersion = 3;
static constexpr int8_t kMinFormatVersionRowLineage = 3;
+ static constexpr int8_t kMinFormatVersionDefaultValues = 3;
static constexpr int64_t kInitialSequenceNumber = 0;
static constexpr int64_t kInvalidSequenceNumber = -1;
static constexpr int64_t kInitialRowId = 0;
+ static inline const std::unordered_map<TypeId, int8_t> kMinFormatVersions =
{};
+
/// An integer version number for the format
int8_t format_version;
/// A UUID that identifies the table
@@ -187,7 +190,7 @@ ICEBERG_EXPORT std::string ToString(const MetadataLogEntry&
entry);
/// This builder provides a fluent interface for creating and modifying table
metadata.
/// It supports both creating new tables and building from existing metadata.
///
-/// Each modification method generates a corresponding MetadataUpdate that is
tracked
+/// Each modification method generates a corresponding TableUpdate that is
tracked
/// in a changes list. This allows the builder to maintain a complete history
of all
/// modifications made to the table metadata, which is important for tracking
table
/// evolution and for serialization purposes.
@@ -246,7 +249,7 @@ class ICEBERG_EXPORT TableMetadataBuilder : public
ErrorCollector {
/// \param schema The schema to set as current
/// \param new_last_column_id The highest column ID in the schema
/// \return Reference to this builder for method chaining
- TableMetadataBuilder& SetCurrentSchema(std::shared_ptr<Schema> schema,
+ TableMetadataBuilder& SetCurrentSchema(const std::shared_ptr<Schema>& schema,
int32_t new_last_column_id);
/// \brief Set the current schema by schema ID
@@ -259,7 +262,7 @@ class ICEBERG_EXPORT TableMetadataBuilder : public
ErrorCollector {
///
/// \param schema The schema to add
/// \return Reference to this builder for method chaining
- TableMetadataBuilder& AddSchema(std::shared_ptr<Schema> schema);
+ TableMetadataBuilder& AddSchema(const std::shared_ptr<Schema>& schema);
/// \brief Set the default partition spec for the table
///
@@ -289,7 +292,7 @@ class ICEBERG_EXPORT TableMetadataBuilder : public
ErrorCollector {
///
/// \param schema_ids The IDs of schemas to remove
/// \return Reference to this builder for method chaining
- TableMetadataBuilder& RemoveSchemas(const std::vector<int32_t>& schema_ids);
+ TableMetadataBuilder& RemoveSchemas(const std::unordered_set<int32_t>&
schema_ids);
/// \brief Set the default sort order for the table
///
diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc
index 0fc27532..9d92a16e 100644
--- a/src/iceberg/table_update.cc
+++ b/src/iceberg/table_update.cc
@@ -52,7 +52,7 @@ void
UpgradeFormatVersion::GenerateRequirements(TableUpdateContext& context) con
// AddSchema
void AddSchema::ApplyTo(TableMetadataBuilder& builder) const {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ builder.AddSchema(schema_);
}
void AddSchema::GenerateRequirements(TableUpdateContext& context) const {
@@ -62,7 +62,7 @@ void AddSchema::GenerateRequirements(TableUpdateContext&
context) const {
// SetCurrentSchema
void SetCurrentSchema::ApplyTo(TableMetadataBuilder& builder) const {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ builder.SetCurrentSchema(schema_id_);
}
void SetCurrentSchema::GenerateRequirements(TableUpdateContext& context) const
{
@@ -103,7 +103,7 @@ void
RemovePartitionSpecs::GenerateRequirements(TableUpdateContext& context) con
// RemoveSchemas
void RemoveSchemas::ApplyTo(TableMetadataBuilder& builder) const {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ builder.RemoveSchemas(schema_ids_);
}
void RemoveSchemas::GenerateRequirements(TableUpdateContext& context) const {
diff --git a/src/iceberg/table_update.h b/src/iceberg/table_update.h
index 71db517b..a6bdb9e5 100644
--- a/src/iceberg/table_update.h
+++ b/src/iceberg/table_update.h
@@ -216,10 +216,10 @@ class ICEBERG_EXPORT RemovePartitionSpecs : public
TableUpdate {
/// \brief Represents removing schemas from the table
class ICEBERG_EXPORT RemoveSchemas : public TableUpdate {
public:
- explicit RemoveSchemas(std::vector<int32_t> schema_ids)
+ explicit RemoveSchemas(std::unordered_set<int32_t> schema_ids)
: schema_ids_(std::move(schema_ids)) {}
- const std::vector<int32_t>& schema_ids() const { return schema_ids_; }
+ const std::unordered_set<int32_t>& schema_ids() const { return schema_ids_; }
void ApplyTo(TableMetadataBuilder& builder) const override;
@@ -228,7 +228,7 @@ class ICEBERG_EXPORT RemoveSchemas : public TableUpdate {
Kind kind() const override { return Kind::kRemoveSchemas; }
private:
- std::vector<int32_t> schema_ids_;
+ std::unordered_set<int32_t> schema_ids_;
};
/// \brief Represents adding a new sort order to the table
diff --git a/src/iceberg/test/metadata_io_test.cc
b/src/iceberg/test/metadata_io_test.cc
index ea4555d3..7b11cf47 100644
--- a/src/iceberg/test/metadata_io_test.cc
+++ b/src/iceberg/test/metadata_io_test.cc
@@ -30,9 +30,11 @@
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
#include "iceberg/file_io.h"
#include "iceberg/json_internal.h"
+#include "iceberg/partition_spec.h"
#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
+#include "iceberg/sort_order.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_properties.h"
#include "iceberg/test/matchers.h"
@@ -64,6 +66,7 @@ class MetadataIOTest : public TempFileTestBase {
.last_sequence_number = 0,
.schemas = {schema},
.current_schema_id = 1,
+ .partition_specs = {PartitionSpec::Unpartitioned()},
.default_spec_id = 0,
.last_partition_id = 0,
.properties = TableProperties::FromMap({{"key",
"value"}}),
@@ -75,6 +78,7 @@ class MetadataIOTest : public TempFileTestBase {
.manifest_list = "s3://a/b/1.avro",
.summary = {{"operation", "append"}},
})},
+ .sort_orders = {SortOrder::Unsorted()},
.default_sort_order_id = 0,
.next_row_id = 0};
}
diff --git a/src/iceberg/test/partition_spec_test.cc
b/src/iceberg/test/partition_spec_test.cc
index b7a9fedd..f9953a30 100644
--- a/src/iceberg/test/partition_spec_test.cc
+++ b/src/iceberg/test/partition_spec_test.cc
@@ -109,17 +109,17 @@ TEST(PartitionSpecTest, PartitionTypeTest) {
"fields": [ {
"source-id": 4,
"field-id": 1000,
- "name": "ts_day",
+ "name": "__ts_day",
"transform": "day"
}, {
"source-id": 1,
"field-id": 1001,
- "name": "id_bucket",
+ "name": "__id_bucket",
"transform": "bucket[16]"
}, {
"source-id": 2,
"field-id": 1002,
- "name": "id_truncate",
+ "name": "__id_truncate",
"transform": "truncate[4]"
} ]
})"_json;
@@ -137,9 +137,9 @@ TEST(PartitionSpecTest, PartitionTypeTest) {
ICEBERG_UNWRAP_OR_FAIL(auto parsed_spec, PartitionSpecFromJson(schema, json,
1));
ICEBERG_UNWRAP_OR_FAIL(auto partition_type,
parsed_spec->PartitionType(*schema));
- SchemaField pt_field1(1000, "ts_day", date(), true);
- SchemaField pt_field2(1001, "id_bucket", int32(), true);
- SchemaField pt_field3(1002, "id_truncate", string(), true);
+ SchemaField pt_field1(1000, "__ts_day", date(), true);
+ SchemaField pt_field2(1001, "__id_bucket", int32(), true);
+ SchemaField pt_field3(1002, "__id_truncate", string(), true);
ASSERT_EQ(3, partition_type->fields().size());
diff --git a/src/iceberg/test/residual_evaluator_test.cc
b/src/iceberg/test/residual_evaluator_test.cc
index bef17d2b..04c82e25 100644
--- a/src/iceberg/test/residual_evaluator_test.cc
+++ b/src/iceberg/test/residual_evaluator_test.cc
@@ -406,7 +406,7 @@ TEST_F(ResidualEvaluatorTest,
IntegerTruncateTransformResiduals) {
// Valid partitions would be 0, 10, 20...90, 100 etc.
auto truncate_transform = Transform::Truncate(10);
- PartitionField pt_field(50, 1000, "value", truncate_transform);
+ PartitionField pt_field(50, 1000, "__value", truncate_transform);
ICEBERG_UNWRAP_OR_FAIL(auto spec_unique,
PartitionSpec::Make(*schema, 0, {pt_field}, false));
auto spec = std::shared_ptr<PartitionSpec>(spec_unique.release());
@@ -523,7 +523,7 @@ TEST_F(ResidualEvaluatorTest,
StringTruncateTransformResiduals) {
// Valid partitions would be two letter strings for eg: ab, bc etc
auto truncate_transform = Transform::Truncate(2);
- PartitionField pt_field(50, 1000, "value", truncate_transform);
+ PartitionField pt_field(50, 1000, "__value", truncate_transform);
ICEBERG_UNWRAP_OR_FAIL(auto spec_unique,
PartitionSpec::Make(*schema, 0, {pt_field}, false));
auto spec = std::shared_ptr<PartitionSpec>(spec_unique.release());
diff --git a/src/iceberg/test/table_metadata_builder_test.cc
b/src/iceberg/test/table_metadata_builder_test.cc
index bb9ce8c0..a912a08f 100644
--- a/src/iceberg/test/table_metadata_builder_test.cc
+++ b/src/iceberg/test/table_metadata_builder_test.cc
@@ -60,10 +60,11 @@ std::unique_ptr<TableMetadata> CreateBaseMetadata() {
metadata->last_column_id = 3;
metadata->current_schema_id = 0;
metadata->schemas.push_back(CreateTestSchema());
+ metadata->partition_specs.push_back(PartitionSpec::Unpartitioned());
metadata->default_spec_id = PartitionSpec::kInitialSpecId;
metadata->last_partition_id = 0;
metadata->current_snapshot_id = Snapshot::kInvalidSnapshotId;
- metadata->default_sort_order_id = SortOrder::kInitialSortOrderId;
+ metadata->default_sort_order_id = SortOrder::kUnsortedOrderId;
metadata->sort_orders.push_back(SortOrder::Unsorted());
metadata->next_row_id = TableMetadata::kInitialRowId;
metadata->properties = TableProperties::default_properties();
@@ -77,6 +78,10 @@ TEST(TableMetadataBuilderTest, BuildFromEmpty) {
auto builder = TableMetadataBuilder::BuildFromEmpty(2);
ASSERT_NE(builder, nullptr);
+ auto schema = CreateTestSchema();
+ builder->SetCurrentSchema(schema, schema->HighestFieldId().value());
+ builder->SetDefaultSortOrder(SortOrder::Unsorted());
+ builder->SetDefaultPartitionSpec(PartitionSpec::Unpartitioned());
builder->AssignUUID("new-uuid-5678");
ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
@@ -85,7 +90,7 @@ TEST(TableMetadataBuilderTest, BuildFromEmpty) {
EXPECT_EQ(metadata->format_version, 2);
EXPECT_EQ(metadata->last_sequence_number,
TableMetadata::kInitialSequenceNumber);
EXPECT_EQ(metadata->default_spec_id, PartitionSpec::kInitialSpecId);
- EXPECT_EQ(metadata->default_sort_order_id, SortOrder::kInitialSortOrderId);
+ EXPECT_EQ(metadata->default_sort_order_id, SortOrder::kUnsortedOrderId);
EXPECT_EQ(metadata->current_snapshot_id, Snapshot::kInvalidSnapshotId);
EXPECT_TRUE(metadata->metadata_log.empty());
}
@@ -153,6 +158,10 @@ TEST(TableMetadataBuilderTest, BuildupMetadataLog) {
TEST(TableMetadataBuilderTest, AssignUUID) {
// Assign UUID for new table
auto builder = TableMetadataBuilder::BuildFromEmpty(2);
+ auto schema = CreateTestSchema();
+ builder->SetCurrentSchema(schema, schema->HighestFieldId().value());
+ builder->SetDefaultSortOrder(SortOrder::Unsorted());
+ builder->SetDefaultPartitionSpec(PartitionSpec::Unpartitioned());
builder->AssignUUID("new-uuid-5678");
ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
EXPECT_EQ(metadata->table_uuid, "new-uuid-5678");
@@ -178,6 +187,9 @@ TEST(TableMetadataBuilderTest, AssignUUID) {
// Auto-generate UUID
builder = TableMetadataBuilder::BuildFromEmpty(2);
+ builder->SetCurrentSchema(schema, schema->HighestFieldId().value());
+ builder->SetDefaultSortOrder(SortOrder::Unsorted());
+ builder->SetDefaultPartitionSpec(PartitionSpec::Unpartitioned());
builder->AssignUUID();
ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
EXPECT_FALSE(metadata->table_uuid.empty());
@@ -192,7 +204,8 @@ TEST(TableMetadataBuilderTest, AssignUUID) {
}
TEST(TableMetadataBuilderTest, SetProperties) {
- auto builder = TableMetadataBuilder::BuildFromEmpty(2);
+ auto base = CreateBaseMetadata();
+ auto builder = TableMetadataBuilder::BuildFrom(base.get());
builder->SetProperties({{"key1", "value1"}, {"key2", "value2"}});
ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
@@ -201,7 +214,7 @@ TEST(TableMetadataBuilderTest, SetProperties) {
EXPECT_EQ(metadata->properties.configs().at("key2"), "value2");
// Update existing property and add new one
- builder = TableMetadataBuilder::BuildFromEmpty(2);
+ builder = TableMetadataBuilder::BuildFrom(base.get());
builder->SetProperties({{"key1", "value1"}});
builder->SetProperties({{"key1", "new_value1"}, {"key3", "value3"}});
@@ -212,7 +225,8 @@ TEST(TableMetadataBuilderTest, SetProperties) {
}
TEST(TableMetadataBuilderTest, RemoveProperties) {
- auto builder = TableMetadataBuilder::BuildFromEmpty(2);
+ auto base = CreateBaseMetadata();
+ auto builder = TableMetadataBuilder::BuildFrom(base.get());
builder->SetProperties({{"key1", "value1"}, {"key2", "value2"}, {"key3",
"value3"}});
builder->RemoveProperties({"key2", "key4"}); // key4 does not exist
@@ -224,6 +238,10 @@ TEST(TableMetadataBuilderTest, RemoveProperties) {
TEST(TableMetadataBuilderTest, UpgradeFormatVersion) {
auto builder = TableMetadataBuilder::BuildFromEmpty(1);
+ auto schema = CreateTestSchema();
+ builder->SetCurrentSchema(schema, schema->HighestFieldId().value());
+ builder->SetDefaultSortOrder(SortOrder::Unsorted());
+ builder->SetDefaultPartitionSpec(PartitionSpec::Unpartitioned());
builder->UpgradeFormatVersion(2);
ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
@@ -386,4 +404,348 @@ TEST(TableMetadataBuilderTest,
SetDefaultSortOrderInvalid) {
ASSERT_THAT(builder->Build(), HasErrorMessage("no sort order has been
added"));
}
+// Test AddSchema
+TEST(TableMetadataBuilderTest, AddSchemaBasic) {
+ auto base = CreateBaseMetadata();
+ auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+ // 1. Add a new schema
+ auto field1 = SchemaField::MakeRequired(4, "new_field1", int64());
+ auto field2 = SchemaField::MakeRequired(5, "new_field2", float64());
+ auto new_schema = std::make_shared<Schema>(std::vector<SchemaField>{field1,
field2}, 1);
+ builder->AddSchema(new_schema);
+ ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+ ASSERT_EQ(metadata->schemas.size(), 2);
+ EXPECT_EQ(metadata->schemas[1]->schema_id().value(), 1);
+ EXPECT_EQ(metadata->last_column_id, 5);
+
+ // 2. Add duplicate schema - should be idempotent
+ builder = TableMetadataBuilder::BuildFrom(base.get());
+ auto schema1 = std::make_shared<Schema>(std::vector<SchemaField>{field1,
field2}, 1);
+ auto schema2 = std::make_shared<Schema>(std::vector<SchemaField>{field1,
field2}, 2);
+ builder->AddSchema(schema1);
+ builder->AddSchema(schema2); // Same fields, should reuse ID
+ ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+ ASSERT_EQ(metadata->schemas.size(), 2); // Only one new schema added
+
+ // 3. Add multiple different schemas
+ builder = TableMetadataBuilder::BuildFrom(base.get());
+ auto field3 = SchemaField::MakeRequired(6, "field3", string());
+ auto schema3 = std::make_shared<Schema>(std::vector<SchemaField>{field1,
field2}, 1);
+ auto schema4 = std::make_shared<Schema>(std::vector<SchemaField>{field1,
field3}, 2);
+ builder->AddSchema(schema3);
+ builder->AddSchema(schema4);
+ ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+ ASSERT_EQ(metadata->schemas.size(), 3);
+ EXPECT_EQ(metadata->schemas[1]->schema_id().value(), 1);
+ EXPECT_EQ(metadata->schemas[2]->schema_id().value(), 2);
+ EXPECT_EQ(metadata->last_column_id, 6);
+}
+
+TEST(TableMetadataBuilderTest, AddSchemaInvalidColumnIds) {
+ auto base = CreateBaseMetadata();
+ auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+ // Try to add schema with column ID lower than existing last_column_id
+ auto field1 =
+ SchemaField::MakeRequired(2, "duplicate_id", int64()); // ID 2 already
exists
+ auto invalid_schema =
std::make_shared<Schema>(std::vector<SchemaField>{field1}, 1);
+ builder->AddSchema(invalid_schema);
+ ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+ // Should still work - AddSchema automatically uses max(existing,
highest_in_schema)
+ ASSERT_EQ(metadata->schemas.size(), 2);
+ EXPECT_EQ(metadata->last_column_id, 3); // Should remain 3 (from base
metadata)
+}
+
+TEST(TableMetadataBuilderTest, AddSchemaWithHigherColumnIds) {
+ auto base = CreateBaseMetadata();
+ auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+ // Add schema with higher column IDs
+ auto field1 = SchemaField::MakeRequired(10, "high_id1", int64());
+ auto field2 = SchemaField::MakeRequired(15, "high_id2", string());
+ auto new_schema = std::make_shared<Schema>(std::vector<SchemaField>{field1,
field2}, 1);
+ builder->AddSchema(new_schema);
+ ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+ ASSERT_EQ(metadata->schemas.size(), 2);
+ EXPECT_EQ(metadata->last_column_id, 15); // Should be updated to highest
field ID
+}
+
+TEST(TableMetadataBuilderTest, AddSchemaEmptyFields) {
+ auto base = CreateBaseMetadata();
+ auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+ // Add schema with no fields
+ auto empty_schema = std::make_shared<Schema>(std::vector<SchemaField>{}, 1);
+ builder->AddSchema(empty_schema);
+ ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+ ASSERT_EQ(metadata->schemas.size(), 2);
+ EXPECT_EQ(metadata->last_column_id, 3); // Should remain unchanged
+}
+
+// Test SetCurrentSchema
+TEST(TableMetadataBuilderTest, SetCurrentSchemaBasic) {
+ auto base = CreateBaseMetadata();
+ auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+ // 1. Set current schema by Schema object with explicit last_column_id
+ auto field1 = SchemaField::MakeRequired(4, "new_field", int64());
+ auto new_schema = std::make_shared<Schema>(std::vector<SchemaField>{field1},
1);
+ builder->SetCurrentSchema(new_schema, 4);
+ ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+ ASSERT_EQ(metadata->schemas.size(), 2);
+ EXPECT_EQ(metadata->current_schema_id.value(), 1);
+ EXPECT_EQ(metadata->schemas[1]->schema_id().value(), 1);
+ EXPECT_EQ(metadata->last_column_id, 4);
+
+ // 2. Set current schema by schema ID
+ builder = TableMetadataBuilder::BuildFrom(base.get());
+ auto schema1 = std::make_shared<Schema>(std::vector<SchemaField>{field1}, 1);
+ builder->AddSchema(schema1);
+ builder->SetCurrentSchema(1);
+ ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+ EXPECT_EQ(metadata->current_schema_id.value(), 1);
+
+ // 3. Set current schema using -1 (last added)
+ builder = TableMetadataBuilder::BuildFrom(base.get());
+ auto field2 = SchemaField::MakeRequired(5, "another_field", float64());
+ auto schema2 = std::make_shared<Schema>(std::vector<SchemaField>{field2}, 2);
+ builder->AddSchema(schema2);
+ builder->SetCurrentSchema(-1); // Use last added
+ ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+ EXPECT_EQ(metadata->current_schema_id.value(), 1);
+
+ // 4. Setting same schema is no-op
+ builder = TableMetadataBuilder::BuildFrom(base.get());
+ builder->SetCurrentSchema(0);
+ ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+ EXPECT_EQ(metadata->current_schema_id.value(), 0);
+}
+
+TEST(TableMetadataBuilderTest, SetCurrentSchemaWithInvalidLastColumnId) {
+ auto base = CreateBaseMetadata();
+ auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+ // Try to set current schema with last_column_id lower than existing
+ auto field1 = SchemaField::MakeRequired(4, "new_field", int64());
+ auto new_schema = std::make_shared<Schema>(std::vector<SchemaField>{field1},
1);
+ builder->SetCurrentSchema(new_schema, 2); // 2 < 3 (existing last_column_id)
+ ASSERT_THAT(builder->Build(), IsError(ErrorKind::kValidationFailed));
+ ASSERT_THAT(builder->Build(), HasErrorMessage("Invalid last column ID"));
+}
+
+TEST(TableMetadataBuilderTest, SetCurrentSchemaUpdatesLastColumnId) {
+ auto base = CreateBaseMetadata();
+ auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+ // Set current schema with higher last_column_id
+ auto field1 = SchemaField::MakeRequired(4, "new_field1", int64());
+ auto field2 = SchemaField::MakeRequired(8, "new_field2", string());
+ auto new_schema = std::make_shared<Schema>(std::vector<SchemaField>{field1,
field2}, 1);
+ builder->SetCurrentSchema(new_schema, 10); // Higher than field IDs
+ ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+ EXPECT_EQ(metadata->current_schema_id.value(), 1);
+ EXPECT_EQ(metadata->last_column_id, 10);
+}
+
+TEST(TableMetadataBuilderTest, SetCurrentSchemaInvalid) {
+ auto base = CreateBaseMetadata();
+
+ // 1. Try to use -1 (last added) when no schema has been added
+ auto builder = TableMetadataBuilder::BuildFrom(base.get());
+ builder->SetCurrentSchema(-1);
+ ASSERT_THAT(builder->Build(), IsError(ErrorKind::kValidationFailed));
+ ASSERT_THAT(builder->Build(), HasErrorMessage("no schema has been added"));
+
+ // 2. Try to set non-existent schema ID
+ builder = TableMetadataBuilder::BuildFrom(base.get());
+ builder->SetCurrentSchema(999);
+ ASSERT_THAT(builder->Build(), IsError(ErrorKind::kValidationFailed));
+ ASSERT_THAT(builder->Build(), HasErrorMessage("unknown schema: 999"));
+}
+
+// Test schema evolution: SetCurrentSchema should rebuild partition specs and
sort orders
+TEST(TableMetadataBuilderTest, SetCurrentSchemaRebuildsSpecsAndOrders) {
+ auto base = CreateBaseMetadata();
+
+ // Add a partition spec to the base metadata
+ auto schema = CreateTestSchema();
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto spec,
+ PartitionSpec::Make(PartitionSpec::kInitialSpecId,
+ {PartitionField(1, 1000, "id_bucket",
Transform::Bucket(16))},
+ 1000));
+ base->partition_specs.push_back(std::move(spec));
+
+ // Add a sort order to the base metadata
+ SortField sort_field(1, Transform::Identity(), SortDirection::kAscending,
+ NullOrder::kFirst);
+ ICEBERG_UNWRAP_OR_FAIL(auto order,
+ SortOrder::Make(*schema, 1,
std::vector<SortField>{sort_field}));
+ base->sort_orders.push_back(std::move(order));
+ base->default_sort_order_id = 1;
+
+ auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+ // Add and set a new schema
+ std::vector<SchemaField> new_fields{schema->fields().begin(),
schema->fields().end()};
+ new_fields.push_back(SchemaField::MakeRequired(4, "new_id", int64()));
+ new_fields.push_back(SchemaField::MakeRequired(5, "new_data", string()));
+ auto new_schema = std::make_shared<Schema>(std::move(new_fields), 1);
+ builder->SetCurrentSchema(new_schema, 5);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+
+ // Verify schema was set
+ EXPECT_EQ(metadata->current_schema_id.value(), 1);
+
+ // Verify partition specs were rebuilt (they should still exist)
+ ASSERT_EQ(metadata->partition_specs.size(), 2);
+
+ // Verify sort orders were rebuilt (they should still exist)
+ ASSERT_EQ(metadata->sort_orders.size(), 2);
+}
+
+// Test RemoveSchemas
+TEST(TableMetadataBuilderTest, RemoveSchemasBasic) {
+ auto base = CreateBaseMetadata();
+ auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+ // Add multiple schemas
+ auto field1 = SchemaField::MakeRequired(4, "field1", int64());
+ auto schema1 = std::make_shared<Schema>(std::vector<SchemaField>{field1}, 1);
+ auto field2 = SchemaField::MakeRequired(5, "field2", float64());
+ auto schema2 = std::make_shared<Schema>(std::vector<SchemaField>{field2}, 2);
+ auto field3 = SchemaField::MakeRequired(6, "field3", string());
+ auto schema3 = std::make_shared<Schema>(std::vector<SchemaField>{field3}, 3);
+
+ builder->AddSchema(schema1);
+ builder->AddSchema(schema2);
+ builder->AddSchema(schema3);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+ ASSERT_EQ(metadata->schemas.size(), 4); // Original + 3 new
+
+ // Remove one schema
+ builder = TableMetadataBuilder::BuildFrom(metadata.get());
+ builder->RemoveSchemas({1});
+ ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+ ASSERT_EQ(metadata->schemas.size(), 3);
+ EXPECT_EQ(metadata->schemas[0]->schema_id().value(), 0);
+ EXPECT_EQ(metadata->schemas[1]->schema_id().value(), 2);
+ EXPECT_EQ(metadata->schemas[2]->schema_id().value(), 3);
+
+ // Remove multiple schemas
+ builder = TableMetadataBuilder::BuildFrom(metadata.get());
+ builder->RemoveSchemas({2, 3});
+ ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+ ASSERT_EQ(metadata->schemas.size(), 1);
+ EXPECT_EQ(metadata->schemas[0]->schema_id().value(),
Schema::kInitialSchemaId);
+}
+
+TEST(TableMetadataBuilderTest, RemoveSchemasCannotRemoveCurrent) {
+ auto base = CreateBaseMetadata();
+ auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+ // Add a new schema
+ auto field1 = SchemaField::MakeRequired(4, "field1", int64());
+ auto schema1 = std::make_shared<Schema>(std::vector<SchemaField>{field1}, 1);
+ builder->AddSchema(schema1);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+ ASSERT_EQ(metadata->schemas.size(), 2);
+ EXPECT_EQ(metadata->current_schema_id.value(), 0);
+
+ // Try to remove current schema (ID 0)
+ builder = TableMetadataBuilder::BuildFrom(metadata.get());
+ builder->RemoveSchemas({0});
+ ASSERT_THAT(builder->Build(), IsError(ErrorKind::kValidationFailed));
+ ASSERT_THAT(builder->Build(), HasErrorMessage("Cannot remove current schema:
0"));
+
+ // Try to remove current schema along with others
+ builder = TableMetadataBuilder::BuildFrom(metadata.get());
+ builder->RemoveSchemas({0, 1});
+ ASSERT_THAT(builder->Build(), IsError(ErrorKind::kValidationFailed));
+ ASSERT_THAT(builder->Build(), HasErrorMessage("Cannot remove current schema:
0"));
+}
+
+TEST(TableMetadataBuilderTest, RemoveSchemasNonExistent) {
+ auto base = CreateBaseMetadata();
+ auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+ // Add one schema
+ auto field1 = SchemaField::MakeRequired(4, "field1", int64());
+ auto schema1 = std::make_shared<Schema>(std::vector<SchemaField>{field1}, 1);
+ builder->AddSchema(schema1);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+ ASSERT_EQ(metadata->schemas.size(), 2);
+
+ // Try to remove non-existent schema - should be no-op
+ builder = TableMetadataBuilder::BuildFrom(metadata.get());
+ builder->RemoveSchemas({999});
+ ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+ ASSERT_EQ(metadata->schemas.size(), 2);
+
+ // Remove mix of existent and non-existent
+ builder = TableMetadataBuilder::BuildFrom(metadata.get());
+ builder->RemoveSchemas({1, 999, 888});
+ ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+ ASSERT_EQ(metadata->schemas.size(), 1);
+ EXPECT_EQ(metadata->schemas[0]->schema_id().value(),
Schema::kInitialSchemaId);
+}
+
+TEST(TableMetadataBuilderTest, RemoveSchemasEmptySet) {
+ auto base = CreateBaseMetadata();
+ auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+ // Add a schema
+ auto field1 = SchemaField::MakeRequired(4, "field1", int64());
+ auto schema1 = std::make_shared<Schema>(std::vector<SchemaField>{field1}, 1);
+ builder->AddSchema(schema1);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+ ASSERT_EQ(metadata->schemas.size(), 2);
+
+ // Remove empty set - should be no-op
+ builder = TableMetadataBuilder::BuildFrom(metadata.get());
+ builder->RemoveSchemas({});
+ ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+ ASSERT_EQ(metadata->schemas.size(), 2);
+}
+
+TEST(TableMetadataBuilderTest, RemoveSchemasAfterSchemaChange) {
+ auto base = CreateBaseMetadata();
+ auto builder = TableMetadataBuilder::BuildFrom(base.get());
+
+ // Add multiple schemas
+ auto field1 = SchemaField::MakeRequired(4, "field1", int64());
+ auto schema1 = std::make_shared<Schema>(std::vector<SchemaField>{field1}, 1);
+ auto field2 = SchemaField::MakeRequired(5, "field2", float64());
+ auto schema2 = std::make_shared<Schema>(std::vector<SchemaField>{field2}, 2);
+
+ builder->AddSchema(schema1);
+ builder->AddSchema(schema2);
+ builder->SetCurrentSchema(1); // Set schema1 as current
+
+ ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+ ASSERT_EQ(metadata->schemas.size(), 3);
+ EXPECT_EQ(metadata->current_schema_id.value(), 1);
+
+ // Now remove the old current schema (ID 0)
+ builder = TableMetadataBuilder::BuildFrom(metadata.get());
+ builder->RemoveSchemas({0});
+ ICEBERG_UNWRAP_OR_FAIL(metadata, builder->Build());
+ ASSERT_EQ(metadata->schemas.size(), 2);
+ EXPECT_EQ(metadata->current_schema_id.value(), 1);
+ EXPECT_EQ(metadata->schemas[0]->schema_id().value(), 1);
+ EXPECT_EQ(metadata->schemas[1]->schema_id().value(), 2);
+
+ // Cannot remove the new current schema
+ builder = TableMetadataBuilder::BuildFrom(metadata.get());
+ builder->RemoveSchemas({1});
+ ASSERT_THAT(builder->Build(), IsError(ErrorKind::kValidationFailed));
+ ASSERT_THAT(builder->Build(), HasErrorMessage("Cannot remove current schema:
1"));
+}
+
} // namespace iceberg
diff --git a/src/iceberg/test/table_requirements_test.cc
b/src/iceberg/test/table_requirements_test.cc
index 3278eb88..bbbcc681 100644
--- a/src/iceberg/test/table_requirements_test.cc
+++ b/src/iceberg/test/table_requirements_test.cc
@@ -623,7 +623,8 @@ TEST(TableRequirementsTest, RemoveSchemas) {
metadata->current_schema_id = 3;
std::vector<std::unique_ptr<TableUpdate>> updates;
-
updates.push_back(std::make_unique<table::RemoveSchemas>(std::vector<int32_t>{1,
2}));
+ updates.push_back(
+ std::make_unique<table::RemoveSchemas>(std::unordered_set<int32_t>{1,
2}));
auto result = TableRequirements::ForUpdateTable(*metadata, updates);
ASSERT_THAT(result, IsOk());
@@ -652,7 +653,8 @@ TEST(TableRequirementsTest, RemoveSchemasWithBranch) {
AddBranch(*metadata, "branch", 42);
std::vector<std::unique_ptr<TableUpdate>> updates;
-
updates.push_back(std::make_unique<table::RemoveSchemas>(std::vector<int32_t>{1,
2}));
+ updates.push_back(
+ std::make_unique<table::RemoveSchemas>(std::unordered_set<int32_t>{1,
2}));
auto result = TableRequirements::ForUpdateTable(*metadata, updates);
ASSERT_THAT(result, IsOk());
@@ -675,7 +677,8 @@ TEST(TableRequirementsTest,
RemoveSchemasWithSchemaChangedFailure) {
metadata->current_schema_id = 3;
std::vector<std::unique_ptr<TableUpdate>> updates;
-
updates.push_back(std::make_unique<table::RemoveSchemas>(std::vector<int32_t>{1,
2}));
+ updates.push_back(
+ std::make_unique<table::RemoveSchemas>(std::unordered_set<int32_t>{1,
2}));
auto result = TableRequirements::ForUpdateTable(*metadata, updates);
ASSERT_THAT(result, IsOk());
@@ -703,7 +706,8 @@ TEST(TableRequirementsTest,
RemoveSchemasWithBranchChangedFailure) {
AddBranch(*metadata, "test", 42);
std::vector<std::unique_ptr<TableUpdate>> updates;
-
updates.push_back(std::make_unique<table::RemoveSchemas>(std::vector<int32_t>{1,
2}));
+ updates.push_back(
+ std::make_unique<table::RemoveSchemas>(std::unordered_set<int32_t>{1,
2}));
auto result = TableRequirements::ForUpdateTable(*metadata, updates);
ASSERT_THAT(result, IsOk());
@@ -1074,7 +1078,8 @@ TEST(TableRequirementsTest,
ReplaceTableDoesNotAddBranchRequirements) {
AddBranch(*metadata, "branch", 42);
std::vector<std::unique_ptr<TableUpdate>> updates;
-
updates.push_back(std::make_unique<table::RemoveSchemas>(std::vector<int32_t>{1,
2}));
+ updates.push_back(
+ std::make_unique<table::RemoveSchemas>(std::unordered_set<int32_t>{1,
2}));
auto result = TableRequirements::ForReplaceTable(*metadata, updates);
ASSERT_THAT(result, IsOk());
diff --git a/src/iceberg/test/table_update_test.cc
b/src/iceberg/test/table_update_test.cc
index 041cfcd2..44a37f98 100644
--- a/src/iceberg/test/table_update_test.cc
+++ b/src/iceberg/test/table_update_test.cc
@@ -71,11 +71,12 @@ std::unique_ptr<TableMetadata> CreateBaseMetadata() {
metadata->last_column_id = 3;
metadata->current_schema_id = 0;
metadata->schemas.push_back(CreateTestSchema());
+ metadata->partition_specs.push_back(PartitionSpec::Unpartitioned());
metadata->default_spec_id = PartitionSpec::kInitialSpecId;
metadata->last_partition_id = 0;
metadata->current_snapshot_id = Snapshot::kInvalidSnapshotId;
- metadata->default_sort_order_id = SortOrder::kInitialSortOrderId;
metadata->sort_orders.push_back(SortOrder::Unsorted());
+ metadata->default_sort_order_id = SortOrder::kUnsortedOrderId;
metadata->next_row_id = TableMetadata::kInitialRowId;
return metadata;
}
@@ -286,7 +287,8 @@ INSTANTIATE_TEST_SUITE_P(
.test_name = "RemoveSchemas",
.update_factory =
[] {
- return
std::make_unique<table::RemoveSchemas>(std::vector<int>{1});
+ return std::make_unique<table::RemoveSchemas>(
+ std::unordered_set<int>{1});
},
.expected_existing_table_count = 1,
.validator =
diff --git a/src/iceberg/test/update_partition_spec_test.cc
b/src/iceberg/test/update_partition_spec_test.cc
index 85d9cc52..31dbbae6 100644
--- a/src/iceberg/test/update_partition_spec_test.cc
+++ b/src/iceberg/test/update_partition_spec_test.cc
@@ -131,7 +131,7 @@ class UpdatePartitionSpecTest : public
::testing::TestWithParam<int8_t> {
metadata->default_spec_id = spec->spec_id();
metadata->last_partition_id = spec->last_assigned_field_id();
metadata->current_snapshot_id = Snapshot::kInvalidSnapshotId;
- metadata->default_sort_order_id = SortOrder::kInitialSortOrderId;
+ metadata->default_sort_order_id = SortOrder::kUnsortedOrderId;
metadata->sort_orders.push_back(SortOrder::Unsorted());
metadata->next_row_id = TableMetadata::kInitialRowId;
metadata->properties = TableProperties::default_properties();