This is an automated email from the ASF dual-hosted git repository.

wgtmac pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 43eb3271 feat(data): add delete filter support (#650)
43eb3271 is described below

commit 43eb3271126019dff1881771281631a4113ec8cc
Author: Gang Wu <[email protected]>
AuthorDate: Tue May 12 15:18:48 2026 +0800

    feat(data): add delete filter support (#650)
    
    Adds delete filter support for merge-on-read data batches, including
    position deletes, equality deletes, dropped-field lookup through
    historic schemas, optional delete counting, and required schema
    expansion.
---
 src/iceberg/CMakeLists.txt             |    1 +
 src/iceberg/data/delete_filter.cc      |  790 ++++++++++++++++
 src/iceberg/data/delete_filter.h       |  229 +++++
 src/iceberg/data/meson.build           |    1 +
 src/iceberg/meson.build                |    1 +
 src/iceberg/metadata_columns.h         |    2 +-
 src/iceberg/test/CMakeLists.txt        |    1 +
 src/iceberg/test/delete_filter_test.cc | 1625 ++++++++++++++++++++++++++++++++
 8 files changed, 2649 insertions(+), 1 deletion(-)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index c4e193b8..79298b1a 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -161,6 +161,7 @@ add_iceberg_lib(iceberg
 
 set(ICEBERG_DATA_SOURCES
     data/data_writer.cc
+    data/delete_filter.cc
     data/delete_loader.cc
     data/equality_delete_writer.cc
     data/position_delete_writer.cc
diff --git a/src/iceberg/data/delete_filter.cc 
b/src/iceberg/data/delete_filter.cc
new file mode 100644
index 00000000..876d644e
--- /dev/null
+++ b/src/iceberg/data/delete_filter.cc
@@ -0,0 +1,790 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/data/delete_filter.h"
+
+#include <algorithm>
+#include <map>
+#include <optional>
+#include <set>
+#include <span>
+#include <string_view>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/result.h"
+#include "iceberg/row/arrow_array_wrapper.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+#include "iceberg/util/struct_like_set.h"
+
+namespace iceberg {
+
+namespace {
+
+std::optional<size_t> FindFieldIndexById(std::span<const SchemaField> fields,
+                                         int32_t field_id) {
+  for (size_t pos = 0; pos < fields.size(); ++pos) {
+    if (fields[pos].field_id() == field_id) {
+      return pos;
+    }
+  }
+  return std::nullopt;
+}
+
+Result<size_t> RequireFieldIndexById(std::span<const SchemaField> fields,
+                                     int32_t field_id, std::string_view 
context) {
+  auto pos = FindFieldIndexById(fields, field_id);
+  if (pos.has_value()) {
+    return pos.value();
+  }
+  return InvalidSchema("Cannot find field id {} in {}", field_id, context);
+}
+
+// Views a source row through the equality-delete key schema: fields are 
selected by
+// field id, then exposed by position so StructLikeSet can compare only delete 
keys.
+class ProjectedStructLike : public StructLike {
+ public:
+  struct ProjectedField;
+  using ProjectedSubFields = std::vector<ProjectedField>;
+
+  struct ProjectedField {
+    int32_t field_id;
+    size_t source_field_pos;
+    std::shared_ptr<const ProjectedSubFields> nested_projected_fields;
+  };
+
+  explicit ProjectedStructLike(std::shared_ptr<const ProjectedSubFields> 
projected_fields)
+      : projected_fields_(std::move(projected_fields)) {
+    nested_projected_structs_.reserve(projected_fields_->size());
+    for (const auto& projected_field : *projected_fields_) {
+      nested_projected_structs_.push_back(
+          projected_field.nested_projected_fields == nullptr
+              ? nullptr
+              : std::make_shared<ProjectedStructLike>(
+                    projected_field.nested_projected_fields));
+    }
+  }
+
+  /// \brief Build field-id based positions from the source row to the 
equality keys.
+  ///
+  /// \param source_type the schema of wrapped rows
+  /// \param target_type the key schema used by the equality-delete set
+  static Result<std::shared_ptr<const ProjectedSubFields>> BuildProjection(
+      const StructType& source_type, const StructType& target_type) {
+    ProjectedSubFields projected_fields;
+    projected_fields.reserve(target_type.fields().size());
+    for (const auto& target_field : target_type.fields()) {
+      ICEBERG_ASSIGN_OR_RAISE(
+          auto source_field_pos,
+          RequireFieldIndexById(source_type.fields(), target_field.field_id(),
+                                "source projection"));
+      const auto& source_field = source_type.fields()[source_field_pos];
+
+      std::shared_ptr<const ProjectedSubFields> nested_projected_fields;
+      if (*source_field.type() != *target_field.type()) {
+        if (target_field.type()->type_id() == TypeId::kStruct &&
+            source_field.type()->type_id() == TypeId::kStruct) {
+          ICEBERG_ASSIGN_OR_RAISE(
+              nested_projected_fields,
+              BuildProjection(
+                  internal::checked_cast<const 
StructType&>(*source_field.type()),
+                  internal::checked_cast<const 
StructType&>(*target_field.type())));
+        } else if (target_field.type()->is_nested()) {
+          return NotSupported("Cannot project partial non-struct equality 
field id {}",
+                              target_field.field_id());
+        }
+      }
+
+      projected_fields.push_back(ProjectedField{
+          .field_id = target_field.field_id(),
+          .source_field_pos = source_field_pos,
+          .nested_projected_fields = std::move(nested_projected_fields),
+      });
+    }
+    return std::make_shared<const 
ProjectedSubFields>(std::move(projected_fields));
+  }
+
+  void Wrap(const StructLike& row) {
+    owned_row_.reset();
+    row_ = &row;
+  }
+
+  void Wrap(std::shared_ptr<StructLike> row) {
+    owned_row_ = std::move(row);
+    row_ = owned_row_.get();
+  }
+
+  Result<Scalar> GetField(size_t pos) const override {
+    ICEBERG_PRECHECK(row_ != nullptr, "ProjectedStructLike has no wrapped 
row");
+    if (pos >= projected_fields_->size()) {
+      return InvalidArgument("Projected field index {} out of range (size: 
{})", pos,
+                             projected_fields_->size());
+    }
+
+    const auto& projected_field = (*projected_fields_)[pos];
+    ICEBERG_ASSIGN_OR_RAISE(auto scalar,
+                            row_->GetField(projected_field.source_field_pos));
+    if (projected_field.nested_projected_fields == nullptr ||
+        std::holds_alternative<std::monostate>(scalar)) {
+      return scalar;
+    }
+
+    if (!std::holds_alternative<std::shared_ptr<StructLike>>(scalar)) {
+      return InvalidSchema("Expected struct field id {} while projecting 
equality row",
+                           projected_field.field_id);
+    }
+
+    auto child = std::get<std::shared_ptr<StructLike>>(std::move(scalar));
+    if (child == nullptr) {
+      return Scalar{std::monostate{}};
+    }
+
+    auto projected_struct = nested_projected_structs_[pos];
+    projected_struct->Wrap(std::move(child));
+    return 
Scalar{std::static_pointer_cast<StructLike>(std::move(projected_struct))};
+  }
+
+  size_t num_fields() const override { return projected_fields_->size(); }
+
+ private:
+  std::shared_ptr<StructLike> owned_row_;
+  const StructLike* row_ = nullptr;
+  std::shared_ptr<const ProjectedSubFields> projected_fields_;
+  std::vector<std::shared_ptr<ProjectedStructLike>> nested_projected_structs_;
+};
+
+Status ValidateEqualityIds(const DataFile& delete_file) {
+  if (delete_file.equality_ids.empty()) {
+    return InvalidArgument("Equality delete file '{}' has no equality field 
ids",
+                           delete_file.file_path);
+  }
+  return {};
+}
+
+SchemaField WithType(const SchemaField& field, std::shared_ptr<Type> type) {
+  return SchemaField{field.field_id(), std::string(field.name()), 
std::move(type),
+                     field.optional(), std::string(field.doc())};
+}
+
+std::shared_ptr<Type> SortStructFieldsById(const std::shared_ptr<Type>& type) {
+  if (type->type_id() != TypeId::kStruct) {
+    return type;
+  }
+
+  const auto& struct_type = internal::checked_cast<const StructType&>(*type);
+  auto source_fields = struct_type.fields();
+  std::vector<std::shared_ptr<Type>> sorted_types;
+  sorted_types.reserve(source_fields.size());
+  bool changed = false;
+  for (const auto& field : source_fields) {
+    auto sorted_type = SortStructFieldsById(field.type());
+    changed = changed || sorted_type != field.type();
+    sorted_types.push_back(std::move(sorted_type));
+  }
+
+  const bool needs_sort =
+      !std::ranges::is_sorted(source_fields, {}, &SchemaField::field_id);
+  if (!changed && !needs_sort) {
+    return type;
+  }
+
+  std::vector<SchemaField> fields;
+  fields.reserve(source_fields.size());
+  for (size_t pos = 0; pos < source_fields.size(); ++pos) {
+    const auto& field = source_fields[pos];
+    fields.push_back(sorted_types[pos] == field.type()
+                         ? field
+                         : WithType(field, std::move(sorted_types[pos])));
+  }
+
+  if (needs_sort) {
+    std::ranges::sort(fields, {}, &SchemaField::field_id);
+  }
+  return std::make_shared<StructType>(std::move(fields));
+}
+
+void SortFieldsById(std::vector<SchemaField>& fields) {
+  for (auto& field : fields) {
+    auto sorted_type = SortStructFieldsById(field.type());
+    if (sorted_type != field.type()) {
+      field = WithType(field, std::move(sorted_type));
+    }
+  }
+  std::ranges::sort(fields, {}, &SchemaField::field_id);
+}
+
+Result<std::vector<SchemaField>> ProjectFieldsById(
+    const Schema& schema, const std::set<int32_t>& selected_ids) {
+  std::unordered_set<int32_t> unordered_ids(selected_ids.begin(), 
selected_ids.end());
+  ICEBERG_ASSIGN_OR_RAISE(auto projected_schema, 
schema.Project(unordered_ids));
+  std::vector<SchemaField> fields(projected_schema->fields().begin(),
+                                  projected_schema->fields().end());
+  return fields;
+}
+
+Result<std::vector<SchemaField>> ProjectEqualityKeyFields(
+    const Schema& schema, const std::set<int32_t>& selected_ids) {
+  ICEBERG_ASSIGN_OR_RAISE(auto fields, ProjectFieldsById(schema, 
selected_ids));
+  // Equality-delete keys keep the projected struct shape; fields are sorted 
by id
+  // within each struct level, not flattened by nested leaf ids.
+  SortFieldsById(fields);
+  return fields;
+}
+
+bool ContainsFieldId(const SchemaField& field, int32_t field_id);
+
+bool ContainsFieldId(const Type& type, int32_t field_id) {
+  if (!type.is_nested()) {
+    return false;
+  }
+  const auto& nested = internal::checked_cast<const NestedType&>(type);
+  return std::ranges::any_of(nested.fields(), [field_id](const SchemaField& 
field) {
+    return ContainsFieldId(field, field_id);
+  });
+}
+
+bool ContainsFieldId(const SchemaField& field, int32_t field_id) {
+  return field.field_id() == field_id || ContainsFieldId(*field.type(), 
field_id);
+}
+
+Status ValidateEqualityProjectionField(int32_t field_id, const SchemaField& 
field) {
+  if (field.field_id() == field_id) {
+    if (!field.type()->is_primitive()) {
+      return InvalidArgument(
+          "Equality delete field id {} must reference a primitive field", 
field_id);
+    }
+    return {};
+  }
+
+  switch (field.type()->type_id()) {
+    case TypeId::kStruct: {
+      const auto& struct_type = internal::checked_cast<const 
StructType&>(*field.type());
+      for (const auto& child : struct_type.fields()) {
+        if (ContainsFieldId(child, field_id)) {
+          return ValidateEqualityProjectionField(field_id, child);
+        }
+      }
+      break;
+    }
+    case TypeId::kList:
+    case TypeId::kMap:
+      if (ContainsFieldId(*field.type(), field_id)) {
+        return InvalidArgument("Equality delete field id {} must not be nested 
in {}",
+                               field_id, ToString(field.type()->type_id()));
+      }
+      break;
+    default:
+      break;
+  }
+
+  return InvalidSchema("Cannot find equality delete field id {} in projection 
field {}",
+                       field_id, field.field_id());
+}
+
+Result<std::optional<DeleteFilter::FieldLookupResult>> LookupFieldInSchema(
+    const Schema& schema, int32_t field_id) {
+  ICEBERG_ASSIGN_OR_RAISE(auto field, schema.FindFieldById(field_id));
+  if (!field.has_value()) {
+    return std::nullopt;
+  }
+  if (!field->get().type()->is_primitive()) {
+    return InvalidArgument("Equality delete field id {} must reference a 
primitive field",
+                           field_id);
+  }
+
+  std::set<int32_t> selected_ids = {field_id};
+  ICEBERG_ASSIGN_OR_RAISE(auto projected_fields, ProjectFieldsById(schema, 
selected_ids));
+  if (projected_fields.empty()) {
+    return InvalidSchema("Cannot project field id {} from lookup schema", 
field_id);
+  }
+  if (projected_fields.size() != 1) {
+    return InvalidSchema("Expected one top-level projection for field id {} 
but got {}",
+                         field_id, projected_fields.size());
+  }
+
+  return DeleteFilter::FieldLookupResult{
+      .field = field.value().get(),
+      .projection_field = std::move(projected_fields[0]),
+  };
+}
+
+Result<bool> MergeField(SchemaField& existing, const SchemaField& required) {
+  if (existing.field_id() != required.field_id()) {
+    return InvalidSchema("Cannot merge field id {} with field id {}", 
existing.field_id(),
+                         required.field_id());
+  }
+
+  if (*existing.type() == *required.type() || !required.type()->is_nested()) {
+    return false;
+  }
+
+  if (existing.type()->type_id() == TypeId::kStruct &&
+      required.type()->type_id() == TypeId::kStruct) {
+    const auto& existing_struct =
+        internal::checked_cast<const StructType&>(*existing.type());
+    std::vector<SchemaField> fields(existing_struct.fields().begin(),
+                                    existing_struct.fields().end());
+    const auto& required_struct =
+        internal::checked_cast<const StructType&>(*required.type());
+
+    bool changed = false;
+    for (const auto& required_child : required_struct.fields()) {
+      auto existing_pos = FindFieldIndexById(fields, 
required_child.field_id());
+      if (existing_pos.has_value()) {
+        ICEBERG_ASSIGN_OR_RAISE(auto child_changed,
+                                MergeField(fields[existing_pos.value()], 
required_child));
+        changed = changed || child_changed;
+      } else {
+        fields.push_back(required_child);
+        changed = true;
+      }
+    }
+
+    if (!changed) {
+      return false;
+    }
+    existing = SchemaField(existing.field_id(), std::string(existing.name()),
+                           std::make_shared<StructType>(std::move(fields)),
+                           existing.optional(), std::string(existing.doc()));
+    return true;
+  }
+
+  return InvalidArgument(
+      "Cannot merge non-struct nested field id {} into delete projection",
+      required.field_id());
+}
+
+Result<bool> MergeProjectionField(std::vector<SchemaField>& fields,
+                                  const SchemaField& required_projection) {
+  auto existing_pos = FindFieldIndexById(fields, 
required_projection.field_id());
+  if (existing_pos.has_value()) {
+    return MergeField(fields[existing_pos.value()], required_projection);
+  }
+
+  fields.push_back(required_projection);
+  return true;
+}
+
+void AddIdOnce(std::vector<int32_t>& ids, std::unordered_set<int32_t>& seen,
+               int32_t field_id) {
+  if (seen.insert(field_id).second) {
+    ids.push_back(field_id);
+  }
+}
+
+}  // namespace
+
+struct DeleteFilter::EqDeleteGroup {
+  std::unique_ptr<ProjectedStructLike> row_projection;
+  std::unique_ptr<UncheckedStructLikeSet> delete_set;
+};
+
+Result<DeleteFilter::FieldLookup> DeleteFilter::MakeFieldLookup(
+    std::shared_ptr<Schema> table_schema,
+    std::span<const std::shared_ptr<Schema>> schemas) {
+  ICEBERG_PRECHECK(table_schema != nullptr, "Table schema must not be null");
+
+  std::vector<std::shared_ptr<Schema>> lookup_schemas;
+  lookup_schemas.reserve(schemas.size() + 1);
+  const int32_t current_schema_id = table_schema->schema_id();
+  lookup_schemas.push_back(std::move(table_schema));
+
+  std::vector<std::shared_ptr<Schema>> sorted_fallback_schemas;
+  sorted_fallback_schemas.reserve(schemas.size());
+  for (const auto& schema : schemas) {
+    ICEBERG_PRECHECK(schema != nullptr, "Schema must not be null");
+    if (schema->schema_id() != current_schema_id) {
+      sorted_fallback_schemas.push_back(schema);
+    }
+  }
+
+  // Search fallback schemas from latest to oldest so the highest schema_id 
wins.
+  std::ranges::stable_sort(sorted_fallback_schemas, [](const auto& lhs, const 
auto& rhs) {
+    return lhs->schema_id() > rhs->schema_id();
+  });
+
+  std::unordered_set<int32_t> seen_schema_ids;
+  seen_schema_ids.insert(current_schema_id);
+  for (const auto& schema : sorted_fallback_schemas) {
+    if (seen_schema_ids.insert(schema->schema_id()).second) {
+      lookup_schemas.push_back(schema);
+    }
+  }
+
+  return [lookup_schemas = std::move(lookup_schemas)](
+             int32_t field_id) -> Result<std::optional<FieldLookupResult>> {
+    for (const auto& schema : lookup_schemas) {
+      ICEBERG_ASSIGN_OR_RAISE(auto field, LookupFieldInSchema(*schema, 
field_id));
+      if (field.has_value()) {
+        return field;
+      }
+    }
+    return std::nullopt;
+  };
+}
+
+Result<DeleteFilter::FieldLookup> DeleteFilter::MakeFieldLookup(
+    std::shared_ptr<TableMetadata> table_metadata) {
+  ICEBERG_PRECHECK(table_metadata != nullptr, "Table metadata must not be 
null");
+
+  ICEBERG_ASSIGN_OR_RAISE(auto table_schema, table_metadata->Schema());
+  return MakeFieldLookup(std::move(table_schema), table_metadata->schemas);
+}
+
+Result<std::unique_ptr<DeleteFilter>> DeleteFilter::Make(
+    std::string file_path, std::span<const std::shared_ptr<DataFile>> 
delete_files,
+    std::shared_ptr<Schema> table_schema, std::shared_ptr<Schema> 
requested_schema,
+    std::shared_ptr<FileIO> io, bool need_row_pos_col,
+    std::shared_ptr<DeleteCounter> counter) {
+  ICEBERG_ASSIGN_OR_RAISE(auto field_lookup, MakeFieldLookup(table_schema));
+  return Make(std::move(file_path), delete_files, std::move(requested_schema),
+              std::move(io), std::move(field_lookup), need_row_pos_col,
+              std::move(counter));
+}
+
+Result<std::unique_ptr<DeleteFilter>> DeleteFilter::Make(
+    std::string file_path, std::span<const std::shared_ptr<DataFile>> 
delete_files,
+    std::shared_ptr<TableMetadata> table_metadata,
+    std::shared_ptr<Schema> requested_schema, std::shared_ptr<FileIO> io,
+    bool need_row_pos_col, std::shared_ptr<DeleteCounter> counter) {
+  ICEBERG_PRECHECK(table_metadata != nullptr, "Table metadata must not be 
null");
+
+  ICEBERG_ASSIGN_OR_RAISE(auto field_lookup, 
MakeFieldLookup(std::move(table_metadata)));
+  return Make(std::move(file_path), delete_files, std::move(requested_schema),
+              std::move(io), std::move(field_lookup), need_row_pos_col,
+              std::move(counter));
+}
+
+Result<std::unique_ptr<DeleteFilter>> DeleteFilter::Make(
+    std::string file_path, std::span<const std::shared_ptr<DataFile>> 
delete_files,
+    std::shared_ptr<Schema> table_schema, std::shared_ptr<Schema> 
requested_schema,
+    std::shared_ptr<FileIO> io, std::span<const std::shared_ptr<Schema>> 
schemas,
+    bool need_row_pos_col, std::shared_ptr<DeleteCounter> counter) {
+  ICEBERG_ASSIGN_OR_RAISE(auto field_lookup, MakeFieldLookup(table_schema, 
schemas));
+  return Make(std::move(file_path), delete_files, std::move(requested_schema),
+              std::move(io), std::move(field_lookup), need_row_pos_col,
+              std::move(counter));
+}
+
+Result<std::unique_ptr<DeleteFilter>> DeleteFilter::Make(
+    std::string file_path, std::span<const std::shared_ptr<DataFile>> 
delete_files,
+    std::shared_ptr<Schema> requested_schema, std::shared_ptr<FileIO> io,
+    FieldLookup field_lookup, bool need_row_pos_col,
+    std::shared_ptr<DeleteCounter> counter) {
+  ICEBERG_PRECHECK(requested_schema != nullptr, "Requested schema must not be 
null");
+  ICEBERG_PRECHECK(field_lookup != nullptr, "Field lookup must not be null");
+  ICEBERG_PRECHECK(delete_files.empty() || io != nullptr,
+                   "FileIO must not be null when delete files are present");
+
+  auto filter = std::unique_ptr<DeleteFilter>(
+      new DeleteFilter(std::move(file_path), std::move(requested_schema), 
std::move(io),
+                       std::move(field_lookup), need_row_pos_col, 
std::move(counter)));
+  ICEBERG_RETURN_UNEXPECTED(filter->Init(delete_files));
+  return filter;
+}
+
+DeleteFilter::DeleteFilter(std::string file_path,
+                           std::shared_ptr<Schema> requested_schema,
+                           std::shared_ptr<FileIO> io, FieldLookup 
field_lookup,
+                           bool need_row_pos_col, 
std::shared_ptr<DeleteCounter> counter)
+    : file_path_(std::move(file_path)),
+      requested_schema_(std::move(requested_schema)),
+      field_lookup_(std::move(field_lookup)),
+      need_row_pos_col_(need_row_pos_col),
+      counter_(std::move(counter)),
+      delete_loader_(std::move(io)) {}
+
+DeleteFilter::~DeleteFilter() = default;
+
+Status DeleteFilter::Init(std::span<const std::shared_ptr<DataFile>> 
delete_files) {
+  for (const auto& delete_file : delete_files) {
+    ICEBERG_PRECHECK(delete_file != nullptr, "Delete file must not be null");
+
+    switch (delete_file->content) {
+      case DataFile::Content::kPositionDeletes:
+        pos_deletes_.push_back(delete_file);
+        break;
+      case DataFile::Content::kEqualityDeletes:
+        ICEBERG_RETURN_UNEXPECTED(ValidateEqualityIds(*delete_file));
+        eq_deletes_.push_back(delete_file);
+        break;
+      case DataFile::Content::kData:
+        return InvalidArgument("Expected delete file but got data file '{}'",
+                               delete_file->file_path);
+      default:
+        return InvalidArgument("Unknown delete file content type {}",
+                               static_cast<int>(delete_file->content));
+    }
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(required_schema_, ComputeRequiredSchema());
+
+  // Pre-compute _pos column position for reuse
+  pos_field_position_ = FindFieldIndexById(required_schema_->fields(),
+                                           
MetadataColumns::kFilePositionColumnId);
+
+  return {};
+}
+
+Result<std::shared_ptr<Schema>> DeleteFilter::ComputeRequiredSchema() const {
+  if (!HasPositionDeletes() && !HasEqualityDeletes()) {
+    return requested_schema_;
+  }
+
+  std::vector<int32_t> required_ids;
+  std::unordered_set<int32_t> seen_required_ids;
+  if (HasPositionDeletes() && need_row_pos_col_) {
+    AddIdOnce(required_ids, seen_required_ids, 
MetadataColumns::kFilePositionColumnId);
+  }
+
+  for (const auto& delete_file : eq_deletes_) {
+    for (int32_t field_id : delete_file->equality_ids) {
+      AddIdOnce(required_ids, seen_required_ids, field_id);
+    }
+  }
+
+  std::vector<SchemaField> fields(requested_schema_->fields().begin(),
+                                  requested_schema_->fields().end());
+  bool changed = false;
+
+  for (int32_t field_id : required_ids) {
+    if (field_id == MetadataColumns::kFilePositionColumnId ||
+        field_id == MetadataColumns::kIsDeletedColumnId) {
+      // These columns do not exist in the table schema and will be handled 
later.
+      continue;
+    }
+
+    // Top-level primitive fields already cover equality-delete needs. Nested 
fields
+    // still need lookup so we can validate/merge the required subfield 
projection.
+    auto existing_pos = FindFieldIndexById(fields, field_id);
+    if (existing_pos.has_value() && 
!fields[existing_pos.value()].type()->is_nested()) {
+      continue;
+    }
+
+    ICEBERG_ASSIGN_OR_RAISE(auto lookup, field_lookup_(field_id));
+    if (!lookup.has_value()) {
+      return InvalidArgument("Cannot find equality delete field id {}", 
field_id);
+    }
+    ICEBERG_RETURN_UNEXPECTED(
+        ValidateEqualityProjectionField(field_id, lookup->projection_field));
+
+    ICEBERG_ASSIGN_OR_RAISE(auto merged,
+                            MergeProjectionField(fields, 
lookup->projection_field));
+    changed = changed || merged;
+  }
+
+  const bool needs_pos =
+      HasPositionDeletes() && need_row_pos_col_ &&
+      !FindFieldIndexById(fields, 
MetadataColumns::kFilePositionColumnId).has_value();
+  if (needs_pos) {
+    fields.push_back(MetadataColumns::kRowPosition);
+    changed = true;
+  }
+
+  if (!changed) {
+    return requested_schema_;
+  }
+
+  return std::make_shared<Schema>(std::move(fields));
+}
+
+const std::shared_ptr<Schema>& DeleteFilter::RequiredSchema() const {
+  return required_schema_;
+}
+
+bool DeleteFilter::HasPositionDeletes() const { return !pos_deletes_.empty(); }
+
+bool DeleteFilter::HasEqualityDeletes() const { return !eq_deletes_.empty(); }
+
+Status DeleteFilter::EnsurePositionDeletesLoaded() const {
+  if (!HasPositionDeletes()) {
+    return {};
+  }
+
+  std::lock_guard lock(pos_mutex_);
+  if (pos_loaded_) {
+    return {};
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(pos_index_,
+                          delete_loader_.LoadPositionDeletes(pos_deletes_, 
file_path_));
+  pos_loaded_ = true;
+  return {};
+}
+
+Status DeleteFilter::EnsureEqualityDeletesLoaded() const {
+  if (!HasEqualityDeletes()) {
+    return {};
+  }
+
+  std::lock_guard lock(eq_mutex_);
+  if (eq_loaded_) {
+    return {};
+  }
+
+  std::map<std::set<int32_t>, std::vector<std::shared_ptr<DataFile>>> 
files_by_ids;
+  for (const auto& delete_file : eq_deletes_) {
+    // equality_ids were already validated in Init, build the grouping key 
directly.
+    std::set<int32_t> ids(delete_file->equality_ids.begin(),
+                          delete_file->equality_ids.end());
+    files_by_ids[std::move(ids)].push_back(delete_file);
+  }
+
+  std::vector<std::unique_ptr<EqDeleteGroup>> groups;
+  groups.reserve(files_by_ids.size());
+
+  for (auto& [field_ids, files] : files_by_ids) {
+    ICEBERG_ASSIGN_OR_RAISE(auto fields,
+                            ProjectEqualityKeyFields(*required_schema_, 
field_ids));
+    auto equality_type = std::make_shared<StructType>(std::move(fields));
+
+    ICEBERG_ASSIGN_OR_RAISE(auto row_projection, 
ProjectedStructLike::BuildProjection(
+                                                     *required_schema_, 
*equality_type));
+    auto project_row = 
std::make_unique<ProjectedStructLike>(std::move(row_projection));
+    ICEBERG_ASSIGN_OR_RAISE(auto delete_set,
+                            delete_loader_.LoadEqualityDeletes(files, 
*equality_type));
+    groups.push_back(std::make_unique<EqDeleteGroup>(EqDeleteGroup{
+        .row_projection = std::move(project_row),
+        .delete_set = std::move(delete_set),
+    }));
+  }
+
+  eq_groups_ = std::move(groups);
+  eq_loaded_ = true;
+  return {};
+}
+
+const std::shared_ptr<Schema>& DeleteFilter::ExpectedSchema() const {
+  return requested_schema_;
+}
+
+void DeleteFilter::IncrementDeleteCount(int64_t count) {
+  if (counter_ != nullptr) {
+    counter_->Increment(count);
+  }
+}
+
+Result<const PositionDeleteIndex*> DeleteFilter::DeletedRowPositions() const {
+  if (!HasPositionDeletes()) {
+    return nullptr;
+  }
+  ICEBERG_RETURN_UNEXPECTED(EnsurePositionDeletesLoaded());
+  return &pos_index_;
+}
+
+Result<std::function<Result<bool>(const StructLike&)>> 
DeleteFilter::EqDeletedRowFilter()
+    const {
+  if (!HasEqualityDeletes()) {
+    // No equality deletes: every row is alive.
+    return [](const StructLike&) -> Result<bool> { return true; };
+  }
+  ICEBERG_RETURN_UNEXPECTED(EnsureEqualityDeletesLoaded());
+  std::lock_guard lock(eq_mutex_);
+  if (!eq_deleted_row_filter_cache_) {
+    eq_deleted_row_filter_cache_ = [this](const StructLike& row) -> 
Result<bool> {
+      for (const auto& group : eq_groups_) {
+        auto& projected_row = *group->row_projection;
+        projected_row.Wrap(row);
+        ICEBERG_ASSIGN_OR_RAISE(auto matched, 
group->delete_set->Contains(projected_row));
+        if (matched) {
+          return false;
+        }
+      }
+      return true;
+    };
+  }
+  return eq_deleted_row_filter_cache_;
+}
+
+Result<std::function<Result<bool>(const StructLike&)>>
+DeleteFilter::FindEqualityDeleteRows() const {
+  if (!HasEqualityDeletes()) {
+    // No equality deletes: no row is deleted.
+    return [](const StructLike&) -> Result<bool> { return false; };
+  }
+  ICEBERG_ASSIGN_OR_RAISE(auto alive_filter, EqDeletedRowFilter());
+  return [alive_filter = std::move(alive_filter)](const StructLike& row) -> 
Result<bool> {
+    ICEBERG_ASSIGN_OR_RAISE(auto alive, alive_filter(row));
+    return !alive;
+  };
+}
+
+Result<AliveRowSelection> DeleteFilter::ComputeAliveRows(const ArrowSchema& 
batch_schema,
+                                                         const ArrowArray& 
batch) const {
+  ICEBERG_PRECHECK(batch.length >= 0, "Batch length must be non-negative");
+
+  ICEBERG_RETURN_UNEXPECTED(EnsurePositionDeletesLoaded());
+  ICEBERG_RETURN_UNEXPECTED(EnsureEqualityDeletesLoaded());
+
+  AliveRowSelection result;
+  if (batch.length == 0) {
+    return result;
+  }
+
+  result.indices.reserve(batch.length);
+  ICEBERG_ASSIGN_OR_RAISE(auto row, ArrowArrayStructLike::Make(batch_schema, 
batch));
+
+  for (int64_t i = 0; i < batch.length; ++i) {
+    if (i > 0) {
+      ICEBERG_RETURN_UNEXPECTED(row->Reset(i));
+    }
+
+    bool deleted = false;
+    if (pos_field_position_.has_value()) {
+      ICEBERG_ASSIGN_OR_RAISE(auto pos_scalar,
+                              row->GetField(pos_field_position_.value()));
+      auto* pos = std::get_if<int64_t>(&pos_scalar);
+      if (pos == nullptr) {
+        return InvalidArrowData("Position delete filtering requires non-null 
int64 _pos");
+      }
+      deleted = pos_index_.IsDeleted(*pos);
+    }
+
+    if (!deleted) {
+      for (const auto& eq_group : eq_groups_) {
+        auto& projected_row = *eq_group->row_projection;
+        projected_row.Wrap(*row);
+        ICEBERG_ASSIGN_OR_RAISE(auto matched,
+                                eq_group->delete_set->Contains(projected_row));
+        if (matched) {
+          deleted = true;
+          break;
+        }
+      }
+    }
+
+    if (!deleted) {
+      result.indices.push_back(static_cast<int32_t>(i));
+    } else if (counter_ != nullptr) {
+      counter_->Increment();
+    }
+  }
+
+  return result;
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/data/delete_filter.h b/src/iceberg/data/delete_filter.h
new file mode 100644
index 00000000..4cb9bace
--- /dev/null
+++ b/src/iceberg/data/delete_filter.h
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/data/delete_filter.h
+/// Delete-aware filtering for Arrow C Data batches.
+
+#include <atomic>
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <optional>
+#include <span>
+#include <string>
+#include <vector>
+
+#include "iceberg/arrow_c_data.h"
+#include "iceberg/data/delete_loader.h"
+#include "iceberg/deletes/position_delete_index.h"
+#include "iceberg/iceberg_data_export.h"
+#include "iceberg/result.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/type_fwd.h"
+
+namespace iceberg {
+
+/// \brief Result of ComputeAliveRows: indices of rows not matched by any 
delete.
+struct ICEBERG_DATA_EXPORT AliveRowSelection {
+  /// Zero-based row indices within the batch that are alive (not deleted).
+  std::vector<int32_t> indices;
+
+  /// Number of alive rows (convenience accessor to avoid size_t casts).
+  int64_t alive_count() const { return static_cast<int64_t>(indices.size()); }
+
+  bool empty() const { return indices.empty(); }
+};
+
+/// \brief Counts rows removed by delete filters.
+class ICEBERG_DATA_EXPORT DeleteCounter {
+ public:
+  void Increment(int64_t count = 1) {
+    count_.fetch_add(count, std::memory_order_relaxed);
+  }
+  int64_t Get() const { return count_.load(std::memory_order_relaxed); }
+
+ private:
+  std::atomic<int64_t> count_{0};
+};
+
+/// \brief Concrete batch-oriented delete filter for merge-on-read data 
batches.
+class ICEBERG_DATA_EXPORT DeleteFilter {
+ public:
+  /// \brief Field lookup output for current or fallback equality-delete 
fields.
+  ///
+  /// `field` is the exact field for validation. `projection_field` is the
+  /// top-level field, possibly with a pruned nested struct path, that must be
+  /// merged into RequiredSchema so the data reader can materialize the delete
+  /// column.
+  struct FieldLookupResult {
+    SchemaField field;
+    SchemaField projection_field;
+  };
+
+  /// \brief Lookup a field by ID, including fields from table schema 
fallbacks.
+  using FieldLookup = 
std::function<Result<std::optional<FieldLookupResult>>(int32_t)>;
+
+  /// \brief Build a lookup from the current schema and optional table schemas.
+  ///
+  /// The current table schema is searched first. `schemas` is the table 
metadata
+  /// schema list and may contain `table_schema`; current schema duplicates 
are ignored
+  /// and fallback schemas are searched from latest schema id to oldest.
+  static Result<FieldLookup> MakeFieldLookup(
+      std::shared_ptr<Schema> table_schema,
+      std::span<const std::shared_ptr<Schema>> schemas = {});
+
+  /// \brief Build a lookup from table metadata which uses the current schema 
first,
+  /// then table metadata schemas as fallback.
+  static Result<FieldLookup> MakeFieldLookup(
+      std::shared_ptr<TableMetadata> table_metadata);
+
+  /// \brief Create a DeleteFilter with current schema only field lookup.
+  ///
+  /// \param need_row_pos_col If true, `_pos` is added to `RequiredSchema` when
+  ///   position deletes are present so `ComputeAliveRows` can apply them.
+  ///   Pass false when the caller owns position filtering externally (e.g. a 
vectorised
+  ///   reader that applies the position delete index directly to Arrow column 
buffers).
+  ///   Note that when `need_row_pos_col` is false, `HasPositionDeletes()` may
+  ///   return true but `ComputeAliveRows` will not apply position deletes 
because `_pos`
+  ///   is absent from `RequiredSchema`. The caller is responsible for 
applying them.
+  /// \param counter Optional counter incremented for each deleted row.
+  static Result<std::unique_ptr<DeleteFilter>> Make(
+      std::string file_path, std::span<const std::shared_ptr<DataFile>> 
delete_files,
+      std::shared_ptr<Schema> table_schema, std::shared_ptr<Schema> 
requested_schema,
+      std::shared_ptr<FileIO> io, bool need_row_pos_col = true,
+      std::shared_ptr<DeleteCounter> counter = nullptr);
+
+  /// \brief Create a DeleteFilter using table metadata for schema-aware field 
lookup.
+  static Result<std::unique_ptr<DeleteFilter>> Make(
+      std::string file_path, std::span<const std::shared_ptr<DataFile>> 
delete_files,
+      std::shared_ptr<TableMetadata> table_metadata,
+      std::shared_ptr<Schema> requested_schema, std::shared_ptr<FileIO> io,
+      bool need_row_pos_col = true, std::shared_ptr<DeleteCounter> counter = 
nullptr);
+
+  /// \brief Create a DeleteFilter with table schemas for dropped equality 
fields.
+  static Result<std::unique_ptr<DeleteFilter>> Make(
+      std::string file_path, std::span<const std::shared_ptr<DataFile>> 
delete_files,
+      std::shared_ptr<Schema> table_schema, std::shared_ptr<Schema> 
requested_schema,
+      std::shared_ptr<FileIO> io, std::span<const std::shared_ptr<Schema>> 
schemas,
+      bool need_row_pos_col = true, std::shared_ptr<DeleteCounter> counter = 
nullptr);
+
+  /// \brief Create a DeleteFilter with a custom field lookup.
+  static Result<std::unique_ptr<DeleteFilter>> Make(
+      std::string file_path, std::span<const std::shared_ptr<DataFile>> 
delete_files,
+      std::shared_ptr<Schema> requested_schema, std::shared_ptr<FileIO> io,
+      FieldLookup field_lookup, bool need_row_pos_col = true,
+      std::shared_ptr<DeleteCounter> counter = nullptr);
+
+  ~DeleteFilter();
+
+  /// \brief Schema required from the underlying data file reader.
+  const std::shared_ptr<Schema>& RequiredSchema() const;
+
+  /// \brief The original schema requested by the caller, before delete 
columns were
+  /// added.
+  const std::shared_ptr<Schema>& ExpectedSchema() const;
+
+  /// \brief Increment the delete counter by the given count.
+  ///
+  /// Allows callers to record deletes that occur outside `ComputeAliveRows` 
(e.g. when
+  /// applying deletes in a vectorised path).
+  void IncrementDeleteCount(int64_t count = 1);
+
+  /// \brief Expose the loaded position delete index for external use.
+  ///
+  /// Triggers lazy loading of position delete files on first call. Returns 
nullptr
+  /// when there are no position deletes. Returns an error if loading fails.
+  ///
+  /// The returned pointer is valid only for the lifetime of this DeleteFilter.
+  Result<const PositionDeleteIndex*> DeletedRowPositions() const;
+
+  /// \brief Returns a predicate that is true for rows NOT matched by any 
equality delete.
+  ///
+  /// The returned function is valid for the lifetime of this DeleteFilter and 
is cached
+  /// after the first call. When there are no equality deletes, returns a 
predicate that
+  /// always returns true (every row is alive).
+  ///
+  /// \note The returned predicate is NOT thread-safe: it mutates internal 
projection
+  /// state on each call. Do not invoke it concurrently from multiple threads.
+  Result<std::function<Result<bool>(const StructLike&)>> EqDeletedRowFilter() 
const;
+
+  /// \brief Returns a predicate that is true for rows matched by any equality 
delete.
+  ///
+  /// Inverse of `EqDeletedRowFilter()`. When there are no equality deletes, 
returns a
+  /// predicate that always returns false (no row is deleted).
+  Result<std::function<Result<bool>(const StructLike&)>> 
FindEqualityDeleteRows() const;
+
+  /// \brief Compute alive rows relative to the supplied Arrow C Data batch.
+  ///
+  /// Returns the indices (zero-based, relative to the batch) of rows not 
matched by
+  /// any delete. Deleted-row counts are forwarded to the DeleteCounter 
supplied at
+  /// construction.
+  Result<AliveRowSelection> ComputeAliveRows(const ArrowSchema& batch_schema,
+                                             const ArrowArray& batch) const;
+
+  bool HasPositionDeletes() const;
+  bool HasEqualityDeletes() const;
+
+  DeleteFilter(const DeleteFilter&) = delete;
+  DeleteFilter& operator=(const DeleteFilter&) = delete;
+
+ private:
+  struct EqDeleteGroup;
+
+  DeleteFilter(std::string file_path, std::shared_ptr<Schema> requested_schema,
+               std::shared_ptr<FileIO> io, FieldLookup field_lookup,
+               bool need_row_pos_col, std::shared_ptr<DeleteCounter> counter);
+
+  Status Init(std::span<const std::shared_ptr<DataFile>> delete_files);
+  Result<std::shared_ptr<Schema>> ComputeRequiredSchema() const;
+  Status EnsurePositionDeletesLoaded() const;
+  Status EnsureEqualityDeletesLoaded() const;
+
+  const std::string file_path_;
+  std::vector<std::shared_ptr<DataFile>> pos_deletes_;
+  std::vector<std::shared_ptr<DataFile>> eq_deletes_;
+
+  std::shared_ptr<Schema> requested_schema_;
+  std::shared_ptr<Schema> required_schema_;
+  FieldLookup field_lookup_;
+
+  const bool need_row_pos_col_;
+  // Position of `_pos` in required_schema_ when existent
+  std::optional<size_t> pos_field_position_;
+  std::shared_ptr<DeleteCounter> counter_;
+
+  // TODO(gangwu): expose a factory hook (e.g. a std::function<DeleteLoader()> 
or a
+  // virtual newDeleteLoader()) so callers can inject a caching DeleteLoader 
(analogous to
+  // SparkDeleteFilter.CachingDeleteLoader in Java).
+  DeleteLoader delete_loader_;
+
+  mutable std::mutex pos_mutex_;
+  mutable bool pos_loaded_ = false;
+  mutable PositionDeleteIndex pos_index_;
+
+  mutable std::mutex eq_mutex_;
+  mutable bool eq_loaded_ = false;
+  mutable std::vector<std::unique_ptr<EqDeleteGroup>> eq_groups_;
+  mutable std::function<Result<bool>(const StructLike&)> 
eq_deleted_row_filter_cache_;
+};
+
+}  // namespace iceberg
diff --git a/src/iceberg/data/meson.build b/src/iceberg/data/meson.build
index 0f68decc..f0877ec6 100644
--- a/src/iceberg/data/meson.build
+++ b/src/iceberg/data/meson.build
@@ -18,6 +18,7 @@
 install_headers(
     [
         'data_writer.h',
+        'delete_filter.h',
         'delete_loader.h',
         'equality_delete_writer.h',
         'position_delete_writer.h',
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index c2947f3f..0b5f269d 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -142,6 +142,7 @@ iceberg_sources = files(
 
 iceberg_data_sources = files(
     'data/data_writer.cc',
+    'data/delete_filter.cc',
     'data/delete_loader.cc',
     'data/equality_delete_writer.cc',
     'data/position_delete_writer.cc',
diff --git a/src/iceberg/metadata_columns.h b/src/iceberg/metadata_columns.h
index 61f07c48..b390a50e 100644
--- a/src/iceberg/metadata_columns.h
+++ b/src/iceberg/metadata_columns.h
@@ -50,7 +50,7 @@ struct ICEBERG_EXPORT MetadataColumns {
 
   constexpr static int32_t kIsDeletedColumnId = kInt32Max - 3;
   inline static const SchemaField kIsDeleted = SchemaField::MakeRequired(
-      kIsDeletedColumnId, "_deleted", binary(), "Whether the row has been 
deleted");
+      kIsDeletedColumnId, "_deleted", boolean(), "Whether the row has been 
deleted");
 
   constexpr static int32_t kSpecIdColumnId = kInt32Max - 4;
   inline static const SchemaField kSpecId =
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 1d80b29a..afafc4c1 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -220,6 +220,7 @@ if(ICEBERG_BUILD_BUNDLE)
                    USE_BUNDLE
                    SOURCES
                    data_writer_test.cc
+                   delete_filter_test.cc
                    delete_loader_test.cc)
 
 endif()
diff --git a/src/iceberg/test/delete_filter_test.cc 
b/src/iceberg/test/delete_filter_test.cc
new file mode 100644
index 00000000..89d1b6b8
--- /dev/null
+++ b/src/iceberg/test/delete_filter_test.cc
@@ -0,0 +1,1625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/data/delete_filter.h"
+
+#include <format>
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include <arrow/array.h>
+#include <arrow/c/bridge.h>
+#include <arrow/json/from_string.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_io_internal.h"
+#include "iceberg/data/equality_delete_writer.h"
+#include "iceberg/data/position_delete_writer.h"
+#include "iceberg/file_format.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/parquet/parquet_register.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/row/arrow_array_wrapper.h"
+#include "iceberg/row/partition_values.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/type.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+namespace {
+
+struct ExportedBatch {
+  ArrowSchema schema{};
+  ArrowArray array{};
+
+  ExportedBatch() = default;
+  ~ExportedBatch() {
+    if (array.release != nullptr) {
+      array.release(&array);
+    }
+    if (schema.release != nullptr) {
+      schema.release(&schema);
+    }
+  }
+
+  ExportedBatch(const ExportedBatch&) = delete;
+  ExportedBatch& operator=(const ExportedBatch&) = delete;
+
+  ExportedBatch(ExportedBatch&& other) noexcept
+      : schema(other.schema), array(other.array) {
+    other.schema.release = nullptr;
+    other.array.release = nullptr;
+  }
+  ExportedBatch& operator=(ExportedBatch&& other) noexcept = delete;
+};
+
+std::vector<std::string> FieldNames(const Schema& schema) {
+  std::vector<std::string> names;
+  for (const auto& field : schema.fields()) {
+    names.emplace_back(field.name());
+  }
+  return names;
+}
+
+std::vector<int32_t> FieldIds(const Schema& schema) {
+  std::vector<int32_t> ids;
+  for (const auto& field : schema.fields()) {
+    ids.push_back(field.field_id());
+  }
+  return ids;
+}
+
+std::vector<int32_t> StructFieldIds(const StructType& struct_type) {
+  std::vector<int32_t> ids;
+  for (const auto& field : struct_type.fields()) {
+    ids.push_back(field.field_id());
+  }
+  return ids;
+}
+
+void ExpectAliveRows(const AliveRowSelection& alive,
+                     const std::vector<int32_t>& expected) {
+  ASSERT_EQ(alive.alive_count(), static_cast<int64_t>(expected.size()));
+  EXPECT_EQ(alive.indices, expected);
+}
+
+class CapturingReader : public Reader {
+ public:
+  explicit CapturingReader(std::shared_ptr<iceberg::Schema>* projection)
+      : projection_(projection) {}
+
+  Status Open(const ReaderOptions& options) override {
+    *projection_ = options.projection;
+    return {};
+  }
+
+  Status Close() override { return {}; }
+
+  Result<std::optional<ArrowArray>> Next() override { return std::nullopt; }
+
+  Result<ArrowSchema> Schema() override {
+    ArrowSchema schema;
+    ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(**projection_, &schema));
+    return schema;
+  }
+
+  Result<std::unordered_map<std::string, std::string>> Metadata() override {
+    return std::unordered_map<std::string, std::string>{};
+  }
+
+ private:
+  std::shared_ptr<iceberg::Schema>* projection_;
+};
+
+class ScopedReaderFactory {
+ public:
+  ScopedReaderFactory(FileFormatType format_type, ReaderFactory factory)
+      : format_type_(format_type),
+        previous_(ReaderFactoryRegistry::GetFactory(format_type)) {
+    ReaderFactoryRegistry::GetFactory(format_type_) = std::move(factory);
+  }
+
+  ~ScopedReaderFactory() {
+    ReaderFactoryRegistry::GetFactory(format_type_) = std::move(previous_);
+  }
+
+ private:
+  FileFormatType format_type_;
+  ReaderFactory previous_;
+};
+
+}  // namespace
+
+class DeleteFilterTest : public ::testing::Test {
+ protected:
+  static void SetUpTestSuite() { parquet::RegisterAll(); }
+
+  void SetUp() override {
+    file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
+    table_schema_ = std::make_shared<Schema>(
+        std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                                 SchemaField::MakeOptional(2, "name", 
string()),
+                                 SchemaField::MakeOptional(3, "category", 
string())});
+    partition_spec_ = PartitionSpec::Unpartitioned();
+  }
+
+  std::shared_ptr<Schema> Project(std::initializer_list<int32_t> field_ids) 
const {
+    std::unordered_set<int32_t> ids(field_ids.begin(), field_ids.end());
+    auto result = table_schema_->Project(ids);
+    EXPECT_TRUE(result.has_value()) << "Projection failed: " << 
result.error().message;
+    return std::move(result.value());
+  }
+
+  Result<ExportedBatch> MakeBatch(const Schema& schema,
+                                  const std::string& json_data) const {
+    ArrowSchema type_schema;
+    ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(schema, &type_schema));
+    auto arrow_type_result = ::arrow::ImportType(&type_schema);
+    if (!arrow_type_result.ok()) {
+      return UnknownError(arrow_type_result.status().ToString());
+    }
+    auto struct_type = 
::arrow::struct_(arrow_type_result.MoveValueUnsafe()->fields());
+    auto array_result = ::arrow::json::ArrayFromJSONString(struct_type, 
json_data);
+    if (!array_result.ok()) {
+      return UnknownError(array_result.status().ToString());
+    }
+
+    ExportedBatch batch;
+    ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(schema, &batch.schema));
+    auto export_status =
+        ::arrow::ExportArray(*array_result.MoveValueUnsafe(), &batch.array);
+    if (!export_status.ok()) {
+      return UnknownError(export_status.ToString());
+    }
+    return batch;
+  }
+
+  Result<std::shared_ptr<DataFile>> PositionDeleteFile(
+      const std::string& path, const std::vector<int64_t>& positions,
+      const std::string& data_path = std::string(kDataPath)) {
+    PositionDeleteWriterOptions options{
+        .path = path,
+        .schema = table_schema_,
+        .spec = partition_spec_,
+        .partition = PartitionValues{},
+        .format = FileFormatType::kParquet,
+        .io = file_io_,
+        .flush_threshold = 10000,
+        .properties = {{"write.parquet.compression-codec", "uncompressed"}},
+    };
+
+    ICEBERG_ASSIGN_OR_RAISE(auto writer, PositionDeleteWriter::Make(options));
+    for (int64_t pos : positions) {
+      ICEBERG_RETURN_UNEXPECTED(writer->WriteDelete(data_path, pos));
+    }
+    ICEBERG_RETURN_UNEXPECTED(writer->Close());
+    ICEBERG_ASSIGN_OR_RAISE(auto metadata, writer->Metadata());
+    return metadata.data_files[0];
+  }
+
+  Result<std::shared_ptr<DataFile>> EqualityDeleteFile(
+      const std::string& path, const std::string& json_data,
+      std::vector<int32_t> equality_field_ids) {
+    EqualityDeleteWriterOptions options{
+        .path = path,
+        .schema = table_schema_,
+        .spec = partition_spec_,
+        .partition = PartitionValues{},
+        .format = FileFormatType::kParquet,
+        .io = file_io_,
+        .equality_field_ids = std::move(equality_field_ids),
+        .properties = {{"write.parquet.compression-codec", "uncompressed"}},
+    };
+
+    ICEBERG_ASSIGN_OR_RAISE(auto writer, EqualityDeleteWriter::Make(options));
+    ICEBERG_ASSIGN_OR_RAISE(auto batch, MakeBatch(*table_schema_, json_data));
+    ICEBERG_RETURN_UNEXPECTED(writer->Write(&batch.array));
+    ICEBERG_RETURN_UNEXPECTED(writer->Close());
+    ICEBERG_ASSIGN_OR_RAISE(auto metadata, writer->Metadata());
+    return metadata.data_files[0];
+  }
+
+  static constexpr std::string_view kDataPath = "data.parquet";
+
+  std::shared_ptr<FileIO> file_io_;
+  std::shared_ptr<Schema> table_schema_;
+  std::shared_ptr<PartitionSpec> partition_spec_;
+};
+
+enum class RequiredSchemaRequest {
+  kProjectFields,
+  kIdAndRowPos,
+};
+
+struct RequiredSchemaCase {
+  const char* name;
+  RequiredSchemaRequest request;
+  std::vector<int32_t> requested_field_ids;
+  std::vector<std::vector<int32_t>> equality_ids_by_file;
+  bool has_pos_delete;
+  bool need_row_pos_col;
+  std::vector<int32_t> expected_field_ids;
+  std::vector<std::string> expected_field_names;
+  bool expected_has_position_deletes;
+  bool expected_has_equality_deletes;
+};
+
+template <typename Param>
+std::string ParamName(const testing::TestParamInfo<Param>& info) {
+  return info.param.name;
+}
+
+class DeleteFilterRequiredSchemaTest
+    : public DeleteFilterTest,
+      public testing::WithParamInterface<RequiredSchemaCase> {
+ protected:
+  std::shared_ptr<Schema> RequestedSchema(const RequiredSchemaCase& test_case) 
{
+    switch (test_case.request) {
+      case RequiredSchemaRequest::kProjectFields: {
+        std::unordered_set<int32_t> ids(test_case.requested_field_ids.begin(),
+                                        test_case.requested_field_ids.end());
+        auto result = table_schema_->Project(ids);
+        EXPECT_TRUE(result.has_value())
+            << "Projection failed: " << result.error().message;
+        return std::move(result.value());
+      }
+      case RequiredSchemaRequest::kIdAndRowPos:
+        return std::make_shared<Schema>(std::vector<SchemaField>{
+            SchemaField::MakeRequired(1, "id", int32()), 
MetadataColumns::kRowPosition});
+    }
+    return nullptr;
+  }
+
+  std::vector<std::shared_ptr<DataFile>> DeleteFiles(
+      const RequiredSchemaCase& test_case) {
+    std::vector<std::shared_ptr<DataFile>> delete_files;
+    for (size_t index = 0; index < test_case.equality_ids_by_file.size(); 
++index) {
+      delete_files.push_back(std::make_shared<DataFile>(DataFile{
+          .content = DataFile::Content::kEqualityDeletes,
+          .file_path = std::format("{}-eq-{}.parquet", test_case.name, index),
+          .file_format = FileFormatType::kParquet,
+          .equality_ids = test_case.equality_ids_by_file[index],
+      }));
+    }
+    if (test_case.has_pos_delete) {
+      delete_files.push_back(std::make_shared<DataFile>(DataFile{
+          .content = DataFile::Content::kPositionDeletes,
+          .file_path = std::format("{}-pos.parquet", test_case.name),
+          .file_format = FileFormatType::kParquet,
+      }));
+    }
+    return delete_files;
+  }
+};
+
+TEST_P(DeleteFilterRequiredSchemaTest, ComputesRequiredSchema) {
+  const auto& test_case = GetParam();
+  auto delete_files = DeleteFiles(test_case);
+  auto requested_schema = RequestedSchema(test_case);
+
+  auto filter =
+      DeleteFilter::Make(std::string(kDataPath), delete_files, table_schema_,
+                         requested_schema, file_io_, 
test_case.need_row_pos_col);
+
+  ASSERT_THAT(filter, IsOk());
+  EXPECT_EQ(filter.value()->HasPositionDeletes(),
+            test_case.expected_has_position_deletes);
+  EXPECT_EQ(filter.value()->HasEqualityDeletes(),
+            test_case.expected_has_equality_deletes);
+  EXPECT_THAT(FieldIds(*filter.value()->RequiredSchema()),
+              testing::ElementsAreArray(test_case.expected_field_ids));
+  EXPECT_THAT(FieldNames(*filter.value()->RequiredSchema()),
+              testing::ElementsAreArray(test_case.expected_field_names));
+}
+
+INSTANTIATE_TEST_SUITE_P(
+    RequiredSchema, DeleteFilterRequiredSchemaTest,
+    testing::Values(
+        RequiredSchemaCase{
+            .name = "UnchangedWithoutDeletes",
+            .request = RequiredSchemaRequest::kProjectFields,
+            .requested_field_ids = {2, 1},
+            .equality_ids_by_file = {},
+            .has_pos_delete = false,
+            .need_row_pos_col = true,
+            .expected_field_ids = {1, 2},
+            .expected_field_names = {"id", "name"},
+            .expected_has_position_deletes = false,
+            .expected_has_equality_deletes = false,
+        },
+        RequiredSchemaCase{
+            .name = "AddsEqualityFieldsAndRowPos",
+            .request = RequiredSchemaRequest::kProjectFields,
+            .requested_field_ids = {1},
+            .equality_ids_by_file = {{2}, {3, 1}},
+            .has_pos_delete = true,
+            .need_row_pos_col = true,
+            .expected_field_ids = {1, 2, 3, 
MetadataColumns::kFilePositionColumnId},
+            .expected_field_names = {"id", "name", "category",
+                                     
std::string(MetadataColumns::kRowPosition.name())},
+            .expected_has_position_deletes = true,
+            .expected_has_equality_deletes = true,
+        },
+        RequiredSchemaCase{
+            .name = "AddsEqualityFieldsInDeclaredOrder",
+            .request = RequiredSchemaRequest::kProjectFields,
+            .requested_field_ids = {1},
+            .equality_ids_by_file = {{3, 2}},
+            .has_pos_delete = false,
+            .need_row_pos_col = true,
+            .expected_field_ids = {1, 3, 2},
+            .expected_field_names = {"id", "category", "name"},
+            .expected_has_position_deletes = false,
+            .expected_has_equality_deletes = true,
+        },
+        RequiredSchemaCase{
+            .name = "DeduplicatesRowPos",
+            .request = RequiredSchemaRequest::kIdAndRowPos,
+            .requested_field_ids = {},
+            .equality_ids_by_file = {},
+            .has_pos_delete = true,
+            .need_row_pos_col = true,
+            .expected_field_ids = {1, MetadataColumns::kFilePositionColumnId},
+            .expected_field_names = {"id",
+                                     
std::string(MetadataColumns::kRowPosition.name())},
+            .expected_has_position_deletes = true,
+            .expected_has_equality_deletes = false,
+        },
+        RequiredSchemaCase{
+            .name = "NeedRowPosColFalseOmitsPos",
+            .request = RequiredSchemaRequest::kProjectFields,
+            .requested_field_ids = {1},
+            .equality_ids_by_file = {},
+            .has_pos_delete = true,
+            .need_row_pos_col = false,
+            .expected_field_ids = {1},
+            .expected_field_names = {"id"},
+            .expected_has_position_deletes = true,
+            .expected_has_equality_deletes = false,
+        },
+        RequiredSchemaCase{
+            .name = "NeedRowPosColTrueAppendsPos",
+            .request = RequiredSchemaRequest::kProjectFields,
+            .requested_field_ids = {1},
+            .equality_ids_by_file = {},
+            .has_pos_delete = true,
+            .need_row_pos_col = true,
+            .expected_field_ids = {1, MetadataColumns::kFilePositionColumnId},
+            .expected_field_names = {"id",
+                                     
std::string(MetadataColumns::kRowPosition.name())},
+            .expected_has_position_deletes = true,
+            .expected_has_equality_deletes = false,
+        },
+        RequiredSchemaCase{
+            .name = "AddsFieldsInJavaOrder",
+            .request = RequiredSchemaRequest::kProjectFields,
+            .requested_field_ids = {1},
+            .equality_ids_by_file = {{2}, {3}},
+            .has_pos_delete = true,
+            .need_row_pos_col = true,
+            .expected_field_ids = {1, 2, 3, 
MetadataColumns::kFilePositionColumnId},
+            .expected_field_names = {"id", "name", "category",
+                                     
std::string(MetadataColumns::kRowPosition.name())},
+            .expected_has_position_deletes = true,
+            .expected_has_equality_deletes = true,
+        }),
+    ParamName<RequiredSchemaCase>);
+
+TEST_F(DeleteFilterTest, 
EqualityFieldsCanBeTopLevelPrimitiveOrNestedPrimitive) {
+  auto nested_schema = std::make_shared<Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(1, "id", int32()),
+      SchemaField::MakeOptional(
+          4, "info", struct_({SchemaField::MakeOptional(5, "city", 
string())}))});
+  auto requested_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
+  auto eq_by_struct = std::make_shared<DataFile>(DataFile{
+      .content = DataFile::Content::kEqualityDeletes,
+      .file_path = "eq-id.parquet",
+      .file_format = FileFormatType::kParquet,
+      .equality_ids = {1},
+  });
+
+  std::vector<std::shared_ptr<DataFile>> top_level_primitive_delete = 
{eq_by_struct};
+  auto top_level_filter =
+      DeleteFilter::Make(std::string(kDataPath), top_level_primitive_delete,
+                         nested_schema, requested_schema, file_io_);
+  ASSERT_THAT(top_level_filter, IsOk());
+  EXPECT_THAT(FieldIds(*top_level_filter.value()->RequiredSchema()),
+              testing::ElementsAre(1));
+
+  auto eq_by_nested_field = std::make_shared<DataFile>(*eq_by_struct);
+  eq_by_nested_field->equality_ids = {5};
+  std::vector<std::shared_ptr<DataFile>> nested_delete = {eq_by_nested_field};
+
+  auto nested_filter = DeleteFilter::Make(std::string(kDataPath), 
nested_delete,
+                                          nested_schema, requested_schema, 
file_io_);
+
+  ASSERT_THAT(nested_filter, IsOk());
+  EXPECT_THAT(FieldIds(*nested_filter.value()->RequiredSchema()),
+              testing::ElementsAre(1, 4));
+}
+
+TEST_F(DeleteFilterTest, RequiredSchemaMergesNestedSibling) {
+  auto nested_schema = std::make_shared<Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(1, "id", int32()),
+      SchemaField::MakeOptional(
+          4, "info",
+          struct_({SchemaField::MakeOptional(5, "city", string()),
+                   SchemaField::MakeOptional(6, "state", string())}))});
+  auto requested_schema = std::shared_ptr<Schema>(
+      nested_schema->Project(std::unordered_set<int32_t>{1, 5}).value());
+  auto eq_by_state = std::make_shared<DataFile>(DataFile{
+      .content = DataFile::Content::kEqualityDeletes,
+      .file_path = "eq-state.parquet",
+      .file_format = FileFormatType::kParquet,
+      .equality_ids = {6},
+  });
+  std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_state};
+
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
nested_schema,
+                                   requested_schema, file_io_);
+
+  ASSERT_THAT(filter, IsOk());
+  EXPECT_THAT(FieldIds(*filter.value()->RequiredSchema()), 
testing::ElementsAre(1, 4));
+  const auto& info = filter.value()->RequiredSchema()->fields()[1];
+  auto info_type = std::dynamic_pointer_cast<StructType>(info.type());
+  ASSERT_NE(info_type, nullptr);
+  EXPECT_THAT(StructFieldIds(*info_type), testing::ElementsAre(5, 6));
+}
+
+TEST_F(DeleteFilterTest, StructEqualityFieldErrors) {
+  auto nested_schema = std::make_shared<Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(1, "id", int32()),
+      SchemaField::MakeOptional(
+          4, "info",
+          struct_({SchemaField::MakeOptional(5, "city", string()),
+                   SchemaField::MakeOptional(6, "state", string())}))});
+  auto requested_schema = std::shared_ptr<Schema>(
+      nested_schema->Project(std::unordered_set<int32_t>{1, 5}).value());
+  auto eq_by_info = std::make_shared<DataFile>(DataFile{
+      .content = DataFile::Content::kEqualityDeletes,
+      .file_path = "eq-info.parquet",
+      .file_format = FileFormatType::kParquet,
+      .equality_ids = {4},
+  });
+  std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_info};
+
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
nested_schema,
+                                   requested_schema, file_io_);
+
+  EXPECT_THAT(filter, IsError(ErrorKind::kInvalidArgument));
+  EXPECT_THAT(filter, HasErrorMessage("must reference a primitive field"));
+}
+
+enum class AliveRowsDeleteKind {
+  kNone,
+  kPosition,
+  kEqualityName,
+  kEqualityNameAndCategory,
+  kMixedPositionAndEqualityId,
+};
+
+enum class CounterMode {
+  kNone,
+  kAttachCounter,
+  kNullCounter,
+};
+
+struct AliveRowsCase {
+  const char* name;
+  AliveRowsDeleteKind delete_kind;
+  std::vector<int64_t> position_delete_positions;
+  std::string position_delete_data_path;
+  bool need_row_pos_col;
+  CounterMode counter_mode;
+  std::string batch_json;
+  std::vector<int32_t> expected_alive_rows;
+  std::optional<int64_t> expected_delete_count;
+};
+
+class DeleteFilterAliveRowsTest : public DeleteFilterTest,
+                                  public 
testing::WithParamInterface<AliveRowsCase> {};
+
+TEST_P(DeleteFilterAliveRowsTest, ComputesAliveRows) {
+  const auto& test_case = GetParam();
+  std::vector<std::shared_ptr<DataFile>> delete_files;
+  switch (test_case.delete_kind) {
+    case AliveRowsDeleteKind::kNone:
+      break;
+    case AliveRowsDeleteKind::kPosition: {
+      auto data_path = test_case.position_delete_data_path.empty()
+                           ? std::string(kDataPath)
+                           : test_case.position_delete_data_path;
+      ICEBERG_UNWRAP_OR_FAIL(
+          auto pos_delete,
+          PositionDeleteFile(std::format("{}-pos.parquet", test_case.name),
+                             test_case.position_delete_positions, data_path));
+      delete_files.push_back(pos_delete);
+      break;
+    }
+    case AliveRowsDeleteKind::kEqualityName: {
+      ICEBERG_UNWRAP_OR_FAIL(
+          auto eq_by_name,
+          EqualityDeleteFile(std::format("{}-eq-name.parquet", test_case.name),
+                             R"([[0, "Bob", "unused"]])", {2}));
+      delete_files.push_back(eq_by_name);
+      break;
+    }
+    case AliveRowsDeleteKind::kEqualityNameAndCategory: {
+      ICEBERG_UNWRAP_OR_FAIL(
+          auto eq_by_name,
+          EqualityDeleteFile(std::format("{}-eq-name.parquet", test_case.name),
+                             R"([[0, "Bob", "unused"]])", {2}));
+      ICEBERG_UNWRAP_OR_FAIL(
+          auto eq_by_category,
+          EqualityDeleteFile(std::format("{}-eq-category.parquet", 
test_case.name),
+                             R"([[0, "unused", "red"]])", {3}));
+      delete_files.push_back(eq_by_name);
+      delete_files.push_back(eq_by_category);
+      break;
+    }
+    case AliveRowsDeleteKind::kMixedPositionAndEqualityId: {
+      ICEBERG_UNWRAP_OR_FAIL(
+          auto pos_delete,
+          PositionDeleteFile(std::format("{}-pos.parquet", test_case.name),
+                             test_case.position_delete_positions));
+      ICEBERG_UNWRAP_OR_FAIL(
+          auto eq_by_id,
+          EqualityDeleteFile(std::format("{}-eq-id.parquet", test_case.name),
+                             R"([[3, "unused", "unused"]])", {1}));
+      delete_files.push_back(pos_delete);
+      delete_files.push_back(eq_by_id);
+      break;
+    }
+  }
+
+  auto requested_schema = Project({1});
+  std::shared_ptr<DeleteCounter> counter;
+  if (test_case.counter_mode == CounterMode::kAttachCounter) {
+    counter = std::make_shared<DeleteCounter>();
+  }
+  auto filter =
+      DeleteFilter::Make(std::string(kDataPath), delete_files, table_schema_,
+                         requested_schema, file_io_, 
test_case.need_row_pos_col, counter);
+  ASSERT_THAT(filter, IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto batch, MakeBatch(*filter.value()->RequiredSchema(), 
test_case.batch_json));
+
+  auto alive = filter.value()->ComputeAliveRows(batch.schema, batch.array);
+
+  ASSERT_THAT(alive, IsOk());
+  ExpectAliveRows(alive.value(), test_case.expected_alive_rows);
+  if (test_case.expected_delete_count.has_value()) {
+    ASSERT_NE(counter, nullptr);
+    EXPECT_EQ(counter->Get(), test_case.expected_delete_count.value());
+  }
+}
+
+INSTANTIATE_TEST_SUITE_P(
+    AliveRows, DeleteFilterAliveRowsTest,
+    testing::Values(
+        AliveRowsCase{
+            .name = "AllReturnedWithoutDeletes",
+            .delete_kind = AliveRowsDeleteKind::kNone,
+            .position_delete_positions = {},
+            .position_delete_data_path = "",
+            .need_row_pos_col = true,
+            .counter_mode = CounterMode::kNone,
+            .batch_json = R"([[1], [2], [3]])",
+            .expected_alive_rows = {0, 1, 2},
+            .expected_delete_count = std::nullopt,
+        },
+        AliveRowsCase{
+            .name = "PositionDeletesFilterByRowPos",
+            .delete_kind = AliveRowsDeleteKind::kPosition,
+            .position_delete_positions = {1, 3},
+            .position_delete_data_path = "",
+            .need_row_pos_col = true,
+            .counter_mode = CounterMode::kNone,
+            .batch_json = R"([[10, 0], [20, 1], [30, 2], [40, 3]])",
+            .expected_alive_rows = {0, 2},
+            .expected_delete_count = std::nullopt,
+        },
+        AliveRowsCase{
+            .name = "EqualityDeletesApplyOrSemantics",
+            .delete_kind = AliveRowsDeleteKind::kEqualityNameAndCategory,
+            .position_delete_positions = {},
+            .position_delete_data_path = "",
+            .need_row_pos_col = true,
+            .counter_mode = CounterMode::kNone,
+            .batch_json =
+                R"([[1, "Alice", "blue"], [2, "Bob", "blue"], [3, "Carol", 
"red"], [4, "Dan", "green"]])",
+            .expected_alive_rows = {0, 3},
+            .expected_delete_count = std::nullopt,
+        },
+        AliveRowsCase{
+            .name = "MixedDeletesPosBeforeEqCanDeleteAll",
+            .delete_kind = AliveRowsDeleteKind::kMixedPositionAndEqualityId,
+            .position_delete_positions = {0, 1},
+            .position_delete_data_path = "",
+            .need_row_pos_col = true,
+            .counter_mode = CounterMode::kNone,
+            .batch_json = R"([[1, 0], [2, 1], [3, 2]])",
+            .expected_alive_rows = {},
+            .expected_delete_count = std::nullopt,
+        },
+        AliveRowsCase{
+            .name = "EmptyBatchReturnsEmptyBitmap",
+            .delete_kind = AliveRowsDeleteKind::kNone,
+            .position_delete_positions = {},
+            .position_delete_data_path = "",
+            .need_row_pos_col = true,
+            .counter_mode = CounterMode::kNone,
+            .batch_json = R"([])",
+            .expected_alive_rows = {},
+            .expected_delete_count = std::nullopt,
+        },
+        AliveRowsCase{
+            .name = "NeedRowPosColFalseSkipsPosFiltering",
+            .delete_kind = AliveRowsDeleteKind::kPosition,
+            .position_delete_positions = {0, 1},
+            .position_delete_data_path = "",
+            .need_row_pos_col = false,
+            .counter_mode = CounterMode::kNone,
+            .batch_json = R"([[10], [20], [30]])",
+            .expected_alive_rows = {0, 1, 2},
+            .expected_delete_count = std::nullopt,
+        },
+        AliveRowsCase{
+            .name = "CounterCountsPosDeletes",
+            .delete_kind = AliveRowsDeleteKind::kPosition,
+            .position_delete_positions = {0, 2},
+            .position_delete_data_path = "",
+            .need_row_pos_col = true,
+            .counter_mode = CounterMode::kAttachCounter,
+            .batch_json = R"([[10, 0], [20, 1], [30, 2], [40, 3]])",
+            .expected_alive_rows = {1, 3},
+            .expected_delete_count = 2,
+        },
+        AliveRowsCase{
+            .name = "CounterCountsEqDeletes",
+            .delete_kind = AliveRowsDeleteKind::kEqualityName,
+            .position_delete_positions = {},
+            .position_delete_data_path = "",
+            .need_row_pos_col = true,
+            .counter_mode = CounterMode::kAttachCounter,
+            .batch_json = R"([[1, "Alice"], [2, "Bob"], [3, "Bob"], [4, 
"Dan"]])",
+            .expected_alive_rows = {0, 3},
+            .expected_delete_count = 2,
+        },
+        AliveRowsCase{
+            .name = "NullCounterIsNoOp",
+            .delete_kind = AliveRowsDeleteKind::kPosition,
+            .position_delete_positions = {0},
+            .position_delete_data_path = "",
+            .need_row_pos_col = true,
+            .counter_mode = CounterMode::kNullCounter,
+            .batch_json = R"([[10, 0], [20, 1]])",
+            .expected_alive_rows = {1},
+            .expected_delete_count = std::nullopt,
+        },
+        AliveRowsCase{
+            .name = "PosDeleteOnlyFiltersMatchingPath",
+            .delete_kind = AliveRowsDeleteKind::kPosition,
+            .position_delete_positions = {0, 1, 2},
+            .position_delete_data_path = "other-data.parquet",
+            .need_row_pos_col = true,
+            .counter_mode = CounterMode::kNone,
+            .batch_json = R"([[10, 0], [20, 1], [30, 2]])",
+            .expected_alive_rows = {0, 1, 2},
+            .expected_delete_count = std::nullopt,
+        }),
+    ParamName<AliveRowsCase>);
+
+TEST_F(DeleteFilterTest, TopLevelStructEqualityErrors) {
+  auto nested_schema = std::make_shared<Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(1, "id", int32()),
+      SchemaField::MakeOptional(
+          4, "info", struct_({SchemaField::MakeOptional(5, "city", 
string())}))});
+  auto requested_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
+
+  auto eq_by_info = std::make_shared<DataFile>(DataFile{
+      .content = DataFile::Content::kEqualityDeletes,
+      .file_path = "eq-info.parquet",
+      .file_format = FileFormatType::kParquet,
+      .equality_ids = {4},
+  });
+  std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_info};
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
nested_schema,
+                                   requested_schema, file_io_);
+
+  EXPECT_THAT(filter, IsError(ErrorKind::kInvalidArgument));
+  EXPECT_THAT(filter, HasErrorMessage("must reference a primitive field"));
+}
+
+TEST_F(DeleteFilterTest, NestedStructFieldEqualityFiltersRows) {
+  auto nested_schema = std::make_shared<Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(1, "id", int32()),
+      SchemaField::MakeOptional(
+          4, "info",
+          struct_({SchemaField::MakeOptional(5, "city", string()),
+                   SchemaField::MakeOptional(6, "state", string())}))});
+  auto requested_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
+
+  EqualityDeleteWriterOptions options{
+      .path = "eq-city.parquet",
+      .schema = nested_schema,
+      .spec = partition_spec_,
+      .partition = PartitionValues{},
+      .format = FileFormatType::kParquet,
+      .io = file_io_,
+      .equality_field_ids = {5},
+      .properties = {{"write.parquet.compression-codec", "uncompressed"}},
+  };
+  ICEBERG_UNWRAP_OR_FAIL(auto writer, EqualityDeleteWriter::Make(options));
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto delete_batch,
+      MakeBatch(*nested_schema,
+                R"([{"id": 0, "info": {"city": "Paris", "state": "FR"}}])"));
+  ASSERT_THAT(writer->Write(&delete_batch.array), IsOk());
+  ASSERT_THAT(writer->Close(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto eq_by_city_meta, writer->Metadata());
+  auto eq_by_city = eq_by_city_meta.data_files[0];
+
+  std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_city};
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
nested_schema,
+                                   requested_schema, file_io_);
+  ASSERT_THAT(filter, IsOk());
+  EXPECT_THAT(FieldIds(*filter.value()->RequiredSchema()), 
testing::ElementsAre(1, 4));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto data_batch,
+                         MakeBatch(*filter.value()->RequiredSchema(),
+                                   R"([{"id": 1, "info": {"city": "London"}},
+                                  {"id": 2, "info": {"city": "Paris"}},
+                                  {"id": 3, "info": null}])"));
+
+  auto alive = filter.value()->ComputeAliveRows(data_batch.schema, 
data_batch.array);
+
+  ASSERT_THAT(alive, IsOk());
+  ExpectAliveRows(alive.value(), {0, 2});
+}
+
+TEST_F(DeleteFilterTest, NestedEqualityWithPartialStructNoOverDelete) {
+  auto nested_schema = std::make_shared<Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(1, "id", int32()),
+      SchemaField::MakeOptional(
+          4, "info",
+          struct_({SchemaField::MakeOptional(5, "city", string()),
+                   SchemaField::MakeOptional(6, "state", string())}))});
+  auto requested_schema = std::shared_ptr<Schema>(
+      nested_schema->Project(std::unordered_set<int32_t>{1, 5}).value());
+
+  EqualityDeleteWriterOptions options{
+      .path = "eq-state-partial.parquet",
+      .schema = nested_schema,
+      .spec = partition_spec_,
+      .partition = PartitionValues{},
+      .format = FileFormatType::kParquet,
+      .io = file_io_,
+      .equality_field_ids = {6},
+      .properties = {{"write.parquet.compression-codec", "uncompressed"}},
+  };
+  ICEBERG_UNWRAP_OR_FAIL(auto writer, EqualityDeleteWriter::Make(options));
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto delete_batch,
+      MakeBatch(*nested_schema,
+                R"([{"id": 0, "info": {"city": "ignored", "state": "CA"}}])"));
+  ASSERT_THAT(writer->Write(&delete_batch.array), IsOk());
+  ASSERT_THAT(writer->Close(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto eq_by_state_meta, writer->Metadata());
+  auto eq_by_state = eq_by_state_meta.data_files[0];
+
+  std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_state};
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
nested_schema,
+                                   requested_schema, file_io_);
+  ASSERT_THAT(filter, IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto data_batch,
+                         MakeBatch(*filter.value()->RequiredSchema(),
+                                   R"([{"id": 1, "info": {"city": "SF", 
"state": "CA"}},
+                                  {"id": 2, "info": {"city": "NYC", "state": 
"NY"}},
+                                  {"id": 3, "info": null}])"));
+
+  auto alive = filter.value()->ComputeAliveRows(data_batch.schema, 
data_batch.array);
+
+  ASSERT_THAT(alive, IsOk());
+  ExpectAliveRows(alive.value(), {1, 2});
+}
+
+TEST_F(DeleteFilterTest, EqualityDeleteProjectionSortsNestedFieldsById) {
+  auto nested_schema = std::make_shared<Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(1, "id", int32()),
+      SchemaField::MakeOptional(
+          4, "info",
+          struct_({SchemaField::MakeOptional(6, "state", string()),
+                   SchemaField::MakeOptional(5, "city", string())}))});
+  auto requested_schema = std::shared_ptr<Schema>(
+      nested_schema->Project(std::unordered_set<int32_t>{1}).value());
+  auto eq_delete = std::make_shared<DataFile>(DataFile{
+      .content = DataFile::Content::kEqualityDeletes,
+      .file_path = "eq-city-state.orc",
+      .file_format = FileFormatType::kOrc,
+      .equality_ids = {6, 5},
+  });
+  std::vector<std::shared_ptr<DataFile>> delete_files = {eq_delete};
+
+  std::shared_ptr<Schema> captured_projection;
+  ScopedReaderFactory reader_factory(
+      FileFormatType::kOrc, [&captured_projection]() -> 
Result<std::unique_ptr<Reader>> {
+        return std::make_unique<CapturingReader>(&captured_projection);
+      });
+
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
nested_schema,
+                                   requested_schema, file_io_);
+  ASSERT_THAT(filter, IsOk());
+  ASSERT_THAT(filter.value()->EqDeletedRowFilter(), IsOk());
+
+  ASSERT_NE(captured_projection, nullptr);
+  ASSERT_EQ(captured_projection->fields().size(), 1);
+  auto info_type =
+      
std::dynamic_pointer_cast<StructType>(captured_projection->fields()[0].type());
+  ASSERT_NE(info_type, nullptr);
+  EXPECT_THAT(StructFieldIds(*info_type), testing::ElementsAre(5, 6));
+}
+
+TEST_F(DeleteFilterTest, DroppedTopLevelFieldResolvedBySchemas) {
+  auto current_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())},
+      /*schema_id=*/2);
+  auto historic_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                               SchemaField::MakeOptional(7, "dropped_value", 
string())},
+      /*schema_id=*/1);
+  auto requested_schema = current_schema;
+  auto eq_by_dropped = std::make_shared<DataFile>(DataFile{
+      .content = DataFile::Content::kEqualityDeletes,
+      .file_path = "eq-dropped.parquet",
+      .file_format = FileFormatType::kParquet,
+      .equality_ids = {7},
+  });
+  std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_dropped};
+  std::vector<std::shared_ptr<Schema>> schemas = {current_schema, 
historic_schema};
+
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
current_schema,
+                                   requested_schema, file_io_, schemas);
+
+  ASSERT_THAT(filter, IsOk());
+  EXPECT_THAT(FieldIds(*filter.value()->RequiredSchema()), 
testing::ElementsAre(1, 7));
+  EXPECT_THAT(FieldNames(*filter.value()->RequiredSchema()),
+              testing::ElementsAre("id", "dropped_value"));
+}
+
+TEST_F(DeleteFilterTest, MakeFieldLookupSchemasMayIncludeCurrent) {
+  auto current_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                               SchemaField::MakeOptional(7, "current_value", 
string())},
+      /*schema_id=*/2);
+  auto old_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                               SchemaField::MakeOptional(7, "old_value", 
int32())},
+      /*schema_id=*/1);
+  std::vector<std::shared_ptr<Schema>> schemas = {old_schema, current_schema};
+
+  auto lookup_result = DeleteFilter::MakeFieldLookup(current_schema, schemas);
+  ASSERT_THAT(lookup_result, IsOk());
+
+  auto field_result = lookup_result.value()(7);
+  ASSERT_THAT(field_result, IsOk());
+  ASSERT_TRUE(field_result.value().has_value());
+  EXPECT_EQ(field_result.value()->field.name(), "current_value");
+  EXPECT_EQ(field_result.value()->field.type()->type_id(), TypeId::kString);
+}
+
+TEST_F(DeleteFilterTest, MakeFieldLookupCurrentSchemaWins) {
+  auto current_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                               SchemaField::MakeOptional(7, "current_value", 
string())},
+      /*schema_id=*/3);
+  auto fallback_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                               SchemaField::MakeOptional(7, "fallback_value", 
int32())},
+      /*schema_id=*/2);
+  std::vector<std::shared_ptr<Schema>> schemas = {fallback_schema};
+
+  auto lookup_result = DeleteFilter::MakeFieldLookup(current_schema, schemas);
+  ASSERT_THAT(lookup_result, IsOk());
+
+  auto field_result = lookup_result.value()(7);
+  ASSERT_THAT(field_result, IsOk());
+  ASSERT_TRUE(field_result.value().has_value());
+  EXPECT_EQ(field_result.value()->field.name(), "current_value");
+  EXPECT_EQ(field_result.value()->field.type()->type_id(), TypeId::kString);
+}
+
+TEST_F(DeleteFilterTest, MakeFieldLookupLatestFallbackSchemaWins) {
+  auto current_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())},
+      /*schema_id=*/3);
+  auto older_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                               SchemaField::MakeOptional(7, "old_value", 
int32())},
+      /*schema_id=*/1);
+  auto newer_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                               SchemaField::MakeOptional(7, "new_value", 
string())},
+      /*schema_id=*/2);
+  std::vector<std::shared_ptr<Schema>> schemas = {older_schema, newer_schema};
+
+  auto lookup_result = DeleteFilter::MakeFieldLookup(current_schema, schemas);
+  ASSERT_THAT(lookup_result, IsOk());
+
+  auto field_result = lookup_result.value()(7);
+  ASSERT_THAT(field_result, IsOk());
+  ASSERT_TRUE(field_result.value().has_value());
+  EXPECT_EQ(field_result.value()->field.name(), "new_value");
+  EXPECT_EQ(field_result.value()->field.type()->type_id(), TypeId::kString);
+}
+
+TEST_F(DeleteFilterTest, DroppedNestedFieldResolvedBySchemas) {
+  auto current_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{
+          SchemaField::MakeRequired(1, "id", int32()),
+          SchemaField::MakeOptional(
+              4, "info", struct_({SchemaField::MakeOptional(5, "city", 
string())}))},
+      /*schema_id=*/2);
+  auto historic_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{
+          SchemaField::MakeRequired(1, "id", int32()),
+          SchemaField::MakeOptional(
+              4, "info",
+              struct_({SchemaField::MakeOptional(5, "city", string()),
+                       SchemaField::MakeOptional(6, "state", string())}))},
+      /*schema_id=*/1);
+  auto requested_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
+  auto eq_by_dropped_nested = std::make_shared<DataFile>(DataFile{
+      .content = DataFile::Content::kEqualityDeletes,
+      .file_path = "eq-dropped-state.parquet",
+      .file_format = FileFormatType::kParquet,
+      .equality_ids = {6},
+  });
+  std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_dropped_nested};
+  std::vector<std::shared_ptr<Schema>> schemas = {current_schema, 
historic_schema};
+
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
current_schema,
+                                   requested_schema, file_io_, schemas);
+
+  ASSERT_THAT(filter, IsOk());
+  EXPECT_THAT(FieldIds(*filter.value()->RequiredSchema()), 
testing::ElementsAre(1, 4));
+  const auto& info = filter.value()->RequiredSchema()->fields()[1];
+  auto info_type = std::dynamic_pointer_cast<StructType>(info.type());
+  ASSERT_NE(info_type, nullptr);
+  EXPECT_THAT(StructFieldIds(*info_type), testing::ElementsAre(6));
+}
+
+TEST_F(DeleteFilterTest, MetadataLookupUsesSchemas) {
+  auto current_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())},
+      /*schema_id=*/2);
+  auto historic_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                               SchemaField::MakeOptional(7, "dropped_value", 
string())},
+      /*schema_id=*/1);
+  auto metadata = std::make_shared<TableMetadata>(TableMetadata{
+      .format_version = TableMetadata::kDefaultTableFormatVersion,
+      .schemas = {historic_schema, current_schema},
+      .current_schema_id = current_schema->schema_id(),
+  });
+  auto requested_schema = current_schema;
+  auto eq_by_dropped = std::make_shared<DataFile>(DataFile{
+      .content = DataFile::Content::kEqualityDeletes,
+      .file_path = "eq-dropped.parquet",
+      .file_format = FileFormatType::kParquet,
+      .equality_ids = {7},
+  });
+  std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_dropped};
+
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
metadata,
+                                   requested_schema, file_io_);
+
+  ASSERT_THAT(filter, IsOk());
+  EXPECT_THAT(FieldIds(*filter.value()->RequiredSchema()), 
testing::ElementsAre(1, 7));
+}
+
+TEST_F(DeleteFilterTest, MetadataLookupPrefersLatestFallbackSchema) {
+  auto current_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())},
+      /*schema_id=*/3);
+  auto older_historic_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                               SchemaField::MakeOptional(7, "old_name", 
int32())},
+      /*schema_id=*/1);
+  auto newer_historic_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                               SchemaField::MakeOptional(7, "new_name", 
string())},
+      /*schema_id=*/2);
+  auto metadata = std::make_shared<TableMetadata>(TableMetadata{
+      .format_version = TableMetadata::kDefaultTableFormatVersion,
+      .schemas = {older_historic_schema, newer_historic_schema, 
current_schema},
+      .current_schema_id = current_schema->schema_id(),
+  });
+  auto eq_by_dropped = std::make_shared<DataFile>(DataFile{
+      .content = DataFile::Content::kEqualityDeletes,
+      .file_path = "eq-dropped.parquet",
+      .file_format = FileFormatType::kParquet,
+      .equality_ids = {7},
+  });
+  std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_dropped};
+
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
metadata,
+                                   current_schema, file_io_);
+
+  ASSERT_THAT(filter, IsOk());
+  ASSERT_THAT(FieldIds(*filter.value()->RequiredSchema()), 
testing::ElementsAre(1, 7));
+  const auto& dropped_field = filter.value()->RequiredSchema()->fields()[1];
+  EXPECT_EQ(dropped_field.name(), "new_name");
+  EXPECT_EQ(dropped_field.type()->type_id(), TypeId::kString);
+}
+
+TEST_F(DeleteFilterTest, DroppedNestedFieldFiltersRowsWithSchemas) {
+  auto current_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{
+          SchemaField::MakeRequired(1, "id", int32()),
+          SchemaField::MakeOptional(
+              4, "info", struct_({SchemaField::MakeOptional(5, "city", 
string())}))},
+      /*schema_id=*/2);
+  auto historic_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{
+          SchemaField::MakeRequired(1, "id", int32()),
+          SchemaField::MakeOptional(
+              4, "info",
+              struct_({SchemaField::MakeOptional(5, "city", string()),
+                       SchemaField::MakeOptional(6, "state", string())}))},
+      /*schema_id=*/1);
+  auto requested_schema = current_schema;
+
+  EqualityDeleteWriterOptions options{
+      .path = "eq-dropped-state-filter.parquet",
+      .schema = historic_schema,
+      .spec = partition_spec_,
+      .partition = PartitionValues{},
+      .format = FileFormatType::kParquet,
+      .io = file_io_,
+      .equality_field_ids = {6},
+      .properties = {{"write.parquet.compression-codec", "uncompressed"}},
+  };
+  ICEBERG_UNWRAP_OR_FAIL(auto writer, EqualityDeleteWriter::Make(options));
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto delete_batch,
+      MakeBatch(*historic_schema,
+                R"([{"id": 0, "info": {"city": "ignored", "state": "CA"}}])"));
+  ASSERT_THAT(writer->Write(&delete_batch.array), IsOk());
+  ASSERT_THAT(writer->Close(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto eq_by_state_meta, writer->Metadata());
+  auto eq_by_state = eq_by_state_meta.data_files[0];
+  std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_state};
+  std::vector<std::shared_ptr<Schema>> schemas = {current_schema, 
historic_schema};
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
current_schema,
+                                   requested_schema, file_io_, schemas);
+  ASSERT_THAT(filter, IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto data_batch,
+                         MakeBatch(*filter.value()->RequiredSchema(),
+                                   R"([{"id": 1, "info": {"city": "SF", 
"state": "CA"}},
+                                  {"id": 2, "info": {"city": "NYC", "state": 
"NY"}},
+                                  {"id": 3, "info": null}])"));
+
+  auto alive = filter.value()->ComputeAliveRows(data_batch.schema, 
data_batch.array);
+
+  ASSERT_THAT(alive, IsOk());
+  ExpectAliveRows(alive.value(), {1, 2});
+}
+
+TEST_F(DeleteFilterTest, DeletionVectorErrorPropagatesFromCompute) {
+  auto dv_file = std::make_shared<DataFile>(DataFile{
+      .content = DataFile::Content::kPositionDeletes,
+      .file_path = "dv.puffin",
+      .file_format = FileFormatType::kPuffin,
+  });
+  std::vector<std::shared_ptr<DataFile>> delete_files = {dv_file};
+  auto requested_schema = Project({1});
+
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
table_schema_,
+                                   requested_schema, file_io_);
+
+  ASSERT_THAT(filter, IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto batch,
+                         MakeBatch(*filter.value()->RequiredSchema(), R"([[1, 
0]])"));
+  auto alive = filter.value()->ComputeAliveRows(batch.schema, batch.array);
+  ASSERT_THAT(alive, IsError(ErrorKind::kNotSupported));
+}
+
+TEST_F(DeleteFilterTest, EmptyBatchPropagatesDeleteLoadErrors) {
+  auto dv_file = std::make_shared<DataFile>(DataFile{
+      .content = DataFile::Content::kPositionDeletes,
+      .file_path = "dv-empty.puffin",
+      .file_format = FileFormatType::kPuffin,
+  });
+  std::vector<std::shared_ptr<DataFile>> delete_files = {dv_file};
+  auto requested_schema = Project({1});
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
table_schema_,
+                                   requested_schema, file_io_);
+  ASSERT_THAT(filter, IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto batch,
+                         MakeBatch(*filter.value()->RequiredSchema(), 
R"([])"));
+
+  auto alive = filter.value()->ComputeAliveRows(batch.schema, batch.array);
+
+  ASSERT_THAT(alive, IsError(ErrorKind::kNotSupported));
+}
+
+TEST_F(DeleteFilterTest, CounterAccumulatesAcrossBatches) {
+  ICEBERG_UNWRAP_OR_FAIL(auto pos_delete,
+                         PositionDeleteFile("pos-multi-batch.parquet", {1}));
+  std::vector<std::shared_ptr<DataFile>> delete_files = {pos_delete};
+  auto requested_schema = Project({1});
+  auto counter = std::make_shared<DeleteCounter>();
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
table_schema_,
+                                   requested_schema, file_io_,
+                                   /*need_row_pos_col=*/true, counter);
+  ASSERT_THAT(filter, IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto batch1, 
MakeBatch(*filter.value()->RequiredSchema(),
+                                                R"([[10, 0], [20, 1], [30, 
2]])"));
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto batch2, MakeBatch(*filter.value()->RequiredSchema(), R"([[40, 3], 
[50, 4]])"));
+
+  ASSERT_THAT(filter.value()->ComputeAliveRows(batch1.schema, batch1.array), 
IsOk());
+  ASSERT_THAT(filter.value()->ComputeAliveRows(batch2.schema, batch2.array), 
IsOk());
+  EXPECT_EQ(counter->Get(), 1);
+}
+
+enum class MakeErrorDeleteKind {
+  kNullDeleteFile,
+  kDataFile,
+  kEqualityDeleteWithEmptyIds,
+  kUnknownEqualityFieldId,
+};
+
+struct MakeErrorCase {
+  const char* name;
+  MakeErrorDeleteKind delete_kind;
+};
+
+class DeleteFilterMakeErrorTest : public DeleteFilterTest,
+                                  public 
testing::WithParamInterface<MakeErrorCase> {};
+
+TEST_P(DeleteFilterMakeErrorTest, InvalidDeleteFilesError) {
+  const auto& test_case = GetParam();
+  std::vector<std::shared_ptr<DataFile>> delete_files;
+  switch (test_case.delete_kind) {
+    case MakeErrorDeleteKind::kNullDeleteFile:
+      delete_files.push_back(nullptr);
+      break;
+    case MakeErrorDeleteKind::kDataFile:
+      delete_files.push_back(std::make_shared<DataFile>(DataFile{
+          .content = DataFile::Content::kData,
+          .file_path = "data.parquet",
+          .file_format = FileFormatType::kParquet,
+      }));
+      break;
+    case MakeErrorDeleteKind::kEqualityDeleteWithEmptyIds:
+      delete_files.push_back(std::make_shared<DataFile>(DataFile{
+          .content = DataFile::Content::kEqualityDeletes,
+          .file_path = "eq-no-ids.parquet",
+          .file_format = FileFormatType::kParquet,
+          .equality_ids = {},
+      }));
+      break;
+    case MakeErrorDeleteKind::kUnknownEqualityFieldId:
+      delete_files.push_back(std::make_shared<DataFile>(DataFile{
+          .content = DataFile::Content::kEqualityDeletes,
+          .file_path = "eq-unknown.parquet",
+          .file_format = FileFormatType::kParquet,
+          .equality_ids = {999},
+      }));
+      break;
+  }
+
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
table_schema_,
+                                   table_schema_, file_io_);
+
+  EXPECT_THAT(filter, IsError(ErrorKind::kInvalidArgument));
+}
+
+INSTANTIATE_TEST_SUITE_P(
+    MakeErrors, DeleteFilterMakeErrorTest,
+    testing::Values(
+        MakeErrorCase{
+            .name = "NullDeleteFile",
+            .delete_kind = MakeErrorDeleteKind::kNullDeleteFile,
+        },
+        MakeErrorCase{
+            .name = "DataFileAsDeleteFile",
+            .delete_kind = MakeErrorDeleteKind::kDataFile,
+        },
+        MakeErrorCase{
+            .name = "EqualityDeleteWithEmptyIds",
+            .delete_kind = MakeErrorDeleteKind::kEqualityDeleteWithEmptyIds,
+        },
+        MakeErrorCase{
+            .name = "UnknownEqualityFieldId",
+            .delete_kind = MakeErrorDeleteKind::kUnknownEqualityFieldId,
+        }),
+    ParamName<MakeErrorCase>);
+
+TEST_F(DeleteFilterTest, EqualityFieldNestedInListOrMapErrors) {
+  auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
+      SchemaField::MakeRequired(1, "id", int32()),
+      SchemaField::MakeOptional(4, "tags",
+                                list(SchemaField::MakeRequired(5, "element", 
string()))),
+      SchemaField::MakeOptional(6, "attrs",
+                                map(SchemaField::MakeRequired(7, "key", 
string()),
+                                    SchemaField::MakeOptional(8, "value", 
string())))});
+  auto requested_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
+
+  for (const auto& [field_id, nested_type] :
+       {std::pair{5, std::string_view("list")}, std::pair{8, 
std::string_view("map")}}) {
+    auto eq_delete = std::make_shared<DataFile>(DataFile{
+        .content = DataFile::Content::kEqualityDeletes,
+        .file_path = "eq-nested-container.parquet",
+        .file_format = FileFormatType::kParquet,
+        .equality_ids = {field_id},
+    });
+    std::vector<std::shared_ptr<DataFile>> delete_files = {eq_delete};
+
+    auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
schema,
+                                     requested_schema, file_io_);
+
+    EXPECT_THAT(filter, IsError(ErrorKind::kInvalidArgument));
+    EXPECT_THAT(filter,
+                HasErrorMessage(std::format("must not be nested in {}", 
nested_type)));
+  }
+}
+
+TEST_F(DeleteFilterTest, NullPosInBatchErrors) {
+  ICEBERG_UNWRAP_OR_FAIL(auto pos_delete,
+                         PositionDeleteFile("pos-null-pos.parquet", {0}));
+  std::vector<std::shared_ptr<DataFile>> delete_files = {pos_delete};
+  auto requested_schema = Project({1});
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
table_schema_,
+                                   requested_schema, file_io_);
+  ASSERT_THAT(filter, IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto batch, 
MakeBatch(*filter.value()->RequiredSchema(),
+                                               R"([[10, null], [20, null]])"));
+
+  auto alive = filter.value()->ComputeAliveRows(batch.schema, batch.array);
+
+  EXPECT_THAT(alive, IsError(ErrorKind::kInvalidArrowData));
+}
+
+TEST_F(DeleteFilterTest, ExpectedSchemaIsRequestedSchema) {
+  auto requested_schema = Project({1});
+  auto eq_by_name = std::make_shared<DataFile>(DataFile{
+      .content = DataFile::Content::kEqualityDeletes,
+      .file_path = "eq-name.parquet",
+      .file_format = FileFormatType::kParquet,
+      .equality_ids = {2},
+  });
+  std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_name};
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
table_schema_,
+                                   requested_schema, file_io_);
+  ASSERT_THAT(filter, IsOk());
+  EXPECT_EQ(filter.value()->ExpectedSchema(), requested_schema);
+  EXPECT_NE(filter.value()->RequiredSchema(), requested_schema);
+}
+
+TEST_F(DeleteFilterTest, IncrementDeleteCountForwardsToCounter) {
+  std::vector<std::shared_ptr<DataFile>> delete_files;
+  auto counter = std::make_shared<DeleteCounter>();
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
table_schema_,
+                                   table_schema_, file_io_,
+                                   /*need_row_pos_col=*/true, counter);
+  ASSERT_THAT(filter, IsOk());
+
+  filter.value()->IncrementDeleteCount(3);
+  filter.value()->IncrementDeleteCount();
+
+  EXPECT_EQ(counter->Get(), 4);
+}
+
+TEST_F(DeleteFilterTest, DeletedRowPositionsNullWithNoPosDeletes) {
+  std::vector<std::shared_ptr<DataFile>> delete_files;
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
table_schema_,
+                                   table_schema_, file_io_);
+  ASSERT_THAT(filter, IsOk());
+
+  auto index = filter.value()->DeletedRowPositions();
+
+  ASSERT_THAT(index, IsOk());
+  EXPECT_EQ(index.value(), nullptr);
+}
+
+TEST_F(DeleteFilterTest, DeletedRowPositionsLazyLoads) {
+  ICEBERG_UNWRAP_OR_FAIL(auto pos_delete,
+                         PositionDeleteFile("pos-index.parquet", {1, 3}));
+  std::vector<std::shared_ptr<DataFile>> delete_files = {pos_delete};
+  auto requested_schema = Project({1});
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
table_schema_,
+                                   requested_schema, file_io_);
+  ASSERT_THAT(filter, IsOk());
+
+  auto index = filter.value()->DeletedRowPositions();
+
+  ASSERT_THAT(index, IsOk());
+  ASSERT_NE(index.value(), nullptr);
+  EXPECT_TRUE(index.value()->IsDeleted(1));
+  EXPECT_TRUE(index.value()->IsDeleted(3));
+  EXPECT_FALSE(index.value()->IsDeleted(0));
+  EXPECT_FALSE(index.value()->IsDeleted(2));
+}
+
+TEST_F(DeleteFilterTest, EqDeletedRowFilterTrueWithNoEqDeletes) {
+  std::vector<std::shared_ptr<DataFile>> delete_files;
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
table_schema_,
+                                   table_schema_, file_io_);
+  ASSERT_THAT(filter, IsOk());
+
+  auto predicate_result = filter.value()->EqDeletedRowFilter();
+
+  ASSERT_THAT(predicate_result, IsOk());
+  ASSERT_TRUE(static_cast<bool>(predicate_result.value()));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto batch, 
MakeBatch(*filter.value()->RequiredSchema(),
+                                               R"([[1, "Alice", "blue"]])"));
+  ICEBERG_UNWRAP_OR_FAIL(auto row, ArrowArrayStructLike::Make(batch.schema, 
batch.array));
+  ICEBERG_UNWRAP_OR_FAIL(auto alive, predicate_result.value()(*row));
+  EXPECT_TRUE(alive);
+}
+
+TEST_F(DeleteFilterTest, EqDeletedRowFilterReturnsTrueForAliveRows) {
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto eq_by_name,
+      EqualityDeleteFile("eq-filter.parquet", R"([[0, "Bob", "unused"]])", 
{2}));
+  std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_name};
+  auto requested_schema = Project({1});
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
table_schema_,
+                                   requested_schema, file_io_);
+  ASSERT_THAT(filter, IsOk());
+
+  auto predicate_result = filter.value()->EqDeletedRowFilter();
+  ASSERT_THAT(predicate_result, IsOk());
+  auto& predicate = predicate_result.value();
+  ASSERT_TRUE(static_cast<bool>(predicate));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto batch,
+                         MakeBatch(*filter.value()->RequiredSchema(),
+                                   R"([[1, "Alice"], [2, "Bob"], [3, 
"Carol"]])"));
+  ICEBERG_UNWRAP_OR_FAIL(auto row, ArrowArrayStructLike::Make(batch.schema, 
batch.array));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto alice_alive, predicate(*row));
+  EXPECT_TRUE(alice_alive);
+
+  ASSERT_THAT(row->Reset(1), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto bob_alive, predicate(*row));
+  EXPECT_FALSE(bob_alive);
+
+  ASSERT_THAT(row->Reset(2), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto carol_alive, predicate(*row));
+  EXPECT_TRUE(carol_alive);
+}
+
+TEST_F(DeleteFilterTest, EqDeletedRowFilterIsCached) {
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto eq_by_name,
+      EqualityDeleteFile("eq-cache.parquet", R"([[0, "Bob", "unused"]])", 
{2}));
+  std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_name};
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
table_schema_,
+                                   table_schema_, file_io_);
+  ASSERT_THAT(filter, IsOk());
+
+  auto result1 = filter.value()->EqDeletedRowFilter();
+  auto result2 = filter.value()->EqDeletedRowFilter();
+  ASSERT_THAT(result1, IsOk());
+  ASSERT_THAT(result2, IsOk());
+  EXPECT_TRUE(static_cast<bool>(result1.value()));
+  EXPECT_TRUE(static_cast<bool>(result2.value()));
+}
+
+TEST_F(DeleteFilterTest, FindEqDeleteRowsTrueForDeleted) {
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto eq_by_name,
+      EqualityDeleteFile("eq-find.parquet", R"([[0, "Bob", "unused"]])", {2}));
+  std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_name};
+  auto requested_schema = Project({1});
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
table_schema_,
+                                   requested_schema, file_io_);
+  ASSERT_THAT(filter, IsOk());
+
+  auto predicate_result = filter.value()->FindEqualityDeleteRows();
+  ASSERT_THAT(predicate_result, IsOk());
+  auto& predicate = predicate_result.value();
+  ASSERT_TRUE(static_cast<bool>(predicate));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto batch,
+                         MakeBatch(*filter.value()->RequiredSchema(),
+                                   R"([[1, "Alice"], [2, "Bob"], [3, 
"Carol"]])"));
+  ICEBERG_UNWRAP_OR_FAIL(auto row, ArrowArrayStructLike::Make(batch.schema, 
batch.array));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto alice_deleted, predicate(*row));
+  EXPECT_FALSE(alice_deleted);
+
+  ASSERT_THAT(row->Reset(1), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto bob_deleted, predicate(*row));
+  EXPECT_TRUE(bob_deleted);
+}
+
+TEST_F(DeleteFilterTest, FindEqDeleteRowsFalseWithNoEqDeletes) {
+  std::vector<std::shared_ptr<DataFile>> delete_files;
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
table_schema_,
+                                   table_schema_, file_io_);
+  ASSERT_THAT(filter, IsOk());
+
+  auto predicate_result = filter.value()->FindEqualityDeleteRows();
+
+  ASSERT_THAT(predicate_result, IsOk());
+  ASSERT_TRUE(static_cast<bool>(predicate_result.value()));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto batch, 
MakeBatch(*filter.value()->RequiredSchema(),
+                                               R"([[1, "Alice", "blue"]])"));
+  ICEBERG_UNWRAP_OR_FAIL(auto row, ArrowArrayStructLike::Make(batch.schema, 
batch.array));
+  ICEBERG_UNWRAP_OR_FAIL(auto deleted, predicate_result.value()(*row));
+  EXPECT_FALSE(deleted);
+}
+
+TEST_F(DeleteFilterTest, ExplicitFieldLookupFiltersRows) {
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto eq_by_name,
+      EqualityDeleteFile("eq-lookup.parquet", R"([[0, "Bob", "unused"]])", 
{2}));
+  std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_name};
+  auto requested_schema = Project({1});
+
+  ICEBERG_UNWRAP_OR_FAIL(auto base_lookup, 
DeleteFilter::MakeFieldLookup(table_schema_));
+  DeleteFilter::FieldLookup custom_lookup =
+      [base_lookup = std::move(base_lookup)](
+          int32_t field_id) -> 
Result<std::optional<DeleteFilter::FieldLookupResult>> {
+    if (field_id == 2) {
+      return base_lookup(field_id);
+    }
+    return std::nullopt;
+  };
+
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
requested_schema,
+                                   file_io_, std::move(custom_lookup));
+
+  ASSERT_THAT(filter, IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto batch,
+                         MakeBatch(*filter.value()->RequiredSchema(),
+                                   R"([[1, "Alice"], [2, "Bob"], [3, 
"Carol"]])"));
+
+  auto alive = filter.value()->ComputeAliveRows(batch.schema, batch.array);
+
+  ASSERT_THAT(alive, IsOk());
+  ExpectAliveRows(alive.value(), {0, 2});
+}
+
+TEST_F(DeleteFilterTest, ExplicitFieldLookupNulloptErrors) {
+  // A lookup that returns nullopt for the equality field must produce an error
+  // at Make() time (during ComputeRequiredSchema), not silently skip the 
field.
+  auto eq_by_name = std::make_shared<DataFile>(DataFile{
+      .content = DataFile::Content::kEqualityDeletes,
+      .file_path = "eq-missing.parquet",
+      .file_format = FileFormatType::kParquet,
+      .equality_ids = {2},
+  });
+  std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_name};
+  auto requested_schema = Project({1});
+
+  DeleteFilter::FieldLookup empty_lookup =
+      [](int32_t) -> Result<std::optional<DeleteFilter::FieldLookupResult>> {
+    return std::nullopt;
+  };
+
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
requested_schema,
+                                   file_io_, std::move(empty_lookup));
+
+  EXPECT_THAT(filter, IsError(ErrorKind::kInvalidArgument));
+}
+
+TEST_F(DeleteFilterTest, ExplicitFieldLookupRejectsListOrMapProjection) {
+  auto eq_by_element = std::make_shared<DataFile>(DataFile{
+      .content = DataFile::Content::kEqualityDeletes,
+      .file_path = "eq-element.parquet",
+      .file_format = FileFormatType::kParquet,
+      .equality_ids = {5},
+  });
+  std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_element};
+  auto requested_schema = Project({1});
+
+  DeleteFilter::FieldLookup list_lookup =
+      [](int32_t field_id) -> 
Result<std::optional<DeleteFilter::FieldLookupResult>> {
+    auto element = SchemaField::MakeRequired(5, "element", string());
+    return DeleteFilter::FieldLookupResult{
+        .field = element,
+        .projection_field = SchemaField::MakeOptional(4, "tags", 
list(element)),
+    };
+  };
+
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
requested_schema,
+                                   file_io_, std::move(list_lookup));
+
+  EXPECT_THAT(filter, IsError(ErrorKind::kInvalidArgument));
+  EXPECT_THAT(filter, HasErrorMessage("must not be nested in list"));
+}
+
+TEST_F(DeleteFilterTest, ExplicitFieldLookupSkipsExistingFields) {
+  // When the equality field is already in requested_schema, the custom lookup
+  // must NOT be called
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto eq_by_name,
+      EqualityDeleteFile("eq-already-present.parquet", R"([[0, "Bob", 
"unused"]])", {2}));
+  std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_name};
+  auto requested_schema = Project({1, 2});
+
+  bool lookup_called = false;
+  DeleteFilter::FieldLookup tracking_lookup =
+      [&lookup_called](
+          int32_t) -> Result<std::optional<DeleteFilter::FieldLookupResult>> {
+    lookup_called = true;
+    return std::nullopt;  // would fail if called
+  };
+
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
requested_schema,
+                                   file_io_, std::move(tracking_lookup));
+
+  ASSERT_THAT(filter, IsOk());
+  EXPECT_FALSE(lookup_called);
+}
+
+TEST_F(DeleteFilterTest, SchemasLookupDeduplicatesCurrentSchemaId) {
+  auto current_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())},
+      /*schema_id=*/2);
+  auto same_id_historic_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                               SchemaField::MakeOptional(7, "not_historic", 
string())},
+      /*schema_id=*/2);
+  auto eq_by_dropped = std::make_shared<DataFile>(DataFile{
+      .content = DataFile::Content::kEqualityDeletes,
+      .file_path = "eq-dropped.parquet",
+      .file_format = FileFormatType::kParquet,
+      .equality_ids = {7},
+  });
+  std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_dropped};
+  std::vector<std::shared_ptr<Schema>> schemas = {current_schema,
+                                                  same_id_historic_schema};
+
+  auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files, 
current_schema,
+                                   current_schema, file_io_, schemas);
+
+  EXPECT_THAT(filter, IsError(ErrorKind::kInvalidArgument));
+  EXPECT_THAT(filter, HasErrorMessage("Cannot find equality delete field id 
7"));
+}
+
+}  // namespace iceberg

Reply via email to