This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 4557a6bb feat: add support to update partition spec (#401)
4557a6bb is described below

commit 4557a6bbe39d3e5234d33b33b94f29d82ffa0732
Author: Junwang Zhao <[email protected]>
AuthorDate: Thu Dec 25 17:02:06 2025 +0800

    feat: add support to update partition spec (#401)
---
 src/iceberg/CMakeLists.txt                     |   3 +-
 src/iceberg/meson.build                        |   1 +
 src/iceberg/partition_spec.cc                  |  31 +
 src/iceberg/partition_spec.h                   |   7 +
 src/iceberg/table.cc                           |  10 +
 src/iceberg/table.h                            |   4 +
 src/iceberg/table_metadata.cc                  | 104 +++-
 src/iceberg/table_update.cc                    |   4 +-
 src/iceberg/test/CMakeLists.txt                |   1 +
 src/iceberg/test/update_partition_spec_test.cc | 822 +++++++++++++++++++++++++
 src/iceberg/transaction.cc                     |  17 +
 src/iceberg/transaction.h                      |   4 +
 src/iceberg/transform.cc                       |  21 +
 src/iceberg/transform.h                        |   5 +
 src/iceberg/transform_function.h               |   3 +
 src/iceberg/type_fwd.h                         |   4 +
 src/iceberg/update/meson.build                 |   7 +-
 src/iceberg/update/pending_update.h            |   1 +
 src/iceberg/update/update_partition_spec.cc    | 455 ++++++++++++++
 src/iceberg/update/update_partition_spec.h     | 203 ++++++
 src/iceberg/util/macros.h                      |  10 +-
 src/iceberg/util/string_util.h                 |  10 +
 22 files changed, 1711 insertions(+), 16 deletions(-)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index e48afbfd..519757d2 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -76,8 +76,9 @@ set(ICEBERG_SOURCES
     transform_function.cc
     type.cc
     update/pending_update.cc
-    update/update_sort_order.cc
+    update/update_partition_spec.cc
     update/update_properties.cc
+    update/update_sort_order.cc
     util/bucket_util.cc
     util/conversions.cc
     util/decimal.cc
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 7c1011fc..8327ca2e 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -98,6 +98,7 @@ iceberg_sources = files(
     'transform_function.cc',
     'type.cc',
     'update/pending_update.cc',
+    'update/update_partition_spec.cc',
     'update/update_properties.cc',
     'update/update_sort_order.cc',
     'util/bucket_util.cc',
diff --git a/src/iceberg/partition_spec.cc b/src/iceberg/partition_spec.cc
index b0f1144c..7d0dab40 100644
--- a/src/iceberg/partition_spec.cc
+++ b/src/iceberg/partition_spec.cc
@@ -20,6 +20,7 @@
 #include "iceberg/partition_spec.h"
 
 #include <algorithm>
+#include <cstddef>
 #include <cstdint>
 #include <format>
 #include <memory>
@@ -95,6 +96,27 @@ Result<std::unique_ptr<StructType>> 
PartitionSpec::PartitionType(
   return std::make_unique<StructType>(std::move(partition_fields));
 }
 
+bool PartitionSpec::CompatibleWith(const PartitionSpec& other) const {
+  if (Equals(other)) {
+    return true;
+  }
+
+  if (fields_.size() != other.fields_.size()) {
+    return false;
+  }
+
+  for (const auto& [lhs, rhs] :
+       std::ranges::zip_view<std::span<const PartitionField>,
+                             std::span<const PartitionField>>{fields_, 
other.fields_}) {
+    if (lhs.source_id() != rhs.source_id() || *lhs.transform() != 
*rhs.transform() ||
+        lhs.name() != rhs.name()) {
+      return false;
+    }
+  }
+
+  return true;
+}
+
 std::string PartitionSpec::ToString() const {
   std::string repr = std::format("partition_spec[spec_id<{}>,\n", spec_id_);
   for (const auto& field : fields_) {
@@ -191,4 +213,13 @@ Result<std::unique_ptr<PartitionSpec>> PartitionSpec::Make(
       new PartitionSpec(spec_id, std::move(fields), last_assigned_field_id));
 }
 
+bool PartitionSpec::HasSequentialFieldIds(const PartitionSpec& spec) {
+  for (size_t i = 0; i < spec.fields().size(); i += 1) {
+    if (spec.fields()[i].field_id() != 
PartitionSpec::kLegacyPartitionDataIdStart + i) {
+      return false;
+    }
+  }
+  return true;
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/partition_spec.h b/src/iceberg/partition_spec.h
index 0d1a78f1..5fab5952 100644
--- a/src/iceberg/partition_spec.h
+++ b/src/iceberg/partition_spec.h
@@ -64,6 +64,11 @@ class ICEBERG_EXPORT PartitionSpec : public 
util::Formattable {
   /// \brief Get the partition type binding to the input schema.
   Result<std::unique_ptr<StructType>> PartitionType(const Schema& schema) 
const;
 
+  /// \brief Returns true if this spec is equivalent to the other, with 
partition field
+  /// ids ignored. That is, if both specs have the same number of fields, 
field order,
+  /// field name, source columns, and transforms.
+  bool CompatibleWith(const PartitionSpec& other) const;
+
   std::string ToString() const override;
 
   int32_t last_assigned_field_id() const { return last_assigned_field_id_; }
@@ -111,6 +116,8 @@ class ICEBERG_EXPORT PartitionSpec : public 
util::Formattable {
       int32_t spec_id, std::vector<PartitionField> fields,
       std::optional<int32_t> last_assigned_field_id = std::nullopt);
 
+  static bool HasSequentialFieldIds(const PartitionSpec& spec);
+
  private:
   /// \brief Create a new partition spec.
   ///
diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc
index 38afdedd..6b4d317b 100644
--- a/src/iceberg/table.cc
+++ b/src/iceberg/table.cc
@@ -19,6 +19,8 @@
 
 #include "iceberg/table.h"
 
+#include <memory>
+
 #include "iceberg/catalog.h"
 #include "iceberg/partition_spec.h"
 #include "iceberg/result.h"
@@ -28,6 +30,7 @@
 #include "iceberg/table_properties.h"
 #include "iceberg/table_scan.h"
 #include "iceberg/transaction.h"
+#include "iceberg/update/update_partition_spec.h"
 #include "iceberg/update/update_properties.h"
 #include "iceberg/util/macros.h"
 
@@ -147,6 +150,13 @@ Result<std::shared_ptr<Transaction>> 
Table::NewTransaction() {
                            /*auto_commit=*/false);
 }
 
+Result<std::shared_ptr<UpdatePartitionSpec>> Table::NewUpdatePartitionSpec() {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto transaction, Transaction::Make(shared_from_this(), 
Transaction::Kind::kUpdate,
+                                          /*auto_commit=*/true));
+  return transaction->NewUpdatePartitionSpec();
+}
+
 Result<std::shared_ptr<UpdateProperties>> Table::NewUpdateProperties() {
   ICEBERG_ASSIGN_OR_RAISE(
       auto transaction, Transaction::Make(shared_from_this(), 
Transaction::Kind::kUpdate,
diff --git a/src/iceberg/table.h b/src/iceberg/table.h
index 317aea01..30ad14c1 100644
--- a/src/iceberg/table.h
+++ b/src/iceberg/table.h
@@ -128,6 +128,10 @@ class ICEBERG_EXPORT Table : public 
std::enable_shared_from_this<Table> {
   /// \brief Create a new Transaction to commit multiple table operations at 
once.
   virtual Result<std::shared_ptr<Transaction>> NewTransaction();
 
+  /// \brief Create a new UpdatePartitionSpec to update the partition spec of 
this table
+  /// and commit the changes.
+  virtual Result<std::shared_ptr<UpdatePartitionSpec>> 
NewUpdatePartitionSpec();
+
   /// \brief Create a new UpdateProperties to update table properties and 
commit the
   /// changes.
   virtual Result<std::shared_ptr<UpdateProperties>> NewUpdateProperties();
diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc
index 2d77aef4..ba5b8f32 100644
--- a/src/iceberg/table_metadata.cc
+++ b/src/iceberg/table_metadata.cc
@@ -37,6 +37,7 @@
 #include "iceberg/exception.h"
 #include "iceberg/file_io.h"
 #include "iceberg/json_internal.h"
+#include "iceberg/partition_field.h"
 #include "iceberg/partition_spec.h"
 #include "iceberg/result.h"
 #include "iceberg/schema.h"
@@ -44,6 +45,7 @@
 #include "iceberg/sort_order.h"
 #include "iceberg/table_properties.h"
 #include "iceberg/table_update.h"
+#include "iceberg/util/checked_cast.h"
 #include "iceberg/util/error_collector.h"
 #include "iceberg/util/gzip_internal.h"
 #include "iceberg/util/location_util.h"
@@ -428,7 +430,8 @@ class TableMetadataBuilder::Impl {
   Result<int32_t> AddSortOrder(const SortOrder& order);
   Status SetProperties(const std::unordered_map<std::string, std::string>& 
updated);
   Status RemoveProperties(const std::unordered_set<std::string>& removed);
-
+  Status SetDefaultPartitionSpec(int32_t spec_id);
+  Result<int32_t> AddPartitionSpec(const PartitionSpec& spec);
   std::unique_ptr<TableMetadata> Build();
 
  private:
@@ -438,6 +441,12 @@ class TableMetadataBuilder::Impl {
   /// \return The ID to use for this sort order (reused if exists, new 
otherwise)
   int32_t ReuseOrCreateNewSortOrderId(const SortOrder& new_order);
 
+  /// \brief Internal method to check for existing partition spec and reuse 
its ID or
+  /// create a new one
+  /// \param new_spec The partition spec to check
+  /// \return The ID to use for this partition spec (reused if exists, new 
otherwise)
+  int32_t ReuseOrCreateNewPartitionSpecId(const PartitionSpec& new_spec);
+
  private:
   // Base metadata (nullptr for new tables)
   const TableMetadata* base_;
@@ -540,9 +549,10 @@ Result<int32_t> 
TableMetadataBuilder::Impl::AddSortOrder(const SortOrder& order)
     bool is_new_order =
         last_added_order_id_.has_value() &&
         std::ranges::find_if(changes_, [new_order_id](const auto& change) {
-          auto* add_sort_order = 
dynamic_cast<table::AddSortOrder*>(change.get());
-          return add_sort_order &&
-                 add_sort_order->sort_order()->order_id() == new_order_id;
+          return change->kind() == TableUpdate::Kind::kAddSortOrder &&
+                 internal::checked_cast<const table::AddSortOrder&>(*change)
+                         .sort_order()
+                         ->order_id() == new_order_id;
         }) != changes_.cend();
     last_added_order_id_ = is_new_order ? std::make_optional(new_order_id) : 
std::nullopt;
     return new_order_id;
@@ -572,6 +582,69 @@ Result<int32_t> 
TableMetadataBuilder::Impl::AddSortOrder(const SortOrder& order)
   return new_order_id;
 }
 
+Status TableMetadataBuilder::Impl::SetDefaultPartitionSpec(int32_t spec_id) {
+  if (spec_id == -1) {
+    if (!last_added_spec_id_.has_value()) {
+      return ValidationFailed(
+          "Cannot set last added partition spec: no partition spec has been 
added");
+    }
+    return SetDefaultPartitionSpec(last_added_spec_id_.value());
+  }
+
+  if (spec_id == metadata_.default_spec_id) {
+    // the new spec is already current and no change is needed
+    return {};
+  }
+
+  metadata_.default_spec_id = spec_id;
+  if (last_added_spec_id_ == std::make_optional(spec_id)) {
+    
changes_.push_back(std::make_unique<table::SetDefaultPartitionSpec>(kLastAdded));
+  } else {
+    
changes_.push_back(std::make_unique<table::SetDefaultPartitionSpec>(spec_id));
+  }
+  return {};
+}
+
+Result<int32_t> TableMetadataBuilder::Impl::AddPartitionSpec(const 
PartitionSpec& spec) {
+  int32_t new_spec_id = ReuseOrCreateNewPartitionSpecId(spec);
+
+  if (specs_by_id_.contains(new_spec_id)) {
+    // update last_added_spec_id if the spec was added in this set of changes 
(since it
+    // is now the last)
+    bool is_new_spec =
+        last_added_spec_id_.has_value() &&
+        std::ranges::find_if(changes_, [new_spec_id](const auto& change) {
+          return change->kind() == TableUpdate::Kind::kAddPartitionSpec &&
+                 internal::checked_cast<const 
table::AddPartitionSpec&>(*change)
+                         .spec()
+                         ->spec_id() == new_spec_id;
+        }) != changes_.cend();
+    last_added_spec_id_ = is_new_spec ? std::make_optional(new_spec_id) : 
std::nullopt;
+    return new_spec_id;
+  }
+
+  // Get current schema and validate the partition spec against it
+  ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata_.Schema());
+  ICEBERG_RETURN_UNEXPECTED(spec.Validate(*schema, 
/*allow_missing_fields=*/false));
+  ICEBERG_CHECK(
+      metadata_.format_version > 1 || 
PartitionSpec::HasSequentialFieldIds(spec),
+      "Spec does not use sequential IDs that are required in v1: {}", 
spec.ToString());
+
+  ICEBERG_ASSIGN_OR_RAISE(
+      std::shared_ptr<PartitionSpec> new_spec,
+      PartitionSpec::Make(new_spec_id, 
std::vector<PartitionField>(spec.fields().begin(),
+                                                                   
spec.fields().end())));
+  metadata_.last_partition_id =
+      std::max(metadata_.last_partition_id, 
new_spec->last_assigned_field_id());
+  metadata_.partition_specs.push_back(new_spec);
+  specs_by_id_.emplace(new_spec_id, new_spec);
+
+  changes_.push_back(std::make_unique<table::AddPartitionSpec>(new_spec));
+  last_added_spec_id_ = new_spec_id;
+
+  return new_spec_id;
+}
+
 Status TableMetadataBuilder::Impl::SetProperties(
     const std::unordered_map<std::string, std::string>& updated) {
   // If updated is empty, return early (no-op)
@@ -653,6 +726,20 @@ int32_t 
TableMetadataBuilder::Impl::ReuseOrCreateNewSortOrderId(
   return new_order_id;
 }
 
+int32_t TableMetadataBuilder::Impl::ReuseOrCreateNewPartitionSpecId(
+    const PartitionSpec& new_spec) {
+  // if the spec already exists, use the same ID. otherwise, use the highest 
ID + 1.
+  int32_t new_spec_id = PartitionSpec::kInitialSpecId;
+  for (const auto& spec : metadata_.partition_specs) {
+    if (new_spec.CompatibleWith(*spec)) {
+      return spec->spec_id();
+    } else if (new_spec_id <= spec->spec_id()) {
+      new_spec_id = spec->spec_id() + 1;
+    }
+  }
+  return new_spec_id;
+}
+
 TableMetadataBuilder::TableMetadataBuilder(int8_t format_version)
     : impl_(std::make_unique<Impl>(format_version)) {}
 
@@ -723,16 +810,19 @@ TableMetadataBuilder& 
TableMetadataBuilder::AddSchema(std::shared_ptr<Schema> sc
 
 TableMetadataBuilder& TableMetadataBuilder::SetDefaultPartitionSpec(
     std::shared_ptr<PartitionSpec> spec) {
-  throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto spec_id, 
impl_->AddPartitionSpec(*spec));
+  return SetDefaultPartitionSpec(spec_id);
 }
 
 TableMetadataBuilder& TableMetadataBuilder::SetDefaultPartitionSpec(int32_t 
spec_id) {
-  throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+  ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetDefaultPartitionSpec(spec_id));
+  return *this;
 }
 
 TableMetadataBuilder& TableMetadataBuilder::AddPartitionSpec(
     std::shared_ptr<PartitionSpec> spec) {
-  throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto spec_id, 
impl_->AddPartitionSpec(*spec));
+  return *this;
 }
 
 TableMetadataBuilder& TableMetadataBuilder::RemovePartitionSpecs(
diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc
index 87fcd134..0fc27532 100644
--- a/src/iceberg/table_update.cc
+++ b/src/iceberg/table_update.cc
@@ -72,7 +72,7 @@ void 
SetCurrentSchema::GenerateRequirements(TableUpdateContext& context) const {
 // AddPartitionSpec
 
 void AddPartitionSpec::ApplyTo(TableMetadataBuilder& builder) const {
-  throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+  builder.AddPartitionSpec(spec_);
 }
 
 void AddPartitionSpec::GenerateRequirements(TableUpdateContext& context) const 
{
@@ -82,7 +82,7 @@ void 
AddPartitionSpec::GenerateRequirements(TableUpdateContext& context) const {
 // SetDefaultPartitionSpec
 
 void SetDefaultPartitionSpec::ApplyTo(TableMetadataBuilder& builder) const {
-  throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+  builder.SetDefaultPartitionSpec(spec_id_);
 }
 
 void SetDefaultPartitionSpec::GenerateRequirements(TableUpdateContext& 
context) const {
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 7e71310d..4b2c0f47 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -156,6 +156,7 @@ if(ICEBERG_BUILD_BUNDLE)
                    USE_BUNDLE
                    SOURCES
                    transaction_test.cc
+                   update_partition_spec_test.cc
                    update_properties_test.cc
                    update_sort_order_test.cc)
 
diff --git a/src/iceberg/test/update_partition_spec_test.cc 
b/src/iceberg/test/update_partition_spec_test.cc
new file mode 100644
index 00000000..85d9cc52
--- /dev/null
+++ b/src/iceberg/test/update_partition_spec_test.cc
@@ -0,0 +1,822 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/update_partition_spec.h"
+
+#include <format>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <arrow/filesystem/mockfs.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/catalog/memory/in_memory_catalog.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/sort_order.h"
+#include "iceberg/table.h"
+#include "iceberg/table_identifier.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/transaction.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+#include "iceberg/util/uuid.h"
+
+namespace iceberg {
+
+class UpdatePartitionSpecTest : public ::testing::TestWithParam<int8_t> {
+ protected:
+  void SetUp() override {
+    file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
+    catalog_ = InMemoryCatalog::Make("test_catalog", file_io_, "/warehouse/", 
{});
+    format_version_ = GetParam();
+    test_schema_ = std::make_shared<Schema>(
+        std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int64()),
+                                 SchemaField::MakeRequired(2, "ts", 
timestamp_tz()),
+                                 SchemaField::MakeRequired(3, "category", 
string()),
+                                 SchemaField::MakeOptional(4, "data", 
string())},
+        0);
+
+    // Create unpartitioned and partitioned specs matching Java test
+    ICEBERG_UNWRAP_OR_FAIL(
+        auto unpartitioned_spec,
+        PartitionSpec::Make(PartitionSpec::kInitialSpecId, 
std::vector<PartitionField>{},
+                            PartitionSpec::kLegacyPartitionDataIdStart - 1));
+    ICEBERG_UNWRAP_OR_FAIL(
+        partitioned_spec_,
+        PartitionSpec::Make(
+            PartitionSpec::kInitialSpecId,
+            std::vector<PartitionField>{
+                PartitionField(3, 1000, "category", Transform::Identity()),
+                PartitionField(2, 1001, "ts_day", Transform::Day()),
+                PartitionField(1, 1002, "shard", Transform::Bucket(16))},
+            1002));
+
+    auto partitioned_metadata =
+        CreateBaseMetadata(format_version_, test_schema_, partitioned_spec_);
+    auto unpartitioned_metadata =
+        CreateBaseMetadata(format_version_, test_schema_, 
std::move(unpartitioned_spec));
+
+    // Write metadata files
+    partitioned_metadata->location = partitioned_table_location_;
+    unpartitioned_metadata->location = unpartitioned_table_location_;
+
+    // Arrow MockFS cannot automatically create directories.
+    auto arrow_fs = 
std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>(
+        static_cast<arrow::ArrowFileSystemFileIO&>(*file_io_).fs());
+    ASSERT_TRUE(arrow_fs != nullptr);
+    ASSERT_TRUE(arrow_fs->CreateDir(partitioned_table_location_ + 
"/metadata").ok());
+    ASSERT_TRUE(arrow_fs->CreateDir(unpartitioned_table_location_ + 
"/metadata").ok());
+
+    // Write table metadata to the table location.
+    std::string partitioned_metadata_location =
+        std::format("{}/metadata/00001-{}.metadata.json", 
partitioned_table_location_,
+                    Uuid::GenerateV7().ToString());
+    std::string unpartitioned_metadata_location =
+        std::format("{}/metadata/00001-{}.metadata.json", 
unpartitioned_table_location_,
+                    Uuid::GenerateV7().ToString());
+
+    ASSERT_THAT(TableMetadataUtil::Write(*file_io_, 
partitioned_metadata_location,
+                                         *partitioned_metadata),
+                IsOk());
+    ASSERT_THAT(TableMetadataUtil::Write(*file_io_, 
unpartitioned_metadata_location,
+                                         *unpartitioned_metadata),
+                IsOk());
+
+    // Register the tables in the catalog.
+    ICEBERG_UNWRAP_OR_FAIL(
+        partitioned_table_,
+        catalog_->RegisterTable(partitioned_table_ident_, 
partitioned_metadata_location));
+    ICEBERG_UNWRAP_OR_FAIL(unpartitioned_table_,
+                           catalog_->RegisterTable(unpartitioned_table_ident_,
+                                                   
unpartitioned_metadata_location));
+  }
+
+  // Helper to create base metadata with a specific partition spec
+  std::unique_ptr<TableMetadata> CreateBaseMetadata(int8_t format_version,
+                                                    std::shared_ptr<Schema> 
schema,
+                                                    
std::shared_ptr<PartitionSpec> spec) {
+    auto metadata = std::make_unique<TableMetadata>();
+    metadata->format_version = format_version;
+    metadata->table_uuid = "test-uuid-1234";
+    metadata->location = "/warehouse/test_table";
+    metadata->last_sequence_number = 0;
+    metadata->last_updated_ms = TimePointMs{std::chrono::milliseconds(1000)};
+    metadata->last_column_id = 4;
+    metadata->current_schema_id = 0;
+    metadata->schemas.push_back(std::move(schema));
+    metadata->default_spec_id = spec->spec_id();
+    metadata->last_partition_id = spec->last_assigned_field_id();
+    metadata->current_snapshot_id = Snapshot::kInvalidSnapshotId;
+    metadata->default_sort_order_id = SortOrder::kInitialSortOrderId;
+    metadata->sort_orders.push_back(SortOrder::Unsorted());
+    metadata->next_row_id = TableMetadata::kInitialRowId;
+    metadata->properties = TableProperties::default_properties();
+    metadata->partition_specs.push_back(std::move(spec));
+    return metadata;
+  }
+
+  // Helper to create UpdatePartitionSpec with a specific partition spec
+  std::shared_ptr<UpdatePartitionSpec> CreateUpdatePartitionSpec(bool 
partitioned) {
+    if (partitioned) {
+      auto update_result = partitioned_table_->NewUpdatePartitionSpec();
+      if (!update_result.has_value()) {
+        ADD_FAILURE() << "Failed to create update: " << 
update_result.error().message;
+        return nullptr;
+      }
+      return update_result.value();
+    } else {
+      auto update_result = unpartitioned_table_->NewUpdatePartitionSpec();
+      if (!update_result.has_value()) {
+        ADD_FAILURE() << "Failed to create update: " << 
update_result.error().message;
+        return nullptr;
+      }
+      return update_result.value();
+    }
+  }
+
+  // Helper to create an expected partition spec
+  std::shared_ptr<PartitionSpec> MakeExpectedSpec(
+      const std::vector<PartitionField>& fields, int32_t 
last_assigned_field_id) {
+    auto spec_result = PartitionSpec::Make(PartitionSpec::kInitialSpecId, 
fields,
+                                           last_assigned_field_id);
+    if (!spec_result.has_value()) {
+      ADD_FAILURE() << "Failed to create expected spec: " << 
spec_result.error().message;
+      return nullptr;
+    }
+    return std::shared_ptr<PartitionSpec>(spec_result.value().release());
+  }
+
+  // Helper to apply update and get the resulting spec
+  std::shared_ptr<PartitionSpec> ApplyUpdateAndGetSpec(
+      std::shared_ptr<UpdatePartitionSpec> update) {
+    auto result = update->Apply();
+    if (!result.has_value()) {
+      ADD_FAILURE() << "Failed to apply update: " << result.error().message;
+      return nullptr;
+    }
+    return result.value().spec;
+  }
+
+  // Helper to apply update and assert spec equality
+  void ApplyUpdateAndAssertSpec(std::shared_ptr<UpdatePartitionSpec> update,
+                                const std::vector<PartitionField>& 
expected_fields,
+                                int32_t last_assigned_field_id) {
+    auto updated_spec = ApplyUpdateAndGetSpec(update);
+    auto expected_spec = MakeExpectedSpec(expected_fields, 
last_assigned_field_id);
+    AssertPartitionSpecEquals(*expected_spec, *updated_spec);
+  }
+
+  // Helper to assert partition spec equality
+  void AssertPartitionSpecEquals(const PartitionSpec& expected,
+                                 const PartitionSpec& actual) {
+    ASSERT_EQ(expected.fields().size(), actual.fields().size());
+    for (size_t i = 0; i < expected.fields().size(); ++i) {
+      const auto& expected_field = expected.fields()[i];
+      const auto& actual_field = actual.fields()[i];
+      EXPECT_EQ(expected_field.source_id(), actual_field.source_id());
+      EXPECT_EQ(expected_field.field_id(), actual_field.field_id());
+      EXPECT_EQ(expected_field.name(), actual_field.name());
+      EXPECT_EQ(*expected_field.transform(), *actual_field.transform());
+    }
+  }
+
+  // Helper to expect an error with a specific message
+  void ExpectError(std::shared_ptr<UpdatePartitionSpec> update, ErrorKind 
expected_kind,
+                   const std::string& expected_message) {
+    auto result = update->Apply();
+    ASSERT_THAT(result, IsError(expected_kind));
+    ASSERT_THAT(result, HasErrorMessage(expected_message));
+  }
+
+  // Helper to create a table with a custom partition spec
+  std::shared_ptr<Table> CreateTableWithSpec(std::shared_ptr<PartitionSpec> 
spec,
+                                             const std::string& table_name) {
+    auto metadata = CreateBaseMetadata(format_version_, test_schema_, spec);
+    TableIdentifier identifier{.ns = Namespace{.levels = {"test"}}, .name = 
table_name};
+    std::string metadata_location =
+        std::format("/warehouse/{}/metadata/00000-{}.metadata.json", 
table_name,
+                    Uuid::GenerateV7().ToString());
+    auto table_result =
+        Table::Make(identifier, 
std::shared_ptr<TableMetadata>(metadata.release()),
+                    metadata_location, file_io_, catalog_);
+    if (!table_result.has_value()) {
+      ADD_FAILURE() << "Failed to create table: " << 
table_result.error().message;
+      return nullptr;
+    }
+    return table_result.value();
+  }
+
+  // Helper to create UpdatePartitionSpec from a table
+  std::shared_ptr<UpdatePartitionSpec> CreateUpdateFromTable(
+      std::shared_ptr<Table> table) {
+    auto transaction_result =
+        Transaction::Make(table, Transaction::Kind::kUpdate, 
/*auto_commit=*/false);
+    if (!transaction_result.has_value()) {
+      ADD_FAILURE() << "Failed to create transaction: "
+                    << transaction_result.error().message;
+      return nullptr;
+    }
+    auto update_result = UpdatePartitionSpec::Make(transaction_result.value());
+    if (!update_result.has_value()) {
+      ADD_FAILURE() << "Failed to create UpdatePartitionSpec: "
+                    << update_result.error().message;
+      return nullptr;
+    }
+    return update_result.value();
+  }
+
+  const TableIdentifier partitioned_table_ident_{.name = "partitioned_table"};
+  const TableIdentifier unpartitioned_table_ident_{.name = 
"unpartitioned_table"};
+  const std::string 
partitioned_table_location_{"/warehouse/partitioned_table"};
+  const std::string 
unpartitioned_table_location_{"/warehouse/unpartitioned_table"};
+  std::shared_ptr<FileIO> file_io_;
+  std::shared_ptr<InMemoryCatalog> catalog_;
+  std::shared_ptr<Schema> test_schema_;
+  std::shared_ptr<PartitionSpec> partitioned_spec_;
+  std::shared_ptr<Table> partitioned_table_;
+  std::shared_ptr<Table> unpartitioned_table_;
+  int8_t format_version_;
+};
+
+INSTANTIATE_TEST_SUITE_P(FormatVersions, UpdatePartitionSpecTest, 
::testing::Values(1, 2),
+                         [](const ::testing::TestParamInfo<int8_t>& info) {
+                           return std::format("V{}", info.param);
+                         });
+
+TEST_P(UpdatePartitionSpecTest, TestAddIdentityByName) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
unpartitioned_table_->NewUpdatePartitionSpec());
+  update->AddField("category");
+  ApplyUpdateAndAssertSpec(
+      update, {PartitionField(3, 1000, "category", Transform::Identity())}, 
1000);
+}
+
+TEST_P(UpdatePartitionSpecTest, TestAddIdentityByTerm) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
unpartitioned_table_->NewUpdatePartitionSpec());
+  update->AddField(Expressions::Ref("category"));
+  ApplyUpdateAndAssertSpec(
+      update, {PartitionField(3, 1000, "category", Transform::Identity())}, 
1000);
+}
+
+TEST_P(UpdatePartitionSpecTest, TestAddYear) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
unpartitioned_table_->NewUpdatePartitionSpec());
+  update->AddField(Expressions::Year("ts"));
+  ApplyUpdateAndAssertSpec(update,
+                           {PartitionField(2, 1000, "ts_year", 
Transform::Year())}, 1000);
+}
+
+TEST_P(UpdatePartitionSpecTest, TestAddMonth) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
unpartitioned_table_->NewUpdatePartitionSpec());
+  update->AddField(Expressions::Month("ts"));
+  ApplyUpdateAndAssertSpec(
+      update, {PartitionField(2, 1000, "ts_month", Transform::Month())}, 1000);
+}
+
+TEST_P(UpdatePartitionSpecTest, TestAddDay) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
unpartitioned_table_->NewUpdatePartitionSpec());
+  update->AddField(Expressions::Day("ts"));
+  ApplyUpdateAndAssertSpec(update, {PartitionField(2, 1000, "ts_day", 
Transform::Day())},
+                           1000);
+}
+
+TEST_P(UpdatePartitionSpecTest, TestAddHour) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
unpartitioned_table_->NewUpdatePartitionSpec());
+  update->AddField(Expressions::Hour("ts"));
+  ApplyUpdateAndAssertSpec(update,
+                           {PartitionField(2, 1000, "ts_hour", 
Transform::Hour())}, 1000);
+}
+
+TEST_P(UpdatePartitionSpecTest, TestAddBucket) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
unpartitioned_table_->NewUpdatePartitionSpec());
+  update->AddField(Expressions::Bucket("id", 16));
+  ApplyUpdateAndAssertSpec(
+      update, {PartitionField(1, 1000, "id_bucket_16", 
Transform::Bucket(16))}, 1000);
+}
+
+TEST_P(UpdatePartitionSpecTest, TestAddTruncate) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
unpartitioned_table_->NewUpdatePartitionSpec());
+  update->AddField(Expressions::Truncate("data", 4));
+  ApplyUpdateAndAssertSpec(
+      update, {PartitionField(4, 1000, "data_trunc_4", 
Transform::Truncate(4))}, 1000);
+}
+
+TEST_P(UpdatePartitionSpecTest, TestAddNamedPartition) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
unpartitioned_table_->NewUpdatePartitionSpec());
+  update->AddField(Expressions::Bucket("id", 16), "shard");
+  ApplyUpdateAndAssertSpec(
+      update, {PartitionField(1, 1000, "shard", Transform::Bucket(16))}, 1000);
+}
+
+TEST_P(UpdatePartitionSpecTest, TestAddToExisting) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->AddField(Expressions::Truncate("data", 4));
+  ApplyUpdateAndAssertSpec(
+      update,
+      {PartitionField(3, 1000, "category", Transform::Identity()),
+       PartitionField(2, 1001, "ts_day", Transform::Day()),
+       PartitionField(1, 1002, "shard", Transform::Bucket(16)),
+       PartitionField(4, 1003, "data_trunc_4", Transform::Truncate(4))},
+      1003);
+}
+
+TEST_P(UpdatePartitionSpecTest, TestMultipleAdds) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
unpartitioned_table_->NewUpdatePartitionSpec());
+  update->AddField("category")
+      .AddField(Expressions::Day("ts"))
+      .AddField(Expressions::Bucket("id", 16), "shard")
+      .AddField(Expressions::Truncate("data", 4), "prefix");
+  ApplyUpdateAndAssertSpec(update,
+                           {PartitionField(3, 1000, "category", 
Transform::Identity()),
+                            PartitionField(2, 1001, "ts_day", 
Transform::Day()),
+                            PartitionField(1, 1002, "shard", 
Transform::Bucket(16)),
+                            PartitionField(4, 1003, "prefix", 
Transform::Truncate(4))},
+                           1003);
+}
+
+TEST_P(UpdatePartitionSpecTest, TestAddHourToDay) {
+  // First add day partition
+  ICEBERG_UNWRAP_OR_FAIL(auto update1, 
unpartitioned_table_->NewUpdatePartitionSpec());
+  update1->AddField(Expressions::Day("ts"));
+  auto by_day_spec = ApplyUpdateAndGetSpec(update1);
+
+  // Then add hour partition
+  auto table = CreateTableWithSpec(by_day_spec, "test_table");
+  auto update2 = CreateUpdateFromTable(table);
+  update2->AddField(Expressions::Hour("ts"));
+  auto by_hour_spec = ApplyUpdateAndGetSpec(update2);
+
+  ASSERT_EQ(by_hour_spec->fields().size(), 2);
+  EXPECT_EQ(by_hour_spec->fields()[0].source_id(), 2);
+  EXPECT_EQ(by_hour_spec->fields()[0].name(), "ts_day");
+  EXPECT_EQ(*by_hour_spec->fields()[0].transform(), *Transform::Day());
+  EXPECT_EQ(by_hour_spec->fields()[1].source_id(), 2);
+  EXPECT_EQ(by_hour_spec->fields()[1].name(), "ts_hour");
+  EXPECT_EQ(*by_hour_spec->fields()[1].transform(), *Transform::Hour());
+}
+
+TEST_P(UpdatePartitionSpecTest, TestAddMultipleBuckets) {
+  // First add bucket 16
+  ICEBERG_UNWRAP_OR_FAIL(auto update1, 
unpartitioned_table_->NewUpdatePartitionSpec());
+  update1->AddField(Expressions::Bucket("id", 16));
+  auto bucket16_spec = ApplyUpdateAndGetSpec(update1);
+
+  // Then add bucket 8
+  auto table = CreateTableWithSpec(bucket16_spec, "test_table");
+  auto update2 = CreateUpdateFromTable(table);
+  update2->AddField(Expressions::Bucket("id", 8));
+  ApplyUpdateAndAssertSpec(
+      update2,
+      {PartitionField(1, 1000, "id_bucket_16", Transform::Bucket(16)),
+       PartitionField(1, 1001, "id_bucket_8", Transform::Bucket(8))},
+      1001);
+}
+
+TEST_P(UpdatePartitionSpecTest, TestRemoveIdentityByName) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->RemoveField("category");
+  auto updated_spec = ApplyUpdateAndGetSpec(update);
+  if (format_version_ == 1) {
+    auto expected =
+        MakeExpectedSpec({PartitionField(3, 1000, "category", 
Transform::Void()),
+                          PartitionField(2, 1001, "ts_day", Transform::Day()),
+                          PartitionField(1, 1002, "shard", 
Transform::Bucket(16))},
+                         1002);
+    AssertPartitionSpecEquals(*expected, *updated_spec);
+  } else {
+    auto expected =
+        MakeExpectedSpec({PartitionField(2, 1001, "ts_day", Transform::Day()),
+                          PartitionField(1, 1002, "shard", 
Transform::Bucket(16))},
+                         1002);
+    AssertPartitionSpecEquals(*expected, *updated_spec);
+  }
+}
+
+TEST_P(UpdatePartitionSpecTest, TestRemoveBucketByName) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->RemoveField("shard");
+  auto updated_spec = ApplyUpdateAndGetSpec(update);
+  if (format_version_ == 1) {
+    auto expected =
+        MakeExpectedSpec({PartitionField(3, 1000, "category", 
Transform::Identity()),
+                          PartitionField(2, 1001, "ts_day", Transform::Day()),
+                          PartitionField(1, 1002, "shard", Transform::Void())},
+                         1002);
+    AssertPartitionSpecEquals(*expected, *updated_spec);
+  } else {
+    auto expected =
+        MakeExpectedSpec({PartitionField(3, 1000, "category", 
Transform::Identity()),
+                          PartitionField(2, 1001, "ts_day", Transform::Day())},
+                         1001);
+    AssertPartitionSpecEquals(*expected, *updated_spec);
+  }
+}
+
+TEST_P(UpdatePartitionSpecTest, TestRemoveIdentityByEquivalent) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->RemoveField(Expressions::Ref("category"));
+  auto updated_spec = ApplyUpdateAndGetSpec(update);
+  if (format_version_ == 1) {
+    auto expected =
+        MakeExpectedSpec({PartitionField(3, 1000, "category", 
Transform::Void()),
+                          PartitionField(2, 1001, "ts_day", Transform::Day()),
+                          PartitionField(1, 1002, "shard", 
Transform::Bucket(16))},
+                         1002);
+    AssertPartitionSpecEquals(*expected, *updated_spec);
+  } else {
+    auto expected =
+        MakeExpectedSpec({PartitionField(2, 1001, "ts_day", Transform::Day()),
+                          PartitionField(1, 1002, "shard", 
Transform::Bucket(16))},
+                         1002);
+    AssertPartitionSpecEquals(*expected, *updated_spec);
+  }
+}
+
+TEST_P(UpdatePartitionSpecTest, TestRemoveDayByEquivalent) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->RemoveField(Expressions::Day("ts"));
+  auto updated_spec = ApplyUpdateAndGetSpec(update);
+  if (format_version_ == 1) {
+    auto expected =
+        MakeExpectedSpec({PartitionField(3, 1000, "category", 
Transform::Identity()),
+                          PartitionField(2, 1001, "ts_day", Transform::Void()),
+                          PartitionField(1, 1002, "shard", 
Transform::Bucket(16))},
+                         1002);
+    AssertPartitionSpecEquals(*expected, *updated_spec);
+  } else {
+    auto expected =
+        MakeExpectedSpec({PartitionField(3, 1000, "category", 
Transform::Identity()),
+                          PartitionField(1, 1002, "shard", 
Transform::Bucket(16))},
+                         1002);
+    AssertPartitionSpecEquals(*expected, *updated_spec);
+  }
+}
+
+TEST_P(UpdatePartitionSpecTest, TestRemoveBucketByEquivalent) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->RemoveField(Expressions::Bucket("id", 16));
+  auto updated_spec = ApplyUpdateAndGetSpec(update);
+  if (format_version_ == 1) {
+    auto expected =
+        MakeExpectedSpec({PartitionField(3, 1000, "category", 
Transform::Identity()),
+                          PartitionField(2, 1001, "ts_day", Transform::Day()),
+                          PartitionField(1, 1002, "shard", Transform::Void())},
+                         1002);
+    AssertPartitionSpecEquals(*expected, *updated_spec);
+  } else {
+    auto expected =
+        MakeExpectedSpec({PartitionField(3, 1000, "category", 
Transform::Identity()),
+                          PartitionField(2, 1001, "ts_day", Transform::Day())},
+                         1001);
+    AssertPartitionSpecEquals(*expected, *updated_spec);
+  }
+}
+
+TEST_P(UpdatePartitionSpecTest, TestRename) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->RenameField("shard", "id_bucket");
+  ApplyUpdateAndAssertSpec(update,
+                           {PartitionField(3, 1000, "category", 
Transform::Identity()),
+                            PartitionField(2, 1001, "ts_day", 
Transform::Day()),
+                            PartitionField(1, 1002, "id_bucket", 
Transform::Bucket(16))},
+                           1002);
+}
+
+TEST_P(UpdatePartitionSpecTest, TestMultipleChanges) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->RenameField("shard", "id_bucket")
+      .RemoveField(Expressions::Day("ts"))
+      .AddField(Expressions::Truncate("data", 4), "prefix");
+  auto updated_spec = ApplyUpdateAndGetSpec(update);
+  if (format_version_ == 1) {
+    auto expected =
+        MakeExpectedSpec({PartitionField(3, 1000, "category", 
Transform::Identity()),
+                          PartitionField(2, 1001, "ts_day", Transform::Void()),
+                          PartitionField(1, 1002, "id_bucket", 
Transform::Bucket(16)),
+                          PartitionField(4, 1003, "prefix", 
Transform::Truncate(4))},
+                         1003);
+    AssertPartitionSpecEquals(*expected, *updated_spec);
+  } else {
+    auto expected =
+        MakeExpectedSpec({PartitionField(3, 1000, "category", 
Transform::Identity()),
+                          PartitionField(1, 1002, "id_bucket", 
Transform::Bucket(16)),
+                          PartitionField(4, 1003, "prefix", 
Transform::Truncate(4))},
+                         1003);
+    AssertPartitionSpecEquals(*expected, *updated_spec);
+  }
+}
+
+TEST_P(UpdatePartitionSpecTest, TestAddDeletedName) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->RemoveField(Expressions::Bucket("id", 16));
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  auto updated_spec = result.spec;
+
+  if (format_version_ == 1) {
+    ICEBERG_UNWRAP_OR_FAIL(
+        auto expected_spec,
+        PartitionSpec::Make(
+            PartitionSpec::kInitialSpecId,
+            std::vector<PartitionField>{
+                PartitionField(3, 1000, "category", Transform::Identity()),
+                PartitionField(2, 1001, "ts_day", Transform::Day()),
+                PartitionField(1, 1002, "shard", Transform::Void())},
+            1002));
+    auto expected = std::shared_ptr<PartitionSpec>(expected_spec.release());
+    AssertPartitionSpecEquals(*expected, *updated_spec);
+  } else {
+    ICEBERG_UNWRAP_OR_FAIL(
+        auto expected_spec,
+        PartitionSpec::Make(
+            PartitionSpec::kInitialSpecId,
+            std::vector<PartitionField>{
+                PartitionField(3, 1000, "category", Transform::Identity()),
+                PartitionField(2, 1001, "ts_day", Transform::Day())},
+            1001));
+    auto expected = std::shared_ptr<PartitionSpec>(expected_spec.release());
+    AssertPartitionSpecEquals(*expected, *updated_spec);
+  }
+}
+
+TEST_P(UpdatePartitionSpecTest, TestRemoveNewlyAddedFieldByName) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->AddField(Expressions::Truncate("data", 4), "prefix");
+  update->RemoveField("prefix");
+  ExpectError(update, ErrorKind::kValidationFailed, "Cannot delete newly added 
field");
+}
+
+TEST_P(UpdatePartitionSpecTest, TestRemoveNewlyAddedFieldByTransform) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->AddField(Expressions::Truncate("data", 4), "prefix");
+  update->RemoveField(Expressions::Truncate("data", 4));
+  ExpectError(update, ErrorKind::kValidationFailed, "Cannot delete newly added 
field");
+}
+
+TEST_P(UpdatePartitionSpecTest, TestAddAlreadyAddedFieldByTransform) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->AddField(Expressions::Truncate("data", 4), "prefix");
+  update->AddField(Expressions::Truncate("data", 4));
+  ExpectError(update, ErrorKind::kValidationFailed,
+              "Cannot add duplicate partition field");
+}
+
+TEST_P(UpdatePartitionSpecTest, TestAddAlreadyAddedFieldByName) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->AddField(Expressions::Truncate("data", 4), "prefix");
+  update->AddField(Expressions::Truncate("data", 6), "prefix");
+  ExpectError(update, ErrorKind::kValidationFailed,
+              "Cannot add duplicate partition field");
+}
+
+TEST_P(UpdatePartitionSpecTest, TestAddRedundantTimePartition) {
+  // Test day + hour conflict
+  ICEBERG_UNWRAP_OR_FAIL(auto update1, 
unpartitioned_table_->NewUpdatePartitionSpec());
+  update1->AddField(Expressions::Day("ts"));
+  update1->AddField(Expressions::Hour("ts"));
+  ExpectError(update1, ErrorKind::kValidationFailed,
+              "Cannot add redundant partition field");
+
+  // Test hour + month conflict after adding hour to existing day
+  ICEBERG_UNWRAP_OR_FAIL(auto update2, 
partitioned_table_->NewUpdatePartitionSpec());
+  update2->AddField(Expressions::Hour("ts"));   // day already exists, so hour 
is OK
+  update2->AddField(Expressions::Month("ts"));  // conflicts with hour
+  ExpectError(update2, ErrorKind::kValidationFailed,
+              "Cannot add redundant partition field");
+}
+
+TEST_P(UpdatePartitionSpecTest, TestNoEffectAddDeletedSameFieldWithSameName) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update1, 
partitioned_table_->NewUpdatePartitionSpec());
+  update1->RemoveField("shard");
+  update1->AddField(Expressions::Bucket("id", 16), "shard");
+  ICEBERG_UNWRAP_OR_FAIL(auto result1, update1->Apply());
+  auto spec1 = result1.spec;
+  AssertPartitionSpecEquals(*partitioned_spec_, *spec1);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto update2, 
partitioned_table_->NewUpdatePartitionSpec());
+  update2->RemoveField("shard");
+  update2->AddField(Expressions::Bucket("id", 16));
+  ICEBERG_UNWRAP_OR_FAIL(auto result2, update2->Apply());
+  auto spec2 = result2.spec;
+  AssertPartitionSpecEquals(*partitioned_spec_, *spec2);
+}
+
+TEST_P(UpdatePartitionSpecTest, 
TestGenerateNewSpecAddDeletedSameFieldWithDifferentName) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->RemoveField("shard");
+  update->AddField(Expressions::Bucket("id", 16), "new_shard");
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  auto updated_spec = result.spec;
+
+  ASSERT_EQ(updated_spec->fields().size(), 3);
+  EXPECT_EQ(updated_spec->fields()[0].name(), "category");
+  EXPECT_EQ(updated_spec->fields()[1].name(), "ts_day");
+  EXPECT_EQ(updated_spec->fields()[2].name(), "new_shard");
+  EXPECT_EQ(*updated_spec->fields()[0].transform(), *Transform::Identity());
+  EXPECT_EQ(*updated_spec->fields()[1].transform(), *Transform::Day());
+  EXPECT_EQ(*updated_spec->fields()[2].transform(), *Transform::Bucket(16));
+}
+
+TEST_P(UpdatePartitionSpecTest, TestAddDuplicateByName) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->AddField("category");
+  ExpectError(update, ErrorKind::kValidationFailed,
+              "Cannot add duplicate partition field");
+}
+
+TEST_P(UpdatePartitionSpecTest, TestAddDuplicateByRef) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->AddField(Expressions::Ref("category"));
+  ExpectError(update, ErrorKind::kValidationFailed,
+              "Cannot add duplicate partition field");
+}
+
+TEST_P(UpdatePartitionSpecTest, TestAddDuplicateTransform) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->AddField(Expressions::Bucket("id", 16));
+  ExpectError(update, ErrorKind::kValidationFailed,
+              "Cannot add duplicate partition field");
+}
+
+TEST_P(UpdatePartitionSpecTest, TestAddNamedDuplicate) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->AddField(Expressions::Bucket("id", 16), "b16");
+  ExpectError(update, ErrorKind::kValidationFailed,
+              "Cannot add duplicate partition field");
+}
+
+TEST_P(UpdatePartitionSpecTest, TestRemoveUnknownFieldByName) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->RemoveField("moon");
+  ExpectError(update, ErrorKind::kValidationFailed,
+              "Cannot find partition field to remove");
+}
+
+TEST_P(UpdatePartitionSpecTest, TestRemoveUnknownFieldByEquivalent) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->RemoveField(Expressions::Hour("ts"));  // day(ts) exists, not hour
+  ExpectError(update, ErrorKind::kValidationFailed,
+              "Cannot find partition field to remove");
+}
+
+TEST_P(UpdatePartitionSpecTest, TestRenameUnknownField) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->RenameField("shake", "seal");
+  ExpectError(update, ErrorKind::kValidationFailed,
+              "Cannot find partition field to rename: shake");
+}
+
+TEST_P(UpdatePartitionSpecTest, TestRenameAfterAdd) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->AddField(Expressions::Truncate("data", 4), "data_trunc");
+  update->RenameField("data_trunc", "prefix");
+  ExpectError(update, ErrorKind::kValidationFailed,
+              "Cannot rename newly added partition field: data_trunc");
+}
+
+TEST_P(UpdatePartitionSpecTest, TestRenameAndDelete) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->RenameField("shard", "id_bucket");
+  update->RemoveField(Expressions::Bucket("id", 16));
+  ExpectError(update, ErrorKind::kValidationFailed,
+              "Cannot rename and delete partition field: shard");
+}
+
+TEST_P(UpdatePartitionSpecTest, TestDeleteAndRename) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, 
partitioned_table_->NewUpdatePartitionSpec());
+  update->RemoveField(Expressions::Bucket("id", 16));
+  update->RenameField("shard", "id_bucket");
+  ExpectError(update, ErrorKind::kValidationFailed,
+              "Cannot delete and rename partition field: shard");
+}
+
+TEST_P(UpdatePartitionSpecTest, TestRemoveAndAddMultiTimes) {
+  // Add first time
+  ICEBERG_UNWRAP_OR_FAIL(auto update1, 
unpartitioned_table_->NewUpdatePartitionSpec());
+  update1->AddField(Expressions::Day("ts"), "ts_date");
+  auto add_first_time_spec = ApplyUpdateAndGetSpec(update1);
+
+  // Remove first time
+  auto table1 = CreateTableWithSpec(add_first_time_spec, "test_table");
+  auto update2 = CreateUpdateFromTable(table1);
+  update2->RemoveField(Expressions::Day("ts"));
+  auto remove_first_time_spec = ApplyUpdateAndGetSpec(update2);
+
+  // Add second time
+  auto table2 = CreateTableWithSpec(remove_first_time_spec, "test_table2");
+  auto update3 = CreateUpdateFromTable(table2);
+  update3->AddField(Expressions::Day("ts"), "ts_date");
+  auto add_second_time_spec = ApplyUpdateAndGetSpec(update3);
+
+  // Remove second time
+  auto table3 = CreateTableWithSpec(add_second_time_spec, "test_table3");
+  auto update4 = CreateUpdateFromTable(table3);
+  update4->RemoveField(Expressions::Day("ts"));
+  auto remove_second_time_spec = ApplyUpdateAndGetSpec(update4);
+
+  // Add third time with month
+  auto table4 = CreateTableWithSpec(remove_second_time_spec, "test_table4");
+  auto update5 = CreateUpdateFromTable(table4);
+  update5->AddField(Expressions::Month("ts"));
+  auto add_third_time_spec = ApplyUpdateAndGetSpec(update5);
+
+  // Rename ts_month to ts_date
+  auto table5 = CreateTableWithSpec(add_third_time_spec, "test_table5");
+  auto update6 = CreateUpdateFromTable(table5);
+  update6->RenameField("ts_month", "ts_date");
+  auto updated_spec = ApplyUpdateAndGetSpec(update6);
+
+  if (format_version_ == 1) {
+    ASSERT_EQ(updated_spec->fields().size(), 3);
+    // In V1, we expect void transforms for deleted fields
+    EXPECT_TRUE(updated_spec->fields()[0].name().find("ts_date") == 0);
+    EXPECT_TRUE(updated_spec->fields()[1].name().find("ts_date") == 0);
+    EXPECT_EQ(updated_spec->fields()[2].name(), "ts_date");
+    EXPECT_EQ(*updated_spec->fields()[0].transform(), *Transform::Void());
+    EXPECT_EQ(*updated_spec->fields()[1].transform(), *Transform::Void());
+    EXPECT_EQ(*updated_spec->fields()[2].transform(), *Transform::Month());
+  } else {
+    ICEBERG_UNWRAP_OR_FAIL(
+        auto expected_spec,
+        PartitionSpec::Make(PartitionSpec::kInitialSpecId,
+                            std::vector<PartitionField>{
+                                PartitionField(2, 1000, "ts_date", 
Transform::Month())},
+                            1000));
+    auto expected = std::shared_ptr<PartitionSpec>(expected_spec.release());
+    AssertPartitionSpecEquals(*expected, *updated_spec);
+  }
+}
+
+TEST_P(UpdatePartitionSpecTest, 
TestRemoveAndUpdateWithDifferentTransformation) {
+  auto initial_spec = MakeExpectedSpec(
+      {PartitionField(2, 1000, "ts_transformed", Transform::Month())}, 1000);
+  auto table = CreateTableWithSpec(initial_spec, "test_table");
+  auto update = CreateUpdateFromTable(table);
+  update->RemoveField("ts_transformed");
+  update->AddField(Expressions::Day("ts"), "ts_transformed");
+  auto updated_spec = ApplyUpdateAndGetSpec(update);
+
+  if (format_version_ == 1) {
+    ASSERT_EQ(updated_spec->fields().size(), 2);
+    EXPECT_TRUE(updated_spec->fields()[0].name().find("ts_transformed") == 0);
+    EXPECT_EQ(updated_spec->fields()[1].name(), "ts_transformed");
+    EXPECT_EQ(*updated_spec->fields()[0].transform(), *Transform::Void());
+    EXPECT_EQ(*updated_spec->fields()[1].transform(), *Transform::Day());
+  } else {
+    ASSERT_EQ(updated_spec->fields().size(), 1);
+    EXPECT_EQ(updated_spec->fields()[0].name(), "ts_transformed");
+    EXPECT_EQ(*updated_spec->fields()[0].transform(), *Transform::Day());
+  }
+}
+
+TEST_P(UpdatePartitionSpecTest, CommitSuccess) {
+  // Test empty commit
+  ICEBERG_UNWRAP_OR_FAIL(auto empty_update, 
partitioned_table_->NewUpdatePartitionSpec());
+  EXPECT_THAT(empty_update->Commit(), IsOk());
+
+  // Reload table after first commit
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, 
catalog_->LoadTable(partitioned_table_ident_));
+
+  // Test commit with partition spec changes
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdatePartitionSpec());
+  update->AddField(Expressions::Truncate("data", 4), "prefix");
+
+  EXPECT_THAT(update->Commit(), IsOk());
+
+  // Verify the partition spec was committed
+  ICEBERG_UNWRAP_OR_FAIL(auto final_table, 
catalog_->LoadTable(partitioned_table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto spec, final_table->spec());
+
+  ASSERT_EQ(spec->fields().size(), 4);
+  EXPECT_EQ(spec->fields()[0].name(), "category");
+  EXPECT_EQ(spec->fields()[1].name(), "ts_day");
+  EXPECT_EQ(spec->fields()[2].name(), "shard");
+  EXPECT_EQ(spec->fields()[3].name(), "prefix");
+  EXPECT_EQ(*spec->fields()[0].transform(), *Transform::Identity());
+  EXPECT_EQ(*spec->fields()[1].transform(), *Transform::Day());
+  EXPECT_EQ(*spec->fields()[2].transform(), *Transform::Bucket(16));
+  EXPECT_EQ(*spec->fields()[3].transform(), *Transform::Truncate(4));
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index 8bc2a539..9404fe2e 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -28,6 +28,7 @@
 #include "iceberg/table_requirements.h"
 #include "iceberg/table_update.h"
 #include "iceberg/update/pending_update.h"
+#include "iceberg/update/update_partition_spec.h"
 #include "iceberg/update/update_properties.h"
 #include "iceberg/update/update_sort_order.h"
 #include "iceberg/util/checked_cast.h"
@@ -85,6 +86,15 @@ Status Transaction::Apply(PendingUpdate& update) {
       ICEBERG_ASSIGN_OR_RAISE(auto sort_order, update_sort_order.Apply());
       metadata_builder_->SetDefaultSortOrder(std::move(sort_order));
     } break;
+    case PendingUpdate::Kind::kUpdatePartitionSpec: {
+      auto& update_partition_spec = 
internal::checked_cast<UpdatePartitionSpec&>(update);
+      ICEBERG_ASSIGN_OR_RAISE(auto result, update_partition_spec.Apply());
+      if (result.set_as_default) {
+        metadata_builder_->SetDefaultPartitionSpec(std::move(result.spec));
+      } else {
+        metadata_builder_->AddPartitionSpec(std::move(result.spec));
+      }
+    } break;
     default:
       return NotSupported("Unsupported pending update: {}",
                           static_cast<int>(update.kind()));
@@ -137,6 +147,13 @@ Result<std::shared_ptr<Table>> Transaction::Commit() {
   return table_;
 }
 
+Result<std::shared_ptr<UpdatePartitionSpec>> 
Transaction::NewUpdatePartitionSpec() {
+  ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdatePartitionSpec> update_spec,
+                          UpdatePartitionSpec::Make(shared_from_this()));
+  ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_spec));
+  return update_spec;
+}
+
 Result<std::shared_ptr<UpdateProperties>> Transaction::NewUpdateProperties() {
   ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateProperties> update_properties,
                           UpdateProperties::Make(shared_from_this()));
diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h
index 05fdea32..18143b8a 100644
--- a/src/iceberg/transaction.h
+++ b/src/iceberg/transaction.h
@@ -56,6 +56,10 @@ class ICEBERG_EXPORT Transaction : public 
std::enable_shared_from_this<Transacti
   /// - CommitFailed: if the updates cannot be committed due to conflicts.
   Result<std::shared_ptr<Table>> Commit();
 
+  /// \brief Create a new UpdatePartitionSpec to update the partition spec of 
this table
+  /// and commit the changes.
+  Result<std::shared_ptr<UpdatePartitionSpec>> NewUpdatePartitionSpec();
+
   /// \brief Create a new UpdateProperties to update table properties and 
commit the
   /// changes.
   Result<std::shared_ptr<UpdateProperties>> NewUpdateProperties();
diff --git a/src/iceberg/transform.cc b/src/iceberg/transform.cc
index 61448971..4c39d01e 100644
--- a/src/iceberg/transform.cc
+++ b/src/iceberg/transform.cc
@@ -388,6 +388,27 @@ std::string Transform::ToString() const {
   std::unreachable();
 }
 
+Result<std::string> Transform::GeneratePartitionName(std::string_view 
source_name) const {
+  switch (transform_type_) {
+    case TransformType::kIdentity:
+      return std::string(source_name);
+    case TransformType::kBucket:
+      return std::format("{}_bucket_{}", source_name, 
std::get<int32_t>(param_));
+    case TransformType::kTruncate:
+      return std::format("{}_trunc_{}", source_name, 
std::get<int32_t>(param_));
+    case TransformType::kYear:
+    case TransformType::kMonth:
+    case TransformType::kDay:
+    case TransformType::kHour:
+      return std::format("{}_{}", source_name, 
TransformTypeToString(transform_type_));
+    case TransformType::kVoid:
+      return std::format("{}_null", source_name);
+    case TransformType::kUnknown:
+      return Invalid("Cannot generate partition name for unknown transform");
+  }
+  std::unreachable();
+}
+
 TransformFunction::TransformFunction(TransformType transform_type,
                                      std::shared_ptr<Type> source_type)
     : transform_type_(transform_type), source_type_(std::move(source_type)) {}
