wgtmac commented on code in PR #207:
URL: https://github.com/apache/iceberg-cpp/pull/207#discussion_r2341495846


##########
src/iceberg/schema.cc:
##########
@@ -257,4 +258,149 @@ void NameToIdVisitor::Finish() {
   }
 }
 
+/// \brief Visitor class for pruning schema columns based on selected field 
IDs.
+///
+/// This visitor traverses a schema and creates a projected version containing 
only
+/// the specified fields. When `select_full_types` is true, a field with all 
its
+/// sub-fields are selected if its field-id has been selected; otherwise, only 
leaf
+/// fields of selected field-ids are selected.
+///
+/// \note It returns an error when projection is not successful.
+class PruneColumnVisitor {
+ public:
+  PruneColumnVisitor(const std::unordered_set<int32_t>& selected_ids,
+                     bool select_full_types)
+      : selected_ids_(selected_ids), select_full_types_(select_full_types) {}
+
+  Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<Type>& type) const 
{
+    switch (type->type_id()) {
+      case TypeId::kStruct:
+        return Visit(internal::checked_pointer_cast<StructType>(type));
+      case TypeId::kList:
+        return Visit(internal::checked_pointer_cast<ListType>(type));
+      case TypeId::kMap:
+        return Visit(internal::checked_pointer_cast<MapType>(type));
+      default:
+        return nullptr;
+    }
+  }
+
+  Result<std::shared_ptr<Type>> Visit(const SchemaField& field) const {
+    if (selected_ids_.contains(field.field_id())) {
+      return (select_full_types_ || field.type()->is_primitive()) ? 
field.type()
+                                                                  : 
Visit(field.type());
+    }
+    return Visit(field.type());
+  }
+
+  static SchemaField MakeField(const SchemaField& field, std::shared_ptr<Type> 
type) {
+    return {field.field_id(), std::string(field.name()), std::move(type),
+            field.optional(), std::string(field.doc())};
+  }
+
+  Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<StructType>& type) 
const {
+    bool same_types = true;
+    std::vector<SchemaField> selected_fields;
+    for (const auto& field : type->fields()) {
+      ICEBERG_ASSIGN_OR_RAISE(auto child_type, Visit(field));
+      if (child_type) {
+        same_types = same_types && (child_type == field.type());
+        selected_fields.emplace_back(MakeField(field, std::move(child_type)));
+      }
+    }
+
+    if (selected_fields.empty()) {
+      return nullptr;
+    } else if (same_types and selected_fields.size() == type->fields().size()) 
{
+      return type;
+    }
+    return std::make_shared<StructType>(std::move(selected_fields));
+  }
+
+  Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<ListType>& type) 
const {
+    const auto& elem_field = type->fields()[0];
+    ICEBERG_ASSIGN_OR_RAISE(auto elem_type, Visit(elem_field));
+    if (elem_type == nullptr) {
+      return nullptr;
+    } else if (elem_type == elem_field.type()) {
+      return type;
+    }
+    return std::make_shared<ListType>(MakeField(elem_field, 
std::move(elem_type)));
+  }
+
+  Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<MapType>& type) 
const {
+    const auto& key_field = type->fields()[0];
+    const auto& value_field = type->fields()[1];
+    ICEBERG_ASSIGN_OR_RAISE(auto key_type, Visit(key_field));
+    ICEBERG_ASSIGN_OR_RAISE(auto value_type, Visit(value_field));
+
+    if (key_type == nullptr && value_type == nullptr) {
+      return nullptr;
+    } else if (value_type == value_field.type() &&
+               (key_type == key_field.type() || key_type == nullptr)) {
+      return type;
+    } else if (value_type == nullptr) {
+      return InvalidArgument("Cannot project Map without value field");
+    }
+    return std::make_shared<MapType>(
+        (key_type == nullptr ? key_field : MakeField(key_field, 
std::move(key_type))),
+        MakeField(value_field, std::move(value_type)));
+  }
+
+ private:
+  const std::unordered_set<int32_t>& selected_ids_;
+  const bool select_full_types_;
+};
+
+Result<std::unique_ptr<Schema>> Schema::Select(std::span<const std::string> 
names,
+                                               bool case_sensitive) const {
+  const std::string kAllColumns = "*";
+  if (std::ranges::find(names, kAllColumns) != names.end()) {
+    return std::make_unique<Schema>(*this);
+  }
+
+  std::unordered_set<int32_t> selected_ids;
+  for (const auto& name : names) {
+    ICEBERG_ASSIGN_OR_RAISE(auto result, FindFieldByName(name, 
case_sensitive));
+    if (result.has_value()) {
+      selected_ids.insert(result.value().get().field_id());
+    }
+  }
+
+  PruneColumnVisitor visitor(selected_ids, /*select_full_types=*/true);
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto result, 
visitor.Visit(std::shared_ptr<StructType>(ToStructType(*this))));
+
+  if (!result) {
+    return std::make_unique<Schema>(std::vector<SchemaField>{}, schema_id_);
+  }
+
+  if (result->type_id() != TypeId::kStruct) {
+    return InvalidSchema("Projected type must be a struct type");
+  }
+
+  return FromStructType(std::move(const_cast<StructType&>(
+                            internal::checked_cast<const 
StructType&>(*result))),
+                        schema_id_);
+}
+
+Result<std::unique_ptr<Schema>> Schema::Project(
+    const std::unordered_set<int32_t>& field_ids) const {
+  PruneColumnVisitor visitor(field_ids, /*select_full_types=*/false);
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto result, 
visitor.Visit(std::shared_ptr<StructType>(ToStructType(*this))));
+
+  if (!result) {
+    return std::make_unique<Schema>(std::vector<SchemaField>{}, schema_id_);
+  }
+
+  if (result->type_id() != TypeId::kStruct) {
+    return InvalidSchema("Projected type must be a struct type");
+  }
+
+  return FromStructType(std::move(const_cast<StructType&>(

Review Comment:
   ditto



##########
src/iceberg/schema.cc:
##########
@@ -257,4 +258,149 @@ void NameToIdVisitor::Finish() {
   }
 }
 
+/// \brief Visitor class for pruning schema columns based on selected field 
IDs.
+///
+/// This visitor traverses a schema and creates a projected version containing 
only
+/// the specified fields. When `select_full_types` is true, a field with all 
its
+/// sub-fields are selected if its field-id has been selected; otherwise, only 
leaf
+/// fields of selected field-ids are selected.
+///
+/// \note It returns an error when projection is not successful.
+class PruneColumnVisitor {
+ public:
+  PruneColumnVisitor(const std::unordered_set<int32_t>& selected_ids,
+                     bool select_full_types)
+      : selected_ids_(selected_ids), select_full_types_(select_full_types) {}
+
+  Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<Type>& type) const 
{
+    switch (type->type_id()) {
+      case TypeId::kStruct:
+        return Visit(internal::checked_pointer_cast<StructType>(type));
+      case TypeId::kList:
+        return Visit(internal::checked_pointer_cast<ListType>(type));
+      case TypeId::kMap:
+        return Visit(internal::checked_pointer_cast<MapType>(type));
+      default:
+        return nullptr;
+    }
+  }
+
+  Result<std::shared_ptr<Type>> Visit(const SchemaField& field) const {
+    if (selected_ids_.contains(field.field_id())) {
+      return (select_full_types_ || field.type()->is_primitive()) ? 
field.type()
+                                                                  : 
Visit(field.type());
+    }
+    return Visit(field.type());
+  }
+
+  static SchemaField MakeField(const SchemaField& field, std::shared_ptr<Type> 
type) {
+    return {field.field_id(), std::string(field.name()), std::move(type),
+            field.optional(), std::string(field.doc())};
+  }
+
+  Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<StructType>& type) 
const {
+    bool same_types = true;
+    std::vector<SchemaField> selected_fields;
+    for (const auto& field : type->fields()) {
+      ICEBERG_ASSIGN_OR_RAISE(auto child_type, Visit(field));
+      if (child_type) {
+        same_types = same_types && (child_type == field.type());
+        selected_fields.emplace_back(MakeField(field, std::move(child_type)));
+      }
+    }
+
+    if (selected_fields.empty()) {
+      return nullptr;
+    } else if (same_types and selected_fields.size() == type->fields().size()) 
{
+      return type;
+    }
+    return std::make_shared<StructType>(std::move(selected_fields));
+  }
+
+  Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<ListType>& type) 
const {
+    const auto& elem_field = type->fields()[0];
+    ICEBERG_ASSIGN_OR_RAISE(auto elem_type, Visit(elem_field));
+    if (elem_type == nullptr) {
+      return nullptr;
+    } else if (elem_type == elem_field.type()) {
+      return type;
+    }
+    return std::make_shared<ListType>(MakeField(elem_field, 
std::move(elem_type)));
+  }
+
+  Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<MapType>& type) 
const {
+    const auto& key_field = type->fields()[0];
+    const auto& value_field = type->fields()[1];
+    ICEBERG_ASSIGN_OR_RAISE(auto key_type, Visit(key_field));
+    ICEBERG_ASSIGN_OR_RAISE(auto value_type, Visit(value_field));
+
+    if (key_type == nullptr && value_type == nullptr) {
+      return nullptr;
+    } else if (value_type == value_field.type() &&
+               (key_type == key_field.type() || key_type == nullptr)) {
+      return type;
+    } else if (value_type == nullptr) {
+      return InvalidArgument("Cannot project Map without value field");
+    }
+    return std::make_shared<MapType>(
+        (key_type == nullptr ? key_field : MakeField(key_field, 
std::move(key_type))),
+        MakeField(value_field, std::move(value_type)));
+  }
+
+ private:
+  const std::unordered_set<int32_t>& selected_ids_;
+  const bool select_full_types_;
+};
+
+Result<std::unique_ptr<Schema>> Schema::Select(std::span<const std::string> 
names,
+                                               bool case_sensitive) const {
+  const std::string kAllColumns = "*";
+  if (std::ranges::find(names, kAllColumns) != names.end()) {
+    return std::make_unique<Schema>(*this);
+  }
+
+  std::unordered_set<int32_t> selected_ids;
+  for (const auto& name : names) {
+    ICEBERG_ASSIGN_OR_RAISE(auto result, FindFieldByName(name, 
case_sensitive));
+    if (result.has_value()) {
+      selected_ids.insert(result.value().get().field_id());
+    }
+  }
+
+  PruneColumnVisitor visitor(selected_ids, /*select_full_types=*/true);
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto result, 
visitor.Visit(std::shared_ptr<StructType>(ToStructType(*this))));
+
+  if (!result) {
+    return std::make_unique<Schema>(std::vector<SchemaField>{}, schema_id_);
+  }
+
+  if (result->type_id() != TypeId::kStruct) {
+    return InvalidSchema("Projected type must be a struct type");
+  }
+
+  return FromStructType(std::move(const_cast<StructType&>(
+                            internal::checked_cast<const 
StructType&>(*result))),
+                        schema_id_);
+}
+
+Result<std::unique_ptr<Schema>> Schema::Project(
+    const std::unordered_set<int32_t>& field_ids) const {
+  PruneColumnVisitor visitor(field_ids, /*select_full_types=*/false);
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto result, 
visitor.Visit(std::shared_ptr<StructType>(ToStructType(*this))));
+
+  if (!result) {
+    return std::make_unique<Schema>(std::vector<SchemaField>{}, schema_id_);

Review Comment:
   ditto



##########
test/schema_test.cc:
##########
@@ -491,3 +493,688 @@ TEST(SchemaTest, NestedDuplicateFieldIdError) {
   EXPECT_THAT(result.error().message,
               ::testing::HasSubstr("Duplicate field id found: 1"));
 }
+
+namespace {
+
+struct TestFieldFactory {
+  static std::unique_ptr<iceberg::SchemaField> Id(int32_t field_id = 1,

Review Comment:
   What's in my mind is something like below:
   
   ```cpp
   namespace {
   
   iceberg::SchemaField MakeIdField() { return {1, "id", iceberg::int32(), 
true}; }
   
   iceberg::SchemaField MakeNameField() { return {2, "name", iceberg::string(), 
false}; }
   
   template <typename... Args>
   std::shared_ptr<iceberg::StructType> MakeStructType(Args... args) {
     return std::make_shared<iceberg::StructType>(
         std::vector<iceberg::SchemaField>{args...});
   }
   
   template <typename... Args>
   std::shared_ptr<iceberg::Schema> MakeSchema(Args... args) {
     return 
std::make_shared<iceberg::Schema>(std::vector<iceberg::SchemaField>{args...},
                                              std::nullopt);
   }
   
   }  // namespace
   
   TEST(SchemaTest, Dummy) {
     auto schema = MakeSchema(MakeIdField(), MakeNameField());
     auto struct_type = MakeStructType(MakeIdField(), MakeNameField());
   }
   ```



##########
test/schema_test.cc:
##########
@@ -491,3 +493,688 @@ TEST(SchemaTest, NestedDuplicateFieldIdError) {
   EXPECT_THAT(result.error().message,
               ::testing::HasSubstr("Duplicate field id found: 1"));
 }
+
+namespace {
+
+struct TestFieldFactory {
+  static std::unique_ptr<iceberg::SchemaField> Id(int32_t field_id = 1,

Review Comment:
   The main point of adding these convenience functions is to make lines 
shorter. However, these function now are significantly longer than simply 
writing `SchemaField field{1, "id", iceberg::int32(), true};`



##########
test/schema_test.cc:
##########
@@ -491,3 +493,688 @@ TEST(SchemaTest, NestedDuplicateFieldIdError) {
   EXPECT_THAT(result.error().message,
               ::testing::HasSubstr("Duplicate field id found: 1"));
 }
+
+namespace {
+
+struct TestFieldFactory {
+  static std::unique_ptr<iceberg::SchemaField> Id(int32_t field_id = 1,

Review Comment:
   I have seen a common misuse like this for multiple times. Please check that 
`NestedType` accepts `SchemaField` but not `std::shared_ptr<SchemaField>` or 
`std::unique_ptr<SchemaField>`, so it is pointless to create a smart point of 
it then make a copy by dereferencing it. To be short, you should never create a 
smart pointer of `SchemaField`.



##########
src/iceberg/schema.cc:
##########
@@ -257,4 +258,149 @@ void NameToIdVisitor::Finish() {
   }
 }
 
+/// \brief Visitor class for pruning schema columns based on selected field 
IDs.
+///
+/// This visitor traverses a schema and creates a projected version containing 
only
+/// the specified fields. When `select_full_types` is true, a field with all 
its
+/// sub-fields are selected if its field-id has been selected; otherwise, only 
leaf
+/// fields of selected field-ids are selected.
+///
+/// \note It returns an error when projection is not successful.
+class PruneColumnVisitor {
+ public:
+  PruneColumnVisitor(const std::unordered_set<int32_t>& selected_ids,
+                     bool select_full_types)
+      : selected_ids_(selected_ids), select_full_types_(select_full_types) {}
+
+  Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<Type>& type) const 
{
+    switch (type->type_id()) {
+      case TypeId::kStruct:
+        return Visit(internal::checked_pointer_cast<StructType>(type));
+      case TypeId::kList:
+        return Visit(internal::checked_pointer_cast<ListType>(type));
+      case TypeId::kMap:
+        return Visit(internal::checked_pointer_cast<MapType>(type));
+      default:
+        return nullptr;
+    }
+  }
+
+  Result<std::shared_ptr<Type>> Visit(const SchemaField& field) const {
+    if (selected_ids_.contains(field.field_id())) {
+      return (select_full_types_ || field.type()->is_primitive()) ? 
field.type()
+                                                                  : 
Visit(field.type());
+    }
+    return Visit(field.type());
+  }
+
+  static SchemaField MakeField(const SchemaField& field, std::shared_ptr<Type> 
type) {
+    return {field.field_id(), std::string(field.name()), std::move(type),
+            field.optional(), std::string(field.doc())};
+  }
+
+  Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<StructType>& type) 
const {
+    bool same_types = true;
+    std::vector<SchemaField> selected_fields;
+    for (const auto& field : type->fields()) {
+      ICEBERG_ASSIGN_OR_RAISE(auto child_type, Visit(field));
+      if (child_type) {
+        same_types = same_types && (child_type == field.type());
+        selected_fields.emplace_back(MakeField(field, std::move(child_type)));
+      }
+    }
+
+    if (selected_fields.empty()) {
+      return nullptr;
+    } else if (same_types and selected_fields.size() == type->fields().size()) 
{
+      return type;
+    }
+    return std::make_shared<StructType>(std::move(selected_fields));
+  }
+
+  Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<ListType>& type) 
const {
+    const auto& elem_field = type->fields()[0];
+    ICEBERG_ASSIGN_OR_RAISE(auto elem_type, Visit(elem_field));
+    if (elem_type == nullptr) {
+      return nullptr;
+    } else if (elem_type == elem_field.type()) {
+      return type;
+    }
+    return std::make_shared<ListType>(MakeField(elem_field, 
std::move(elem_type)));
+  }
+
+  Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<MapType>& type) 
const {
+    const auto& key_field = type->fields()[0];
+    const auto& value_field = type->fields()[1];
+    ICEBERG_ASSIGN_OR_RAISE(auto key_type, Visit(key_field));
+    ICEBERG_ASSIGN_OR_RAISE(auto value_type, Visit(value_field));
+
+    if (key_type == nullptr && value_type == nullptr) {
+      return nullptr;
+    } else if (value_type == value_field.type() &&
+               (key_type == key_field.type() || key_type == nullptr)) {
+      return type;
+    } else if (value_type == nullptr) {
+      return InvalidArgument("Cannot project Map without value field");
+    }
+    return std::make_shared<MapType>(
+        (key_type == nullptr ? key_field : MakeField(key_field, 
std::move(key_type))),
+        MakeField(value_field, std::move(value_type)));
+  }
+
+ private:
+  const std::unordered_set<int32_t>& selected_ids_;
+  const bool select_full_types_;
+};
+
+Result<std::unique_ptr<Schema>> Schema::Select(std::span<const std::string> 
names,
+                                               bool case_sensitive) const {
+  const std::string kAllColumns = "*";
+  if (std::ranges::find(names, kAllColumns) != names.end()) {
+    return std::make_unique<Schema>(*this);
+  }
+
+  std::unordered_set<int32_t> selected_ids;
+  for (const auto& name : names) {
+    ICEBERG_ASSIGN_OR_RAISE(auto result, FindFieldByName(name, 
case_sensitive));
+    if (result.has_value()) {
+      selected_ids.insert(result.value().get().field_id());
+    }
+  }
+
+  PruneColumnVisitor visitor(selected_ids, /*select_full_types=*/true);
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto result, 
visitor.Visit(std::shared_ptr<StructType>(ToStructType(*this))));
+
+  if (!result) {
+    return std::make_unique<Schema>(std::vector<SchemaField>{}, schema_id_);
+  }
+
+  if (result->type_id() != TypeId::kStruct) {
+    return InvalidSchema("Projected type must be a struct type");
+  }
+
+  return FromStructType(std::move(const_cast<StructType&>(
+                            internal::checked_cast<const 
StructType&>(*result))),
+                        schema_id_);

Review Comment:
   ```suggestion
     return 
FromStructType(std::move(internal::checked_cast<StructType&>(*pruned_type)),
                           std::nullopt);
   ```



##########
src/iceberg/schema.cc:
##########
@@ -257,4 +258,149 @@ void NameToIdVisitor::Finish() {
   }
 }
 
+/// \brief Visitor class for pruning schema columns based on selected field 
IDs.

Review Comment:
   ```suggestion
   /// \brief Visitor for pruning columns based on selected field IDs.
   ```



##########
src/iceberg/schema.cc:
##########
@@ -257,4 +258,149 @@ void NameToIdVisitor::Finish() {
   }
 }
 
+/// \brief Visitor class for pruning schema columns based on selected field 
IDs.
+///
+/// This visitor traverses a schema and creates a projected version containing 
only
+/// the specified fields. When `select_full_types` is true, a field with all 
its
+/// sub-fields are selected if its field-id has been selected; otherwise, only 
leaf
+/// fields of selected field-ids are selected.
+///
+/// \note It returns an error when projection is not successful.
+class PruneColumnVisitor {
+ public:
+  PruneColumnVisitor(const std::unordered_set<int32_t>& selected_ids,
+                     bool select_full_types)
+      : selected_ids_(selected_ids), select_full_types_(select_full_types) {}
+
+  Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<Type>& type) const 
{
+    switch (type->type_id()) {
+      case TypeId::kStruct:
+        return Visit(internal::checked_pointer_cast<StructType>(type));
+      case TypeId::kList:
+        return Visit(internal::checked_pointer_cast<ListType>(type));
+      case TypeId::kMap:
+        return Visit(internal::checked_pointer_cast<MapType>(type));
+      default:
+        return nullptr;
+    }
+  }
+
+  Result<std::shared_ptr<Type>> Visit(const SchemaField& field) const {
+    if (selected_ids_.contains(field.field_id())) {
+      return (select_full_types_ || field.type()->is_primitive()) ? 
field.type()
+                                                                  : 
Visit(field.type());
+    }
+    return Visit(field.type());
+  }
+
+  static SchemaField MakeField(const SchemaField& field, std::shared_ptr<Type> 
type) {
+    return {field.field_id(), std::string(field.name()), std::move(type),
+            field.optional(), std::string(field.doc())};
+  }
+
+  Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<StructType>& type) 
const {
+    bool same_types = true;
+    std::vector<SchemaField> selected_fields;
+    for (const auto& field : type->fields()) {
+      ICEBERG_ASSIGN_OR_RAISE(auto child_type, Visit(field));
+      if (child_type) {
+        same_types = same_types && (child_type == field.type());
+        selected_fields.emplace_back(MakeField(field, std::move(child_type)));
+      }
+    }
+
+    if (selected_fields.empty()) {
+      return nullptr;
+    } else if (same_types and selected_fields.size() == type->fields().size()) 
{
+      return type;
+    }
+    return std::make_shared<StructType>(std::move(selected_fields));
+  }
+
+  Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<ListType>& type) 
const {
+    const auto& elem_field = type->fields()[0];
+    ICEBERG_ASSIGN_OR_RAISE(auto elem_type, Visit(elem_field));
+    if (elem_type == nullptr) {
+      return nullptr;
+    } else if (elem_type == elem_field.type()) {
+      return type;
+    }
+    return std::make_shared<ListType>(MakeField(elem_field, 
std::move(elem_type)));
+  }
+
+  Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<MapType>& type) 
const {
+    const auto& key_field = type->fields()[0];
+    const auto& value_field = type->fields()[1];
+    ICEBERG_ASSIGN_OR_RAISE(auto key_type, Visit(key_field));
+    ICEBERG_ASSIGN_OR_RAISE(auto value_type, Visit(value_field));
+
+    if (key_type == nullptr && value_type == nullptr) {
+      return nullptr;
+    } else if (value_type == value_field.type() &&
+               (key_type == key_field.type() || key_type == nullptr)) {
+      return type;
+    } else if (value_type == nullptr) {
+      return InvalidArgument("Cannot project Map without value field");
+    }
+    return std::make_shared<MapType>(
+        (key_type == nullptr ? key_field : MakeField(key_field, 
std::move(key_type))),
+        MakeField(value_field, std::move(value_type)));
+  }
+
+ private:
+  const std::unordered_set<int32_t>& selected_ids_;
+  const bool select_full_types_;
+};
+
+Result<std::unique_ptr<Schema>> Schema::Select(std::span<const std::string> 
names,
+                                               bool case_sensitive) const {
+  const std::string kAllColumns = "*";
+  if (std::ranges::find(names, kAllColumns) != names.end()) {
+    return std::make_unique<Schema>(*this);
+  }
+
+  std::unordered_set<int32_t> selected_ids;
+  for (const auto& name : names) {
+    ICEBERG_ASSIGN_OR_RAISE(auto result, FindFieldByName(name, 
case_sensitive));
+    if (result.has_value()) {
+      selected_ids.insert(result.value().get().field_id());
+    }
+  }
+
+  PruneColumnVisitor visitor(selected_ids, /*select_full_types=*/true);
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto result, 
visitor.Visit(std::shared_ptr<StructType>(ToStructType(*this))));
+
+  if (!result) {
+    return std::make_unique<Schema>(std::vector<SchemaField>{}, schema_id_);

Review Comment:
   ```suggestion
       return std::make_unique<Schema>(std::vector<SchemaField>{}, 
std::nullopt);
   ```
   
   This is no longer the original schema so it cannot share the schema id.



##########
src/iceberg/schema.cc:
##########
@@ -257,4 +258,149 @@ void NameToIdVisitor::Finish() {
   }
 }
 
