This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 34d5a1dd feat: implement update column (#498)
34d5a1dd is described below

commit 34d5a1ddbedacb0a4d66f330385edea10b9b3360
Author: Guotao Yu <[email protected]>
AuthorDate: Fri Jan 16 20:32:06 2026 +0800

    feat: implement update column (#498)
---
 src/iceberg/test/update_schema_test.cc | 955 +++++++++++++++++++++++++++++++--
 src/iceberg/update/update_schema.cc    | 217 +++++---
 src/iceberg/update/update_schema.h     |  27 +
 src/iceberg/util/type_util.cc          |  41 ++
 src/iceberg/util/type_util.h           |  13 +
 5 files changed, 1117 insertions(+), 136 deletions(-)

diff --git a/src/iceberg/test/update_schema_test.cc 
b/src/iceberg/test/update_schema_test.cc
index ce3c77cc..1c625bf3 100644
--- a/src/iceberg/test/update_schema_test.cc
+++ b/src/iceberg/test/update_schema_test.cc
@@ -41,7 +41,6 @@ TEST_F(UpdateSchemaTest, AddOptionalColumn) {
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
   ASSERT_TRUE(result.schema != nullptr);
 
-  // Verify the new column was added
   ICEBERG_UNWRAP_OR_FAIL(auto new_field_opt, 
result.schema->FindFieldByName("new_col"));
   ASSERT_TRUE(new_field_opt.has_value());
 
@@ -71,7 +70,6 @@ TEST_F(UpdateSchemaTest, 
AddRequiredColumnWithAllowIncompatible) {
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
   ASSERT_TRUE(result.schema != nullptr);
 
-  // Verify the new column was added
   ICEBERG_UNWRAP_OR_FAIL(auto new_field_opt,
                          result.schema->FindFieldByName("required_col"));
   ASSERT_TRUE(new_field_opt.has_value());
@@ -93,7 +91,6 @@ TEST_F(UpdateSchemaTest, AddMultipleColumns) {
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
   ASSERT_TRUE(result.schema != nullptr);
 
-  // Verify all columns were added
   ICEBERG_UNWRAP_OR_FAIL(auto col1_opt, 
result.schema->FindFieldByName("col1"));
   ICEBERG_UNWRAP_OR_FAIL(auto col2_opt, 
result.schema->FindFieldByName("col2"));
   ICEBERG_UNWRAP_OR_FAIL(auto col3_opt, 
result.schema->FindFieldByName("col3"));
@@ -119,7 +116,6 @@ TEST_F(UpdateSchemaTest, AddColumnWithDotInNameFails) {
 
 // Test adding column to nested struct
 TEST_F(UpdateSchemaTest, AddColumnToNestedStruct) {
-  // First, add a struct column to the table
   auto struct_type = std::make_shared<StructType>(std::vector<SchemaField>{
       SchemaField(100, "nested_field", int32(), true, "Nested field")});
 
@@ -127,7 +123,6 @@ TEST_F(UpdateSchemaTest, AddColumnToNestedStruct) {
   setup_update->AddColumn("struct_col", struct_type, "A struct column");
   EXPECT_THAT(setup_update->Commit(), IsOk());
 
-  // Reload table and add column to the nested struct
   ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
   ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
   update->AddColumn("struct_col", "new_nested_field", string(), "New nested 
field");
@@ -135,7 +130,6 @@ TEST_F(UpdateSchemaTest, AddColumnToNestedStruct) {
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
   ASSERT_TRUE(result.schema != nullptr);
 
-  // Verify the nested field was added
   ICEBERG_UNWRAP_OR_FAIL(auto struct_field_opt,
                          result.schema->FindFieldByName("struct_col"));
   ASSERT_TRUE(struct_field_opt.has_value());
@@ -165,12 +159,10 @@ TEST_F(UpdateSchemaTest, 
AddColumnToNonExistentParentFails) {
 
 // Test adding column to non-struct parent fails
 TEST_F(UpdateSchemaTest, AddColumnToNonStructParentFails) {
-  // First, add a primitive column
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AddColumn("primitive_col", int32(), "A primitive column");
   EXPECT_THAT(setup_update->Commit(), IsOk());
 
-  // Try to add column to the primitive column (should fail)
   ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
   ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
   update->AddColumn("primitive_col", "nested_field", string(), "Should fail");
@@ -202,10 +194,8 @@ TEST_F(UpdateSchemaTest, ColumnIdAssignment) {
 
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
 
-  // Verify new last column ID is incremented
   EXPECT_EQ(result.new_last_column_id, original_last_id + 2);
 
-  // Verify new columns have sequential IDs
   ICEBERG_UNWRAP_OR_FAIL(auto col1_opt, 
result.schema->FindFieldByName("new_col1"));
   ICEBERG_UNWRAP_OR_FAIL(auto col2_opt, 
result.schema->FindFieldByName("new_col2"));
 
@@ -228,7 +218,6 @@ TEST_F(UpdateSchemaTest, AddNestedStructColumn) {
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
   ASSERT_TRUE(result.schema != nullptr);
 
-  // Verify the struct column was added
   ICEBERG_UNWRAP_OR_FAIL(auto struct_field_opt,
                          result.schema->FindFieldByName("complex_struct"));
   ASSERT_TRUE(struct_field_opt.has_value());
@@ -238,7 +227,6 @@ TEST_F(UpdateSchemaTest, AddNestedStructColumn) {
   EXPECT_TRUE(struct_field.type()->is_nested());
   EXPECT_TRUE(struct_field.optional());
 
-  // Verify nested fields exist
   const auto& nested_type = static_cast<const 
StructType&>(*struct_field.type());
   EXPECT_EQ(nested_type.fields().size(), 2);
 
@@ -264,7 +252,6 @@ TEST_F(UpdateSchemaTest, CaseSensitiveColumnNames) {
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
   ASSERT_TRUE(result.schema != nullptr);
 
-  // Both columns should exist with case-sensitive search
   ICEBERG_UNWRAP_OR_FAIL(auto upper_opt, 
result.schema->FindFieldByName("Column", true));
   ICEBERG_UNWRAP_OR_FAIL(auto lower_opt, 
result.schema->FindFieldByName("column", true));
 
@@ -294,7 +281,6 @@ TEST_F(UpdateSchemaTest, EmptyUpdate) {
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
 
-  // Schema should be unchanged
   EXPECT_EQ(*result.schema, *original_schema);
   EXPECT_EQ(result.new_last_column_id, table_->metadata()->last_column_id);
 }
@@ -306,7 +292,6 @@ TEST_F(UpdateSchemaTest, CommitSuccess) {
 
   EXPECT_THAT(update->Commit(), IsOk());
 
-  // Reload table and verify column exists
   ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
   ICEBERG_UNWRAP_OR_FAIL(auto schema, reloaded->schema());
 
@@ -322,7 +307,6 @@ TEST_F(UpdateSchemaTest, CommitSuccess) {
 
 // Test adding fields to map value and list element structs
 TEST_F(UpdateSchemaTest, AddFieldsToMapAndList) {
-  // Create a schema with map and list of structs
   auto map_key_struct = std::make_shared<StructType>(
       std::vector<SchemaField>{SchemaField(20, "address", string(), false),
                                SchemaField(21, "city", string(), false)});
@@ -346,16 +330,13 @@ TEST_F(UpdateSchemaTest, AddFieldsToMapAndList) {
       .AddColumn("points", list_type, "2-D cartesian points");
   EXPECT_THAT(setup_update->Commit(), IsOk());
 
-  // Reload and add fields to nested structs
   ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
   ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
-  update
-      ->AddColumn("locations", "alt", float32(), "altitude")  // add to map 
value
-      .AddColumn("points", "z", int64(), "z coordinate");     // add to list 
element
+  update->AddColumn("locations", "alt", float32(), "altitude")
+      .AddColumn("points", "z", int64(), "z coordinate");
 
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
 
-  // Verify map value has new field
   ICEBERG_UNWRAP_OR_FAIL(auto locations_opt, 
result.schema->FindFieldByName("locations"));
   ASSERT_TRUE(locations_opt.has_value());
   const auto& locations_field = locations_opt->get();
@@ -367,7 +348,6 @@ TEST_F(UpdateSchemaTest, AddFieldsToMapAndList) {
   ASSERT_TRUE(alt_opt.has_value());
   EXPECT_EQ(alt_opt->get().type(), float32());
 
-  // Verify list element has new field
   ICEBERG_UNWRAP_OR_FAIL(auto points_opt, 
result.schema->FindFieldByName("points"));
   ASSERT_TRUE(points_opt.has_value());
   const auto& points_field = points_opt->get();
@@ -382,17 +362,14 @@ TEST_F(UpdateSchemaTest, AddFieldsToMapAndList) {
 
 // Test adding nested struct with ID reassignment
 TEST_F(UpdateSchemaTest, AddNestedStructWithIdReassignment) {
-  // Create a struct with conflicting IDs (will be reassigned)
   auto nested_struct = std::make_shared<StructType>(std::vector<SchemaField>{
-      SchemaField(1, "lat", int32(), false),    // ID 1 conflicts with 
existing schema
-      SchemaField(2, "long", int32(), true)});  // ID 2 conflicts with 
existing schema
+      SchemaField(1, "lat", int32(), false), SchemaField(2, "long", int32(), 
true)});
 
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AddColumn("location", nested_struct);
 
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
 
-  // Verify the struct was added with reassigned IDs
   ICEBERG_UNWRAP_OR_FAIL(auto location_opt, 
result.schema->FindFieldByName("location"));
   ASSERT_TRUE(location_opt.has_value());
 
@@ -402,14 +379,12 @@ TEST_F(UpdateSchemaTest, 
AddNestedStructWithIdReassignment) {
   const auto& struct_type = static_cast<const 
StructType&>(*location_field.type());
   ASSERT_EQ(struct_type.fields().size(), 2);
 
-  // IDs should be reassigned to avoid conflicts
   ICEBERG_UNWRAP_OR_FAIL(auto lat_opt, struct_type.GetFieldByName("lat"));
   ICEBERG_UNWRAP_OR_FAIL(auto long_opt, struct_type.GetFieldByName("long"));
 
   ASSERT_TRUE(lat_opt.has_value());
   ASSERT_TRUE(long_opt.has_value());
 
-  // IDs should be > 1 (reassigned)
   EXPECT_GT(lat_opt->get().field_id(), 1);
   EXPECT_GT(long_opt->get().field_id(), 1);
 }
@@ -432,7 +407,6 @@ TEST_F(UpdateSchemaTest, AddNestedMapOfStructs) {
 
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
 
-  // Verify the map was added with reassigned IDs
   ICEBERG_UNWRAP_OR_FAIL(auto locations_opt, 
result.schema->FindFieldByName("locations"));
   ASSERT_TRUE(locations_opt.has_value());
 
@@ -441,11 +415,9 @@ TEST_F(UpdateSchemaTest, AddNestedMapOfStructs) {
 
   const auto& map = static_cast<const MapType&>(*locations_field.type());
 
-  // Verify key struct fields
   const auto& key_struct_type = static_cast<const 
StructType&>(*map.key().type());
   EXPECT_EQ(key_struct_type.fields().size(), 4);
 
-  // Verify value struct fields
   const auto& value_struct_type = static_cast<const 
StructType&>(*map.value().type());
   EXPECT_EQ(value_struct_type.fields().size(), 2);
 
@@ -469,7 +441,6 @@ TEST_F(UpdateSchemaTest, AddNestedListOfStructs) {
 
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
 
-  // Verify the list was added with reassigned IDs
   ICEBERG_UNWRAP_OR_FAIL(auto locations_opt, 
result.schema->FindFieldByName("locations"));
   ASSERT_TRUE(locations_opt.has_value());
 
@@ -491,7 +462,6 @@ TEST_F(UpdateSchemaTest, AddNestedListOfStructs) {
 
 // Test adding field with dots in name to nested struct
 TEST_F(UpdateSchemaTest, AddFieldWithDotsInName) {
-  // First add a struct column
   auto struct_type = std::make_shared<StructType>(
       std::vector<SchemaField>{SchemaField(100, "field1", int32(), true)});
 
@@ -499,14 +469,12 @@ TEST_F(UpdateSchemaTest, AddFieldWithDotsInName) {
   setup_update->AddColumn("struct_col", struct_type);
   EXPECT_THAT(setup_update->Commit(), IsOk());
 
-  // Add a field with dots in its name to the nested struct
   ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
   ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
   update->AddColumn("struct_col", "field.with.dots", int64(), "Field with dots 
in name");
 
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
 
-  // Verify the field with dots was added
   ICEBERG_UNWRAP_OR_FAIL(auto struct_field_opt,
                          result.schema->FindFieldByName("struct_col"));
   ASSERT_TRUE(struct_field_opt.has_value());
@@ -523,7 +491,6 @@ TEST_F(UpdateSchemaTest, AddFieldWithDotsInName) {
 
 // Test adding field to map key should fail
 TEST_F(UpdateSchemaTest, AddFieldToMapKeyFails) {
-  // Create a map with struct key
   auto key_struct = std::make_shared<StructType>(
       std::vector<SchemaField>{SchemaField(20, "address", string(), false)});
 
@@ -538,7 +505,6 @@ TEST_F(UpdateSchemaTest, AddFieldToMapKeyFails) {
   setup_update->AddColumn("locations", map_type);
   EXPECT_THAT(setup_update->Commit(), IsOk());
 
-  // Try to add field to map key (should fail)
   ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
   ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
   update->AddColumn("locations.key", "city", string(), "Should fail");
@@ -550,26 +516,22 @@ TEST_F(UpdateSchemaTest, AddFieldToMapKeyFails) {
 
 // Test deleting a column
 TEST_F(UpdateSchemaTest, DeleteColumn) {
-  // First add a column
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AddColumn("to_delete", string(), "A column to delete");
   EXPECT_THAT(setup_update->Commit(), IsOk());
 
-  // Delete the column
   ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
   ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
   update->DeleteColumn("to_delete");
 
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
 
-  // Verify the column was deleted
   ICEBERG_UNWRAP_OR_FAIL(auto field_opt, 
result.schema->FindFieldByName("to_delete"));
   EXPECT_FALSE(field_opt.has_value());
 }
 
 // Test deleting a nested column
 TEST_F(UpdateSchemaTest, DeleteNestedColumn) {
-  // First add a struct with nested fields
   auto struct_type = std::make_shared<StructType>(
       std::vector<SchemaField>{SchemaField(100, "field1", int32(), true),
                                SchemaField(101, "field2", string(), true)});
@@ -578,14 +540,12 @@ TEST_F(UpdateSchemaTest, DeleteNestedColumn) {
   setup_update->AddColumn("struct_col", struct_type);
   EXPECT_THAT(setup_update->Commit(), IsOk());
 
-  // Delete one of the nested fields
   ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
   ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
   update->DeleteColumn("struct_col.field1");
 
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
 
-  // Verify field1 was deleted but field2 still exists
   ICEBERG_UNWRAP_OR_FAIL(auto struct_field_opt,
                          result.schema->FindFieldByName("struct_col"));
   ASSERT_TRUE(struct_field_opt.has_value());
@@ -612,20 +572,17 @@ TEST_F(UpdateSchemaTest, DeleteMissingColumnFails) {
 
 // Test delete then add same column
 TEST_F(UpdateSchemaTest, DeleteThenAdd) {
-  // First add a required column
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AllowIncompatibleChanges().AddRequiredColumn("col", int32(),
                                                              "Required 
column");
   EXPECT_THAT(setup_update->Commit(), IsOk());
 
-  // Delete then add with different type and optional
   ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
   ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
   update->DeleteColumn("col").AddColumn("col", string(), "Now optional 
string");
 
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
 
-  // Verify the column was re-added with new properties
   ICEBERG_UNWRAP_OR_FAIL(auto field_opt, 
result.schema->FindFieldByName("col"));
   ASSERT_TRUE(field_opt.has_value());
 
@@ -636,7 +593,6 @@ TEST_F(UpdateSchemaTest, DeleteThenAdd) {
 
 // Test delete then add nested field
 TEST_F(UpdateSchemaTest, DeleteThenAddNested) {
-  // First add a struct with a field
   auto struct_type = std::make_shared<StructType>(
       std::vector<SchemaField>{SchemaField(100, "field1", boolean(), false)});
 
@@ -644,7 +600,6 @@ TEST_F(UpdateSchemaTest, DeleteThenAddNested) {
   setup_update->AddColumn("struct_col", struct_type);
   EXPECT_THAT(setup_update->Commit(), IsOk());
 
-  // Delete then re-add the nested field with different type
   ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
   ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
   update->DeleteColumn("struct_col.field1")
@@ -652,7 +607,6 @@ TEST_F(UpdateSchemaTest, DeleteThenAddNested) {
 
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
 
-  // Verify the field was re-added
   ICEBERG_UNWRAP_OR_FAIL(auto struct_field_opt,
                          result.schema->FindFieldByName("struct_col"));
   ASSERT_TRUE(struct_field_opt.has_value());
@@ -667,7 +621,6 @@ TEST_F(UpdateSchemaTest, DeleteThenAddNested) {
 
 // Test add-delete conflict
 TEST_F(UpdateSchemaTest, AddDeleteConflict) {
-  // Try to delete a newly added column (should fail - column doesn't exist in 
schema)
   ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
   update->AddColumn("new_col", int32()).DeleteColumn("new_col");
 
@@ -678,7 +631,6 @@ TEST_F(UpdateSchemaTest, AddDeleteConflict) {
 
 // Test delete column that has additions fails
 TEST_F(UpdateSchemaTest, DeleteColumnWithAdditionsFails) {
-  // First add a struct
   auto struct_type = std::make_shared<StructType>(
       std::vector<SchemaField>{SchemaField(100, "field1", int32(), true)});
 
@@ -686,7 +638,6 @@ TEST_F(UpdateSchemaTest, DeleteColumnWithAdditionsFails) {
   setup_update->AddColumn("struct_col", struct_type);
   EXPECT_THAT(setup_update->Commit(), IsOk());
 
-  // Try to add a field to the struct and delete it in the same update
   ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
   ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
   update->AddColumn("struct_col", "field2", 
string()).DeleteColumn("struct_col");
@@ -698,7 +649,6 @@ TEST_F(UpdateSchemaTest, DeleteColumnWithAdditionsFails) {
 
 // Test delete map key fails
 TEST_F(UpdateSchemaTest, DeleteMapKeyFails) {
-  // Create a map
   auto map_type = std::make_shared<MapType>(SchemaField(10, "key", string(), 
false),
                                             SchemaField(11, "value", int32(), 
true));
 
@@ -706,7 +656,6 @@ TEST_F(UpdateSchemaTest, DeleteMapKeyFails) {
   setup_update->AddColumn("map_col", map_type);
   EXPECT_THAT(setup_update->Commit(), IsOk());
 
-  // Try to delete the map key (should fail in Apply)
   ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
   ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
   update->DeleteColumn("map_col.key");
@@ -718,22 +667,914 @@ TEST_F(UpdateSchemaTest, DeleteMapKeyFails) {
 
 // Test case insensitive delete
 TEST_F(UpdateSchemaTest, DeleteColumnCaseInsensitive) {
-  // First add a column
   ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
   setup_update->AddColumn("MyColumn", string(), "A column with mixed case");
   EXPECT_THAT(setup_update->Commit(), IsOk());
 
-  // Delete using different case
   ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
   ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
   update->CaseSensitive(false).DeleteColumn("mycolumn");
 
   ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
 
-  // Verify the column was deleted
   ICEBERG_UNWRAP_OR_FAIL(auto field_opt,
                          result.schema->FindFieldByName("MyColumn", false));
   EXPECT_FALSE(field_opt.has_value());
 }
 
+// Test renaming a column
+TEST_F(UpdateSchemaTest, RenameColumn) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
+  setup_update->AddColumn("old_name", string(), "A column to rename");
+  EXPECT_THAT(setup_update->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->RenameColumn("old_name", "new_name");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto old_field_opt, 
result.schema->FindFieldByName("old_name"));
+  ICEBERG_UNWRAP_OR_FAIL(auto new_field_opt, 
result.schema->FindFieldByName("new_name"));
+
+  EXPECT_FALSE(old_field_opt.has_value());
+  ASSERT_TRUE(new_field_opt.has_value());
+  EXPECT_EQ(*new_field_opt->get().type(), *string());
+}
+
+// Test renaming nested column
+TEST_F(UpdateSchemaTest, RenameNestedColumn) {
+  auto struct_type = std::make_shared<StructType>(
+      std::vector<SchemaField>{SchemaField(100, "field1", int32(), true),
+                               SchemaField(101, "field2", string(), true)});
+
+  ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
+  setup_update->AddColumn("struct_col", struct_type);
+  EXPECT_THAT(setup_update->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->RenameColumn("struct_col.field1", "renamed_field");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto struct_field_opt,
+                         result.schema->FindFieldByName("struct_col"));
+  ASSERT_TRUE(struct_field_opt.has_value());
+
+  const auto& struct_field = struct_field_opt->get();
+  const auto& nested_struct = static_cast<const 
StructType&>(*struct_field.type());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto field1_opt, 
nested_struct.GetFieldByName("field1"));
+  ICEBERG_UNWRAP_OR_FAIL(auto renamed_opt, 
nested_struct.GetFieldByName("renamed_field"));
+
+  EXPECT_FALSE(field1_opt.has_value());
+  ASSERT_TRUE(renamed_opt.has_value());
+  EXPECT_EQ(*renamed_opt->get().type(), *int32());
+}
+
+// Test renaming column with dots in new name
+TEST_F(UpdateSchemaTest, RenameColumnWithDotsInName) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
+  setup_update->AddColumn("simple_name", string());
+  EXPECT_THAT(setup_update->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->RenameColumn("simple_name", "name.with.dots");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto new_field_opt,
+                         result.schema->FindFieldByName("name.with.dots"));
+  ASSERT_TRUE(new_field_opt.has_value());
+  EXPECT_EQ(*new_field_opt->get().type(), *string());
+}
+
+// Test rename missing column fails
+TEST_F(UpdateSchemaTest, RenameMissingColumnFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->RenameColumn("non_existent", "new_name");
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot rename missing column"));
+}
+
+// Test rename column that will be deleted fails
+TEST_F(UpdateSchemaTest, RenameDeletedColumnFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
+  setup_update->AddColumn("col", string());
+  EXPECT_THAT(setup_update->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->DeleteColumn("col").RenameColumn("col", "new_name");
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot rename a column that will be 
deleted"));
+}
+
+// Test case insensitive rename
+TEST_F(UpdateSchemaTest, RenameColumnCaseInsensitive) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
+  setup_update->AddColumn("MyColumn", string(), "A column with mixed case");
+  EXPECT_THAT(setup_update->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->CaseSensitive(false).RenameColumn("mycolumn", "NewName");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto old_field_opt,
+                         result.schema->FindFieldByName("MyColumn", false));
+  ICEBERG_UNWRAP_OR_FAIL(auto new_field_opt,
+                         result.schema->FindFieldByName("NewName", false));
+
+  EXPECT_FALSE(old_field_opt.has_value());
+  ASSERT_TRUE(new_field_opt.has_value());
+}
+
+// Test rename then delete with old name fails
+TEST_F(UpdateSchemaTest, RenameThenDeleteOldNameFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
+  setup_update->AddColumn("old_name", string());
+  EXPECT_THAT(setup_update->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->RenameColumn("old_name", "new_name").DeleteColumn("old_name");
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot delete a column that has 
updates"));
+}
+
+// Test rename then delete with new name fails
+TEST_F(UpdateSchemaTest, RenameThenDeleteNewNameFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
+  setup_update->AddColumn("old_name", string());
+  EXPECT_THAT(setup_update->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->RenameColumn("old_name", "new_name").DeleteColumn("new_name");
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot delete missing column"));
+}
+
+// Test rename then add with old name
+TEST_F(UpdateSchemaTest, RenameThenAddWithOldName) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
+  setup_update->AddColumn("old_name", int32());
+  EXPECT_THAT(setup_update->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->RenameColumn("old_name", "new_name").AddColumn("old_name", string());
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot add column, name already 
exists"));
+}
+
+// Test add then rename - should fail because RenameColumn only works on 
existing fields
+TEST_F(UpdateSchemaTest, AddThenRename) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->AddColumn("temp_name", string()).RenameColumn("temp_name", 
"final_name");
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot rename missing column"));
+}
+
+// Test delete then add then rename - should fail
+TEST_F(UpdateSchemaTest, DeleteThenAddThenRename) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
+  setup_update->AddColumn("col", int32());
+  EXPECT_THAT(setup_update->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->DeleteColumn("col")
+      .AddColumn("col", string(), "New column with same name")
+      .RenameColumn("col", "renamed_col");
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot rename a column that will be 
deleted"));
+}
+
+// Test making a column optional
+TEST_F(UpdateSchemaTest, MakeColumnOptional) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
+  setup_update->AllowIncompatibleChanges().AddRequiredColumn("id", int32());
+  EXPECT_THAT(setup_update->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->MakeColumnOptional("id");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("id"));
+  ASSERT_TRUE(field_opt.has_value());
+  EXPECT_TRUE(field_opt->get().optional());
+}
+
+// Test requiring a column
+TEST_F(UpdateSchemaTest, RequireColumn) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
+  setup_update->AddColumn("id", int32());
+  EXPECT_THAT(setup_update->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->RequireColumn("id");
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot change column nullability"));
+  EXPECT_THAT(result, HasErrorMessage("optional -> required"));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded2, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update2, reloaded2->NewUpdateSchema());
+  update2->AllowIncompatibleChanges().RequireColumn("id");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result2, update2->Apply());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto field_opt, 
result2.schema->FindFieldByName("id"));
+  ASSERT_TRUE(field_opt.has_value());
+  EXPECT_FALSE(field_opt->get().optional());
+}
+
+// Test requiring an already required column (noop)
+TEST_F(UpdateSchemaTest, RequireColumnNoop) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
+  setup_update->AllowIncompatibleChanges().AddRequiredColumn("id", int32());
+  EXPECT_THAT(setup_update->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->RequireColumn("id");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("id"));
+  ASSERT_TRUE(field_opt.has_value());
+  EXPECT_FALSE(field_opt->get().optional());
+}
+
+// Test making an already optional column optional (noop)
+TEST_F(UpdateSchemaTest, MakeColumnOptionalNoop) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
+  setup_update->AddColumn("id", int32());
+  EXPECT_THAT(setup_update->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->MakeColumnOptional("id");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("id"));
+  ASSERT_TRUE(field_opt.has_value());
+  EXPECT_TRUE(field_opt->get().optional());
+}
+
+// Test case insensitive require column
+TEST_F(UpdateSchemaTest, RequireColumnCaseInsensitive) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
+  setup_update->AddColumn("ID", int32());
+  EXPECT_THAT(setup_update->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->CaseSensitive(false).AllowIncompatibleChanges().RequireColumn("id");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("ID", 
false));
+  ASSERT_TRUE(field_opt.has_value());
+  EXPECT_FALSE(field_opt->get().optional());
+}
+
+// Test make column optional on missing column fails
+TEST_F(UpdateSchemaTest, MakeColumnOptionalMissingFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->MakeColumnOptional("non_existent");
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot update missing column"));
+}
+
+// Test require column on missing column fails
+TEST_F(UpdateSchemaTest, RequireColumnMissingFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->AllowIncompatibleChanges().RequireColumn("non_existent");
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot update missing column"));
+}
+
+// Test make column optional on deleted column fails
+TEST_F(UpdateSchemaTest, MakeColumnOptionalDeletedFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
+  setup_update->AllowIncompatibleChanges().AddRequiredColumn("col", int32());
+  EXPECT_THAT(setup_update->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->DeleteColumn("col").MakeColumnOptional("col");
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot update a column that will be 
deleted"));
+}
+
+// Test require column on deleted column fails
+TEST_F(UpdateSchemaTest, RequireColumnDeletedFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
+  setup_update->AddColumn("col", int32());
+  EXPECT_THAT(setup_update->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->DeleteColumn("col").AllowIncompatibleChanges().RequireColumn("col");
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot update a column that will be 
deleted"));
+}
+
+// Test update column doc (in same transaction as add)
+TEST_F(UpdateSchemaTest, UpdateColumnDoc) {
+  // Add a column and update its doc in the same transaction
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->AddColumn("col", int32(), "original doc");
+  update->UpdateColumnDoc("col", "updated doc");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+
+  // Verify the doc was updated
+  ICEBERG_UNWRAP_OR_FAIL(auto field_opt, 
result.schema->FindFieldByName("col"));
+  ASSERT_TRUE(field_opt.has_value());
+  EXPECT_EQ(field_opt->get().doc(), "updated doc");
+}
+
+// Test update column doc on missing column fails
+TEST_F(UpdateSchemaTest, UpdateColumnDocMissingFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->UpdateColumnDoc("non_existent", "some doc");
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot update missing column"));
+}
+
+// Test update column doc on deleted column fails
+TEST_F(UpdateSchemaTest, UpdateColumnDocDeletedFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
+  setup_update->AddColumn("col", int32(), "original doc");
+  EXPECT_THAT(setup_update->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->DeleteColumn("col").UpdateColumnDoc("col", "new doc");
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot update a column that will be 
deleted"));
+}
+
+// Test update column doc noop (same doc)
+TEST_F(UpdateSchemaTest, UpdateColumnDocNoop) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
+  setup_update->AddColumn("col", int32(), "same doc");
+  EXPECT_THAT(setup_update->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->UpdateColumnDoc("col", "same doc");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto field_opt, 
result.schema->FindFieldByName("col"));
+  ASSERT_TRUE(field_opt.has_value());
+  EXPECT_EQ(field_opt->get().doc(), "same doc");
+}
+
+// Test update column doc with empty string
+TEST_F(UpdateSchemaTest, UpdateColumnDocEmptyString) {
+  // Add a column with a doc and clear it in same transaction
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->AddColumn("col", int32(), "original doc");
+  update->UpdateColumnDoc("col", "");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+
+  // Verify the doc was cleared
+  ICEBERG_UNWRAP_OR_FAIL(auto field_opt, 
result.schema->FindFieldByName("col"));
+  ASSERT_TRUE(field_opt.has_value());
+  EXPECT_EQ(field_opt->get().doc(), "");
+}
+
+// Test update column int to long
+TEST_F(UpdateSchemaTest, UpdateColumnIntToLong) {
+  // Add an int column and update to long in same transaction
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->AddColumn("id", int32(), "An integer ID");
+  update->UpdateColumn("id", int64());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+
+  // Verify the type was updated
+  ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("id"));
+  ASSERT_TRUE(field_opt.has_value());
+  EXPECT_EQ(*field_opt->get().type(), *int64());
+  EXPECT_EQ(field_opt->get().doc(), "An integer ID");  // Doc preserved
+}
+
+// Test update column float to double
+TEST_F(UpdateSchemaTest, UpdateColumnFloatToDouble) {
+  // Add a float column and update to double in same transaction
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->AddColumn("value", float32(), "A float value");
+  update->UpdateColumn("value", float64());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+
+  // Verify the type was updated
+  ICEBERG_UNWRAP_OR_FAIL(auto field_opt, 
result.schema->FindFieldByName("value"));
+  ASSERT_TRUE(field_opt.has_value());
+  EXPECT_EQ(*field_opt->get().type(), *float64());
+}
+
+// Test update column with same type (noop)
+TEST_F(UpdateSchemaTest, UpdateColumnSameType) {
+  // Add a column and update with same type (should be a noop)
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->AddColumn("id", int32());
+  update->UpdateColumn("id", int32());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+
+  // Verify the type is still int32
+  ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("id"));
+  ASSERT_TRUE(field_opt.has_value());
+  EXPECT_EQ(*field_opt->get().type(), *int32());
+}
+
+// Test update column on missing column fails
+TEST_F(UpdateSchemaTest, UpdateColumnMissingFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->UpdateColumn("non_existent", int64());
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot update missing column"));
+}
+
+// Test update column on deleted column fails
+TEST_F(UpdateSchemaTest, UpdateColumnDeletedFails) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
+  setup_update->AddColumn("col", int32());
+  EXPECT_THAT(setup_update->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->DeleteColumn("col").UpdateColumn("col", int64());
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot update a column that will be 
deleted"));
+}
+
+// Test update column with invalid promotion fails (long to int)
+TEST_F(UpdateSchemaTest, UpdateColumnInvalidPromotionFails) {
+  // Add a long column and try to downgrade to int (should fail)
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->AddColumn("id", int64());
+  update->UpdateColumn("id", int32());
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot change column type"));
+}
+
+// Test update column with invalid promotion fails (double to float)
+TEST_F(UpdateSchemaTest, UpdateColumnInvalidPromotionDoubleToFloatFails) {
+  // Add a double column and try to downgrade to float (should fail)
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->AddColumn("value", float64());
+  update->UpdateColumn("value", float32());
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot change column type"));
+}
+
+// Test update column with incompatible types fails (int to string)
+TEST_F(UpdateSchemaTest, UpdateColumnIncompatibleTypesFails) {
+  // Add an int column and try to change to string (should fail)
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->AddColumn("id", int32());
+  update->UpdateColumn("id", string());
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot change column type"));
+}
+
+// Test rename and update column type in same transaction - should fail
+TEST_F(UpdateSchemaTest, RenameAndUpdateColumnInSameTransaction) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->AddColumn("old_name", int32());
+  update->UpdateColumn("old_name", int64());
+  update->RenameColumn("old_name", "new_name");
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot rename missing column"));
+}
+
+// Test decimal precision widening
+TEST_F(UpdateSchemaTest, UpdateColumnDecimalPrecisionWidening) {
+  // Add a decimal column and widen the precision
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  auto decimal_10_2 = decimal(10, 2);
+  auto decimal_20_2 = decimal(20, 2);
+  update->AddColumn("price", decimal_10_2);
+  update->UpdateColumn("price", decimal_20_2);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+
+  // Verify the precision was widened
+  ICEBERG_UNWRAP_OR_FAIL(auto field_opt, 
result.schema->FindFieldByName("price"));
+  ASSERT_TRUE(field_opt.has_value());
+  EXPECT_EQ(*field_opt->get().type(), *decimal_20_2);
+}
+
+// Test decimal with different scale fails
+TEST_F(UpdateSchemaTest, UpdateColumnDecimalDifferentScaleFails) {
+  // Try to change decimal scale (should fail)
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  auto decimal_10_2 = decimal(10, 2);
+  auto decimal_10_3 = decimal(10, 3);
+  update->AddColumn("price", decimal_10_2);
+  update->UpdateColumn("price", decimal_10_3);
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot change column type"));
+}
+
+// Test decimal precision narrowing fails
+TEST_F(UpdateSchemaTest, UpdateColumnDecimalPrecisionNarrowingFails) {
+  // Try to narrow decimal precision (should fail)
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  auto decimal_20_2 = decimal(20, 2);
+  auto decimal_10_2 = decimal(10, 2);
+  update->AddColumn("price", decimal_20_2);
+  update->UpdateColumn("price", decimal_10_2);
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot change column type"));
+}
+
+// Test update type preserves other metadata (doc, optional)
+TEST_F(UpdateSchemaTest, UpdateTypePreservesOtherMetadata) {
+  // Add a column with metadata
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->AddColumn("value", int32(), "A counter value");
+
+  // Update type should preserve doc and optional
+  update->UpdateColumn("value", int64());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+
+  // Verify the type was updated but other metadata preserved
+  ICEBERG_UNWRAP_OR_FAIL(auto field_opt, 
result.schema->FindFieldByName("value"));
+  ASSERT_TRUE(field_opt.has_value());
+
+  const auto& field = field_opt->get();
+  EXPECT_EQ(*field.type(), *int64());         // Type updated
+  EXPECT_EQ(field.doc(), "A counter value");  // Doc preserved
+  EXPECT_TRUE(field.optional());              // Optional preserved
+}
+
+// Test update doc preserves other metadata (type, optional)
+TEST_F(UpdateSchemaTest, UpdateDocPreservesOtherMetadata) {
+  // Add a required column with a type
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->AllowIncompatibleChanges().AddRequiredColumn("id", int64(), "old 
doc");
+
+  // Update doc should preserve type and optional
+  update->UpdateColumnDoc("id", "new doc");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+
+  // Verify the doc was updated but other metadata preserved
+  ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("id"));
+  ASSERT_TRUE(field_opt.has_value());
+
+  const auto& field = field_opt->get();
+  EXPECT_EQ(field.doc(), "new doc");   // Doc updated
+  EXPECT_EQ(*field.type(), *int64());  // Type preserved
+  EXPECT_FALSE(field.optional());      // Optional preserved (still required)
+}
+
+// Test rename-delete conflict
+TEST_F(UpdateSchemaTest, RenameDeleteConflict) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
+  setup_update->AddColumn("col", int32());
+  EXPECT_THAT(setup_update->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->RenameColumn("col", "new_name").DeleteColumn("col");
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot delete a column that has 
updates"));
+}
+
+// Test delete-rename conflict
+TEST_F(UpdateSchemaTest, DeleteRenameConflict) {
+  ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
+  setup_update->AddColumn("col", int32());
+  EXPECT_THAT(setup_update->Commit(), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+  update->DeleteColumn("col").RenameColumn("col", "new_name");
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Cannot rename a column that will be 
deleted"));
+}
+
+// Test case insensitive add then update
+TEST_F(UpdateSchemaTest, CaseInsensitiveAddThenUpdate) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->CaseSensitive(false)
+      .AddColumn("Foo", int32(), "A column with uppercase name")
+      .UpdateColumn("foo", int64());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("Foo", 
false));
+  ASSERT_TRUE(field_opt.has_value());
+  EXPECT_EQ(*field_opt->get().type(), *int64());
+}
+
+// Test case insensitive add then update doc
+TEST_F(UpdateSchemaTest, CaseInsensitiveAddThenUpdateDoc) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->CaseSensitive(false)
+      .AddColumn("Foo", int32(), "original doc")
+      .UpdateColumnDoc("foo", "updated doc");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("Foo", 
false));
+  ASSERT_TRUE(field_opt.has_value());
+  EXPECT_EQ(field_opt->get().doc(), "updated doc");
+}
+
+// Test case insensitive add then make optional
+TEST_F(UpdateSchemaTest, CaseInsensitiveAddThenMakeOptional) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->CaseSensitive(false)
+      .AllowIncompatibleChanges()
+      .AddRequiredColumn("Foo", int32(), "required column")
+      .MakeColumnOptional("foo");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("Foo", 
false));
+  ASSERT_TRUE(field_opt.has_value());
+  EXPECT_TRUE(field_opt->get().optional());
+}
+
+// Test case insensitive add then require
+TEST_F(UpdateSchemaTest, CaseInsensitiveAddThenRequire) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema());
+  update->CaseSensitive(false)
+      .AllowIncompatibleChanges()
+      .AddColumn("Foo", int32(), "optional column")
+      .RequireColumn("foo");  // Require using lowercase
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+
+  // Verify the column is now required
+  ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("Foo", 
false));
+  ASSERT_TRUE(field_opt.has_value());
+  EXPECT_FALSE(field_opt->get().optional());
+}
+
+// Test mixed changes - comprehensive test combining multiple operations
+TEST_F(UpdateSchemaTest, MixedChanges) {
+  // First, create a complex schema similar to Java's SCHEMA constant
+  // Build the "preferences" struct
+  auto preferences_struct = std::make_shared<StructType>(
+      std::vector<SchemaField>{SchemaField(8, "feature1", boolean(), false),
+                               SchemaField(9, "feature2", boolean(), true)});
+
+  // Build the "locations" map (address struct -> coordinate struct)
+  auto address_struct = std::make_shared<StructType>(std::vector<SchemaField>{
+      SchemaField(20, "address", string(), false),
+      SchemaField(21, "city", string(), false), SchemaField(22, "state", 
string(), false),
+      SchemaField(23, "zip", int32(), false)});
+
+  auto coordinate_struct = std::make_shared<StructType>(
+      std::vector<SchemaField>{SchemaField(12, "lat", float32(), false),
+                               SchemaField(13, "long", float32(), false)});
+
+  auto locations_map =
+      std::make_shared<MapType>(SchemaField(10, "key", address_struct, false),
+                                SchemaField(11, "value", coordinate_struct, 
false));
+
+  // Build the "points" list
+  auto point_struct = std::make_shared<StructType>(std::vector<SchemaField>{
+      SchemaField(15, "x", int64(), false), SchemaField(16, "y", int64(), 
false)});
+
+  auto points_list =
+      std::make_shared<ListType>(SchemaField(14, "element", point_struct, 
true));
+
+  // Build the "doubles" list
+  auto doubles_list =
+      std::make_shared<ListType>(SchemaField(17, "element", float64(), false));
+
+  // Build the "properties" map
+  auto properties_map = std::make_shared<MapType>(
+      SchemaField(18, "key", string(), true), SchemaField(19, "value", 
string(), true));
+
+  // Create the initial schema
+  ICEBERG_UNWRAP_OR_FAIL(auto setup_update, table_->NewUpdateSchema());
+  setup_update->AllowIncompatibleChanges()
+      .AddRequiredColumn("id", int32())
+      .AddRequiredColumn("data", string())
+      .AddColumn("preferences", preferences_struct, "struct of named boolean 
options")
+      .AddRequiredColumn("locations", locations_map, "map of address to 
coordinate")
+      .AddColumn("points", points_list, "2-D cartesian points")
+      .AddRequiredColumn("doubles", doubles_list)
+      .AddColumn("properties", properties_map, "string map of properties");
+  EXPECT_THAT(setup_update->Commit(), IsOk());
+
+  // Now perform the mixed changes in a single transaction
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateSchema());
+
+  update->AddColumn("toplevel", decimal(9, 2))
+      .AddColumn("locations", "alt", float32())  // add to map value struct
+      .AddColumn("points", "z", int64())         // add to list element struct
+      .AddColumn("points", "t.t", int64(), "name with '.'")
+      .RenameColumn("data", "json")
+      .RenameColumn("preferences", "options")
+      .RenameColumn("preferences.feature2", "newfeature")  // rename inside 
renamed column
+      .RenameColumn("locations.lat", "latitude")
+      .RenameColumn("points.x", "X")
+      .RenameColumn("points.y", "y.y")  // field name with dots
+      .UpdateColumn("id", int64())
+      .UpdateColumnDoc("id", "unique id")
+      .UpdateColumn("locations.lat", float64())  // use original name before 
rename
+      .UpdateColumnDoc("locations.lat", "latitude")
+      .DeleteColumn("locations.long")
+      .DeleteColumn("properties")
+      .MakeColumnOptional("points.x")
+      .AllowIncompatibleChanges()
+      .RequireColumn("data")
+      .AddRequiredColumn("locations", "description", string(), "Location 
description");
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+
+  // Verify id: int -> long, with doc "unique id"
+  ICEBERG_UNWRAP_OR_FAIL(auto id_opt, result.schema->FindFieldByName("id"));
+  ASSERT_TRUE(id_opt.has_value());
+  EXPECT_EQ(*id_opt->get().type(), *int64());
+  EXPECT_EQ(id_opt->get().doc(), "unique id");
+  EXPECT_FALSE(id_opt->get().optional());  // was required
+
+  // Verify data was renamed to json and is now required
+  ICEBERG_UNWRAP_OR_FAIL(auto data_opt, 
result.schema->FindFieldByName("data"));
+  ICEBERG_UNWRAP_OR_FAIL(auto json_opt, 
result.schema->FindFieldByName("json"));
+  EXPECT_FALSE(data_opt.has_value());
+  ASSERT_TRUE(json_opt.has_value());
+  EXPECT_EQ(*json_opt->get().type(), *string());
+  EXPECT_FALSE(json_opt->get().optional());  // now required
+
+  // Verify preferences was renamed to options, feature2 renamed to newfeature
+  ICEBERG_UNWRAP_OR_FAIL(auto preferences_opt,
+                         result.schema->FindFieldByName("preferences"));
+  ICEBERG_UNWRAP_OR_FAIL(auto options_opt, 
result.schema->FindFieldByName("options"));
+  EXPECT_FALSE(preferences_opt.has_value());
+  ASSERT_TRUE(options_opt.has_value());
+  EXPECT_EQ(options_opt->get().doc(), "struct of named boolean options");
+
+  const auto& options_struct = static_cast<const 
StructType&>(*options_opt->get().type());
+  ICEBERG_UNWRAP_OR_FAIL(auto feature1_opt, 
options_struct.GetFieldByName("feature1"));
+  ICEBERG_UNWRAP_OR_FAIL(auto feature2_opt, 
options_struct.GetFieldByName("feature2"));
+  ICEBERG_UNWRAP_OR_FAIL(auto newfeature_opt,
+                         options_struct.GetFieldByName("newfeature"));
+  EXPECT_TRUE(feature1_opt.has_value());
+  EXPECT_FALSE(feature2_opt.has_value());
+  ASSERT_TRUE(newfeature_opt.has_value());
+  EXPECT_EQ(*newfeature_opt->get().type(), *boolean());
+
+  // Verify locations map changes
+  ICEBERG_UNWRAP_OR_FAIL(auto locations_opt, 
result.schema->FindFieldByName("locations"));
+  ASSERT_TRUE(locations_opt.has_value());
+  EXPECT_EQ(locations_opt->get().doc(), "map of address to coordinate");
+  EXPECT_FALSE(locations_opt->get().optional());  // was required
+
+  const auto& locations_map_type =
+      static_cast<const MapType&>(*locations_opt->get().type());
+  const auto& coord_struct =
+      static_cast<const StructType&>(*locations_map_type.value().type());
+
+  // lat renamed to latitude, type changed to double, doc added
+  ICEBERG_UNWRAP_OR_FAIL(auto lat_opt, coord_struct.GetFieldByName("lat"));
+  ICEBERG_UNWRAP_OR_FAIL(auto latitude_opt, 
coord_struct.GetFieldByName("latitude"));
+  EXPECT_FALSE(lat_opt.has_value());
+  ASSERT_TRUE(latitude_opt.has_value());
+  EXPECT_EQ(*latitude_opt->get().type(), *float64());
+  EXPECT_EQ(latitude_opt->get().doc(), "latitude");
+
+  // long was deleted
+  ICEBERG_UNWRAP_OR_FAIL(auto long_opt, coord_struct.GetFieldByName("long"));
+  EXPECT_FALSE(long_opt.has_value());
+
+  // alt was added
+  ICEBERG_UNWRAP_OR_FAIL(auto alt_opt, coord_struct.GetFieldByName("alt"));
+  ASSERT_TRUE(alt_opt.has_value());
+  EXPECT_EQ(*alt_opt->get().type(), *float32());
+
+  // description was added as required
+  ICEBERG_UNWRAP_OR_FAIL(auto description_opt,
+                         coord_struct.GetFieldByName("description"));
+  ASSERT_TRUE(description_opt.has_value());
+  EXPECT_EQ(*description_opt->get().type(), *string());
+  EXPECT_EQ(description_opt->get().doc(), "Location description");
+  EXPECT_FALSE(description_opt->get().optional());
+
+  // Verify points list changes
+  ICEBERG_UNWRAP_OR_FAIL(auto points_opt, 
result.schema->FindFieldByName("points"));
+  ASSERT_TRUE(points_opt.has_value());
+  EXPECT_EQ(points_opt->get().doc(), "2-D cartesian points");
+
+  const auto& points_list_type = static_cast<const 
ListType&>(*points_opt->get().type());
+  const auto& point_elem_struct =
+      static_cast<const StructType&>(*points_list_type.element().type());
+
+  // x renamed to X and made optional
+  ICEBERG_UNWRAP_OR_FAIL(auto x_opt, point_elem_struct.GetFieldByName("x"));
+  ICEBERG_UNWRAP_OR_FAIL(auto X_opt, point_elem_struct.GetFieldByName("X"));
+  EXPECT_FALSE(x_opt.has_value());
+  ASSERT_TRUE(X_opt.has_value());
+  EXPECT_EQ(*X_opt->get().type(), *int64());
+  EXPECT_TRUE(X_opt->get().optional());  // made optional
+
+  // y renamed to y.y
+  ICEBERG_UNWRAP_OR_FAIL(auto y_opt, point_elem_struct.GetFieldByName("y"));
+  ICEBERG_UNWRAP_OR_FAIL(auto yy_opt, point_elem_struct.GetFieldByName("y.y"));
+  EXPECT_FALSE(y_opt.has_value());
+  ASSERT_TRUE(yy_opt.has_value());
+  EXPECT_EQ(*yy_opt->get().type(), *int64());
+
+  // z was added
+  ICEBERG_UNWRAP_OR_FAIL(auto z_opt, point_elem_struct.GetFieldByName("z"));
+  ASSERT_TRUE(z_opt.has_value());
+  EXPECT_EQ(*z_opt->get().type(), *int64());
+
+  // t.t was added with doc
+  ICEBERG_UNWRAP_OR_FAIL(auto tt_opt, point_elem_struct.GetFieldByName("t.t"));
+  ASSERT_TRUE(tt_opt.has_value());
+  EXPECT_EQ(*tt_opt->get().type(), *int64());
+  EXPECT_EQ(tt_opt->get().doc(), "name with '.'");
+
+  // Verify doubles list unchanged
+  ICEBERG_UNWRAP_OR_FAIL(auto doubles_opt, 
result.schema->FindFieldByName("doubles"));
+  ASSERT_TRUE(doubles_opt.has_value());
+  EXPECT_EQ(doubles_opt->get().type()->type_id(), TypeId::kList);
+
+  // Verify properties was deleted
+  ICEBERG_UNWRAP_OR_FAIL(auto properties_opt,
+                         result.schema->FindFieldByName("properties"));
+  EXPECT_FALSE(properties_opt.has_value());
+
+  // Verify toplevel was added
+  ICEBERG_UNWRAP_OR_FAIL(auto toplevel_opt, 
result.schema->FindFieldByName("toplevel"));
+  ASSERT_TRUE(toplevel_opt.has_value());
+  EXPECT_EQ(*toplevel_opt->get().type(), *decimal(9, 2));
+  EXPECT_TRUE(toplevel_opt->get().optional());
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/update/update_schema.cc 
b/src/iceberg/update/update_schema.cc
index 327ee0ac..e339ef04 100644
--- a/src/iceberg/update/update_schema.cc
+++ b/src/iceberg/update/update_schema.cc
@@ -37,6 +37,7 @@
 #include "iceberg/util/checked_cast.h"
 #include "iceberg/util/error_collector.h"
 #include "iceberg/util/macros.h"
+#include "iceberg/util/string_util.h"
 #include "iceberg/util/type_util.h"
 #include "iceberg/util/visit_type.h"
 
@@ -67,13 +68,10 @@ class ApplyChangesVisitor {
     std::vector<SchemaField> new_fields;
     bool has_changes = false;
 
-    // Process existing fields
     for (const auto& field : struct_type.fields()) {
-      // Recursively process the field's type first
       ICEBERG_ASSIGN_OR_RAISE(auto field_type_result,
                               ApplyChanges(field.type(), field.field_id()));
 
-      // Process field-level changes (deletes, updates, nested additions)
       ICEBERG_ASSIGN_OR_RAISE(auto processed_field,
                               ProcessField(field, field_type_result));
 
@@ -81,17 +79,14 @@ class ApplyChangesVisitor {
         const auto& new_field = processed_field.value();
         new_fields.push_back(new_field);
 
-        // Check if this field changed
         if (new_field != field) {
           has_changes = true;
         }
       } else {
-        // Field was deleted
         has_changes = true;
       }
     }
 
-    // Add new fields for this struct
     auto adds_it = parent_to_added_ids_.find(parent_id);
     if (adds_it != parent_to_added_ids_.end() && !adds_it->second.empty()) {
       has_changes = true;
@@ -103,7 +98,6 @@ class ApplyChangesVisitor {
       }
     }
 
-    // Return original type if nothing changed
     if (!has_changes) {
       return base_type;
     }
@@ -117,11 +111,9 @@ class ApplyChangesVisitor {
                                           int32_t parent_id) {
     const auto& element = list_type.element();
 
-    // Recursively process element type
     ICEBERG_ASSIGN_OR_RAISE(auto element_type_result,
                             ApplyChanges(element.type(), element.field_id()));
 
-    // Process element field (handles deletes, updates, nested additions)
     ICEBERG_ASSIGN_OR_RAISE(auto processed_element,
                             ProcessField(element, element_type_result));
 
@@ -130,7 +122,6 @@ class ApplyChangesVisitor {
 
     const auto& new_element = processed_element.value();
 
-    // Return unchanged if element didn't change
     if (element == new_element) {
       return base_type;
     }
@@ -145,22 +136,18 @@ class ApplyChangesVisitor {
     const auto& key = map_type.key();
     const auto& value = map_type.value();
 
-    // Check for key modifications (not allowed in Iceberg)
     int32_t key_id = key.field_id();
     ICEBERG_CHECK(!deletes_.contains(key_id), "Cannot delete map keys");
     ICEBERG_CHECK(!updates_.contains(key_id), "Cannot update map keys");
     ICEBERG_CHECK(!parent_to_added_ids_.contains(key_id),
                   "Cannot add fields to map keys");
 
-    // Recursively process key and value types
     ICEBERG_ASSIGN_OR_RAISE(auto key_type_result, ApplyChanges(key.type(), 
key_id));
     ICEBERG_ASSIGN_OR_RAISE(auto value_type_result,
                             ApplyChanges(value.type(), value.field_id()));
 
-    // Key type must not change
     ICEBERG_CHECK(*key_type_result == *key.type(), "Cannot alter map keys");
 
-    // Process value field (handles deletes, updates, nested additions)
     ICEBERG_ASSIGN_OR_RAISE(auto processed_value, ProcessField(value, 
value_type_result));
 
     ICEBERG_CHECK(processed_value.has_value(), "Cannot delete value field from 
map: {}",
@@ -168,7 +155,6 @@ class ApplyChangesVisitor {
 
     const auto& new_value = processed_value.value();
 
-    // Return unchanged if nothing changed
     if (key == map_type.key() && value == new_value) {
       return base_type;
     }
@@ -176,60 +162,45 @@ class ApplyChangesVisitor {
     return std::make_shared<MapType>(key, new_value);
   }
 
-  /// \brief Handle primitive types - return unchanged
   Result<std::shared_ptr<Type>> VisitPrimitive(const PrimitiveType& 
primitive_type,
                                                const std::shared_ptr<Type>& 
base_type,
                                                int32_t parent_id) {
-    // Primitive types are returned as-is
     return base_type;
   }
 
  private:
-  /// \brief Process a field: handle deletes, updates, and nested additions
-  ///
-  /// It processes field-level operations after the field's type has been 
recursively
-  /// processed.
   Result<std::optional<SchemaField>> ProcessField(
       const SchemaField& field, const std::shared_ptr<Type>& 
field_type_result) {
     int32_t field_id = field.field_id();
 
-    // 1. Handle deletes
     if (deletes_.contains(field_id)) {
-      // Field is deleted
       return std::nullopt;
     }
 
-    // 2. Start with the recursively processed type
     std::shared_ptr<Type> result_type = field_type_result;
 
-    // 3. Handle type updates (e.g., type widening)
     // Note: We check the update against the ORIGINAL field type, not the 
recursively
     // processed type, because we want to preserve nested changes from 
recursion
     auto update_it = updates_.find(field_id);
     if (update_it != updates_.end()) {
       const auto& update_field = update_it->second;
-      // If the update specifies a type change, use the new type
-      // Otherwise keep the recursively processed type
       if (update_field->type() != field.type()) {
         result_type = update_field->type();
       }
     }
 
-    // Note: Nested field additions are handled in VisitStruct, not here
-    // to avoid duplication
+    // Note: Child field additions are handled in VisitStruct, not here.
+    // The recursively processed type (field_type_result) already contains
+    // any child fields that were added.
 
-    // 4. Build the result field
     if (update_it != updates_.end()) {
-      // Use update field metadata but with the processed type
       const auto& update_field = update_it->second;
       return SchemaField(field_id, update_field->name(), 
std::move(result_type),
                          update_field->optional(), update_field->doc());
     } else if (result_type != field.type()) {
-      // Type changed but no field-level update
       return SchemaField(field_id, field.name(), std::move(result_type), 
field.optional(),
                          field.doc());
     } else {
-      // No changes
       return field;
     }
   }
@@ -250,18 +221,17 @@ Result<std::shared_ptr<UpdateSchema>> UpdateSchema::Make(
 
 UpdateSchema::UpdateSchema(std::shared_ptr<Transaction> transaction)
     : PendingUpdate(std::move(transaction)) {
-  // Get the current schema
-  auto schema_result = base().Schema();
+  const TableMetadata& base_metadata = transaction_->current();
+
+  auto schema_result = base_metadata.Schema();
   if (!schema_result.has_value()) {
     AddError(schema_result.error());
     return;
   }
   schema_ = std::move(schema_result.value());
 
-  // Initialize last_column_id from base metadata
-  last_column_id_ = base().last_column_id;
+  last_column_id_ = base_metadata.last_column_id;
 
-  // Initialize identifier field names from the current schema
   auto identifier_names_result = schema_->IdentifierFieldNames();
   if (!identifier_names_result.has_value()) {
     AddError(identifier_names_result.error());
@@ -269,7 +239,6 @@ UpdateSchema::UpdateSchema(std::shared_ptr<Transaction> 
transaction)
   }
   identifier_field_names_ = std::move(identifier_names_result.value());
 
-  // Initialize id_to_parent map from the schema
   id_to_parent_ = IndexParents(*schema_);
 }
 
@@ -287,7 +256,6 @@ UpdateSchema& UpdateSchema::CaseSensitive(bool 
case_sensitive) {
 
 UpdateSchema& UpdateSchema::AddColumn(std::string_view name, 
std::shared_ptr<Type> type,
                                       std::string_view doc) {
-  // Check for "." in top-level name
   ICEBERG_BUILDER_CHECK(!name.contains('.'),
                         "Cannot add column with ambiguous name: {}, use "
                         "AddColumn(parent, name, type, doc)",
@@ -306,7 +274,6 @@ UpdateSchema& 
UpdateSchema::AddColumn(std::optional<std::string_view> parent,
 UpdateSchema& UpdateSchema::AddRequiredColumn(std::string_view name,
                                               std::shared_ptr<Type> type,
                                               std::string_view doc) {
-  // Check for "." in top-level name
   ICEBERG_BUILDER_CHECK(!name.contains('.'),
                         "Cannot add column with ambiguous name: {}, use "
                         "AddRequiredColumn(parent, name, type, doc)",
@@ -325,34 +292,114 @@ UpdateSchema& 
UpdateSchema::AddRequiredColumn(std::optional<std::string_view> pa
 
 UpdateSchema& UpdateSchema::UpdateColumn(std::string_view name,
                                          std::shared_ptr<PrimitiveType> 
new_type) {
-  // TODO(Guotao Yu): Implement UpdateColumn
-  AddError(NotImplemented("UpdateSchema::UpdateColumn not implemented"));
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field_opt, FindFieldForUpdate(name));
+  ICEBERG_BUILDER_CHECK(field_opt.has_value(), "Cannot update missing column: 
{}", name);
+
+  const auto& field = field_opt->get();
+  int32_t field_id = field.field_id();
+
+  ICEBERG_BUILDER_CHECK(!deletes_.contains(field_id),
+                        "Cannot update a column that will be deleted: {}", 
field.name());
+
+  if (*field.type() == *new_type) {
+    return *this;
+  }
+
+  ICEBERG_BUILDER_CHECK(IsPromotionAllowed(field.type(), new_type),
+                        "Cannot change column type: {}: {} -> {}", name,
+                        field.type()->ToString(), new_type->ToString());
+
+  updates_[field_id] = std::make_shared<SchemaField>(
+      field.field_id(), field.name(), new_type, field.optional(), field.doc());
+
   return *this;
 }
 
 UpdateSchema& UpdateSchema::UpdateColumnDoc(std::string_view name,
                                             std::string_view new_doc) {
-  // TODO(Guotao Yu): Implement UpdateColumnDoc
-  AddError(NotImplemented("UpdateSchema::UpdateColumnDoc not implemented"));
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field_opt, FindFieldForUpdate(name));
+  ICEBERG_BUILDER_CHECK(field_opt.has_value(), "Cannot update missing column: 
{}", name);
+
+  const auto& field = field_opt->get();
+  int32_t field_id = field.field_id();
+
+  ICEBERG_BUILDER_CHECK(!deletes_.contains(field_id),
+                        "Cannot update a column that will be deleted: {}", 
field.name());
+
+  if (field.doc() == new_doc) {
+    return *this;
+  }
+
+  updates_[field_id] =
+      std::make_shared<SchemaField>(field.field_id(), field.name(), 
field.type(),
+                                    field.optional(), std::string(new_doc));
+
   return *this;
 }
 
 UpdateSchema& UpdateSchema::RenameColumn(std::string_view name,
                                          std::string_view new_name) {
-  // TODO(Guotao Yu): Implement RenameColumn
-  AddError(NotImplemented("UpdateSchema::RenameColumn not implemented"));
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field_opt, FindField(name));
+  ICEBERG_BUILDER_CHECK(field_opt.has_value(), "Cannot rename missing column: 
{}", name);
+  ICEBERG_BUILDER_CHECK(!new_name.empty(), "Cannot rename a column to null");
+
+  const auto& field = field_opt->get();
+  int32_t field_id = field.field_id();
+
+  ICEBERG_BUILDER_CHECK(!deletes_.contains(field_id),
+                        "Cannot rename a column that will be deleted: {}", 
field.name());
+
+  auto update_it = updates_.find(field_id);
+  const SchemaField& base_field =
+      update_it != updates_.end() ? *update_it->second : field;
+
+  updates_[field_id] = std::make_shared<SchemaField>(
+      base_field.field_id(), std::string(new_name), base_field.type(),
+      base_field.optional(), base_field.doc());
+
+  auto it = std::ranges::find(identifier_field_names_, name);
+  if (it != identifier_field_names_.end()) {
+    *it = new_name;
+  }
+
   return *this;
 }
 
 UpdateSchema& UpdateSchema::MakeColumnOptional(std::string_view name) {
-  // TODO(Guotao Yu): Implement MakeColumnOptional
-  AddError(NotImplemented("UpdateSchema::MakeColumnOptional not implemented"));
-  return *this;
+  return UpdateColumnRequirementInternal(name, /*is_optional=*/true);
 }
 
 UpdateSchema& UpdateSchema::RequireColumn(std::string_view name) {
-  // TODO(Guotao Yu): Implement RequireColumn
-  AddError(NotImplemented("UpdateSchema::RequireColumn not implemented"));
+  return UpdateColumnRequirementInternal(name, /*is_optional=*/false);
+}
+
+UpdateSchema& UpdateSchema::UpdateColumnRequirementInternal(std::string_view 
name,
+                                                            bool is_optional) {
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field_opt, FindFieldForUpdate(name));
+  ICEBERG_BUILDER_CHECK(field_opt.has_value(), "Cannot update missing column: 
{}", name);
+
+  const auto& field = field_opt->get();
+
+  if ((!is_optional && !field.optional()) || (is_optional && 
field.optional())) {
+    return *this;
+  }
+
+  // TODO(GuotaoYu): support added column with default value
+  // bool is_defaulted_add = IsAdded(name) && field.initial_default() != null;
+  bool is_defaulted_add = false;
+
+  ICEBERG_BUILDER_CHECK(is_optional || is_defaulted_add || 
allow_incompatible_changes_,
+                        "Cannot change column nullability: {}: optional -> 
required",
+                        name);
+
+  int32_t field_id = field.field_id();
+
+  ICEBERG_BUILDER_CHECK(!deletes_.contains(field_id),
+                        "Cannot update a column that will be deleted: {}", 
field.name());
+
+  updates_[field_id] = std::make_shared<SchemaField>(is_optional ? 
field.AsOptional()
+                                                                 : 
field.AsRequired());
+
   return *this;
 }
 
@@ -368,7 +415,6 @@ UpdateSchema& UpdateSchema::DeleteColumn(std::string_view 
name) {
   ICEBERG_BUILDER_CHECK(!updates_.contains(field_id),
                         "Cannot delete a column that has updates: {}", name);
 
-  // Add to deletes set
   deletes_.insert(field_id);
 
   return *this;
@@ -409,7 +455,6 @@ UpdateSchema& UpdateSchema::SetIdentifierFields(
 Result<UpdateSchema::ApplyResult> UpdateSchema::Apply() {
   ICEBERG_RETURN_UNEXPECTED(CheckErrors());
 
-  // Validate existing identifier fields are not deleted
   for (const auto& name : identifier_field_names_) {
     ICEBERG_ASSIGN_OR_RAISE(auto field_opt, FindField(name));
     if (field_opt.has_value()) {
@@ -421,7 +466,6 @@ Result<UpdateSchema::ApplyResult> UpdateSchema::Apply() {
                     "SetIdentifierFields to update identifier fields.",
                     name);
 
-      // Check no parent of this field is deleted
       auto parent_it = id_to_parent_.find(field_id);
       while (parent_it != id_to_parent_.end()) {
         int32_t parent_id = parent_it->second;
@@ -434,14 +478,11 @@ Result<UpdateSchema::ApplyResult> UpdateSchema::Apply() {
     }
   }
 
-  // Apply changes recursively using the visitor
   ApplyChangesVisitor visitor(deletes_, updates_, parent_to_added_ids_);
   ICEBERG_ASSIGN_OR_RAISE(auto new_type, visitor.ApplyChanges(schema_, 
kTableRootId));
 
-  // Cast result back to StructType and extract fields
   auto new_struct_type = internal::checked_pointer_cast<StructType>(new_type);
 
-  // Convert identifier field names to IDs
   auto temp_schema = new_struct_type->ToSchema();
   std::vector<int32_t> fresh_identifier_ids;
   for (const auto& name : identifier_field_names_) {
@@ -454,7 +495,6 @@ Result<UpdateSchema::ApplyResult> UpdateSchema::Apply() {
     fresh_identifier_ids.push_back(field_opt->get().field_id());
   }
 
-  // Create the new schema
   auto new_fields = temp_schema->fields() | 
std::ranges::to<std::vector<SchemaField>>();
   ICEBERG_ASSIGN_OR_RAISE(
       auto new_schema,
@@ -472,10 +512,8 @@ UpdateSchema& 
UpdateSchema::AddColumnInternal(std::optional<std::string_view> pa
   int32_t parent_id = kTableRootId;
   std::string full_name;
 
-  // Handle parent field
   if (parent.has_value()) {
     ICEBERG_BUILDER_CHECK(!parent->empty(), "Parent name cannot be empty");
-    // Find parent field
     ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto parent_field_opt, 
FindField(*parent));
     ICEBERG_BUILDER_CHECK(parent_field_opt.has_value(), "Cannot find parent 
struct: {}",
                           *parent);
@@ -483,33 +521,27 @@ UpdateSchema& 
UpdateSchema::AddColumnInternal(std::optional<std::string_view> pa
     const SchemaField& parent_field = parent_field_opt->get();
     const auto& parent_type = parent_field.type();
 
-    // Get the actual field to add to (handle map/list)
     const SchemaField* target_field = &parent_field;
 
     if (parent_type->type_id() == TypeId::kMap) {
-      // For maps, add to value field
       const auto& map_type = internal::checked_cast<const 
MapType&>(*parent_type);
       target_field = &map_type.value();
     } else if (parent_type->type_id() == TypeId::kList) {
-      // For lists, add to element field
       const auto& list_type = internal::checked_cast<const 
ListType&>(*parent_type);
       target_field = &list_type.element();
     }
 
-    // Validate target is a struct
     ICEBERG_BUILDER_CHECK(target_field->type()->type_id() == TypeId::kStruct,
                           "Cannot add to non-struct column: {}: {}", *parent,
                           target_field->type()->ToString());
 
     parent_id = target_field->field_id();
 
-    // Check parent is not being deleted
     ICEBERG_BUILDER_CHECK(!deletes_.contains(parent_id),
                           "Cannot add to a column that will be deleted: {}", 
*parent);
 
-    // Check field doesn't already exist (unless it's being deleted)
-    std::string nested_name = std::format("{}.{}", *parent, name);
-    ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_field, 
FindField(nested_name));
+    auto current_name = std::format("{}.{}", *parent, name);
+    ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_field, 
FindField(current_name));
     ICEBERG_BUILDER_CHECK(
         !current_field.has_value() || 
deletes_.contains(current_field->get().field_id()),
         "Cannot add column, name already exists: {}.{}", *parent, name);
@@ -519,10 +551,8 @@ UpdateSchema& 
UpdateSchema::AddColumnInternal(std::optional<std::string_view> pa
                                      schema_->FindColumnNameById(parent_id));
     ICEBERG_BUILDER_CHECK(parent_name_opt.has_value(),
                           "Cannot find column name for parent id: {}", 
parent_id);
-
     full_name = std::format("{}.{}", *parent_name_opt, name);
   } else {
-    // Top-level field
     ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_field, FindField(name));
     ICEBERG_BUILDER_CHECK(
         !current_field.has_value() || 
deletes_.contains(current_field->get().field_id()),
@@ -531,32 +561,25 @@ UpdateSchema& 
UpdateSchema::AddColumnInternal(std::optional<std::string_view> pa
     full_name = std::string(name);
   }
 
-  // V3 supports default values, but this implementation doesn't support them 
yet
-  // Check for incompatible change: adding required column without default
   ICEBERG_BUILDER_CHECK(
       is_optional || allow_incompatible_changes_,
       "Incompatible change: cannot add required column without a default 
value: {}",
       full_name);
 
-  // Assign new column ID
   int32_t new_id = AssignNewColumnId();
 
-  // Update tracking for moves
-  added_name_to_id_[full_name] = new_id;
+  added_name_to_id_[CaseSensitivityAwareName(full_name)] = new_id;
   if (parent_id != kTableRootId) {
     id_to_parent_[new_id] = parent_id;
   }
 
-  // Assign fresh IDs to nested types
   AssignFreshIdVisitor id_assigner([this]() { return AssignNewColumnId(); });
   auto type_with_fresh_ids = id_assigner.Visit(type);
 
-  // Create new field
   auto new_field = std::make_shared<SchemaField>(new_id, std::string(name),
                                                  
std::move(type_with_fresh_ids),
                                                  is_optional, 
std::string(doc));
 
-  // Record the update
   updates_[new_id] = std::move(new_field);
   parent_to_added_ids_[parent_id].push_back(new_id);
 
@@ -570,4 +593,40 @@ Result<std::optional<std::reference_wrapper<const 
SchemaField>>> UpdateSchema::F
   return schema_->FindFieldByName(name, case_sensitive_);
 }
 
+Result<std::optional<std::reference_wrapper<const SchemaField>>>
+UpdateSchema::FindFieldForUpdate(std::string_view name) const {
+  ICEBERG_ASSIGN_OR_RAISE(auto existing_field_opt, FindField(name));
+
+  if (existing_field_opt.has_value()) {
+    const auto& existing_field = existing_field_opt->get();
+    int32_t field_id = existing_field.field_id();
+
+    auto update_it = updates_.find(field_id);
+    if (update_it != updates_.end()) {
+      return std::optional<std::reference_wrapper<const SchemaField>>(
+          std::cref(*update_it->second));
+    }
+
+    return existing_field_opt;
+  }
+
+  auto added_it = added_name_to_id_.find(CaseSensitivityAwareName(name));
+  if (added_it != added_name_to_id_.end()) {
+    int32_t added_id = added_it->second;
+    if (auto update_it = updates_.find(added_id); update_it != updates_.end()) 
{
+      return std::optional<std::reference_wrapper<const SchemaField>>(
+          std::cref(*update_it->second));
+    }
+  }
+
+  return std::nullopt;
+}
+
+std::string UpdateSchema::CaseSensitivityAwareName(std::string_view name) 
const {
+  if (case_sensitive_) {
+    return std::string(name);
+  }
+  return StringUtils::ToLower(name);
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/update/update_schema.h 
b/src/iceberg/update/update_schema.h
index d68c291d..e87c9bcc 100644
--- a/src/iceberg/update/update_schema.h
+++ b/src/iceberg/update/update_schema.h
@@ -346,6 +346,13 @@ class ICEBERG_EXPORT UpdateSchema : public PendingUpdate {
                                   std::string_view name, bool is_optional,
                                   std::shared_ptr<Type> type, std::string_view 
doc);
 
+  /// \brief Internal implementation for updating column requirement 
(optional/required).
+  ///
+  /// \param name Name of the column to update.
+  /// \param is_optional Whether the column should be optional (true) or 
required (false).
+  /// \return Reference to this for method chaining.
+  UpdateSchema& UpdateColumnRequirementInternal(std::string_view name, bool 
is_optional);
+
   /// \brief Assign a new column ID and increment the counter.
   int32_t AssignNewColumnId();
 
@@ -353,6 +360,26 @@ class ICEBERG_EXPORT UpdateSchema : public PendingUpdate {
   Result<std::optional<std::reference_wrapper<const SchemaField>>> FindField(
       std::string_view name) const;
 
+  /// \brief Find a field for update operations, considering pending changes.
+  ///
+  /// This method checks both the original schema and pending 
updates/additions to
+  /// return the most current view of a field. Used by operations that need to 
work
+  /// with fields that may have been added or modified in the same transaction.
+  ///
+  /// \param name Name of the field to find.
+  /// \return The field if found in schema or pending changes, nullopt 
otherwise.
+  Result<std::optional<std::reference_wrapper<const SchemaField>>> 
FindFieldForUpdate(
+      std::string_view name) const;
+
+  /// \brief Normalize a field name based on case sensitivity setting.
+  ///
+  /// If case_sensitive_ is true, returns the name as-is.
+  /// If case_sensitive_ is false, returns the lowercase version of the name.
+  ///
+  /// \param name The field name to normalize.
+  /// \return The normalized field name.
+  std::string CaseSensitivityAwareName(std::string_view name) const;
+
   // Internal state
   std::shared_ptr<Schema> schema_;
   int32_t last_column_id_;
diff --git a/src/iceberg/util/type_util.cc b/src/iceberg/util/type_util.cc
index 8f6d74b3..b18d09e4 100644
--- a/src/iceberg/util/type_util.cc
+++ b/src/iceberg/util/type_util.cc
@@ -399,4 +399,45 @@ Result<std::shared_ptr<Schema>> AssignFreshIds(int32_t 
schema_id, const Schema&
   return Schema::Make(std::move(fields), schema_id, identifier_field_names);
 }
 
+bool IsPromotionAllowed(const std::shared_ptr<Type>& from_type,
+                        const std::shared_ptr<Type>& to_type) {
+  if (!from_type || !to_type) {
+    return false;
+  }
+
+  // Same type is always allowed
+  if (*from_type == *to_type) {
+    return true;
+  }
+
+  // Both must be primitive types for promotion
+  if (!from_type->is_primitive() || !to_type->is_primitive()) {
+    return false;
+  }
+
+  TypeId from_id = from_type->type_id();
+  TypeId to_id = to_type->type_id();
+
+  // int -> long
+  if (from_id == TypeId::kInt && to_id == TypeId::kLong) {
+    return true;
+  }
+
+  // float -> double
+  if (from_id == TypeId::kFloat && to_id == TypeId::kDouble) {
+    return true;
+  }
+
+  // decimal(P,S) -> decimal(P',S) where P' > P
+  if (from_id == TypeId::kDecimal && to_id == TypeId::kDecimal) {
+    const auto& from_decimal = internal::checked_cast<const 
DecimalType&>(*from_type);
+    const auto& to_decimal = internal::checked_cast<const 
DecimalType&>(*to_type);
+    // Scale must be the same, precision can only increase
+    return from_decimal.scale() == to_decimal.scale() &&
+           from_decimal.precision() < to_decimal.precision();
+  }
+
+  return false;
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/util/type_util.h b/src/iceberg/util/type_util.h
index c2a219b1..0f71494b 100644
--- a/src/iceberg/util/type_util.h
+++ b/src/iceberg/util/type_util.h
@@ -171,4 +171,17 @@ class AssignFreshIdVisitor {
 ICEBERG_EXPORT Result<std::shared_ptr<Schema>> AssignFreshIds(
     int32_t schema_id, const Schema& schema, std::function<int32_t()> next_id);
 
+/// \brief Check if type promotion from one type to another is allowed.
+///
+/// Type promotion rules:
+/// - int -> long
+/// - float -> double
+/// - decimal(P,S) -> decimal(P',S) where P' > P
+///
+/// \param from_type The original type
+/// \param to_type The target type
+/// \return true if promotion is allowed, false otherwise
+ICEBERG_EXPORT bool IsPromotionAllowed(const std::shared_ptr<Type>& from_type,
+                                       const std::shared_ptr<Type>& to_type);
+
 }  // namespace iceberg

Reply via email to