diff --git a/src/iceberg/transform.h b/src/iceberg/transform.h
index 53993b4e..47cf3ee9 100644
--- a/src/iceberg/transform.h
+++ b/src/iceberg/transform.h
@@ -197,6 +197,11 @@ class ICEBERG_EXPORT Transform : public util::Formattable {
   /// \brief Returns a string representation of this transform (e.g., 
"bucket[16]").
   std::string ToString() const override;
 
+  /// \brief Generates a partition name for the transform.
+  /// \param source_name The name of the source column.
+  /// \return A string representation of the partition name.
+  Result<std::string> GeneratePartitionName(std::string_view source_name) 
const;
+
   /// \brief Equality comparison.
   friend bool operator==(const Transform& lhs, const Transform& rhs) {
     return lhs.Equals(rhs);
diff --git a/src/iceberg/transform_function.h b/src/iceberg/transform_function.h
index b3cfa5a2..c8670824 100644
--- a/src/iceberg/transform_function.h
+++ b/src/iceberg/transform_function.h
@@ -59,6 +59,9 @@ class ICEBERG_EXPORT BucketTransform : public 
TransformFunction {
   /// \brief Returns INT32 as the output type.
   std::shared_ptr<Type> ResultType() const override;
 
+  /// \brief Returns the number of buckets.
+  int32_t num_buckets() const { return num_buckets_; }
+
   /// \brief Create a BucketTransform.
   /// \param source_type Type of the input data.
   /// \param num_buckets Number of buckets to hash into.
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index 8ca444cc..6c7e6b77 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -123,8 +123,11 @@ struct TableMetadata;
 
 /// \brief Expression.
 class BoundPredicate;
+class BoundReference;
+class BoundTransform;
 class Expression;
 class Literal;
+class Term;
 class UnboundPredicate;
 
 /// \brief Scan.
@@ -177,6 +180,7 @@ class Transaction;
 
 /// \brief Update family.
 class PendingUpdate;
+class UpdatePartitionSpec;
 class UpdateProperties;
 class UpdateSortOrder;
 
diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build
index b0ec6706..3fdfda98 100644
--- a/src/iceberg/update/meson.build
+++ b/src/iceberg/update/meson.build
@@ -16,6 +16,11 @@
 # under the License.
 
 install_headers(
-    ['pending_update.h', 'update_sort_order.h', 'update_properties.h'],
+    [
+        'pending_update.h',
+        'update_partition_spec.h',
+        'update_sort_order.h',
+        'update_properties.h',
+    ],
     subdir: 'iceberg/update',
 )
diff --git a/src/iceberg/update/pending_update.h 
b/src/iceberg/update/pending_update.h
index c298ba80..95580f40 100644
--- a/src/iceberg/update/pending_update.h
+++ b/src/iceberg/update/pending_update.h
@@ -42,6 +42,7 @@ namespace iceberg {
 class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
  public:
   enum class Kind : uint8_t {
+    kUpdatePartitionSpec,
     kUpdateProperties,
     kUpdateSortOrder,
   };
diff --git a/src/iceberg/update/update_partition_spec.cc 
b/src/iceberg/update/update_partition_spec.cc
new file mode 100644
index 00000000..ffea1e09
--- /dev/null
+++ b/src/iceberg/update/update_partition_spec.cc
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/update_partition_spec.h"
+
+#include <algorithm>
+#include <format>
+#include <memory>
+#include <string_view>
+#include <utility>
+
+#include "iceberg/expression/term.h"
+#include "iceberg/partition_field.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/transaction.h"
+#include "iceberg/transform.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Result<std::shared_ptr<UpdatePartitionSpec>> UpdatePartitionSpec::Make(
+    std::shared_ptr<Transaction> transaction) {
+  ICEBERG_PRECHECK(transaction != nullptr,
+                   "Cannot create UpdatePartitionSpec without transaction");
+  return std::shared_ptr<UpdatePartitionSpec>(
+      new UpdatePartitionSpec(std::move(transaction)));
+}
+
+UpdatePartitionSpec::UpdatePartitionSpec(std::shared_ptr<Transaction> 
transaction)
+    : PendingUpdate(std::move(transaction)) {
+  const TableMetadata& base_metadata = transaction_->current();
+  format_version_ = base_metadata.format_version;
+
+  // Get the current/default partition spec
+  auto spec_result = base_metadata.PartitionSpec();
+  if (!spec_result.has_value()) {
+    AddError(spec_result.error());
+    return;
+  }
+  spec_ = std::move(spec_result.value());
+
+  // Get the current schema
+  auto schema_result = base_metadata.Schema();
+  if (!schema_result.has_value()) {
+    AddError(schema_result.error());
+    return;
+  }
+  schema_ = std::move(schema_result.value());
+
+  last_assigned_partition_id_ = std::max(base_metadata.last_partition_id,
+                                         
PartitionSpec::kLegacyPartitionDataIdStart - 1);
+  name_to_field_ = IndexSpecByName(*spec_);
+  transform_to_field_ = IndexSpecByTransform(*spec_);
+
+  // Check for unknown transforms
+  for (const auto& field : spec_->fields()) {
+    if (field.transform()->transform_type() == TransformType::kUnknown) {
+      AddError(ErrorKind::kInvalidArgument,
+               "Cannot update partition spec with unknown transform: {}",
+               field.ToString());
+      return;
+    }
+  }
+
+  // Build index of historical partition fields for efficient recycling (V2+)
+  if (format_version_ >= 2) {
+    BuildHistoricalFieldsIndex();
+  }
+}
+
+UpdatePartitionSpec::~UpdatePartitionSpec() = default;
+
+UpdatePartitionSpec& UpdatePartitionSpec::CaseSensitive(bool 
is_case_sensitive) {
+  case_sensitive_ = is_case_sensitive;
+  return *this;
+}
+
+UpdatePartitionSpec& UpdatePartitionSpec::AddNonDefaultSpec() {
+  set_as_default_ = false;
+  return *this;
+}
+
+UpdatePartitionSpec& UpdatePartitionSpec::AddField(std::string_view 
source_name) {
+  // Find the source field in the schema
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(
+      auto field_opt, schema_->FindFieldByName(source_name, case_sensitive_));
+
+  ICEBERG_BUILDER_CHECK(field_opt.has_value(), "Cannot find source field: {}",
+                        source_name);
+  int32_t source_id = field_opt->get().field_id();
+  return AddFieldInternal(source_name, source_id, Transform::Identity());
+}
+
+UpdatePartitionSpec& UpdatePartitionSpec::AddField(const 
std::shared_ptr<Term>& term,
+                                                   std::string_view part_name) 
{
+  ICEBERG_BUILDER_CHECK(term->is_unbound(), "Cannot add bound term to 
partition spec");
+  // Bind the term to get the source id
+  if (term->kind() == Term::Kind::kReference) {
+    const auto& ref = dynamic_cast<const NamedReference&>(*term);
+    ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto bound_ref, ref.Bind(*schema_, 
case_sensitive_));
+    int32_t source_id = bound_ref->field().field_id();
+    return AddFieldInternal(part_name, source_id, Transform::Identity());
+  } else if (term->kind() == Term::Kind::kTransform) {
+    const auto& unbound_transform = dynamic_cast<const 
UnboundTransform&>(*term);
+    ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto bound_transform,
+                                     unbound_transform.Bind(*schema_, 
case_sensitive_));
+    int32_t source_id = bound_transform->reference()->field().field_id();
+    return AddFieldInternal(part_name, source_id, 
bound_transform->transform());
+  }
+
+  return AddError(
+      InvalidArgument("Cannot add {} term to partition spec", 
term->ToString()));
+}
+
+UpdatePartitionSpec& UpdatePartitionSpec::AddFieldInternal(
+    std::string_view name, int32_t source_id,
+    const std::shared_ptr<Transform>& transform) {
+  // Check for duplicate name in added fields
+  ICEBERG_BUILDER_CHECK(name.empty() || !added_field_names_.contains(name),
+                        "Cannot add duplicate partition field: {}", name);
+
+  // Cache transform string to avoid repeated ToString() calls
+  const std::string transform_str = transform->ToString();
+  TransformKey validation_key{source_id, transform_str};
+
+  // Check if this field already exists in the current spec
+  auto existing_it = transform_to_field_.find(validation_key);
+  if (existing_it != transform_to_field_.end()) {
+    const auto& existing = existing_it->second;
+    const bool is_deleted = deletes_.contains(existing->field_id());
+    if (is_deleted && *existing->transform() == *transform) {
+      // If the field was deleted and we're re-adding the same one, just undo 
the delete
+      return RewriteDeleteAndAddField(*existing, name);
+    }
+
+    ICEBERG_BUILDER_CHECK(
+        is_deleted,
+        "Cannot add duplicate partition field '{}' for source {} with 
transform {}, "
+        "conflicts with {}",
+        name, source_id, transform_str, existing->ToString());
+  }
+
+  // Check if already being added
+  auto added_it = transform_to_added_field_.find(validation_key);
+  ICEBERG_BUILDER_CHECK(
+      added_it == transform_to_added_field_.end(),
+      "Cannot add duplicate partition field '{}' for source {} with transform 
{}, "
+      "already added: {}",
+      name, source_id, transform_str, added_it->second);
+
+  // Create or recycle the partition field
+  PartitionField new_field = RecycleOrCreatePartitionField(source_id, 
transform, name);
+
+  // Generate name if not provided
+  std::string field_name;
+  if (!name.empty()) {
+    field_name = name;
+  } else {
+    ICEBERG_BUILDER_ASSIGN_OR_RETURN(field_name,
+                                     GeneratePartitionName(source_id, 
transform));
+  }
+
+  // Create the final field with the name
+  new_field = PartitionField(new_field.source_id(), new_field.field_id(), 
field_name,
+                             new_field.transform());
+
+  // Check for redundant time-based partitions
+  CheckForRedundantAddedPartitions(new_field);
+  transform_to_added_field_.emplace(validation_key, field_name);
+
+  // Handle name conflicts with existing fields
+  auto existing_name_it = name_to_field_.find(field_name);
+  if (existing_name_it != name_to_field_.end()) {
+    const auto& existing_field = existing_name_it->second;
+    const bool existing_is_deleted = 
deletes_.contains(existing_field->field_id());
+    std::string renamed =
+        std::format("{}_{}", existing_field->name(), 
existing_field->field_id());
+    if (!existing_is_deleted) {
+      if (IsVoidTransform(*existing_field)) {
+        // Rename the old deleted field
+        RenameField(existing_field->name(), std::move(renamed));
+      } else {
+        return AddError(
+            InvalidArgument("Cannot add duplicate partition field name: {}", 
field_name));
+      }
+    } else {
+      // Field is being deleted, rename it to avoid conflict
+      renames_.emplace(existing_field->name(), std::move(renamed));
+    }
+  }
+
+  adds_.push_back(std::move(new_field));
+  added_field_names_.emplace(field_name);
+
+  return *this;
+}
+
+UpdatePartitionSpec& UpdatePartitionSpec::RewriteDeleteAndAddField(
+    const PartitionField& existing, std::string_view name) {
+  deletes_.erase(existing.field_id());
+  if (name.empty() || std::string(existing.name()) == name) {
+    return *this;
+  }
+  return RenameField(std::string(existing.name()), std::string{name});
+}
+
+UpdatePartitionSpec& UpdatePartitionSpec::RemoveField(std::string_view name) {
+  // Cannot delete newly added fields
+  ICEBERG_BUILDER_CHECK(!added_field_names_.contains(name),
+                        "Cannot delete newly added field: {}", name);
+
+  // Cannot rename and delete
+  ICEBERG_BUILDER_CHECK(!renames_.contains(name),
+                        "Cannot rename and delete partition field: {}", name);
+
+  auto field_it = name_to_field_.find(name);
+  ICEBERG_BUILDER_CHECK(field_it != name_to_field_.end(),
+                        "Cannot find partition field to remove: {}", name);
+
+  deletes_.insert(field_it->second->field_id());
+  return *this;
+}
+
+UpdatePartitionSpec& UpdatePartitionSpec::RemoveField(const 
std::shared_ptr<Term>& term) {
+  ICEBERG_BUILDER_CHECK(term->is_unbound(),
+                        "Cannot remove bound term from partition spec");
+  // Bind the term to get the source id
+  if (term->kind() == Term::Kind::kReference) {
+    const auto& ref = dynamic_cast<const NamedReference&>(*term);
+    ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto bound_ref, ref.Bind(*schema_, 
case_sensitive_));
+    int32_t source_id = bound_ref->field().field_id();
+    // Reference terms use identity transform
+    TransformKey key{source_id, Transform::Identity()->ToString()};
+    return RemoveFieldByTransform(key, term->ToString());
+  } else if (term->kind() == Term::Kind::kTransform) {
+    const auto& unbound_transform = dynamic_cast<const 
UnboundTransform&>(*term);
+    ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto bound_transform,
+                                     unbound_transform.Bind(*schema_, 
case_sensitive_));
+    int32_t source_id = bound_transform->reference()->field().field_id();
+    auto transform = bound_transform->transform();
+    TransformKey key{source_id, transform->ToString()};
+    return RemoveFieldByTransform(key, term->ToString());
+  }
+
+  return AddError(
+      InvalidArgument("Cannot remove {} term from partition spec", 
term->ToString()));
+}
+
+UpdatePartitionSpec& UpdatePartitionSpec::RemoveFieldByTransform(
+    const TransformKey& key, std::string_view term_str) {
+  // Cannot delete newly added fields
+  ICEBERG_BUILDER_CHECK(!transform_to_added_field_.contains(key),
+                        "Cannot delete newly added field: {}", term_str);
+
+  auto field_it = transform_to_field_.find(key);
+  ICEBERG_BUILDER_CHECK(field_it != transform_to_field_.end(),
+                        "Cannot find partition field to remove: {}", term_str);
+
+  const auto& field = field_it->second;
+  // Cannot rename and delete
+  ICEBERG_BUILDER_CHECK(!renames_.contains(std::string(field->name())),
+                        "Cannot rename and delete partition field: {}", 
field->name());
+
+  deletes_.insert(field->field_id());
+  return *this;
+}
+
+UpdatePartitionSpec& UpdatePartitionSpec::RenameField(std::string_view name,
+                                                      std::string new_name) {
+  // Handle existing void field with the new name
+  auto existing_it = name_to_field_.find(new_name);
+  if (existing_it != name_to_field_.end() && 
IsVoidTransform(*existing_it->second)) {
+    std::string renamed = std::format("{}_{}", existing_it->second->name(),
+                                      existing_it->second->field_id());
+    RenameField(existing_it->second->name(), std::move(renamed));
+  }
+
+  // Cannot rename newly added fields
+  ICEBERG_BUILDER_CHECK(!added_field_names_.contains(name),
+                        "Cannot rename newly added partition field: {}", name);
+
+  auto field_it = name_to_field_.find(name);
+  ICEBERG_BUILDER_CHECK(field_it != name_to_field_.end(),
+                        "Cannot find partition field to rename: {}", name);
+
+  // Cannot delete and rename
+  ICEBERG_BUILDER_CHECK(!deletes_.contains(field_it->second->field_id()),
+                        "Cannot delete and rename partition field: {}", name);
+
+  renames_.emplace(name, std::move(new_name));
+  return *this;
+}
+
+Result<UpdatePartitionSpec::ApplyResult> UpdatePartitionSpec::Apply() {
+  ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+
+  std::vector<PartitionField> new_fields;
+  new_fields.reserve(spec_->fields().size() + adds_.size());
+
+  // Process existing fields
+  for (const auto& field : spec_->fields()) {
+    if (!deletes_.contains(field.field_id())) {
+      // Field is kept, check for rename
+      auto rename_it = renames_.find(field.name());
+      if (rename_it != renames_.end()) {
+        new_fields.emplace_back(field.source_id(), field.field_id(), 
rename_it->second,
+                                field.transform());
+      } else {
+        new_fields.push_back(field);
+      }
+    } else if (format_version_ < 2) {
+      // field IDs were not required for v1 and were assigned sequentially in 
each
+      // partition spec starting at 1,000.
+      // To maintain consistent field ids across partition specs in v1 tables, 
any
+      // partition field that is removed must be replaced with a null 
transform. null
+      // values are always allowed in partition data.
+      auto rename_it = renames_.find(field.name());
+      std::string field_name =
+          rename_it != renames_.end() ? rename_it->second : 
std::string(field.name());
+      new_fields.emplace_back(field.source_id(), field.field_id(), 
std::move(field_name),
+                              Transform::Void());
+    }
+    // In V2, deleted fields are simply removed
+  }
+
+  // Add new fields
+  new_fields.insert(new_fields.end(), adds_.begin(), adds_.end());
+
+  // Use -1 as a placeholder for the spec id, the actual spec id will be 
assigned by
+  // TableMetadataBuilder when the AddPartitionSpec update is applied.
+  ICEBERG_ASSIGN_OR_RAISE(auto new_spec,
+                          PartitionSpec::Make(/*spec_id=*/-1, 
std::move(new_fields)));
+  ICEBERG_RETURN_UNEXPECTED(new_spec->Validate(*schema_, 
/*allow_missing_fields=*/false));
+
+  return ApplyResult{.spec = std::move(new_spec), .set_as_default = 
set_as_default_};
+}
+
+int32_t UpdatePartitionSpec::AssignFieldId() { return 
++last_assigned_partition_id_; }
+
+PartitionField UpdatePartitionSpec::RecycleOrCreatePartitionField(
+    int32_t source_id, std::shared_ptr<Transform> transform, std::string_view 
name) {
+  // In V2+, use pre-built index for O(1) lookup instead of O(n*m) iteration
+  if (format_version_ >= 2 && !historical_fields_.empty()) {
+    auto it = historical_fields_.find(TransformKey{source_id, 
transform->ToString()});
+    if (it != historical_fields_.end()) {
+      const auto& field = it->second;
+      // If target name is specified then consider it too, otherwise not
+      if (name.empty() || field.name() == name) {
+        return field;
+      }
+    }
+  }
+  // No matching field found, create a new one
+  return {source_id, AssignFieldId(), std::string{name}, std::move(transform)};
+}
+
+Result<std::string> UpdatePartitionSpec::GeneratePartitionName(
+    int32_t source_id, const std::shared_ptr<Transform>& transform) const {
+  // Find the source field name
+  ICEBERG_ASSIGN_OR_RAISE(auto field_opt, schema_->FindFieldById(source_id));
+  if (!field_opt.has_value()) {
+    return Invalid("Cannot find source field for partition field: {}", 
source_id);
+  }
+  return transform->GeneratePartitionName(field_opt.value().get().name());
+}
+
+bool UpdatePartitionSpec::IsTimeTransform(const std::shared_ptr<Transform>& 
transform) {
+  switch (transform->transform_type()) {
+    case TransformType::kYear:
+    case TransformType::kMonth:
+    case TransformType::kDay:
+    case TransformType::kHour:
+      return true;
+    default:
+      return false;
+  }
+}
+
+bool UpdatePartitionSpec::IsVoidTransform(const PartitionField& field) {
+  return field.transform()->transform_type() == TransformType::kVoid;
+}
+
+void UpdatePartitionSpec::CheckForRedundantAddedPartitions(const 
PartitionField& field) {
+  if (IsTimeTransform(field.transform())) {
+    if (added_time_fields_.contains(field.source_id())) {
+      AddError(ErrorKind::kInvalidArgument,
+               "Cannot add redundant partition field: {} conflicts with {}",
+               field.ToString(), added_time_fields_.at(field.source_id()));
+      return;
+    }
+    added_time_fields_.emplace(field.source_id(), field.ToString());
+  }
+}
+
+std::unordered_map<std::string, const PartitionField*, StringHash, StringEqual>
+UpdatePartitionSpec::IndexSpecByName(const PartitionSpec& spec) {
+  std::unordered_map<std::string, const PartitionField*, StringHash, 
StringEqual> index;
+  for (const auto& field : spec.fields()) {
+    index.emplace(field.name(), &field);
+  }
+  return index;
+}
+
+std::unordered_map<UpdatePartitionSpec::TransformKey, const PartitionField*,
+                   UpdatePartitionSpec::TransformKeyHash>
+UpdatePartitionSpec::IndexSpecByTransform(const PartitionSpec& spec) {
+  std::unordered_map<TransformKey, const PartitionField*, TransformKeyHash> 
index;
+  index.reserve(spec.fields().size());
+  for (const auto& field : spec.fields()) {
+    TransformKey key{field.source_id(), field.transform()->ToString()};
+    index.emplace(key, &field);
+  }
+  return index;
+}
+
+void UpdatePartitionSpec::BuildHistoricalFieldsIndex() {
+  const TableMetadata& base_metadata = transaction_->current();
+
+  // Count total fields across all specs to reserve capacity
+  size_t total_fields = 0;
+  for (const auto& partition_spec : base_metadata.partition_specs) {
+    total_fields += partition_spec->fields().size();
+  }
+  historical_fields_.reserve(total_fields);
+
+  // Index all fields from all historical partition specs
+  // Later specs override earlier ones for the same (source_id, transform) key
+  for (const auto& partition_spec : base_metadata.partition_specs) {
+    for (const auto& field : partition_spec->fields()) {
+      TransformKey key{field.source_id(), field.transform()->ToString()};
+      historical_fields_.emplace(key, field);
+    }
+  }
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/update/update_partition_spec.h 
b/src/iceberg/update/update_partition_spec.h
new file mode 100644
index 00000000..1eab425d
--- /dev/null
+++ b/src/iceberg/update/update_partition_spec.h
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/update/update_partition_spec.h
+/// API for partition spec evolution.
+
+#include <functional>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/update/pending_update.h"
+#include "iceberg/util/string_util.h"
+
+namespace iceberg {
+
+/// \brief API for partition spec evolution.
+///
+/// When committing, these changes will be applied to the current table 
metadata.
+/// Commit conflicts will not be resolved and will result in a CommitFailed 
error.
+class ICEBERG_EXPORT UpdatePartitionSpec : public PendingUpdate {
+ public:
+  static Result<std::shared_ptr<UpdatePartitionSpec>> Make(
+      std::shared_ptr<Transaction> transaction);
+
+  ~UpdatePartitionSpec() override;
+
+  /// \brief Set whether column resolution in the source schema should be case 
sensitive.
+  UpdatePartitionSpec& CaseSensitive(bool is_case_sensitive);
+
+  /// \brief Add a new partition field from a source column.
+  ///
+  /// The partition field will use identity transform on the source column,
+  /// and use source column name as the partition field name.
+  ///
+  /// \param source_name Source column name in the table schema.
+  /// \return Reference to this for method chaining.
+  UpdatePartitionSpec& AddField(std::string_view source_name);
+
+  /// \brief Add a new partition field with a custom name.
+  ///
+  /// \param term The term representing the source column, should be unbound.
+  /// \param part_name Name for the partition field.
+  /// \return Reference to this for method chaining.
+  UpdatePartitionSpec& AddField(const std::shared_ptr<Term>& term,
+                                std::string_view part_name = "");
+
+  /// \brief Remove a partition field by name.
+  ///
+  /// \param name Name of the partition field to remove.
+  /// \return Reference to this for method chaining.
+  UpdatePartitionSpec& RemoveField(std::string_view name);
+
+  /// \brief Remove a partition field by its source term.
+  ///
+  /// The partition field with the same transform and source reference will be 
removed.
+  /// If the term is a reference and does not have a transform, identity 
transform
+  /// is used.
+  ///
+  /// \param term The term representing the source column, should be unbound.
+  /// \return Reference to this for method chaining.
+  UpdatePartitionSpec& RemoveField(const std::shared_ptr<Term>& term);
+
+  /// \brief Rename a field in the partition spec.
+  ///
+  /// \param name Name of the partition field to rename.
+  /// \param new_name Replacement name for the partition field.
+  /// \return Reference to this for method chaining.
+  UpdatePartitionSpec& RenameField(std::string_view name, std::string 
new_name);
+
+  /// \brief Sets that the new partition spec will NOT be set as the default.
+  ///
+  /// The default behavior is to set the new spec as the default partition 
spec.
+  ///
+  /// \return Reference to this for method chaining.
+  UpdatePartitionSpec& AddNonDefaultSpec();
+
+  Kind kind() const final { return Kind::kUpdatePartitionSpec; }
+
+  struct ApplyResult {
+    std::shared_ptr<PartitionSpec> spec;
+    bool set_as_default;
+  };
+  Result<ApplyResult> Apply();
+
+ private:
+  explicit UpdatePartitionSpec(std::shared_ptr<Transaction> transaction);
+
+  /// \brief Pair of source ID and transform string for indexing.
+  using TransformKey = std::pair<int32_t, std::string>;
+
+  /// \brief Hash function for TransformKey.
+  struct TransformKeyHash {
+    size_t operator()(const TransformKey& key) const {
+      return 31 * std::hash<int32_t>{}(key.first) + 
std::hash<std::string>{}(key.second);
+    }
+  };
+
+  /// \brief Assign a new partition field ID.
+  int32_t AssignFieldId();
+
+  /// \brief Recycle or create a partition field.
+  ///
+  /// In V2 it searches for a similar partition field in historical partition 
specs. Tries
+  /// to match on source field ID, transform type and target name (optional). 
If not found
+  /// or in V1 cases it creates a new PartitionField.
+  ///
+  /// \param source_id The source field ID.
+  /// \param transform The transform function.
+  /// \param name The target partition field name, if specified.
+  /// \return The recycled or newly created partition field.
+  PartitionField RecycleOrCreatePartitionField(int32_t source_id,
+                                               std::shared_ptr<Transform> 
transform,
+                                               std::string_view name);
+
+  /// \brief Internal implementation of AddField with resolved source ID and 
transform.
+  UpdatePartitionSpec& AddFieldInternal(std::string_view name, int32_t 
source_id,
+                                        const std::shared_ptr<Transform>& 
transform);
+
+  /// \brief Generate a partition field name from the source and transform.
+  Result<std::string> GeneratePartitionName(
+      int32_t source_id, const std::shared_ptr<Transform>& transform) const;
+
+  /// \brief Check if a transform is a time-based transform.
+  static bool IsTimeTransform(const std::shared_ptr<Transform>& transform);
+
+  /// \brief Check if a partition field uses void transform.
+  static bool IsVoidTransform(const PartitionField& field);
+
+  /// \brief Check for redundant time-based partition fields.
+  void CheckForRedundantAddedPartitions(const PartitionField& field);
+
+  /// \brief Handle rewriting a delete-and-add operation for the same field.
+  UpdatePartitionSpec& RewriteDeleteAndAddField(const PartitionField& existing,
+                                                std::string_view name);
+
+  /// \brief Internal helper to remove a field by transform key.
+  UpdatePartitionSpec& RemoveFieldByTransform(const TransformKey& key,
+                                              std::string_view term_str);
+
+  /// \brief Index the spec fields by name.
+  static std::unordered_map<std::string, const PartitionField*, StringHash, 
StringEqual>
+  IndexSpecByName(const PartitionSpec& spec);
+
+  /// \brief Index the spec fields by (source_id, transform) pair.
+  static std::unordered_map<TransformKey, const PartitionField*, 
TransformKeyHash>
+  IndexSpecByTransform(const PartitionSpec& spec);
+
+  /// \brief Build index of historical partition fields for efficient 
recycling (V2+).
+  void BuildHistoricalFieldsIndex();
+
+  // Configuration
+  int32_t format_version_;
+  std::shared_ptr<PartitionSpec> spec_;
+  std::shared_ptr<Schema> schema_;
+  bool case_sensitive_{true};
+  bool set_as_default_{true};
+  int32_t last_assigned_partition_id_;
+
+  // Indexes for existing fields
+  std::unordered_map<std::string, const PartitionField*, StringHash, 
StringEqual>
+      name_to_field_;
+  std::unordered_map<TransformKey, const PartitionField*, TransformKeyHash>
+      transform_to_field_;
+
+  // Index for historical partition fields (V2+ only) for efficient recycling.
+  // Maps (source_id, transform_string) -> PartitionField from all historical 
specs.
+  std::unordered_map<TransformKey, PartitionField, TransformKeyHash> 
historical_fields_;
+
+  // Pending changes
+  std::vector<PartitionField> adds_;
+  std::unordered_set<std::string, StringHash, StringEqual> added_field_names_;
+  std::unordered_map<int32_t, std::string> added_time_fields_;
+  std::unordered_map<TransformKey, std::string, TransformKeyHash>
+      transform_to_added_field_;
+  std::unordered_set<int32_t> deletes_;
+  std::unordered_map<std::string, std::string, StringHash, StringEqual> 
renames_;
+};
+
+}  // namespace iceberg
diff --git a/src/iceberg/util/macros.h b/src/iceberg/util/macros.h
index c6919ab7..9c80e585 100644
--- a/src/iceberg/util/macros.h
+++ b/src/iceberg/util/macros.h
@@ -54,11 +54,11 @@
   } while (0)
 
 // Macro for state checks, usually used for unexpected states
