wgtmac commented on code in PR #486:
URL: https://github.com/apache/iceberg-cpp/pull/486#discussion_r2663905033


##########
src/iceberg/update/update_schema.cc:
##########
@@ -195,13 +391,191 @@ UpdateSchema& 
UpdateSchema::UnionByNameWith(std::shared_ptr<Schema> new_schema)
 
 UpdateSchema& UpdateSchema::SetIdentifierFields(
     const std::span<std::string_view>& names) {
-  identifier_field_names_ = names | 
std::ranges::to<std::unordered_set<std::string>>();
+  identifier_field_names_ = names | 
std::ranges::to<std::vector<std::string>>();
   return *this;
 }
 
 Result<UpdateSchema::ApplyResult> UpdateSchema::Apply() {
-  // TODO(Guotao Yu): Implement Apply
-  return NotImplemented("UpdateSchema::Apply not implemented");
+  ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+
+  // Validate existing identifier fields are not deleted
+  for (const auto& name : identifier_field_names_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto field_opt, FindField(name));
+    if (field_opt.has_value()) {
+      const auto& field = field_opt->get();
+      auto field_id = field.field_id();
+
+      // Check the field itself is not deleted

Review Comment:
   ```suggestion
   ```



##########
src/iceberg/update/update_schema.cc:
##########
@@ -195,13 +391,191 @@ UpdateSchema& 
UpdateSchema::UnionByNameWith(std::shared_ptr<Schema> new_schema)
 
 UpdateSchema& UpdateSchema::SetIdentifierFields(
     const std::span<std::string_view>& names) {
-  identifier_field_names_ = names | 
std::ranges::to<std::unordered_set<std::string>>();
+  identifier_field_names_ = names | 
std::ranges::to<std::vector<std::string>>();
   return *this;
 }
 
 Result<UpdateSchema::ApplyResult> UpdateSchema::Apply() {
-  // TODO(Guotao Yu): Implement Apply
-  return NotImplemented("UpdateSchema::Apply not implemented");
+  ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+
+  // Validate existing identifier fields are not deleted
+  for (const auto& name : identifier_field_names_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto field_opt, FindField(name));
+    if (field_opt.has_value()) {
+      const auto& field = field_opt->get();
+      auto field_id = field.field_id();
+
+      // Check the field itself is not deleted
+      ICEBERG_CHECK(!deletes_.contains(field_id),
+                    "Cannot delete identifier field {}. To force deletion, 
also call "
+                    "SetIdentifierFields to update identifier fields.",
+                    name);
+
+      // Check no parent of this field is deleted
+      auto parent_it = id_to_parent_.find(field_id);
+      while (parent_it != id_to_parent_.end()) {
+        int32_t parent_id = parent_it->second;
+        ICEBERG_ASSIGN_OR_RAISE(auto parent_field_opt, 
schema_->FindFieldById(parent_id));
+        ICEBERG_CHECK(
+            !deletes_.contains(parent_id),
+            "Cannot delete field {} as it will delete nested identifier field 
{}",
+            parent_field_opt.has_value() ? 
std::string(parent_field_opt->get().name())
+                                         : std::to_string(parent_id),
+            name);
+        parent_it = id_to_parent_.find(parent_id);
+      }
+    }
+  }
+
+  // Apply schema changes using visitor pattern
+  // Create a temporary struct type from the schema to use with the visitor
+  auto schema_struct_type = std::make_shared<StructType>(
+      schema_->fields() | std::ranges::to<std::vector<SchemaField>>());
+
+  // Apply changes recursively using the visitor
+  ApplyChangesVisitor visitor(deletes_, updates_, parent_to_added_ids_);
+  ICEBERG_ASSIGN_OR_RAISE(auto new_type,
+                          visitor.ApplyChanges(schema_struct_type, 
kTableRootId));
+
+  // Cast result back to StructType and extract fields
+  auto new_struct_type = internal::checked_pointer_cast<StructType>(new_type);
+  std::vector<SchemaField> new_fields(new_struct_type->fields() |
+                                      
std::ranges::to<std::vector<SchemaField>>());
+
+  // Convert identifier field names to IDs
+  auto temp_schema = std::make_shared<Schema>(new_fields);
+  std::vector<int32_t> fresh_identifier_ids;
+  for (const auto& name : identifier_field_names_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto field_opt,
+                            temp_schema->FindFieldByName(name, 
case_sensitive_));
+    ICEBERG_CHECK(field_opt.has_value(),
+                  "Cannot add field {} as an identifier field: not found in 
current "
+                  "schema or added columns",
+                  name);
+    fresh_identifier_ids.push_back(field_opt->get().field_id());
+  }
+
+  // Create the new schema
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto new_schema,
+      Schema::Make(std::move(new_fields), schema_->schema_id(), 
fresh_identifier_ids));
+
+  return ApplyResult{.schema = std::move(new_schema),
+                     .new_last_column_id = last_column_id_};
+}
+
+UpdateSchema& UpdateSchema::AddColumnInternal(std::optional<std::string_view> 
parent,
+                                              std::string_view name, bool 
is_optional,
+                                              std::shared_ptr<Type> type,
+                                              std::string_view doc) {
+  int32_t parent_id = kTableRootId;
+  std::string full_name;
+
+  // Handle parent field
+  if (parent.has_value() && !parent->empty()) {

Review Comment:
   Can we directly use `std::string_view parent` and an empty string means no 
parent?



##########
src/iceberg/update/update_schema.cc:
##########
@@ -195,13 +391,191 @@ UpdateSchema& 
UpdateSchema::UnionByNameWith(std::shared_ptr<Schema> new_schema)
 
 UpdateSchema& UpdateSchema::SetIdentifierFields(
     const std::span<std::string_view>& names) {
-  identifier_field_names_ = names | 
std::ranges::to<std::unordered_set<std::string>>();
+  identifier_field_names_ = names | 
std::ranges::to<std::vector<std::string>>();
   return *this;
 }
 
 Result<UpdateSchema::ApplyResult> UpdateSchema::Apply() {
-  // TODO(Guotao Yu): Implement Apply
-  return NotImplemented("UpdateSchema::Apply not implemented");
+  ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+
+  // Validate existing identifier fields are not deleted
+  for (const auto& name : identifier_field_names_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto field_opt, FindField(name));
+    if (field_opt.has_value()) {
+      const auto& field = field_opt->get();
+      auto field_id = field.field_id();
+
+      // Check the field itself is not deleted
+      ICEBERG_CHECK(!deletes_.contains(field_id),
+                    "Cannot delete identifier field {}. To force deletion, 
also call "
+                    "SetIdentifierFields to update identifier fields.",
+                    name);
+
+      // Check no parent of this field is deleted
+      auto parent_it = id_to_parent_.find(field_id);
+      while (parent_it != id_to_parent_.end()) {
+        int32_t parent_id = parent_it->second;
+        ICEBERG_ASSIGN_OR_RAISE(auto parent_field_opt, 
schema_->FindFieldById(parent_id));
+        ICEBERG_CHECK(
+            !deletes_.contains(parent_id),
+            "Cannot delete field {} as it will delete nested identifier field 
{}",
+            parent_field_opt.has_value() ? 
std::string(parent_field_opt->get().name())
+                                         : std::to_string(parent_id),
+            name);
+        parent_it = id_to_parent_.find(parent_id);
+      }
+    }
+  }
+
+  // Apply schema changes using visitor pattern
+  // Create a temporary struct type from the schema to use with the visitor
+  auto schema_struct_type = std::make_shared<StructType>(

Review Comment:
   We don't need this conversion because `Schema` is also a `StructType`.



##########
src/iceberg/update/update_schema.cc:
##########
@@ -195,13 +391,191 @@ UpdateSchema& 
UpdateSchema::UnionByNameWith(std::shared_ptr<Schema> new_schema)
 
 UpdateSchema& UpdateSchema::SetIdentifierFields(
     const std::span<std::string_view>& names) {
-  identifier_field_names_ = names | 
std::ranges::to<std::unordered_set<std::string>>();
+  identifier_field_names_ = names | 
std::ranges::to<std::vector<std::string>>();
   return *this;
 }
 
 Result<UpdateSchema::ApplyResult> UpdateSchema::Apply() {
-  // TODO(Guotao Yu): Implement Apply
-  return NotImplemented("UpdateSchema::Apply not implemented");
+  ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+
+  // Validate existing identifier fields are not deleted
+  for (const auto& name : identifier_field_names_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto field_opt, FindField(name));
+    if (field_opt.has_value()) {
+      const auto& field = field_opt->get();
+      auto field_id = field.field_id();
+
+      // Check the field itself is not deleted
+      ICEBERG_CHECK(!deletes_.contains(field_id),
+                    "Cannot delete identifier field {}. To force deletion, 
also call "
+                    "SetIdentifierFields to update identifier fields.",
+                    name);
+
+      // Check no parent of this field is deleted
+      auto parent_it = id_to_parent_.find(field_id);
+      while (parent_it != id_to_parent_.end()) {
+        int32_t parent_id = parent_it->second;
+        ICEBERG_ASSIGN_OR_RAISE(auto parent_field_opt, 
schema_->FindFieldById(parent_id));
+        ICEBERG_CHECK(
+            !deletes_.contains(parent_id),
+            "Cannot delete field {} as it will delete nested identifier field 
{}",
+            parent_field_opt.has_value() ? 
std::string(parent_field_opt->get().name())
+                                         : std::to_string(parent_id),
+            name);
+        parent_it = id_to_parent_.find(parent_id);
+      }
+    }
+  }
+
+  // Apply schema changes using visitor pattern
+  // Create a temporary struct type from the schema to use with the visitor
+  auto schema_struct_type = std::make_shared<StructType>(
+      schema_->fields() | std::ranges::to<std::vector<SchemaField>>());
+
+  // Apply changes recursively using the visitor
+  ApplyChangesVisitor visitor(deletes_, updates_, parent_to_added_ids_);
+  ICEBERG_ASSIGN_OR_RAISE(auto new_type,
+                          visitor.ApplyChanges(schema_struct_type, 
kTableRootId));
+
+  // Cast result back to StructType and extract fields
+  auto new_struct_type = internal::checked_pointer_cast<StructType>(new_type);
+  std::vector<SchemaField> new_fields(new_struct_type->fields() |
+                                      
std::ranges::to<std::vector<SchemaField>>());
+
+  // Convert identifier field names to IDs
+  auto temp_schema = std::make_shared<Schema>(new_fields);
+  std::vector<int32_t> fresh_identifier_ids;
+  for (const auto& name : identifier_field_names_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto field_opt,
+                            temp_schema->FindFieldByName(name, 
case_sensitive_));
+    ICEBERG_CHECK(field_opt.has_value(),
+                  "Cannot add field {} as an identifier field: not found in 
current "
+                  "schema or added columns",
+                  name);
+    fresh_identifier_ids.push_back(field_opt->get().field_id());
+  }
+
+  // Create the new schema
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto new_schema,
+      Schema::Make(std::move(new_fields), schema_->schema_id(), 
fresh_identifier_ids));
+
+  return ApplyResult{.schema = std::move(new_schema),
+                     .new_last_column_id = last_column_id_};
+}
+
+UpdateSchema& UpdateSchema::AddColumnInternal(std::optional<std::string_view> 
parent,
+                                              std::string_view name, bool 
is_optional,
+                                              std::shared_ptr<Type> type,
+                                              std::string_view doc) {
+  int32_t parent_id = kTableRootId;
+  std::string full_name;
+
+  // Handle parent field
+  if (parent.has_value() && !parent->empty()) {
+    // Find parent field
+    ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto parent_field_opt, 
FindField(*parent));
+    ICEBERG_BUILDER_CHECK(parent_field_opt.has_value(), "Cannot find parent 
struct: {}",
+                          *parent);
+
+    const SchemaField& parent_field = parent_field_opt->get();
+    const auto& parent_type = parent_field.type();
+
+    // Get the actual field to add to (handle map/list)
+    const SchemaField* target_field = &parent_field;
+    if (parent_type->is_nested()) {
+      const auto& nested = internal::checked_cast<const 
NestedType&>(*parent_type);
+      if (nested.type_id() == TypeId::kMap) {
+        // For maps, add to value field
+        const auto& map_type = internal::checked_cast<const MapType&>(nested);
+        target_field = &map_type.value();
+      } else if (nested.type_id() == TypeId::kList) {
+        // For lists, add to element field
+        const auto& list_type = internal::checked_cast<const 
ListType&>(nested);
+        target_field = &list_type.element();
+      }
+    }
+
+    // Validate target is a struct
+    ICEBERG_BUILDER_CHECK(target_field->type()->is_nested() &&
+                              target_field->type()->type_id() == 
TypeId::kStruct,

Review Comment:
   ```suggestion
       ICEBERG_BUILDER_CHECK(target_field->type()->type_id() == TypeId::kStruct,
   ```



##########
src/iceberg/update/update_schema.cc:
##########
@@ -195,13 +391,191 @@ UpdateSchema& 
UpdateSchema::UnionByNameWith(std::shared_ptr<Schema> new_schema)
 
 UpdateSchema& UpdateSchema::SetIdentifierFields(
     const std::span<std::string_view>& names) {
-  identifier_field_names_ = names | 
std::ranges::to<std::unordered_set<std::string>>();
+  identifier_field_names_ = names | 
std::ranges::to<std::vector<std::string>>();
   return *this;
 }
 
 Result<UpdateSchema::ApplyResult> UpdateSchema::Apply() {
-  // TODO(Guotao Yu): Implement Apply
-  return NotImplemented("UpdateSchema::Apply not implemented");
+  ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+
+  // Validate existing identifier fields are not deleted
+  for (const auto& name : identifier_field_names_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto field_opt, FindField(name));
+    if (field_opt.has_value()) {
+      const auto& field = field_opt->get();
+      auto field_id = field.field_id();
+
+      // Check the field itself is not deleted
+      ICEBERG_CHECK(!deletes_.contains(field_id),
+                    "Cannot delete identifier field {}. To force deletion, 
also call "
+                    "SetIdentifierFields to update identifier fields.",
+                    name);
+
+      // Check no parent of this field is deleted
+      auto parent_it = id_to_parent_.find(field_id);
+      while (parent_it != id_to_parent_.end()) {
+        int32_t parent_id = parent_it->second;
+        ICEBERG_ASSIGN_OR_RAISE(auto parent_field_opt, 
schema_->FindFieldById(parent_id));
+        ICEBERG_CHECK(
+            !deletes_.contains(parent_id),
+            "Cannot delete field {} as it will delete nested identifier field 
{}",
+            parent_field_opt.has_value() ? 
std::string(parent_field_opt->get().name())
+                                         : std::to_string(parent_id),
+            name);
+        parent_it = id_to_parent_.find(parent_id);
+      }
+    }
+  }
+
+  // Apply schema changes using visitor pattern
+  // Create a temporary struct type from the schema to use with the visitor
+  auto schema_struct_type = std::make_shared<StructType>(
+      schema_->fields() | std::ranges::to<std::vector<SchemaField>>());
+
+  // Apply changes recursively using the visitor
+  ApplyChangesVisitor visitor(deletes_, updates_, parent_to_added_ids_);
+  ICEBERG_ASSIGN_OR_RAISE(auto new_type,
+                          visitor.ApplyChanges(schema_struct_type, 
kTableRootId));
+
+  // Cast result back to StructType and extract fields
+  auto new_struct_type = internal::checked_pointer_cast<StructType>(new_type);
+  std::vector<SchemaField> new_fields(new_struct_type->fields() |
+                                      
std::ranges::to<std::vector<SchemaField>>());
+
+  // Convert identifier field names to IDs
+  auto temp_schema = std::make_shared<Schema>(new_fields);

Review Comment:
   ```suggestion
     auto temp_schema = new_struct_type->ToSchema();
   ```



##########
src/iceberg/update/update_schema.cc:
##########
@@ -64,8 +253,10 @@ UpdateSchema::UpdateSchema(std::shared_ptr<Transaction> 
transaction)
     AddError(identifier_names_result.error());
     return;
   }
-  identifier_field_names_ = identifier_names_result.value() |
-                            std::ranges::to<std::unordered_set<std::string>>();
+  identifier_field_names_ = identifier_names_result.value();

Review Comment:
   ```suggestion
     identifier_field_names_ = std::move(identifier_names_result.value());
   ```



##########
src/iceberg/util/visit_type.h:
##########
@@ -124,4 +124,40 @@ inline Status VisitTypeIdInline(TypeId id, VISITOR* 
visitor, ARGS&&... args) {
 
 #undef TYPE_ID_VISIT_INLINE
 
+/// \brief Visit a type using a schema visitor pattern
+///
+/// This function provides a simplified visitor interface that groups Iceberg 
types into
+/// four categories based on their structural properties:
+///
+/// - **Struct types**: Complex types with named fields (StructType)
+/// - **List types**: Sequential container types (ListType)
+/// - **Map types**: Key-value container types (MapType)
+/// - **Primitive types**: All leaf types without nested structure (14 
primitive types)
+///
+/// This grouping is useful for algorithms that need to distinguish between 
container
+/// types and leaf types, but don't require separate handling for each 
primitive type
+/// variant (e.g., Int vs Long vs String).
+///
+/// \tparam VISITOR Visitor class that must implement four Visit methods:
+///   - `VisitStruct(const StructType&, ARGS...)` for struct types
+///   - `VisitList(const ListType&, ARGS...)` for list types
+///   - `VisitMap(const MapType&, ARGS...)` for map types
+///   - `VisitPrimitive(const PrimitiveType&, ARGS...)` for all primitive types
+/// \tparam ARGS Additional argument types forwarded to Visit methods
+/// \param type The type to visit
+/// \param visitor Pointer to the visitor instance
+/// \param args Additional arguments forwarded to the Visit methods
+/// \return The return value from the invoked Visit method
+template <typename VISITOR, typename... ARGS>
+inline auto VisitSchemaInline(const Type& type, VISITOR* visitor, ARGS&&... 
args) {

Review Comment:
   I'm hesitant to use `schema` in the name because they are just for types.



##########
src/iceberg/update/update_schema.cc:
##########
@@ -195,13 +391,191 @@ UpdateSchema& 
UpdateSchema::UnionByNameWith(std::shared_ptr<Schema> new_schema)
 
 UpdateSchema& UpdateSchema::SetIdentifierFields(
     const std::span<std::string_view>& names) {
-  identifier_field_names_ = names | 
std::ranges::to<std::unordered_set<std::string>>();
+  identifier_field_names_ = names | 
std::ranges::to<std::vector<std::string>>();
   return *this;
 }
 
 Result<UpdateSchema::ApplyResult> UpdateSchema::Apply() {
-  // TODO(Guotao Yu): Implement Apply
-  return NotImplemented("UpdateSchema::Apply not implemented");
+  ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+
+  // Validate existing identifier fields are not deleted
+  for (const auto& name : identifier_field_names_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto field_opt, FindField(name));
+    if (field_opt.has_value()) {
+      const auto& field = field_opt->get();
+      auto field_id = field.field_id();
+
+      // Check the field itself is not deleted
+      ICEBERG_CHECK(!deletes_.contains(field_id),
+                    "Cannot delete identifier field {}. To force deletion, 
also call "
+                    "SetIdentifierFields to update identifier fields.",
+                    name);
+
+      // Check no parent of this field is deleted
+      auto parent_it = id_to_parent_.find(field_id);
+      while (parent_it != id_to_parent_.end()) {
+        int32_t parent_id = parent_it->second;
+        ICEBERG_ASSIGN_OR_RAISE(auto parent_field_opt, 
schema_->FindFieldById(parent_id));
+        ICEBERG_CHECK(
+            !deletes_.contains(parent_id),
+            "Cannot delete field {} as it will delete nested identifier field 
{}",
+            parent_field_opt.has_value() ? 
std::string(parent_field_opt->get().name())
+                                         : std::to_string(parent_id),
+            name);

Review Comment:
   ```suggestion
           ICEBERG_CHECK(
               !deletes_.contains(parent_id),
               "Cannot delete field with id {} as it will delete nested 
identifier field {}",
               parent_id,
               name);
   ```
   
   I know java prints the field string representation in the error message. 
Let's simplify this to not issue another `FindFieldById` call.



##########
src/iceberg/update/update_schema.cc:
##########
@@ -162,8 +343,23 @@ UpdateSchema& UpdateSchema::RequireColumn(std::string_view 
name) {
 }
 
 UpdateSchema& UpdateSchema::DeleteColumn(std::string_view name) {
-  // TODO(Guotao Yu): Implement DeleteColumn
-  AddError(NotImplemented("UpdateSchema::DeleteColumn not implemented"));
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field_opt, FindField(name));
+  ICEBERG_BUILDER_CHECK(field_opt.has_value(), "Cannot delete missing column: 
{}", name);
+
+  const auto& field = field_opt->get();
+  int32_t field_id = field.field_id();
+
+  // Check the field doesn't have additions
+  ICEBERG_BUILDER_CHECK(!parent_to_added_ids_.contains(field_id),
+                        "Cannot delete a column that has additions: {}", name);
+
+  // Check the field doesn't have updates
+  ICEBERG_BUILDER_CHECK(!updates_.contains(field_id),
+                        "Cannot delete a column that has updates: {}", name);
+
+  // Add to deletes set

Review Comment:
   ```suggestion
     ICEBERG_BUILDER_CHECK(!parent_to_added_ids_.contains(field_id),
                           "Cannot delete a column that has additions: {}", 
name);
     ICEBERG_BUILDER_CHECK(!updates_.contains(field_id),
                           "Cannot delete a column that has updates: {}", name);
   ```
   
   IMO, these error messages are clear enough so we don't need these comments.



##########
src/iceberg/util/visitor_generate.h:
##########
@@ -40,4 +40,21 @@ namespace iceberg {
   ACTION(List);                                \
   ACTION(Map);
 
+/// \brief Generate switch-case for schema visitor pattern
+///
+/// This macro generates switch cases that dispatch to visitor methods based 
on type:
+/// - Struct types -> calls ACTION with Struct
+/// - List types -> calls ACTION with List
+/// - Map types -> calls ACTION with Map
+/// - All primitive types (default) -> calls ACTION with Primitive
+#define ICEBERG_GENERATE_SCHEMA_VISITOR_CASES(ACTION) \

Review Comment:
   Perhaps `ICEBERG_TYPE_SWITCH_WITH_PRIMITIVE_DEFAULT` is a better name?



##########
src/iceberg/update/update_schema.cc:
##########
@@ -195,13 +391,191 @@ UpdateSchema& 
UpdateSchema::UnionByNameWith(std::shared_ptr<Schema> new_schema)
 
 UpdateSchema& UpdateSchema::SetIdentifierFields(
     const std::span<std::string_view>& names) {
-  identifier_field_names_ = names | 
std::ranges::to<std::unordered_set<std::string>>();
+  identifier_field_names_ = names | 
std::ranges::to<std::vector<std::string>>();
   return *this;
 }
 
 Result<UpdateSchema::ApplyResult> UpdateSchema::Apply() {
-  // TODO(Guotao Yu): Implement Apply
-  return NotImplemented("UpdateSchema::Apply not implemented");
+  ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+
+  // Validate existing identifier fields are not deleted
+  for (const auto& name : identifier_field_names_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto field_opt, FindField(name));
+    if (field_opt.has_value()) {
+      const auto& field = field_opt->get();
+      auto field_id = field.field_id();
+
+      // Check the field itself is not deleted
+      ICEBERG_CHECK(!deletes_.contains(field_id),
+                    "Cannot delete identifier field {}. To force deletion, 
also call "
+                    "SetIdentifierFields to update identifier fields.",
+                    name);
+
+      // Check no parent of this field is deleted
+      auto parent_it = id_to_parent_.find(field_id);
+      while (parent_it != id_to_parent_.end()) {
+        int32_t parent_id = parent_it->second;
+        ICEBERG_ASSIGN_OR_RAISE(auto parent_field_opt, 
schema_->FindFieldById(parent_id));
+        ICEBERG_CHECK(
+            !deletes_.contains(parent_id),
+            "Cannot delete field {} as it will delete nested identifier field 
{}",
+            parent_field_opt.has_value() ? 
std::string(parent_field_opt->get().name())
+                                         : std::to_string(parent_id),
+            name);
+        parent_it = id_to_parent_.find(parent_id);
+      }
+    }
+  }
+
+  // Apply schema changes using visitor pattern
+  // Create a temporary struct type from the schema to use with the visitor
+  auto schema_struct_type = std::make_shared<StructType>(
+      schema_->fields() | std::ranges::to<std::vector<SchemaField>>());
+
+  // Apply changes recursively using the visitor
+  ApplyChangesVisitor visitor(deletes_, updates_, parent_to_added_ids_);
+  ICEBERG_ASSIGN_OR_RAISE(auto new_type,
+                          visitor.ApplyChanges(schema_struct_type, 
kTableRootId));
+
+  // Cast result back to StructType and extract fields
+  auto new_struct_type = internal::checked_pointer_cast<StructType>(new_type);
+  std::vector<SchemaField> new_fields(new_struct_type->fields() |
+                                      
std::ranges::to<std::vector<SchemaField>>());
+
+  // Convert identifier field names to IDs
+  auto temp_schema = std::make_shared<Schema>(new_fields);
+  std::vector<int32_t> fresh_identifier_ids;
+  for (const auto& name : identifier_field_names_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto field_opt,
+                            temp_schema->FindFieldByName(name, 
case_sensitive_));
+    ICEBERG_CHECK(field_opt.has_value(),
+                  "Cannot add field {} as an identifier field: not found in 
current "
+                  "schema or added columns",
+                  name);
+    fresh_identifier_ids.push_back(field_opt->get().field_id());
+  }
+
+  // Create the new schema
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto new_schema,
+      Schema::Make(std::move(new_fields), schema_->schema_id(), 
fresh_identifier_ids));
+
+  return ApplyResult{.schema = std::move(new_schema),
+                     .new_last_column_id = last_column_id_};
+}
+
+UpdateSchema& UpdateSchema::AddColumnInternal(std::optional<std::string_view> 
parent,
+                                              std::string_view name, bool 
is_optional,
+                                              std::shared_ptr<Type> type,
+                                              std::string_view doc) {
+  int32_t parent_id = kTableRootId;
+  std::string full_name;
+
+  // Handle parent field
+  if (parent.has_value() && !parent->empty()) {
+    // Find parent field
+    ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto parent_field_opt, 
FindField(*parent));
+    ICEBERG_BUILDER_CHECK(parent_field_opt.has_value(), "Cannot find parent 
struct: {}",
+                          *parent);
+
+    const SchemaField& parent_field = parent_field_opt->get();
+    const auto& parent_type = parent_field.type();
+
+    // Get the actual field to add to (handle map/list)
+    const SchemaField* target_field = &parent_field;
+    if (parent_type->is_nested()) {
+      const auto& nested = internal::checked_cast<const 
NestedType&>(*parent_type);
+      if (nested.type_id() == TypeId::kMap) {
+        // For maps, add to value field
+        const auto& map_type = internal::checked_cast<const MapType&>(nested);
+        target_field = &map_type.value();
+      } else if (nested.type_id() == TypeId::kList) {
+        // For lists, add to element field
+        const auto& list_type = internal::checked_cast<const 
ListType&>(nested);
+        target_field = &list_type.element();
+      }
+    }

Review Comment:
   ```suggestion
       if (parent_type->type_id() == TypeId::kMap) {
         // For maps, add to value field
         const auto& map_type = internal::checked_cast<const 
MapType&>(*parent_type);
         target_field = &map_type.value();
       } else if (parent_type->type_id() == TypeId::kList) {
         // For lists, add to element field
         const auto& list_type = internal::checked_cast<const 
ListType&>(*parent_type);
         target_field = &list_type.element();
       }
   ```



##########
src/iceberg/update/update_schema.cc:
##########
@@ -195,13 +391,191 @@ UpdateSchema& 
UpdateSchema::UnionByNameWith(std::shared_ptr<Schema> new_schema)
 
 UpdateSchema& UpdateSchema::SetIdentifierFields(
     const std::span<std::string_view>& names) {
-  identifier_field_names_ = names | 
std::ranges::to<std::unordered_set<std::string>>();
+  identifier_field_names_ = names | 
std::ranges::to<std::vector<std::string>>();
   return *this;
 }
 
 Result<UpdateSchema::ApplyResult> UpdateSchema::Apply() {
-  // TODO(Guotao Yu): Implement Apply
-  return NotImplemented("UpdateSchema::Apply not implemented");
+  ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+
+  // Validate existing identifier fields are not deleted
+  for (const auto& name : identifier_field_names_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto field_opt, FindField(name));
+    if (field_opt.has_value()) {
+      const auto& field = field_opt->get();
+      auto field_id = field.field_id();
+
+      // Check the field itself is not deleted
+      ICEBERG_CHECK(!deletes_.contains(field_id),
+                    "Cannot delete identifier field {}. To force deletion, 
also call "
+                    "SetIdentifierFields to update identifier fields.",
+                    name);
+
+      // Check no parent of this field is deleted
+      auto parent_it = id_to_parent_.find(field_id);
+      while (parent_it != id_to_parent_.end()) {
+        int32_t parent_id = parent_it->second;
+        ICEBERG_ASSIGN_OR_RAISE(auto parent_field_opt, 
schema_->FindFieldById(parent_id));
+        ICEBERG_CHECK(
+            !deletes_.contains(parent_id),
+            "Cannot delete field {} as it will delete nested identifier field 
{}",
+            parent_field_opt.has_value() ? 
std::string(parent_field_opt->get().name())
+                                         : std::to_string(parent_id),
+            name);
+        parent_it = id_to_parent_.find(parent_id);
+      }
+    }
+  }
+
+  // Apply schema changes using visitor pattern
+  // Create a temporary struct type from the schema to use with the visitor
+  auto schema_struct_type = std::make_shared<StructType>(
+      schema_->fields() | std::ranges::to<std::vector<SchemaField>>());
+
+  // Apply changes recursively using the visitor
+  ApplyChangesVisitor visitor(deletes_, updates_, parent_to_added_ids_);
+  ICEBERG_ASSIGN_OR_RAISE(auto new_type,
+                          visitor.ApplyChanges(schema_struct_type, 
kTableRootId));
+
+  // Cast result back to StructType and extract fields
+  auto new_struct_type = internal::checked_pointer_cast<StructType>(new_type);
+  std::vector<SchemaField> new_fields(new_struct_type->fields() |
+                                      
std::ranges::to<std::vector<SchemaField>>());
+
+  // Convert identifier field names to IDs
+  auto temp_schema = std::make_shared<Schema>(new_fields);
+  std::vector<int32_t> fresh_identifier_ids;
+  for (const auto& name : identifier_field_names_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto field_opt,
+                            temp_schema->FindFieldByName(name, 
case_sensitive_));
+    ICEBERG_CHECK(field_opt.has_value(),
+                  "Cannot add field {} as an identifier field: not found in 
current "
+                  "schema or added columns",
+                  name);
+    fresh_identifier_ids.push_back(field_opt->get().field_id());
+  }
+
+  // Create the new schema
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto new_schema,
+      Schema::Make(std::move(new_fields), schema_->schema_id(), 
fresh_identifier_ids));
+
+  return ApplyResult{.schema = std::move(new_schema),
+                     .new_last_column_id = last_column_id_};
+}
+
+UpdateSchema& UpdateSchema::AddColumnInternal(std::optional<std::string_view> 
parent,
+                                              std::string_view name, bool 
is_optional,
+                                              std::shared_ptr<Type> type,
+                                              std::string_view doc) {
+  int32_t parent_id = kTableRootId;
+  std::string full_name;
+
+  // Handle parent field
+  if (parent.has_value() && !parent->empty()) {
+    // Find parent field
+    ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto parent_field_opt, 
FindField(*parent));
+    ICEBERG_BUILDER_CHECK(parent_field_opt.has_value(), "Cannot find parent 
struct: {}",
+                          *parent);
+
+    const SchemaField& parent_field = parent_field_opt->get();
+    const auto& parent_type = parent_field.type();
+
+    // Get the actual field to add to (handle map/list)
+    const SchemaField* target_field = &parent_field;
+    if (parent_type->is_nested()) {
+      const auto& nested = internal::checked_cast<const 
NestedType&>(*parent_type);
+      if (nested.type_id() == TypeId::kMap) {
+        // For maps, add to value field
+        const auto& map_type = internal::checked_cast<const MapType&>(nested);
+        target_field = &map_type.value();
+      } else if (nested.type_id() == TypeId::kList) {
+        // For lists, add to element field
+        const auto& list_type = internal::checked_cast<const 
ListType&>(nested);
+        target_field = &list_type.element();
+      }
+    }
+
+    // Validate target is a struct
+    ICEBERG_BUILDER_CHECK(target_field->type()->is_nested() &&
+                              target_field->type()->type_id() == 
TypeId::kStruct,
+                          "Cannot add to non-struct column: {}: {}", *parent,
+                          target_field->type()->ToString());
+
+    parent_id = target_field->field_id();
+
+    // Check parent is not being deleted
+    ICEBERG_BUILDER_CHECK(!deletes_.contains(parent_id),
+                          "Cannot add to a column that will be deleted: {}", 
*parent);
+
+    // Check field doesn't already exist (unless it's being deleted)
+    std::string nested_name = std::format("{}.{}", *parent, name);
+    ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_field_opt, 
FindField(nested_name));
+    if (current_field_opt.has_value()) {
+      const auto& current_field = current_field_opt->get();
+      ICEBERG_BUILDER_CHECK(deletes_.contains(current_field.field_id()),
+                            "Cannot add column, name already exists: {}.{}", 
*parent,
+                            name);
+    }
+
+    // Build full name using canonical name of parent
+    ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto parent_name_opt,
+                                     schema_->FindColumnNameById(parent_id));
+    ICEBERG_BUILDER_CHECK(parent_name_opt.has_value(),
+                          "Cannot find column name for parent id: {}", 
parent_id);
+    full_name = std::format("{}.{}", *parent_name_opt, name);
+  } else {
+    // Top-level field
+    ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_field_opt, FindField(name));
+    if (current_field_opt.has_value()) {
+      const auto& current_field = current_field_opt->get();
+      ICEBERG_BUILDER_CHECK(deletes_.contains(current_field.field_id()),
+                            "Cannot add column, name already exists: {}", 
name);
+    }
+    full_name = std::string(name);
+  }
+
+  // V3 supports default values, but this implementation doesn't support them 
yet
+  // Check for incompatible change: adding required column without default
+  ICEBERG_BUILDER_CHECK(
+      is_optional || allow_incompatible_changes_,
+      "Incompatible change: cannot add required column without a default 
value: {}",
+      full_name);
+
+  // Assign new column ID
+  int32_t new_id = AssignNewColumnId();
+
+  // Update tracking for moves
+  added_name_to_id_[full_name] = new_id;
+  if (parent_id != kTableRootId) {
+    id_to_parent_[new_id] = parent_id;
+  }
+
+  // Assign fresh IDs to nested types
+  AssignFreshIdVisitor id_assigner([this]() { return AssignNewColumnId(); });
+  auto type_with_fresh_ids = id_assigner.Visit(type);
+
+  // Create new field
+  auto new_field = std::make_shared<SchemaField>(new_id, std::string(name),
+                                                 
std::move(type_with_fresh_ids),
+                                                 is_optional, 
std::string(doc));
+
+  // Record the update
+  updates_[new_id] = std::move(new_field);
+  parent_to_added_ids_[parent_id].push_back(new_id);
+
+  return *this;
+}
+
+int32_t UpdateSchema::AssignNewColumnId() {
+  int32_t next = last_column_id_ + 1;
+  last_column_id_ = next;
+  return next;

Review Comment:
   ```suggestion
     return ++last_column_id_;
   ```



##########
src/iceberg/update/update_schema.cc:
##########
@@ -195,13 +391,191 @@ UpdateSchema& 
UpdateSchema::UnionByNameWith(std::shared_ptr<Schema> new_schema)
 
 UpdateSchema& UpdateSchema::SetIdentifierFields(
     const std::span<std::string_view>& names) {
-  identifier_field_names_ = names | 
std::ranges::to<std::unordered_set<std::string>>();
+  identifier_field_names_ = names | 
std::ranges::to<std::vector<std::string>>();
   return *this;
 }
 
 Result<UpdateSchema::ApplyResult> UpdateSchema::Apply() {
-  // TODO(Guotao Yu): Implement Apply
-  return NotImplemented("UpdateSchema::Apply not implemented");
+  ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+
+  // Validate existing identifier fields are not deleted
+  for (const auto& name : identifier_field_names_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto field_opt, FindField(name));
+    if (field_opt.has_value()) {
+      const auto& field = field_opt->get();
+      auto field_id = field.field_id();
+
+      // Check the field itself is not deleted
+      ICEBERG_CHECK(!deletes_.contains(field_id),
+                    "Cannot delete identifier field {}. To force deletion, 
also call "
+                    "SetIdentifierFields to update identifier fields.",
+                    name);
+
+      // Check no parent of this field is deleted
+      auto parent_it = id_to_parent_.find(field_id);
+      while (parent_it != id_to_parent_.end()) {
+        int32_t parent_id = parent_it->second;
+        ICEBERG_ASSIGN_OR_RAISE(auto parent_field_opt, 
schema_->FindFieldById(parent_id));
+        ICEBERG_CHECK(
+            !deletes_.contains(parent_id),
+            "Cannot delete field {} as it will delete nested identifier field 
{}",
+            parent_field_opt.has_value() ? 
std::string(parent_field_opt->get().name())
+                                         : std::to_string(parent_id),
+            name);
+        parent_it = id_to_parent_.find(parent_id);
+      }
+    }
+  }
+
+  // Apply schema changes using visitor pattern
+  // Create a temporary struct type from the schema to use with the visitor
+  auto schema_struct_type = std::make_shared<StructType>(
+      schema_->fields() | std::ranges::to<std::vector<SchemaField>>());
+
+  // Apply changes recursively using the visitor
+  ApplyChangesVisitor visitor(deletes_, updates_, parent_to_added_ids_);
+  ICEBERG_ASSIGN_OR_RAISE(auto new_type,
+                          visitor.ApplyChanges(schema_struct_type, 
kTableRootId));
+
+  // Cast result back to StructType and extract fields
+  auto new_struct_type = internal::checked_pointer_cast<StructType>(new_type);
+  std::vector<SchemaField> new_fields(new_struct_type->fields() |
+                                      
std::ranges::to<std::vector<SchemaField>>());
+
+  // Convert identifier field names to IDs
+  auto temp_schema = std::make_shared<Schema>(new_fields);
+  std::vector<int32_t> fresh_identifier_ids;
+  for (const auto& name : identifier_field_names_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto field_opt,
+                            temp_schema->FindFieldByName(name, 
case_sensitive_));
+    ICEBERG_CHECK(field_opt.has_value(),
+                  "Cannot add field {} as an identifier field: not found in 
current "
+                  "schema or added columns",
+                  name);
+    fresh_identifier_ids.push_back(field_opt->get().field_id());
+  }
+
+  // Create the new schema
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto new_schema,
+      Schema::Make(std::move(new_fields), schema_->schema_id(), 
fresh_identifier_ids));
+
+  return ApplyResult{.schema = std::move(new_schema),
+                     .new_last_column_id = last_column_id_};
+}
+
+UpdateSchema& UpdateSchema::AddColumnInternal(std::optional<std::string_view> 
parent,

Review Comment:
   nit: add a TODO comment above indicating that v3 default value is not yet 
supported.



##########
src/iceberg/update/update_schema.cc:
##########
@@ -195,13 +391,191 @@ UpdateSchema& 
UpdateSchema::UnionByNameWith(std::shared_ptr<Schema> new_schema)
 
 UpdateSchema& UpdateSchema::SetIdentifierFields(
     const std::span<std::string_view>& names) {
-  identifier_field_names_ = names | 
std::ranges::to<std::unordered_set<std::string>>();
+  identifier_field_names_ = names | 
std::ranges::to<std::vector<std::string>>();
   return *this;
 }
 
 Result<UpdateSchema::ApplyResult> UpdateSchema::Apply() {
-  // TODO(Guotao Yu): Implement Apply
-  return NotImplemented("UpdateSchema::Apply not implemented");
+  ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+
+  // Validate existing identifier fields are not deleted
+  for (const auto& name : identifier_field_names_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto field_opt, FindField(name));
+    if (field_opt.has_value()) {
+      const auto& field = field_opt->get();
+      auto field_id = field.field_id();
+
+      // Check the field itself is not deleted
+      ICEBERG_CHECK(!deletes_.contains(field_id),
+                    "Cannot delete identifier field {}. To force deletion, 
also call "
+                    "SetIdentifierFields to update identifier fields.",
+                    name);
+
+      // Check no parent of this field is deleted
+      auto parent_it = id_to_parent_.find(field_id);
+      while (parent_it != id_to_parent_.end()) {
+        int32_t parent_id = parent_it->second;
+        ICEBERG_ASSIGN_OR_RAISE(auto parent_field_opt, 
schema_->FindFieldById(parent_id));
+        ICEBERG_CHECK(
+            !deletes_.contains(parent_id),
+            "Cannot delete field {} as it will delete nested identifier field 
{}",
+            parent_field_opt.has_value() ? 
std::string(parent_field_opt->get().name())
+                                         : std::to_string(parent_id),
+            name);
+        parent_it = id_to_parent_.find(parent_id);
+      }
+    }
+  }
+
+  // Apply schema changes using visitor pattern
+  // Create a temporary struct type from the schema to use with the visitor
+  auto schema_struct_type = std::make_shared<StructType>(
+      schema_->fields() | std::ranges::to<std::vector<SchemaField>>());
+
+  // Apply changes recursively using the visitor
+  ApplyChangesVisitor visitor(deletes_, updates_, parent_to_added_ids_);
+  ICEBERG_ASSIGN_OR_RAISE(auto new_type,
+                          visitor.ApplyChanges(schema_struct_type, 
kTableRootId));
+
+  // Cast result back to StructType and extract fields
+  auto new_struct_type = internal::checked_pointer_cast<StructType>(new_type);
+  std::vector<SchemaField> new_fields(new_struct_type->fields() |
+                                      
std::ranges::to<std::vector<SchemaField>>());
+
+  // Convert identifier field names to IDs
+  auto temp_schema = std::make_shared<Schema>(new_fields);
+  std::vector<int32_t> fresh_identifier_ids;
+  for (const auto& name : identifier_field_names_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto field_opt,
+                            temp_schema->FindFieldByName(name, 
case_sensitive_));
+    ICEBERG_CHECK(field_opt.has_value(),
+                  "Cannot add field {} as an identifier field: not found in 
current "
+                  "schema or added columns",
+                  name);
+    fresh_identifier_ids.push_back(field_opt->get().field_id());
+  }
+
+  // Create the new schema
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto new_schema,
+      Schema::Make(std::move(new_fields), schema_->schema_id(), 
fresh_identifier_ids));
+
+  return ApplyResult{.schema = std::move(new_schema),
+                     .new_last_column_id = last_column_id_};
+}
+
+UpdateSchema& UpdateSchema::AddColumnInternal(std::optional<std::string_view> 
parent,
+                                              std::string_view name, bool 
is_optional,
+                                              std::shared_ptr<Type> type,
+                                              std::string_view doc) {
+  int32_t parent_id = kTableRootId;
+  std::string full_name;
+
+  // Handle parent field
+  if (parent.has_value() && !parent->empty()) {
+    // Find parent field
+    ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto parent_field_opt, 
FindField(*parent));
+    ICEBERG_BUILDER_CHECK(parent_field_opt.has_value(), "Cannot find parent 
struct: {}",
+                          *parent);
+
+    const SchemaField& parent_field = parent_field_opt->get();
+    const auto& parent_type = parent_field.type();
+
+    // Get the actual field to add to (handle map/list)
+    const SchemaField* target_field = &parent_field;
+    if (parent_type->is_nested()) {
+      const auto& nested = internal::checked_cast<const 
NestedType&>(*parent_type);
+      if (nested.type_id() == TypeId::kMap) {
+        // For maps, add to value field
+        const auto& map_type = internal::checked_cast<const MapType&>(nested);
+        target_field = &map_type.value();
+      } else if (nested.type_id() == TypeId::kList) {
+        // For lists, add to element field
+        const auto& list_type = internal::checked_cast<const 
ListType&>(nested);
+        target_field = &list_type.element();
+      }
+    }
+
+    // Validate target is a struct
+    ICEBERG_BUILDER_CHECK(target_field->type()->is_nested() &&
+                              target_field->type()->type_id() == 
TypeId::kStruct,
+                          "Cannot add to non-struct column: {}: {}", *parent,
+                          target_field->type()->ToString());
+
+    parent_id = target_field->field_id();
+
+    // Check parent is not being deleted
+    ICEBERG_BUILDER_CHECK(!deletes_.contains(parent_id),
+                          "Cannot add to a column that will be deleted: {}", 
*parent);
+
+    // Check field doesn't already exist (unless it's being deleted)
+    std::string nested_name = std::format("{}.{}", *parent, name);
+    ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_field_opt, 
FindField(nested_name));
+    if (current_field_opt.has_value()) {
+      const auto& current_field = current_field_opt->get();
+      ICEBERG_BUILDER_CHECK(deletes_.contains(current_field.field_id()),
+                            "Cannot add column, name already exists: {}.{}", 
*parent,
+                            name);
+    }