+/// \brief Visitor class for pruning schema columns based on selected field 
IDs.
+///
+/// This visitor traverses a schema and creates a projected version containing 
only
+/// the specified fields. When `select_full_types` is true, a field with all 
its
+/// sub-fields are selected if its field-id has been selected; otherwise, only 
leaf
+/// fields of selected field-ids are selected.
+///
+/// \note It returns an error when projection is not successful.
+class PruneColumnVisitor {
+ public:
+  PruneColumnVisitor(const std::unordered_set<int32_t>& selected_ids,
+                     bool select_full_types)
+      : selected_ids_(selected_ids), select_full_types_(select_full_types) {}
+
+  Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<Type>& type) const 
{
+    switch (type->type_id()) {
+      case TypeId::kStruct:
+        return Visit(internal::checked_pointer_cast<StructType>(type));
+      case TypeId::kList:
+        return Visit(internal::checked_pointer_cast<ListType>(type));
+      case TypeId::kMap:
+        return Visit(internal::checked_pointer_cast<MapType>(type));
+      default:
+        return nullptr;
+    }
+  }
+
+  Result<std::shared_ptr<Type>> Visit(const SchemaField& field) const {
+    if (selected_ids_.contains(field.field_id())) {
+      return (select_full_types_ || field.type()->is_primitive()) ? 
field.type()
+                                                                  : 
Visit(field.type());
+    }
+    return Visit(field.type());
+  }
+
+  static SchemaField MakeField(const SchemaField& field, std::shared_ptr<Type> 
type) {
+    return {field.field_id(), std::string(field.name()), std::move(type),
+            field.optional(), std::string(field.doc())};
+  }
+
+  Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<StructType>& type) 
const {
+    bool same_types = true;
+    std::vector<SchemaField> selected_fields;
+    for (const auto& field : type->fields()) {
+      ICEBERG_ASSIGN_OR_RAISE(auto child_type, Visit(field));
+      if (child_type) {
+        same_types = same_types && (child_type == field.type());
+        selected_fields.emplace_back(MakeField(field, std::move(child_type)));
+      }
+    }
+
+    if (selected_fields.empty()) {
+      return nullptr;
+    } else if (same_types and selected_fields.size() == type->fields().size()) 
{

Review Comment:
   ```suggestion
       } else if (same_types && selected_fields.size() == 
type->fields().size()) {
   ```
   
   We don't use `and` across the repo so please be consistent.



##########
test/schema_test.cc:
##########
@@ -491,3 +493,688 @@ TEST(SchemaTest, NestedDuplicateFieldIdError) {
   EXPECT_THAT(result.error().message,
               ::testing::HasSubstr("Duplicate field id found: 1"));
 }
+
+namespace {
+
+struct TestFieldFactory {
+  static std::unique_ptr<iceberg::SchemaField> Id(int32_t field_id = 1,

Review Comment:
   For readability, functions to create `Id`, `Name` and the like, we should 
not accept any input parameters because their definition should be consistent 
across these test cases. Otherwise, simply using `SchemaField field{1, "id", 
iceberg::int32(), true}` looks better.



##########
src/iceberg/schema.cc:
##########
@@ -257,4 +258,149 @@ void NameToIdVisitor::Finish() {
   }
 }
 
+/// \brief Visitor class for pruning schema columns based on selected field 
IDs.
+///
+/// This visitor traverses a schema and creates a projected version containing 
only
+/// the specified fields. When `select_full_types` is true, a field with all 
its
+/// sub-fields are selected if its field-id has been selected; otherwise, only 
leaf
+/// fields of selected field-ids are selected.
+///
+/// \note It returns an error when projection is not successful.
+class PruneColumnVisitor {
+ public:
+  PruneColumnVisitor(const std::unordered_set<int32_t>& selected_ids,
+                     bool select_full_types)
+      : selected_ids_(selected_ids), select_full_types_(select_full_types) {}
+
+  Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<Type>& type) const 
{
+    switch (type->type_id()) {
+      case TypeId::kStruct:
+        return Visit(internal::checked_pointer_cast<StructType>(type));
+      case TypeId::kList:
+        return Visit(internal::checked_pointer_cast<ListType>(type));
+      case TypeId::kMap:
+        return Visit(internal::checked_pointer_cast<MapType>(type));
+      default:
+        return nullptr;
+    }
+  }
+
+  Result<std::shared_ptr<Type>> Visit(const SchemaField& field) const {
+    if (selected_ids_.contains(field.field_id())) {
+      return (select_full_types_ || field.type()->is_primitive()) ? 
field.type()
+                                                                  : 
Visit(field.type());
+    }
+    return Visit(field.type());
+  }
+
+  static SchemaField MakeField(const SchemaField& field, std::shared_ptr<Type> 
type) {
+    return {field.field_id(), std::string(field.name()), std::move(type),
+            field.optional(), std::string(field.doc())};
+  }
+
+  Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<StructType>& type) 
const {
+    bool same_types = true;
+    std::vector<SchemaField> selected_fields;
+    for (const auto& field : type->fields()) {
+      ICEBERG_ASSIGN_OR_RAISE(auto child_type, Visit(field));
+      if (child_type) {
+        same_types = same_types && (child_type == field.type());
+        selected_fields.emplace_back(MakeField(field, std::move(child_type)));
+      }
+    }
+
+    if (selected_fields.empty()) {
+      return nullptr;
+    } else if (same_types and selected_fields.size() == type->fields().size()) 
{
+      return type;
+    }
+    return std::make_shared<StructType>(std::move(selected_fields));
+  }
+
+  Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<ListType>& type) 
const {
+    const auto& elem_field = type->fields()[0];
+    ICEBERG_ASSIGN_OR_RAISE(auto elem_type, Visit(elem_field));
+    if (elem_type == nullptr) {
+      return nullptr;
+    } else if (elem_type == elem_field.type()) {
+      return type;
+    }
+    return std::make_shared<ListType>(MakeField(elem_field, 
std::move(elem_type)));
+  }
+
+  Result<std::shared_ptr<Type>> Visit(const std::shared_ptr<MapType>& type) 
const {
+    const auto& key_field = type->fields()[0];
+    const auto& value_field = type->fields()[1];
+    ICEBERG_ASSIGN_OR_RAISE(auto key_type, Visit(key_field));
+    ICEBERG_ASSIGN_OR_RAISE(auto value_type, Visit(value_field));
+
+    if (key_type == nullptr && value_type == nullptr) {
+      return nullptr;
+    } else if (value_type == value_field.type() &&
+               (key_type == key_field.type() || key_type == nullptr)) {
+      return type;
+    } else if (value_type == nullptr) {
+      return InvalidArgument("Cannot project Map without value field");
+    }
+    return std::make_shared<MapType>(
+        (key_type == nullptr ? key_field : MakeField(key_field, 
std::move(key_type))),
+        MakeField(value_field, std::move(value_type)));
+  }
+
+ private:
+  const std::unordered_set<int32_t>& selected_ids_;
+  const bool select_full_types_;
+};
+
+Result<std::unique_ptr<Schema>> Schema::Select(std::span<const std::string> 
names,
+                                               bool case_sensitive) const {
+  const std::string kAllColumns = "*";
+  if (std::ranges::find(names, kAllColumns) != names.end()) {
+    return std::make_unique<Schema>(*this);
+  }
+
+  std::unordered_set<int32_t> selected_ids;
+  for (const auto& name : names) {
+    ICEBERG_ASSIGN_OR_RAISE(auto result, FindFieldByName(name, 
case_sensitive));
+    if (result.has_value()) {
+      selected_ids.insert(result.value().get().field_id());
+    }
+  }
+
+  PruneColumnVisitor visitor(selected_ids, /*select_full_types=*/true);
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto result, 
visitor.Visit(std::shared_ptr<StructType>(ToStructType(*this))));

Review Comment:
   ```suggestion
         auto pruned_type, 
visitor.Visit(std::shared_ptr<StructType>(ToStructType(*this))));
   ```
   
   Let's avoid vague variable names.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to