-#define ICEBERG_CHECK(expr, ...)   \
-  do {                             \
-    if (!(expr)) [[unlikely]] {    \
-      return Invalid(__VA_ARGS__); \
-    }                              \
+#define ICEBERG_CHECK(expr, ...)            \
+  do {                                      \
+    if (!(expr)) [[unlikely]] {             \
+      return ValidationFailed(__VA_ARGS__); \
+    }                                       \
   } while (0)
 
 #define ERROR_TO_EXCEPTION(error)                             \
diff --git a/src/iceberg/util/string_util.h b/src/iceberg/util/string_util.h
index de778036..8aa209c9 100644
--- a/src/iceberg/util/string_util.h
+++ b/src/iceberg/util/string_util.h
@@ -69,4 +69,14 @@ struct ICEBERG_EXPORT StringHash {
   std::size_t operator()(const std::string& str) const { return 
hash_type{}(str); }
 };
 
+/// \brief Transparent equality function that supports std::string_view as 
lookup key
+struct ICEBERG_EXPORT StringEqual {
+  using is_transparent = void;
+
+  bool operator()(std::string_view lhs, std::string_view rhs) const { return 
lhs == rhs; }
+  bool operator()(const std::string& lhs, const std::string& rhs) const {
+    return lhs == rhs;
+  }
+};
+
 }  // namespace iceberg


Reply via email to