Review Comment:
   ```suggestion
       ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_field, 
FindField(nested_name));
       ICEBERG_BUILDER_CHECK(
           !current_field.has_value() || 
deletes_.contains(current_field->get().field_id()),
           "Cannot add column, name already exists: {}.{}", *parent, name);
   ```



##########
src/iceberg/update/update_schema.cc:
##########
@@ -195,13 +391,191 @@ UpdateSchema& 
UpdateSchema::UnionByNameWith(std::shared_ptr<Schema> new_schema)
 
 UpdateSchema& UpdateSchema::SetIdentifierFields(
     const std::span<std::string_view>& names) {
-  identifier_field_names_ = names | 
std::ranges::to<std::unordered_set<std::string>>();
+  identifier_field_names_ = names | 
std::ranges::to<std::vector<std::string>>();
   return *this;
 }
 
 Result<UpdateSchema::ApplyResult> UpdateSchema::Apply() {
-  // TODO(Guotao Yu): Implement Apply
-  return NotImplemented("UpdateSchema::Apply not implemented");
+  ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+
+  // Validate existing identifier fields are not deleted
+  for (const auto& name : identifier_field_names_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto field_opt, FindField(name));
+    if (field_opt.has_value()) {
+      const auto& field = field_opt->get();
+      auto field_id = field.field_id();
+
+      // Check the field itself is not deleted
+      ICEBERG_CHECK(!deletes_.contains(field_id),
+                    "Cannot delete identifier field {}. To force deletion, 
also call "
+                    "SetIdentifierFields to update identifier fields.",
+                    name);
+
+      // Check no parent of this field is deleted
+      auto parent_it = id_to_parent_.find(field_id);
+      while (parent_it != id_to_parent_.end()) {
+        int32_t parent_id = parent_it->second;
+        ICEBERG_ASSIGN_OR_RAISE(auto parent_field_opt, 
schema_->FindFieldById(parent_id));
+        ICEBERG_CHECK(
+            !deletes_.contains(parent_id),
+            "Cannot delete field {} as it will delete nested identifier field 
{}",
+            parent_field_opt.has_value() ? 
std::string(parent_field_opt->get().name())
+                                         : std::to_string(parent_id),
+            name);
+        parent_it = id_to_parent_.find(parent_id);
+      }
+    }
+  }
+
+  // Apply schema changes using visitor pattern
+  // Create a temporary struct type from the schema to use with the visitor
+  auto schema_struct_type = std::make_shared<StructType>(
+      schema_->fields() | std::ranges::to<std::vector<SchemaField>>());
+
+  // Apply changes recursively using the visitor
+  ApplyChangesVisitor visitor(deletes_, updates_, parent_to_added_ids_);
+  ICEBERG_ASSIGN_OR_RAISE(auto new_type,
+                          visitor.ApplyChanges(schema_struct_type, 
kTableRootId));
+
+  // Cast result back to StructType and extract fields
+  auto new_struct_type = internal::checked_pointer_cast<StructType>(new_type);
+  std::vector<SchemaField> new_fields(new_struct_type->fields() |
+                                      
std::ranges::to<std::vector<SchemaField>>());
+
+  // Convert identifier field names to IDs
+  auto temp_schema = std::make_shared<Schema>(new_fields);
+  std::vector<int32_t> fresh_identifier_ids;
+  for (const auto& name : identifier_field_names_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto field_opt,
+                            temp_schema->FindFieldByName(name, 
case_sensitive_));
+    ICEBERG_CHECK(field_opt.has_value(),
+                  "Cannot add field {} as an identifier field: not found in 
current "
+                  "schema or added columns",
+                  name);
+    fresh_identifier_ids.push_back(field_opt->get().field_id());
+  }
+
+  // Create the new schema
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto new_schema,
+      Schema::Make(std::move(new_fields), schema_->schema_id(), 
fresh_identifier_ids));
+
+  return ApplyResult{.schema = std::move(new_schema),
+                     .new_last_column_id = last_column_id_};
+}
+
+UpdateSchema& UpdateSchema::AddColumnInternal(std::optional<std::string_view> 
parent,
+                                              std::string_view name, bool 
is_optional,
+                                              std::shared_ptr<Type> type,
+                                              std::string_view doc) {
+  int32_t parent_id = kTableRootId;
+  std::string full_name;
+
+  // Handle parent field
+  if (parent.has_value() && !parent->empty()) {
+    // Find parent field
+    ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto parent_field_opt, 
FindField(*parent));
+    ICEBERG_BUILDER_CHECK(parent_field_opt.has_value(), "Cannot find parent 
struct: {}",
+                          *parent);
+
+    const SchemaField& parent_field = parent_field_opt->get();
+    const auto& parent_type = parent_field.type();
+
+    // Get the actual field to add to (handle map/list)
+    const SchemaField* target_field = &parent_field;
+    if (parent_type->is_nested()) {
+      const auto& nested = internal::checked_cast<const 
NestedType&>(*parent_type);
+      if (nested.type_id() == TypeId::kMap) {
+        // For maps, add to value field
+        const auto& map_type = internal::checked_cast<const MapType&>(nested);
+        target_field = &map_type.value();
+      } else if (nested.type_id() == TypeId::kList) {
+        // For lists, add to element field
+        const auto& list_type = internal::checked_cast<const 
ListType&>(nested);
+        target_field = &list_type.element();
+      }
+    }
+
+    // Validate target is a struct
+    ICEBERG_BUILDER_CHECK(target_field->type()->is_nested() &&
+                              target_field->type()->type_id() == 
TypeId::kStruct,
+                          "Cannot add to non-struct column: {}: {}", *parent,
+                          target_field->type()->ToString());
+
+    parent_id = target_field->field_id();
+
+    // Check parent is not being deleted
+    ICEBERG_BUILDER_CHECK(!deletes_.contains(parent_id),
+                          "Cannot add to a column that will be deleted: {}", 
*parent);
+
+    // Check field doesn't already exist (unless it's being deleted)
+    std::string nested_name = std::format("{}.{}", *parent, name);
+    ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_field_opt, 
FindField(nested_name));
+    if (current_field_opt.has_value()) {
+      const auto& current_field = current_field_opt->get();
+      ICEBERG_BUILDER_CHECK(deletes_.contains(current_field.field_id()),
+                            "Cannot add column, name already exists: {}.{}", 
*parent,
+                            name);
+    }
+
+    // Build full name using canonical name of parent
+    ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto parent_name_opt,
+                                     schema_->FindColumnNameById(parent_id));
+    ICEBERG_BUILDER_CHECK(parent_name_opt.has_value(),
+                          "Cannot find column name for parent id: {}", 
parent_id);
+    full_name = std::format("{}.{}", *parent_name_opt, name);
+  } else {
+    // Top-level field
+    ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_field_opt, FindField(name));
+    if (current_field_opt.has_value()) {
+      const auto& current_field = current_field_opt->get();
+      ICEBERG_BUILDER_CHECK(deletes_.contains(current_field.field_id()),
+                            "Cannot add column, name already exists: {}", 
name);
+    }

Review Comment:
   ```suggestion
       ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto current_field, FindField(name));
       ICEBERG_BUILDER_CHECK(
           !current_field.has_value() || 
deletes_.contains(current_field->get().field_id()),
           "Cannot add column, name already exists: {}", name);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to